atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Lea  ingestion-only data worker.
//!
//! `DataWorker` is the core runtime for the `data_worker` binary.
//! It establishes a persistent WebSocket connection to a single exchange
//! + symbol pair, decodes raw frames through the existing exchange
//! decoders, and publishes them — **without any pre-processing** — to
//! configured output sinks.
//!
//! # Design
//!
//! | Concern | Approach |
//! |---------|----------|
//! | Output | Pluggable `OutputSink`s (channel, terminal, parquet) |
//! | Processing | **None** — raw decoded events |
//! | Reconnection | Delegated to `IngestionCore` |
//! | Gap tracking | Delegated to `IngestionCore` |
//! | Stale detection | Delegated to `IngestionCore` |
//!
//! # Event classification
//!
//! Each decoded `ExchangeEvent` variant is mapped to a canonical topic
//! name.  The mapping is exchange-specific:
//!
//! | Exchange | Event | Topic |
//! |----------|-------|-------|
//! | Bybit | `OrderbookData` | `orderbook.{depth}.{symbol}` |
//! | Bybit | `TradeData` | `trade.all.{symbol}` |
//! | Bybit | `LiquidationData` | `liquidation.all.{symbol}` |
//! | Bybit | `TickerData` (funding) | `funding.all.{symbol}` |
//! | Bybit | `TickerData` (OI) | `open_interest.all.{symbol}` |
//! | Coinbase | `OrderbookData` | `orderbook.{depth}.{symbol}` |
//! | Coinbase | `TradeData` | `trade.all.{symbol}` |
//! | Kraken | `OrderbookData` | `orderbook.{depth}.{symbol}` |
//! | Kraken | `TradeData` | `trade.all.{symbol}` |

use tokio::sync::mpsc;
use tokio::sync::watch;

use crate::config::MarketSnapshotConfig;
use crate::config::workers::{CommonWorkerFields, DataWorkerConfig, OutputSinkConfig};

use super::ingestion_core::{IngestionCore, IngestionReport};
use super::output::{build_sinks, OutputSinkSet};
use super::topic_publisher::TopicRegistry;
use super::gap_detector::GapStats;

// ─────────────────────────────────────────────────────────────────────────────
// DataWorkerReport
// ─────────────────────────────────────────────────────────────────────────────

/// Summary statistics returned when a [`DataWorker`] finishes.
#[derive(Debug, Clone)]
pub struct DataWorkerReport {
    /// Exchange name (e.g. `"bybit"`).
    pub exchange: String,
    /// Trading pair (e.g. `"BTCUSDT"`).
    pub symbol: String,
    /// Total raw events published across all topics.
    pub total_events: u64,
    /// Per-topic event counts.
    pub events_by_topic: Vec<(String, u64)>,
    /// Wall-clock seconds the worker was active.
    pub elapsed_secs: f64,
    /// `total_events / elapsed_secs`.
    pub effective_rate: f64,
    /// Number of reconnection attempts.
    pub reconnect_count: u32,
    /// Per-topic gap statistics.
    pub gap_stats: Vec<GapStats>,
    /// Non-fatal errors logged during the run.
    pub errors: Vec<String>,
}

