use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use crate::clients::connection_manager::{ConnectionManager, ConnectionManagerConfig};
use crate::clients::connection_state::ConnectionState;
use crate::clients::disconnect::DisconnectReason;
use crate::clients::reconnect::{HealthMonitor, ReconnectAction};
use crate::config::markets::market_config::DataTypesSection;
use crate::config::workers::CommonWorkerFields;
use crate::exchanges::Exchange;
use crate::sources::ExchangeEvent;
use crate::sources::binance::client::BinanceWssClient;
use crate::sources::binance::events::BinanceWssEvent;
use crate::sources::bybit::client::BybitWssClient;
use crate::sources::bybit::events::BybitWssEvent;
use crate::sources::coinbase::client::CoinbaseWssClient;
use crate::sources::coinbase::events::CoinbaseWssEvent;
use crate::sources::kraken::client::KrakenWssClient;
use crate::sources::kraken::events::KrakenWssEvent;
use super::gap_detector::{GapDetectorSet, GapStats};
use super::topic_publisher::TopicMessage;
#[derive(Debug, Clone)]
pub struct IngestionReport {
pub exchange: String,
pub symbol: String,
pub total_events: u64,
pub events_by_topic: Vec<(String, u64)>,
pub elapsed_secs: f64,
pub effective_rate: f64,
pub reconnect_count: u32,
pub gap_stats: Vec<GapStats>,
pub errors: Vec<String>,
}
pub struct IngestionCore {
exchange: Exchange,
common: CommonWorkerFields,
label: String,
gap_threshold: Duration,
reconnect_config: ConnectionManagerConfig,
staleness_timeout: Duration,
topics: Vec<String>,
}
impl IngestionCore {
pub fn new(common: CommonWorkerFields) -> anyhow::Result<Self> {
let exchange: Exchange = common
.exchange
.parse()
.map_err(|e: String| anyhow::anyhow!(e))?;
let label = format!("{}:{}", common.exchange, common.symbol);
let gap_threshold = common.gap_threshold();
let reconnect_config = common.reconnect_config();
let staleness_timeout = common.staleness_timeout();
let topics = wss_topic_names(&common.exchange, &common.symbol, &common.datatypes);
Ok(Self {
exchange,
common,
label,
gap_threshold,
reconnect_config,
staleness_timeout,
topics,
})
}
pub fn topics(&self) -> &[String] {
&self.topics
}
pub fn label(&self) -> &str {
&self.label
}
pub fn exchange_name(&self) -> &str {
&self.common.exchange
}
pub fn symbol(&self) -> &str {
&self.common.symbol
}
pub fn datatypes(&self) -> &DataTypesSection {
&self.common.datatypes
}
pub async fn run(
self,
mut shutdown: watch::Receiver<bool>,
event_tx: mpsc::Sender<TopicMessage>,
) -> anyhow::Result<IngestionReport> {
let exchange = self.exchange;
let exchange_name = self.common.exchange.clone();
let symbol = self.common.symbol.clone();
let ob_depth = self.common.datatypes.orderbook.depth;
let datatypes = self.common.datatypes.clone();
let start = Instant::now();
let topic_refs: Vec<&str> = self.topics.iter().map(|s| s.as_str()).collect();
let mut gaps = GapDetectorSet::new(&topic_refs, self.gap_threshold);
let mut manager =
ConnectionManager::new(self.label.clone(), self.reconnect_config.clone());
let mut total_events: u64 = 0;
let mut reconnect_count: u32 = 0;
let mut events_by_topic: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
let mut errors: Vec<String> = Vec::new();
tracing::info!(
label = self.label.as_str(),
exchange = %exchange,
topics = ?topic_refs,
gap_threshold_ms = self.gap_threshold.as_millis() as u64,
staleness_timeout_ms = self.staleness_timeout.as_millis() as u64,
"ingestion_core.started"
);
'reconnect: loop {
if *shutdown.borrow() {
break;
}
manager.transition(ConnectionState::Connecting, "initiating connection");
let (tx, mut rx) = mpsc::channel::<ExchangeEvent>(2048);
let mut wss_handle = Some(spawn_exchange_task(
exchange,
&exchange_name,
&symbol,
&datatypes,
tx,
));
manager.transition(
ConnectionState::Subscribing,
"exchange client spawned, awaiting first event",
);
let mut health = HealthMonitor::new(self.staleness_timeout);
let mut first_event = true;
let mut disconnect_reason = DisconnectReason::CleanClose;
loop {
tokio::select! {
result = shutdown.changed() => {
if result.is_err() || *shutdown.borrow() {
if let Some(h) = wss_handle.take() {
h.abort();
}
break 'reconnect;
}
}
_ = tokio::time::sleep_until(health.deadline()) => {
disconnect_reason = DisconnectReason::StaleConnection {
silence_duration: self.staleness_timeout,
};
tracing::warn!(
label = self.label.as_str(),
timeout_ms = self.staleness_timeout.as_millis() as u64,
"ingestion_core.stale_connection_detected"
);
break;
}
event = rx.recv() => {
match event {
Some(event) => {
health.record_activity();
if first_event {
manager.transition(
ConnectionState::Streaming,
"first event received",
);
manager.on_connected();
first_event = false;
}
let now_ns = wall_clock_ns();
let topic_names = classify_event(
&event, &exchange_name, &symbol, ob_depth,
&datatypes,
);
for topic in &topic_names {
let msg = TopicMessage {
topic: topic.clone(),
received_at_ns: now_ns,
exchange: exchange_name.clone(),
payload: event.clone(),
};
if event_tx.send(msg).await.is_err() {
tracing::info!(
label = self.label.as_str(),
"ingestion_core.event_tx_closed"
);
if let Some(h) = wss_handle.take() {
h.abort();
}
break 'reconnect;
}
gaps.record_event(topic);
*events_by_topic.entry(topic.clone()).or_insert(0) += 1;
total_events += 1;
}
}
None => {
if let Some(h) = wss_handle.take() {
disconnect_reason = resolve_disconnect_reason(h).await;
}
break;
}
}
}
}
}
if let Some(h) = wss_handle.take() {
h.abort();
}
if *shutdown.borrow() {
break;
}
reconnect_count += 1;
let action = manager.on_disconnect(&disconnect_reason);
match action {
ReconnectAction::RetryAfter(d) => {
let msg = format!(
"reconnecting in {:?} (attempt {}): {}",
d,
manager.consecutive_failures(),
disconnect_reason,
);
tracing::warn!(label = self.label.as_str(), "{}", msg);
errors.push(msg);
tokio::select! {
_ = tokio::time::sleep(d) => {}
result = shutdown.changed() => {
if result.is_err() || *shutdown.borrow() {
break;
}
}
}
}
ReconnectAction::RetryImmediately => {
tracing::info!(
label = self.label.as_str(),
"ingestion_core.retry_immediately"
);
}
ReconnectAction::GiveUp { reason } => {
let msg = format!("giving up: {}", reason);
tracing::error!(label = self.label.as_str(), "{}", msg);
errors.push(msg);
break;
}
ReconnectAction::CircuitOpen { until } => {
let remaining = until.duration_since(Instant::now());
let msg = format!("circuit breaker open, waiting {:?}", remaining);
tracing::error!(label = self.label.as_str(), "{}", msg);
errors.push(msg);
tokio::select! {
_ = tokio::time::sleep(remaining) => {}
result = shutdown.changed() => {
if result.is_err() || *shutdown.borrow() {
break;
}
}
}
}
}
}
let elapsed = start.elapsed().as_secs_f64();
let report = IngestionReport {
exchange: exchange_name.clone(),
symbol: symbol.clone(),
total_events,
events_by_topic: events_by_topic.into_iter().collect(),
elapsed_secs: elapsed,
effective_rate: if elapsed > 0.0 {
total_events as f64 / elapsed
} else {
0.0
},
reconnect_count,
gap_stats: gaps.stats(),
errors,
};
tracing::info!(
label = self.label.as_str(),
total_events = total_events,
elapsed_secs = format!("{:.1}", elapsed),
transitions = manager.transitions().len(),
"ingestion_core.stopped"
);
Ok(report)
}
}
fn spawn_exchange_task(
exchange: Exchange,
exchange_name: &str,
symbol: &str,
datatypes: &DataTypesSection,
tx: mpsc::Sender<ExchangeEvent>,
) -> JoinHandle<DisconnectReason> {
let streams = wss_streams(exchange_name, symbol, datatypes);
let symbol = symbol.to_string();
match exchange {
Exchange::Bybit => {
let client = BybitWssClient::new(streams);
tokio::spawn(async move {
let (inner_tx, mut inner_rx) = mpsc::channel::<BybitWssEvent>(2048);
let recv_handle =
tokio::spawn(async move { client.receive_data(inner_tx).await });
while let Some(event) = inner_rx.recv().await {
if tx.send(ExchangeEvent::Bybit(event)).await.is_err() {
recv_handle.abort();
return DisconnectReason::ReceiverDropped;
}
}
collect_exit_reason(recv_handle, "bybit").await
})
}
Exchange::Coinbase => {
let product_ids = vec![symbol];
let client = CoinbaseWssClient::new(streams, product_ids);
tokio::spawn(async move {
let (inner_tx, mut inner_rx) = mpsc::channel::<CoinbaseWssEvent>(2048);
let recv_handle =
tokio::spawn(async move { client.receive_data(inner_tx).await });
while let Some(event) = inner_rx.recv().await {
if tx.send(ExchangeEvent::Coinbase(event)).await.is_err() {
recv_handle.abort();
return DisconnectReason::ReceiverDropped;
}
}
collect_exit_reason(recv_handle, "coinbase").await
})
}
Exchange::Kraken => {
let symbols = vec![symbol];
let book_depth = datatypes.orderbook.depth;
let client = KrakenWssClient::new(streams, symbols, book_depth);
tokio::spawn(async move {
let (inner_tx, mut inner_rx) = mpsc::channel::<KrakenWssEvent>(2048);
let recv_handle =
tokio::spawn(async move { client.receive_data(inner_tx).await });
while let Some(event) = inner_rx.recv().await {
if tx.send(ExchangeEvent::Kraken(event)).await.is_err() {
recv_handle.abort();
return DisconnectReason::ReceiverDropped;
}
}
collect_exit_reason(recv_handle, "kraken").await
})
}
Exchange::Binance => {
let client = BinanceWssClient::new(streams);
tokio::spawn(async move {
let (inner_tx, mut inner_rx) = mpsc::channel::<BinanceWssEvent>(2048);
let recv_handle =
tokio::spawn(async move { client.receive_data(inner_tx).await });
while let Some(event) = inner_rx.recv().await {
if tx.send(ExchangeEvent::Binance(event)).await.is_err() {
recv_handle.abort();
return DisconnectReason::ReceiverDropped;
}
}
collect_exit_reason(recv_handle, "binance").await
})
}
}
}
async fn collect_exit_reason(
handle: JoinHandle<crate::clients::disconnect::WssExitReason>,
exchange_label: &str,
) -> DisconnectReason {
match handle.await {
Ok(exit_reason) => {
tracing::debug!(
exchange = exchange_label,
exit_reason = %exit_reason,
"exchange_client.exited"
);
DisconnectReason::from(exit_reason)
}
Err(join_err) => {
tracing::error!(
exchange = exchange_label,
error = %join_err,
"exchange_client.task_panicked"
);
DisconnectReason::TransportError {
source: format!("{exchange_label} task panicked: {join_err}").into(),
}
}
}
}
async fn resolve_disconnect_reason(handle: JoinHandle<DisconnectReason>) -> DisconnectReason {
match handle.await {
Ok(reason) => reason,
Err(join_err) => DisconnectReason::TransportError {
source: format!("exchange task join error: {join_err}").into(),
},
}
}
pub fn wss_streams(exchange: &str, symbol: &str, datatypes: &DataTypesSection) -> Vec<String> {
match exchange.to_lowercase().as_str() {
"bybit" => {
let mut streams = Vec::new();
if datatypes.orderbook.enabled {
streams.push(format!("orderbook.{}.{}", datatypes.orderbook.depth, symbol));
}
if datatypes.trades.enabled {
streams.push(format!("publicTrade.{}", symbol));
}
if datatypes.liquidations.enabled {
streams.push(format!("allLiquidation.{}", symbol));
}
if datatypes.funding_rates.enabled || datatypes.open_interest.enabled {
streams.push(format!("tickers.{}", symbol));
}
streams
}
"coinbase" => {
let mut channels = Vec::new();
if datatypes.orderbook.enabled {
channels.push("level2".to_string());
}
if datatypes.trades.enabled {
channels.push("market_trades".to_string());
}
channels
}
"kraken" => {
let mut channels = Vec::new();
if datatypes.orderbook.enabled {
channels.push("book".to_string());
}
if datatypes.trades.enabled {
channels.push("trade".to_string());
}
channels
}
"binance" => {
let sym = symbol.to_lowercase();
let mut streams = Vec::new();
if datatypes.orderbook.enabled {
streams.push(format!("{}@depth@100ms", sym));
}
if datatypes.trades.enabled {
streams.push(format!("{}@trade", sym));
}
streams
}
other => {
tracing::warn!("Unknown exchange '{}'; returning empty streams", other);
vec![]
}
}
}
pub fn wss_topic_names(
exchange: &str,
symbol: &str,
datatypes: &DataTypesSection,
) -> Vec<String> {
let _ = exchange; let mut topics = Vec::new();
if datatypes.orderbook.enabled {
topics.push(format!("orderbook.{}.{}", datatypes.orderbook.depth, symbol));
}
if datatypes.trades.enabled {
topics.push(format!("trade.all.{}", symbol));
}
if datatypes.liquidations.enabled {
topics.push(format!("liquidation.all.{}", symbol));
}
if datatypes.funding_rates.enabled {
topics.push(format!("funding.all.{}", symbol));
}
if datatypes.open_interest.enabled {
topics.push(format!("open_interest.all.{}", symbol));
}
topics
}
pub fn classify_event(
event: &ExchangeEvent,
_exchange: &str,
symbol: &str,
ob_depth: usize,
datatypes: &DataTypesSection,
) -> Vec<String> {
match event {
ExchangeEvent::Bybit(bybit) => classify_bybit(bybit, symbol, ob_depth, datatypes),
ExchangeEvent::Coinbase(coinbase) => {
classify_coinbase(coinbase, symbol, ob_depth, datatypes)
}
ExchangeEvent::Kraken(kraken) => classify_kraken(kraken, symbol, ob_depth, datatypes),
ExchangeEvent::Binance(binance) => {
classify_binance(binance, symbol, ob_depth, datatypes)
}
}
}
fn classify_bybit(
event: &BybitWssEvent,
symbol: &str,
ob_depth: usize,
dt: &DataTypesSection,
) -> Vec<String> {
match event {
BybitWssEvent::OrderbookData(_) if dt.orderbook.enabled => {
vec![format!("orderbook.{}.{}", ob_depth, symbol)]
}
BybitWssEvent::TradeData(_) if dt.trades.enabled => {
vec![format!("trade.all.{}", symbol)]
}
BybitWssEvent::LiquidationData(_) if dt.liquidations.enabled => {
vec![format!("liquidation.all.{}", symbol)]
}
BybitWssEvent::TickerData(data) => {
let mut topics = Vec::new();
if dt.funding_rates.enabled && data.funding_rate.is_some() {
topics.push(format!("funding.all.{}", symbol));
}
if dt.open_interest.enabled && data.open_interest.is_some() {
topics.push(format!("open_interest.all.{}", symbol));
}
topics
}
_ => vec![],
}
}
fn classify_coinbase(
event: &CoinbaseWssEvent,
symbol: &str,
ob_depth: usize,
dt: &DataTypesSection,
) -> Vec<String> {
match event {
CoinbaseWssEvent::OrderbookData(_) if dt.orderbook.enabled => {
vec![format!("orderbook.{}.{}", ob_depth, symbol)]
}
CoinbaseWssEvent::TradeData(_) if dt.trades.enabled => {
vec![format!("trade.all.{}", symbol)]
}
_ => vec![],
}
}
fn classify_kraken(
event: &KrakenWssEvent,
symbol: &str,
ob_depth: usize,
dt: &DataTypesSection,
) -> Vec<String> {
match event {
KrakenWssEvent::OrderbookData(_) if dt.orderbook.enabled => {
vec![format!("orderbook.{}.{}", ob_depth, symbol)]
}
KrakenWssEvent::TradeData(_) if dt.trades.enabled => {
vec![format!("trade.all.{}", symbol)]
}
_ => vec![],
}
}
fn classify_binance(
event: &BinanceWssEvent,
symbol: &str,
ob_depth: usize,
dt: &DataTypesSection,
) -> Vec<String> {
use crate::sources::binance::events::BinanceWssEvent;
match event {
BinanceWssEvent::DepthUpdate(_) | BinanceWssEvent::DepthSnapshot(_)
if dt.orderbook.enabled =>
{
vec![format!("orderbook.{}.{}", ob_depth, symbol)]
}
BinanceWssEvent::TradeData(_) if dt.trades.enabled => {
vec![format!("trade.all.{}", symbol)]
}
_ => vec![],
}
}
pub fn wall_clock_ns() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
}