use tokio::sync::mpsc;
use tokio::sync::watch;
use crate::config::MarketSnapshotConfig;
use crate::config::workers::{CommonWorkerFields, DataWorkerConfig, OutputSinkConfig};
use super::ingestion_core::{IngestionCore, IngestionReport};
use super::output::{build_sinks, OutputSinkSet};
use super::topic_publisher::TopicRegistry;
use super::gap_detector::GapStats;
#[derive(Debug, Clone)]
pub struct DataWorkerReport {
pub exchange: String,
pub symbol: String,
pub total_events: u64,
pub events_by_topic: Vec<(String, u64)>,
pub elapsed_secs: f64,
pub effective_rate: f64,
pub reconnect_count: u32,
pub gap_stats: Vec<GapStats>,
pub errors: Vec<String>,
}
impl From<IngestionReport> for DataWorkerReport {
fn from(r: IngestionReport) -> Self {
Self {
exchange: r.exchange,
symbol: r.symbol,
total_events: r.total_events,
events_by_topic: r.events_by_topic,
elapsed_secs: r.elapsed_secs,
effective_rate: r.effective_rate,
reconnect_count: r.reconnect_count,
gap_stats: r.gap_stats,
errors: r.errors,
}
}
}
pub struct DataWorker {
core: IngestionCore,
sinks: OutputSinkSet,
channel_capacity: usize,
}
impl DataWorker {
pub fn from_config(config: DataWorkerConfig) -> anyhow::Result<Self> {
let channel_capacity = config.common.channel_capacity();
let core = IngestionCore::new(config.common.clone())?;
let has_channel = config
.output
.iter()
.any(|s| matches!(s, OutputSinkConfig::Channel));
let registry = if has_channel {
Some(TopicRegistry::from_config(
&config.common.exchange,
&config.common.symbol,
&config.common.datatypes,
channel_capacity,
))
} else {
None
};
let sinks = build_sinks(&config.output, registry);
Ok(Self {
core,
sinks,
channel_capacity,
})
}
pub fn new(config: MarketSnapshotConfig) -> Self {
let common = CommonWorkerFields::from(&config);
let channel_capacity = common.channel_capacity();
let core = IngestionCore::new(common)
.expect("legacy MarketSnapshotConfig should always produce a valid IngestionCore");
Self {
core,
sinks: OutputSinkSet::new(),
channel_capacity,
}
}
pub fn build_registry(&self) -> TopicRegistry {
TopicRegistry::from_config(
self.core.exchange_name(),
self.core.symbol(),
self.core.datatypes(),
self.channel_capacity,
)
}
pub async fn run(
self,
shutdown: watch::Receiver<bool>,
) -> anyhow::Result<DataWorkerReport> {
let channel_capacity = self.channel_capacity;
let exchange_enum: crate::exchanges::Exchange = self.core.exchange_name()
.parse()
.unwrap_or(crate::exchanges::Exchange::Bybit);
let datatypes = self.core.datatypes().clone();
let symbol = self.core.symbol().to_string();
let (core_tx, core_rx) = mpsc::channel(channel_capacity);
let sinks = self.sinks;
let core_handle = tokio::spawn(self.core.run(shutdown.clone(), core_tx));
let pipeline = crate::workers::pipeline::build_pipeline(exchange_enum, &symbol, &datatypes);
let (pipeline_tx, mut rx) = mpsc::channel(channel_capacity);
let _pipeline_handle = tokio::spawn(
pipeline.run(core_rx, pipeline_tx, shutdown),
);
while let Some(msg) = rx.recv().await {
let _ = sinks.emit_raw(&msg.topic, &msg.payload, msg.received_at_ns);
}
sinks.flush()?;
let report = core_handle.await??;
Ok(DataWorkerReport::from(report))
}
pub async fn run_legacy(
self,
shutdown: watch::Receiver<bool>,
registry: TopicRegistry,
) -> anyhow::Result<DataWorkerReport> {
let channel_capacity = self.channel_capacity;
let (tx, mut rx) = mpsc::channel(channel_capacity);
let core_handle = tokio::spawn(self.core.run(shutdown, tx));
while let Some(msg) = rx.recv().await {
if let Err(e) = registry.publish(&msg.topic, msg.clone()) {
tracing::warn!(
topic = msg.topic.as_str(),
error = %e,
"data_worker.publish_failed"
);
}
}
let report = core_handle.await??;
Ok(DataWorkerReport::from(report))
}
}