use tokio::sync::mpsc;
use tokio::sync::watch;
use crate::config::markets::market_config::SyncMode;
use crate::config::workers::{MarketWorkerConfig, OutputSinkConfig};
use crate::levels::Level;
use crate::orders::OrderSide;
use crate::synchronizers::{ClockMode, MarketSynchronizer};
use crate::sources::ExchangeEvent;
use super::ingestion_core::{IngestionCore, IngestionReport};
use super::output::{build_sinks, OutputSinkSet};
use super::topic_publisher::{TopicMessage, TopicRegistry};
#[derive(Debug, Clone)]
pub struct MarketWorkerReport {
pub ingestion: IngestionReport,
pub snapshots_produced: u64,
pub flushes: u32,
}
pub struct MarketWorker {
core: IngestionCore,
sync: MarketSynchronizer,
sinks: OutputSinkSet,
flush_threshold: usize,
symbol: String,
exchange_name: String,
channel_capacity: usize,
clock_mode: ClockMode,
period_ns: u64,
}
impl MarketWorker {
pub fn from_config(config: MarketWorkerConfig) -> anyhow::Result<Self> {
let period_ns = config.sync.period_ns();
let clock_mode = match config.sync.sync_mode {
SyncMode::OnOrderbook => ClockMode::OrderbookDriven,
SyncMode::OnTrade => ClockMode::TradeDriven,
SyncMode::OnLiquidation => ClockMode::LiquidationDriven,
SyncMode::OnTime => ClockMode::ExternalClock,
};
let sync = MarketSynchronizer::with_clock_mode(period_ns, clock_mode);
let flush_threshold = config.sync.flush_threshold;
let channel_capacity = config.common.channel_capacity();
let symbol = config.common.symbol.clone();
let exchange_name = config.common.exchange.clone();
let core = IngestionCore::new(config.common.clone())?;
let has_channel = config
.output
.iter()
.any(|s| matches!(s, OutputSinkConfig::Channel));
let registry = if has_channel {
Some(TopicRegistry::from_config(
&config.common.exchange,
&config.common.symbol,
&config.common.datatypes,
channel_capacity,
))
} else {
None
};
let sinks = build_sinks(&config.output, registry);
Ok(Self {
core,
sync,
sinks,
flush_threshold,
symbol,
exchange_name,
channel_capacity,
clock_mode,
period_ns,
})
}
pub async fn run(
self,
shutdown: watch::Receiver<bool>,
) -> anyhow::Result<MarketWorkerReport> {
let MarketWorker {
core,
mut sync,
sinks,
flush_threshold,
channel_capacity,
exchange_name,
symbol,
clock_mode,
period_ns,
..
} = self;
let exchange_enum: crate::exchanges::Exchange = exchange_name
.parse()
.unwrap_or(crate::exchanges::Exchange::Bybit);
let pipeline = crate::workers::pipeline::build_pipeline(
exchange_enum,
&symbol,
core.datatypes(),
);
let (core_tx, core_rx) = mpsc::channel(channel_capacity);
let core_handle = tokio::spawn(core.run(shutdown.clone(), core_tx));
let (pipeline_tx, mut rx) = mpsc::channel(channel_capacity);
let _pipeline_handle = tokio::spawn(
pipeline.run(core_rx, pipeline_tx, shutdown),
);
let mut snapshots_produced: u64 = 0;
let mut snapshots_since_flush: u64 = 0;
let mut flushes: u32 = 0;
#[inline]
fn drain_and_emit(
sync: &mut MarketSynchronizer,
sinks: &OutputSinkSet,
) -> u64 {
let ready = sync.drain();
let n = ready.len() as u64;
for snapshot in &ready {
let _ = sinks.emit_snapshot(snapshot);
}
n
}
#[inline]
fn maybe_flush(
sinks: &OutputSinkSet,
flush_threshold: usize,
snapshots_since_flush: &mut u64,
flushes: &mut u32,
) -> anyhow::Result<()> {
if flush_threshold > 0
&& *snapshots_since_flush >= flush_threshold as u64
{
sinks.flush()?;
*snapshots_since_flush = 0;
*flushes += 1;
}
Ok(())
}
if clock_mode == ClockMode::ExternalClock {
let tick_ms = (period_ns / 1_000_000).max(1);
let mut timer = tokio::time::interval(
std::time::Duration::from_millis(tick_ms),
);
tracing::info!(
tick_ms = tick_ms,
"market_worker.external_clock_started"
);
loop {
tokio::select! {
_ = timer.tick() => {
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
sync.on_time(now_ns);
let n = drain_and_emit(&mut sync, &sinks);
snapshots_produced += n;
snapshots_since_flush += n;
maybe_flush(&sinks, flush_threshold, &mut snapshots_since_flush, &mut flushes)?;
}
msg = rx.recv() => {
match msg {
Some(msg) => feed_event(&mut sync, &exchange_name, &symbol, &msg),
None => break, }
}
}
}
} else {
while let Some(msg) = rx.recv().await {
feed_event(&mut sync, &exchange_name, &symbol, &msg);
let n = drain_and_emit(&mut sync, &sinks);
snapshots_produced += n;
snapshots_since_flush += n;
maybe_flush(&sinks, flush_threshold, &mut snapshots_since_flush, &mut flushes)?;
}
}
sync.finalize();
let final_snapshots = sync.drain();
for snapshot in &final_snapshots {
let _ = sinks.emit_snapshot(snapshot);
snapshots_produced += 1;
}
sinks.flush()?;
flushes += 1;
let ingestion = core_handle.await??;
tracing::info!(
exchange = exchange_name.as_str(),
symbol = symbol.as_str(),
snapshots_produced = snapshots_produced,
flushes = flushes,
"market_worker.stopped"
);
Ok(MarketWorkerReport {
ingestion,
snapshots_produced,
flushes,
})
}
}
fn feed_event(
sync: &mut MarketSynchronizer,
_exchange_name: &str,
symbol: &str,
msg: &TopicMessage,
) {
match &msg.payload {
ExchangeEvent::Bybit(bybit_event) => feed_bybit(sync, symbol, bybit_event),
ExchangeEvent::Coinbase(coinbase_event) => feed_coinbase(sync, coinbase_event),
ExchangeEvent::Kraken(kraken_event) => feed_kraken(sync, kraken_event),
ExchangeEvent::Binance(binance_event) => feed_binance(sync, symbol, binance_event),
}
}
fn feed_bybit(
sync: &mut MarketSynchronizer,
symbol: &str,
event: &crate::sources::bybit::events::BybitWssEvent,
) {
use crate::sources::bybit::events::BybitWssEvent;
match event {
BybitWssEvent::TradeData(data) => {
let trade = crate::trades::Trade {
trade_ts: data.trade_ts,
symbol: data.symbol.clone(),
side: data.side.clone(),
amount: data.amount.parse::<f64>().unwrap_or(0.0),
price: data.price.parse::<f64>().unwrap_or(0.0),
exchange: "bybit".to_string(),
id: data.trade_id.clone(),
};
sync.on_trade(trade);
}
BybitWssEvent::OrderbookData(resp) => {
let ob = crate::orderbooks::Orderbook {
orderbook_id: 0,
orderbook_ts: resp.orderbook_ts,
symbol: symbol.to_string(),
exchange: "bybit".to_string(),
bids: resp
.data
.bids
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32, OrderSide::Bids, level.price(), level.size(), vec![],
))
.collect(),
asks: resp
.data
.asks
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32, OrderSide::Asks, level.price(), level.size(), vec![],
))
.collect(),
};
sync.on_orderbook(symbol, resp.orderbook_ts, ob);
}
BybitWssEvent::LiquidationData(data) => {
let liq = crate::liquidations::Liquidation {
liquidation_ts: data.liquidation_ts,
symbol: data.symbol.clone(),
side: data.side.clone(),
price: data.price.parse::<f64>().unwrap_or(0.0),
amount: data.amount.parse::<f64>().unwrap_or(0.0),
exchange: "bybit".to_string(),
};
sync.on_liquidation(liq);
}
BybitWssEvent::TickerData(data) => {
if let Some(ref fr_str) = data.funding_rate {
if let Ok(fr_val) = fr_str.parse::<f64>() {
let next_ts: u64 = data
.next_funding_time
.as_deref()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let fr = crate::funding::FundingRate {
funding_rate_ts: data.ts.unwrap_or(0),
symbol: data.symbol.clone(),
funding_rate: fr_val,
next_funding_ts: next_ts,
exchange: "bybit".to_string(),
};
sync.on_funding(fr);
}
}
if let Some(ref oi_str) = data.open_interest {
if let Ok(oi_val) = oi_str.parse::<f64>() {
let oi_value: f64 = data
.open_interest_value
.as_deref()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let oi = crate::open_interest::OpenInterest {
open_interest_ts: data.ts.unwrap_or(0),
symbol: data.symbol.clone(),
open_interest: oi_val,
open_interest_value: oi_value,
exchange: "bybit".to_string(),
};
sync.on_open_interest(oi);
}
}
}
}
}
fn feed_coinbase(
sync: &mut MarketSynchronizer,
event: &crate::sources::coinbase::events::CoinbaseWssEvent,
) {
use crate::sources::coinbase::events::CoinbaseWssEvent;
match event {
CoinbaseWssEvent::TradeData(data) => {
let trade = crate::trades::Trade {
trade_ts: data.timestamp_ms(),
symbol: data.product_id.clone(),
side: data.side.clone(),
amount: data.size.parse::<f64>().unwrap_or(0.0),
price: data.price.parse::<f64>().unwrap_or(0.0),
exchange: "coinbase".to_string(),
id: data.trade_id.clone(),
};
sync.on_trade(trade);
}
CoinbaseWssEvent::OrderbookData(resp) => {
let Some(event) = resp.events.first() else {
return;
};
let ts_ms: u64 = chrono::DateTime::parse_from_rfc3339(&resp.timestamp)
.map(|dt| dt.timestamp_millis() as u64)
.unwrap_or(0);
let mut bids = Vec::new();
let mut asks = Vec::new();
for (i, update) in event.updates.iter().enumerate() {
let price = update.price_level.parse::<f64>().unwrap_or(0.0);
let volume = update.new_quantity.parse::<f64>().unwrap_or(0.0);
match update.side.as_str() {
"bid" => bids.push(Level::new(
i as u32, OrderSide::Bids, price, volume, vec![],
)),
"offer" | "ask" => asks.push(Level::new(
i as u32, OrderSide::Asks, price, volume, vec![],
)),
_ => {}
}
}
let ob = crate::orderbooks::Orderbook {
orderbook_id: 0,
orderbook_ts: ts_ms,
symbol: event.product_id.clone(),
exchange: "coinbase".to_string(),
bids,
asks,
};
sync.on_orderbook(&event.product_id, ts_ms, ob);
}
}
}
fn feed_kraken(
sync: &mut MarketSynchronizer,
event: &crate::sources::kraken::events::KrakenWssEvent,
) {
use crate::sources::kraken::events::KrakenWssEvent;
match event {
KrakenWssEvent::TradeData(data) => {
let trade = crate::trades::Trade {
trade_ts: data.timestamp_ms(),
symbol: data.symbol.clone(),
side: data.side.clone(),
amount: data.qty,
price: data.price,
exchange: "kraken".to_string(),
id: data.trade_id.to_string(),
};
sync.on_trade(trade);
}
KrakenWssEvent::OrderbookData(resp) => {
let Some(book) = resp.data.first() else {
return;
};
let ts_ms: u64 = chrono::DateTime::parse_from_rfc3339(&book.timestamp)
.map(|dt| dt.timestamp_millis() as u64)
.unwrap_or(0);
let ob = crate::orderbooks::Orderbook {
orderbook_id: 0,
orderbook_ts: ts_ms,
symbol: book.symbol.clone(),
exchange: "kraken".to_string(),
bids: book
.bids
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32, OrderSide::Bids, level.price, level.qty, vec![],
))
.collect(),
asks: book
.asks
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32, OrderSide::Asks, level.price, level.qty, vec![],
))
.collect(),
};
sync.on_orderbook(&book.symbol, ts_ms, ob);
}
}
}
fn feed_binance(
sync: &mut MarketSynchronizer,
symbol: &str,
event: &crate::sources::binance::events::BinanceWssEvent,
) {
use crate::sources::binance::events::BinanceWssEvent;
match event {
BinanceWssEvent::TradeData(data) => {
let trade = crate::trades::Trade {
trade_ts: data.trade_time,
symbol: data.symbol.clone(),
side: data.taker_side().to_string(),
amount: data.quantity.parse::<f64>().unwrap_or(0.0),
price: data.price.parse::<f64>().unwrap_or(0.0),
exchange: "binance".to_string(),
id: data.trade_id.to_string(),
};
sync.on_trade(trade);
}
BinanceWssEvent::DepthUpdate(upd) => {
let ob = crate::orderbooks::Orderbook {
orderbook_id: 0,
orderbook_ts: upd.event_time,
symbol: symbol.to_string(),
exchange: "binance".to_string(),
bids: upd
.bids
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32,
OrderSide::Bids,
level[0].parse::<f64>().unwrap_or(0.0),
level[1].parse::<f64>().unwrap_or(0.0),
vec![],
))
.collect(),
asks: upd
.asks
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32,
OrderSide::Asks,
level[0].parse::<f64>().unwrap_or(0.0),
level[1].parse::<f64>().unwrap_or(0.0),
vec![],
))
.collect(),
};
sync.on_orderbook(symbol, upd.event_time, ob);
}
BinanceWssEvent::DepthSnapshot(snap) => {
let ob = crate::orderbooks::Orderbook {
orderbook_id: 0,
orderbook_ts: 0, symbol: symbol.to_string(),
exchange: "binance".to_string(),
bids: snap
.bids
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32,
OrderSide::Bids,
level[0].parse::<f64>().unwrap_or(0.0),
level[1].parse::<f64>().unwrap_or(0.0),
vec![],
))
.collect(),
asks: snap
.asks
.iter()
.enumerate()
.map(|(i, level)| Level::new(
i as u32,
OrderSide::Asks,
level[0].parse::<f64>().unwrap_or(0.0),
level[1].parse::<f64>().unwrap_or(0.0),
vec![],
))
.collect(),
};
sync.on_orderbook(symbol, 0, ob);
}
}
}