use serde::Deserialize;
use std::path::Path;
use super::common::{CommonWorkerFields, OutputSinkConfig, ReconnectSection};
use crate::config::markets::market_config::{DataTypesSection, MarketSnapshotConfig};
#[derive(Debug, Clone)]
pub struct DataWorkerConfig {
pub common: CommonWorkerFields,
pub output: Vec<OutputSinkConfig>,
}
impl DataWorkerConfig {
pub fn from_legacy(cfg: &MarketSnapshotConfig) -> Self {
Self {
common: CommonWorkerFields::from(cfg),
output: vec![OutputSinkConfig::Channel],
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerManifest {
pub collect: DataWorkerCollect,
pub workers: Vec<DataWorkerEntry>,
#[serde(default)]
pub session: SessionSection,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerCollect {
pub exchange: String,
pub datatypes: DataTypesSection,
#[serde(default = "default_output_sinks")]
pub output: Vec<OutputSinkConfig>,
#[serde(default)]
pub channel_capacity: Option<usize>,
#[serde(default)]
pub staleness_timeout_secs: Option<u64>,
#[serde(default)]
pub gap_threshold_secs: Option<u64>,
#[serde(default)]
pub reconnect: Option<ReconnectSection>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DataWorkerEntry {
pub symbol: String,
#[serde(default)]
pub exchange: Option<String>,
#[serde(default)]
pub datatypes: Option<DataTypesSection>,
#[serde(default)]
pub output: Option<Vec<OutputSinkConfig>>,
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct SessionSection {
pub duration_hours: Option<f64>,
}
impl SessionSection {
pub fn duration_secs(&self) -> Option<f64> {
self.duration_hours.map(|h| h * 3600.0)
}
}
fn default_output_sinks() -> Vec<OutputSinkConfig> {
vec![OutputSinkConfig::Channel]
}
impl DataWorkerManifest {
pub fn from_toml(path: &Path) -> anyhow::Result<Self> {
let contents = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("failed to read {:?}: {}", path, e))?;
let manifest: Self = toml::from_str(&contents)
.map_err(|e| anyhow::anyhow!("failed to parse {:?}: {}", path, e))?;
Ok(manifest)
}
pub fn resolve_all(&self) -> Vec<DataWorkerConfig> {
self.workers
.iter()
.map(|w| self.resolve_entry(w))
.collect()
}
pub fn duration_secs(&self) -> Option<f64> {
self.session.duration_secs()
}
fn resolve_entry(&self, entry: &DataWorkerEntry) -> DataWorkerConfig {
let d = &self.collect;
DataWorkerConfig {
common: CommonWorkerFields {
exchange: entry.exchange.clone().unwrap_or_else(|| d.exchange.clone()),
symbol: entry.symbol.clone(),
datatypes: entry.datatypes.clone().unwrap_or_else(|| d.datatypes.clone()),
channel_capacity: d.channel_capacity,
staleness_timeout_secs: d.staleness_timeout_secs,
gap_threshold_secs: d.gap_threshold_secs,
reconnect: d.reconnect.clone(),
},
output: entry
.output
.clone()
.unwrap_or_else(|| d.output.clone()),
}
}
}