impl From<IngestionReport> for DataWorkerReport {
    fn from(r: IngestionReport) -> Self {
        Self {
            exchange: r.exchange,
            symbol: r.symbol,
            total_events: r.total_events,
            events_by_topic: r.events_by_topic,
            elapsed_secs: r.elapsed_secs,
            effective_rate: r.effective_rate,
            reconnect_count: r.reconnect_count,
            gap_stats: r.gap_stats,
            errors: r.errors,
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// DataWorker
// ─────────────────────────────────────────────────────────────────────────────

/// A single-symbol raw data ingestion worker.
///
/// Connects to the configured exchange, decodes WebSocket events, and
/// publishes them without pre-processing to the configured output sinks.
///
/// Composed from an [`IngestionCore`] (handles connection lifecycle) and
/// an [`OutputSinkSet`] (handles event delivery).
pub struct DataWorker {
    /// The ingestion engine.
    core: IngestionCore,
    /// Fan-out output sinks.
    sinks: OutputSinkSet,
    /// Channel capacity for the core → worker pipe.
    channel_capacity: usize,
}

impl DataWorker {
    /// Create a DataWorker from a [`DataWorkerConfig`].
    ///
    /// The registry is built internally and wrapped in a `ChannelSink`
    /// if the config requests channel output.
    pub fn from_config(config: DataWorkerConfig) -> anyhow::Result<Self> {
        let channel_capacity = config.common.channel_capacity();
        let core = IngestionCore::new(config.common.clone())?;

        // Build registry for channel sink (if requested).
        let has_channel = config
            .output
            .iter()
            .any(|s| matches!(s, OutputSinkConfig::Channel));

        let registry = if has_channel {
            Some(TopicRegistry::from_config(
                &config.common.exchange,
                &config.common.symbol,
                &config.common.datatypes,
                channel_capacity,
            ))
        } else {
            None
        };

        let sinks = build_sinks(&config.output, registry);

        Ok(Self {
            core,
            sinks,
            channel_capacity,
        })
    }

    /// Create a DataWorker from a legacy [`MarketSnapshotConfig`].
    ///
    /// Uses channel-only output for backward compatibility.
    /// The registry is returned separately via `build_registry()`.
    pub fn new(config: MarketSnapshotConfig) -> Self {
        let common = CommonWorkerFields::from(&config);
        let channel_capacity = common.channel_capacity();
        let core = IngestionCore::new(common)
            .expect("legacy MarketSnapshotConfig should always produce a valid IngestionCore");

        // Don't build sinks here — legacy callers use build_registry() + run(rx, registry).
        Self {
            core,
            sinks: OutputSinkSet::new(),
            channel_capacity,
        }
    }

    /// Build a topic registry for downstream subscription.
    ///
    /// This is the legacy API — callers build the registry, subscribe
    /// to topics, then pass it to `run_legacy()`.
    pub fn build_registry(&self) -> TopicRegistry {
        TopicRegistry::from_config(
            self.core.exchange_name(),
            self.core.symbol(),
            self.core.datatypes(),
            self.channel_capacity,
        )
    }

    /// Run the ingestion loop (new API with built-in sinks).
    ///
    /// Returns a [`DataWorkerReport`] with session statistics.
    pub async fn run(
        self,
        shutdown: watch::Receiver<bool>,
    ) -> anyhow::Result<DataWorkerReport> {
        let channel_capacity = self.channel_capacity;

        // Pipeline: passthrough for most exchanges, BookInitializer for Binance.
        let exchange_enum: crate::exchanges::Exchange = self.core.exchange_name()
            .parse()
            .unwrap_or(crate::exchanges::Exchange::Bybit);
        let datatypes = self.core.datatypes().clone();
        let symbol = self.core.symbol().to_string();

        let (core_tx, core_rx) = mpsc::channel(channel_capacity);
        let sinks = self.sinks;
        let core_handle = tokio::spawn(self.core.run(shutdown.clone(), core_tx));

        let pipeline = crate::workers::pipeline::build_pipeline(exchange_enum, &symbol, &datatypes);
        let (pipeline_tx, mut rx) = mpsc::channel(channel_capacity);
        let _pipeline_handle = tokio::spawn(
            pipeline.run(core_rx, pipeline_tx, shutdown),
        );

        // Fan out events to sinks.
        while let Some(msg) = rx.recv().await {
            // Best-effort: log errors but don't abort.
            let _ = sinks.emit_raw(&msg.topic, &msg.payload, msg.received_at_ns);
        }

        sinks.flush()?;

        let report = core_handle.await??;
        Ok(DataWorkerReport::from(report))
    }

    /// Run the ingestion loop (legacy API with external registry).
    ///
    /// This preserves the original `DataWorker::run(shutdown, registry)`
    /// signature for backward compatibility with existing binaries.
    pub async fn run_legacy(
        self,
        shutdown: watch::Receiver<bool>,
        registry: TopicRegistry,
    ) -> anyhow::Result<DataWorkerReport> {
        let channel_capacity = self.channel_capacity;
        let (tx, mut rx) = mpsc::channel(channel_capacity);

        let core_handle = tokio::spawn(self.core.run(shutdown, tx));

        // Publish to the legacy registry directly.
        while let Some(msg) = rx.recv().await {
            if let Err(e) = registry.publish(&msg.topic, msg.clone()) {
                tracing::warn!(
                    topic = msg.topic.as_str(),
                    error = %e,
                    "data_worker.publish_failed"
                );
            }
        }

        let report = core_handle.await??;
        Ok(DataWorkerReport::from(report))
    }
}