atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Shared configuration types for all worker variants.
//!
//! [`CommonWorkerFields`] captures the configuration knobs that every worker
//! needs regardless of whether it synchronises events or not.

use std::time::Duration;

use serde::Deserialize;

use crate::clients::connection_manager::ConnectionManagerConfig;
use crate::config::markets::market_config::DataTypesSection;
use crate::config::markets::market_config::MarketSnapshotConfig;

// ─────────────────────────────────────────────────────────────────────────────
// CommonWorkerFields
// ─────────────────────────────────────────────────────────────────────────────

/// Configuration fields shared by both [`DataWorker`](crate::workers::DataWorker) and [`MarketWorker`](crate::workers::MarketWorker).
///
/// These are the *only* fields needed to stand up an ingestion pipeline:
/// which exchange, which symbol, which datatypes, and tuning knobs for
/// reconnection / health monitoring.
#[derive(Debug, Clone, Deserialize)]
pub struct CommonWorkerFields {
    /// Exchange identifier (e.g. `"bybit"`, `"coinbase"`, `"kraken"`).
    pub exchange: String,
    /// Trading pair (e.g. `"BTCUSDT"`, `"BTC-USD"`).
    pub symbol: String,
    /// Which data feeds to subscribe to.
    pub datatypes: DataTypesSection,
    /// Broadcast channel capacity per topic (default: 8192).
    #[serde(default)]
    pub channel_capacity: Option<usize>,
    /// Staleness timeout in seconds (default: 60).
    #[serde(default)]
    pub staleness_timeout_secs: Option<u64>,
    /// Gap detection silence threshold in seconds (default: 5).
    #[serde(default)]
    pub gap_threshold_secs: Option<u64>,
    /// Reconnection tuning knobs.
    #[serde(default)]
    pub reconnect: Option<ReconnectSection>,
}

/// TOML-exposed reconnection knobs.
///
/// Maps directly to [`ConnectionManagerConfig`] at runtime.
#[derive(Debug, Clone, Deserialize)]
pub struct ReconnectSection {
    /// Base delay before the first retry (ms). Default: 100.
    #[serde(default)]
    pub initial_delay_ms: Option<u64>,
    /// Upper bound on exponential backoff (ms). Default: 10_000.
    #[serde(default)]
    pub max_delay_ms: Option<u64>,
    /// Maximum consecutive failures before circuit breaker opens.
    /// `None` = infinite retries.
    #[serde(default)]
    pub max_attempts: Option<u32>,
    /// Fraction of the base delay added as uniform random jitter.
    /// Default: 0.5.
    #[serde(default)]
    pub jitter_factor: Option<f64>,
}

impl ReconnectSection {
    /// Convert to a [`ConnectionManagerConfig`], filling defaults for
    /// any unset fields.
    pub fn to_connection_manager_config(&self) -> ConnectionManagerConfig {
        let defaults = ConnectionManagerConfig::default();
        ConnectionManagerConfig {
            initial_delay: self
                .initial_delay_ms
                .map(Duration::from_millis)
                .unwrap_or(defaults.initial_delay),
            max_delay: self
                .max_delay_ms
                .map(Duration::from_millis)
                .unwrap_or(defaults.max_delay),
            max_attempts: self.max_attempts.or(defaults.max_attempts),
            jitter_factor: self.jitter_factor.unwrap_or(defaults.jitter_factor),
        }
    }
}

impl CommonWorkerFields {
    /// Resolved channel capacity with fallback to the library default.
    pub fn channel_capacity(&self) -> usize {
        self.channel_capacity.unwrap_or(crate::workers::topic_publisher::DEFAULT_CHANNEL_CAPACITY)
    }

    /// Resolved staleness timeout with fallback to 60 s.
    pub fn staleness_timeout(&self) -> Duration {
        Duration::from_secs(self.staleness_timeout_secs.unwrap_or(60))
    }

    /// Resolved gap detection threshold with fallback to 5 s.
    pub fn gap_threshold(&self) -> Duration {
        Duration::from_secs(self.gap_threshold_secs.unwrap_or(5))
    }

    /// Resolved reconnection config with fallback to library defaults.
    pub fn reconnect_config(&self) -> ConnectionManagerConfig {
        self.reconnect
            .as_ref()
            .map(|r| r.to_connection_manager_config())
            .unwrap_or_default()
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// From<MarketSnapshotConfig>
// ─────────────────────────────────────────────────────────────────────────────

impl From<&MarketSnapshotConfig> for CommonWorkerFields {
    /// Convert a legacy [`MarketSnapshotConfig`] into shared worker fields.
    ///
    /// All optional tuning knobs default to `None` (= library defaults).
    fn from(cfg: &MarketSnapshotConfig) -> Self {
        Self {
            exchange: cfg.exchange.name.clone(),
            symbol: cfg.symbol.name.clone(),
            datatypes: cfg.datatypes.clone(),
            channel_capacity: None,
            staleness_timeout_secs: None,
            gap_threshold_secs: None,
            reconnect: None,
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// OutputSinkConfig
// ─────────────────────────────────────────────────────────────────────────────

/// Describes a single output sink, deserializable from TOML.
///
/// Multiple sinks can be active simultaneously — events are fanned out
/// to every configured sink.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum OutputSinkConfig {
    /// Publish to in-memory broadcast channels (existing `TopicRegistry`).
    Channel,
    /// Print events to the terminal via `tracing::debug!`.
    Terminal,
    /// Write events to local Parquet files.
    Parquet {
        /// Base directory for Parquet output.
        dir: String,
    },
}