use std::time::Duration;
use serde::Deserialize;
use crate::clients::connection_manager::ConnectionManagerConfig;
use crate::config::markets::market_config::DataTypesSection;
use crate::config::markets::market_config::MarketSnapshotConfig;
#[derive(Debug, Clone, Deserialize)]
pub struct CommonWorkerFields {
pub exchange: String,
pub symbol: String,
pub datatypes: DataTypesSection,
#[serde(default)]
pub channel_capacity: Option<usize>,
#[serde(default)]
pub staleness_timeout_secs: Option<u64>,
#[serde(default)]
pub gap_threshold_secs: Option<u64>,
#[serde(default)]
pub reconnect: Option<ReconnectSection>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ReconnectSection {
#[serde(default)]
pub initial_delay_ms: Option<u64>,
#[serde(default)]
pub max_delay_ms: Option<u64>,
#[serde(default)]
pub max_attempts: Option<u32>,
#[serde(default)]
pub jitter_factor: Option<f64>,
}
impl ReconnectSection {
pub fn to_connection_manager_config(&self) -> ConnectionManagerConfig {
let defaults = ConnectionManagerConfig::default();
ConnectionManagerConfig {
initial_delay: self
.initial_delay_ms
.map(Duration::from_millis)
.unwrap_or(defaults.initial_delay),
max_delay: self
.max_delay_ms
.map(Duration::from_millis)
.unwrap_or(defaults.max_delay),
max_attempts: self.max_attempts.or(defaults.max_attempts),
jitter_factor: self.jitter_factor.unwrap_or(defaults.jitter_factor),
}
}
}
impl CommonWorkerFields {
pub fn channel_capacity(&self) -> usize {
self.channel_capacity.unwrap_or(crate::workers::topic_publisher::DEFAULT_CHANNEL_CAPACITY)
}
pub fn staleness_timeout(&self) -> Duration {
Duration::from_secs(self.staleness_timeout_secs.unwrap_or(60))
}
pub fn gap_threshold(&self) -> Duration {
Duration::from_secs(self.gap_threshold_secs.unwrap_or(5))
}
pub fn reconnect_config(&self) -> ConnectionManagerConfig {
self.reconnect
.as_ref()
.map(|r| r.to_connection_manager_config())
.unwrap_or_default()
}
}
impl From<&MarketSnapshotConfig> for CommonWorkerFields {
fn from(cfg: &MarketSnapshotConfig) -> Self {
Self {
exchange: cfg.exchange.name.clone(),
symbol: cfg.symbol.name.clone(),
datatypes: cfg.datatypes.clone(),
channel_capacity: None,
staleness_timeout_secs: None,
gap_threshold_secs: None,
reconnect: None,
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum OutputSinkConfig {
Channel,
Terminal,
Parquet {
dir: String,
},
}