use serde::Deserialize;
use std::path::{Path, PathBuf};
use crate::synchronizers::ClockMode;
#[derive(Debug, Clone, Deserialize)]
pub struct MarketSnapshotConfig {
pub exchange: ExchangeSection,
pub symbol: SymbolSection,
pub update_frequency: UpdateFrequency,
pub pipeline: PipelineSection,
pub datatypes: DataTypesSection,
pub output: OutputSection,
pub logs: LogsSection,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExchangeSection {
pub name: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct SymbolSection {
pub name: String,
pub sync_mode: SyncMode,
}
#[derive(Debug, Clone, Deserialize)]
pub struct UpdateFrequency {
pub value: u64,
pub unit: TimeUnit,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PipelineSection {
pub flush_threshold: usize,
}
#[derive(Debug, Clone, Deserialize)]
pub struct DataTypesSection {
#[serde(default)]
pub orderbook: OrderbookConfig,
#[serde(default)]
pub trades: FeedToggle,
#[serde(default)]
pub liquidations: FeedToggle,
#[serde(default)]
pub funding_rates: FeedToggle,
#[serde(default)]
pub open_interest: FeedToggle,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OrderbookConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_ob_depth")]
pub depth: usize,
}
impl Default for OrderbookConfig {
fn default() -> Self {
Self {
enabled: false,
depth: default_ob_depth(),
}
}
}
#[derive(Debug, Clone, Deserialize, Default)]
pub struct FeedToggle {
#[serde(default)]
pub enabled: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LogsSection {
#[serde(default)]
pub n_orderbooks: usize,
#[serde(default)]
pub n_trades: usize,
#[serde(default)]
pub n_liquidations: usize,
#[serde(default)]
pub n_fundings: usize,
#[serde(default)]
pub n_open_interests: usize,
}
#[derive(Debug, Clone, Deserialize)]
pub struct OutputSection {
pub dir: String,
}
fn default_ob_depth() -> usize {
50
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum SyncMode {
#[serde(rename = "on_orderbook")]
OnOrderbook,
#[serde(rename = "on_trade")]
OnTrade,
#[serde(rename = "on_liquidation")]
OnLiquidation,
#[serde(rename = "on_time")]
OnTime,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum TimeUnit {
Nanos,
Micros,
Millis,
Secs,
}
impl MarketSnapshotConfig {
pub fn from_toml(path: &Path) -> anyhow::Result<Self> {
let contents = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("failed to read {:?}: {}", path, e))?;
let config: Self = toml::from_str(&contents)
.map_err(|e| anyhow::anyhow!("failed to parse {:?}: {}", path, e))?;
Ok(config)
}
pub fn period_ns(&self) -> u64 {
let v = self.update_frequency.value;
match self.update_frequency.unit {
TimeUnit::Nanos => v,
TimeUnit::Micros => v * 1_000,
TimeUnit::Millis => v * 1_000_000,
TimeUnit::Secs => v * 1_000_000_000,
}
}
pub fn flush_interval_ns(&self) -> u64 {
self.period_ns() * self.pipeline.flush_threshold as u64
}
pub fn clock_mode(&self) -> ClockMode {
match self.symbol.sync_mode {
SyncMode::OnOrderbook => ClockMode::OrderbookDriven,
SyncMode::OnTrade => ClockMode::TradeDriven,
SyncMode::OnLiquidation => ClockMode::LiquidationDriven,
SyncMode::OnTime => ClockMode::ExternalClock,
}
}
pub fn clock_mode_label(&self) -> &'static str {
match self.symbol.sync_mode {
SyncMode::OnOrderbook => "ClockMode::OrderbookDriven",
SyncMode::OnTrade => "ClockMode::TradeDriven",
SyncMode::OnLiquidation => "ClockMode::LiquidationDriven",
SyncMode::OnTime => "ClockMode::ExternalClock",
}
}
pub fn wss_streams(&self) -> Vec<String> {
match self.exchange.name.to_lowercase().as_str() {
"bybit" => self.bybit_wss_streams(),
"coinbase" => self.coinbase_wss_channels(),
"kraken" => self.kraken_wss_channels(),
"binance" => self.binance_wss_streams(),
other => {
tracing::warn!("Unknown exchange '{}'; returning empty streams", other);
vec![]
}
}
}
fn bybit_wss_streams(&self) -> Vec<String> {
let sym = &self.symbol.name;
let mut streams = Vec::new();
if self.datatypes.orderbook.enabled {
streams.push(format!(
"orderbook.{}.{}",
self.datatypes.orderbook.depth, sym,
));
}
if self.datatypes.trades.enabled {
streams.push(format!("publicTrade.{}", sym));
}
if self.datatypes.liquidations.enabled {
streams.push(format!("allLiquidation.{}", sym));
}
if self.datatypes.funding_rates.enabled || self.datatypes.open_interest.enabled {
streams.push(format!("tickers.{}", sym));
}
streams
}
fn coinbase_wss_channels(&self) -> Vec<String> {
let mut channels = Vec::new();
if self.datatypes.orderbook.enabled {
channels.push("level2".to_string());
}
if self.datatypes.trades.enabled {
channels.push("market_trades".to_string());
}
channels
}
fn kraken_wss_channels(&self) -> Vec<String> {
let mut channels = Vec::new();
if self.datatypes.orderbook.enabled {
channels.push("book".to_string());
}
if self.datatypes.trades.enabled {
channels.push("trade".to_string());
}
channels
}
fn binance_wss_streams(&self) -> Vec<String> {
let sym = self.symbol.name.to_lowercase();
let mut streams = Vec::new();
if self.datatypes.orderbook.enabled {
streams.push(format!("{}@depth@100ms", sym));
}
if self.datatypes.trades.enabled {
streams.push(format!("{}@trade", sym));
}
streams
}
pub fn output_dir(&self) -> PathBuf {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let workspace_root = Path::new(manifest_dir)
.parent()
.expect("failed to resolve workspace root");
workspace_root.join(&self.output.dir)
}
}
impl std::fmt::Display for SyncMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OnOrderbook => write!(f, "on_orderbook"),
Self::OnTrade => write!(f, "on_trade"),
Self::OnLiquidation => write!(f, "on_liquidation"),
Self::OnTime => write!(f, "on_time"),
}
}
}
impl std::fmt::Display for TimeUnit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Nanos => write!(f, "ns"),
Self::Micros => write!(f, "µs"),
Self::Millis => write!(f, "ms"),
Self::Secs => write!(f, "s"),
}
}
}