atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Configuration for [`DataWorker`](crate::workers::data_worker::DataWorker).
//!
//! [`DataWorkerConfig`] is the lean, DataWorker-specific config — no
//! synchronisation fields, no Parquet flush cadence, no grid spacing.
//! Just exchange + symbol + datatypes + output sinks.
//!
//! [`DataWorkerManifest`] is the multi-worker TOML manifest that resolves
//! into a `Vec<DataWorkerConfig>`.

use serde::Deserialize;
use std::path::Path;

use super::common::{CommonWorkerFields, OutputSinkConfig, ReconnectSection};
use crate::config::markets::market_config::{DataTypesSection, MarketSnapshotConfig};

// ─────────────────────────────────────────────────────────────────────────────
// DataWorkerConfig — per-worker resolved config
// ─────────────────────────────────────────────────────────────────────────────

/// Fully-resolved configuration for a single [`DataWorker`](crate::workers::data_worker::DataWorker).
///
/// Created by [`DataWorkerManifest::resolve_all()`] or constructed
/// directly for library / test usage.
#[derive(Debug, Clone)]
pub struct DataWorkerConfig {
    /// Shared worker fields (exchange, symbol, datatypes, tuning).
    pub common: CommonWorkerFields,
    /// Output sinks to fan events into.
    pub output: Vec<OutputSinkConfig>,
}

impl DataWorkerConfig {
    /// Create from a legacy [`MarketSnapshotConfig`] with a channel-only
    /// sink.  Used for backward compatibility.
    pub fn from_legacy(cfg: &MarketSnapshotConfig) -> Self {
        Self {
            common: CommonWorkerFields::from(cfg),
            output: vec![OutputSinkConfig::Channel],
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// DataWorkerManifest — multi-worker TOML manifest
// ─────────────────────────────────────────────────────────────────────────────

/// Top-level TOML manifest for spawning multiple [`DataWorker`](crate::workers::data_worker::DataWorker)s.
///
/// # Example TOML
///
/// ```toml
/// [collect]
/// exchange = "bybit"
///
/// [collect.datatypes.orderbook]
/// enabled = true
/// depth   = 50
///
/// [collect.datatypes.trades]
/// enabled = true
///
/// [[collect.output]]
/// type = "channel"
///
/// [[workers]]
/// symbol = "BTCUSDT"
///
/// [[workers]]
/// symbol = "SOLUSDT"
///
/// [session]
/// duration_hours = 8
/// ```
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerManifest {
    /// Shared collect applied to all workers.
    pub collect: DataWorkerCollect,
    /// Per-symbol worker definitions.
    pub workers: Vec<DataWorkerEntry>,
    /// Session parameters (duration, etc.).
    #[serde(default)]
    pub session: SessionSection,
}

/// Shared collect section in a [`DataWorkerManifest`].
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerCollect {
    /// Default exchange for all workers.
    pub exchange: String,
    /// Which data feeds to subscribe to.
    pub datatypes: DataTypesSection,
    /// Output sinks.
    #[serde(default = "default_output_sinks")]
    pub output: Vec<OutputSinkConfig>,
    /// Broadcast channel capacity per topic.
    #[serde(default)]
    pub channel_capacity: Option<usize>,
    /// Staleness timeout in seconds.
    #[serde(default)]
    pub staleness_timeout_secs: Option<u64>,
    /// Gap detection silence threshold in seconds.
    #[serde(default)]
    pub gap_threshold_secs: Option<u64>,
    /// Reconnection tuning.
    #[serde(default)]
    pub reconnect: Option<ReconnectSection>,
}

/// A single worker entry — identifies a symbol (exchange comes from collect).
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerEntry {
    /// Trading pair (e.g. `"BTCUSDT"`).
    pub symbol: String,
    /// Override the default exchange for this worker.
    #[serde(default)]
    pub exchange: Option<String>,
    /// Override datatypes for this worker.
    #[serde(default)]
    pub datatypes: Option<DataTypesSection>,
    /// Override output sinks for this worker.
    #[serde(default)]
    pub output: Option<Vec<OutputSinkConfig>>,
}

/// Session parameters (shared with MarketWorkerManifest).
#[derive(Debug, Clone, Deserialize, Default)]
pub struct SessionSection {
    /// Total collection duration in hours.  `None` = run until Ctrl-C.
    pub duration_hours: Option<f64>,
}

impl SessionSection {
    /// Session duration in seconds, if specified.
    pub fn duration_secs(&self) -> Option<f64> {
        self.duration_hours.map(|h| h * 3600.0)
    }
}

fn default_output_sinks() -> Vec<OutputSinkConfig> {
    vec![OutputSinkConfig::Channel]
}

impl DataWorkerManifest {
    /// Load and parse from a TOML file.
    pub fn from_toml(path: &Path) -> anyhow::Result<Self> {
        let contents = std::fs::read_to_string(path)
            .map_err(|e| anyhow::anyhow!("failed to read {:?}: {}", path, e))?;
        let manifest: Self = toml::from_str(&contents)
            .map_err(|e| anyhow::anyhow!("failed to parse {:?}: {}", path, e))?;
        Ok(manifest)
    }

    /// Resolve every [`DataWorkerEntry`] into a fully-specified
    /// [`DataWorkerConfig`] by merging with shared collect.
    pub fn resolve_all(&self) -> Vec<DataWorkerConfig> {
        self.workers
            .iter()
            .map(|w| self.resolve_entry(w))
            .collect()
    }

    /// Session duration in seconds, if specified.
    pub fn duration_secs(&self) -> Option<f64> {
        self.session.duration_secs()
    }

    fn resolve_entry(&self, entry: &DataWorkerEntry) -> DataWorkerConfig {
        let d = &self.collect;
        DataWorkerConfig {
            common: CommonWorkerFields {
                exchange: entry.exchange.clone().unwrap_or_else(|| d.exchange.clone()),
                symbol: entry.symbol.clone(),
                datatypes: entry.datatypes.clone().unwrap_or_else(|| d.datatypes.clone()),
                channel_capacity: d.channel_capacity,
                staleness_timeout_secs: d.staleness_timeout_secs,
                gap_threshold_secs: d.gap_threshold_secs,
                reconnect: d.reconnect.clone(),
            },
            output: entry
                .output
                .clone()
                .unwrap_or_else(|| d.output.clone()),
        }
    }
}