use std::path::PathBuf;
use std::time::Duration;
use clap::Parser;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing_subscriber::EnvFilter;
use atelier_data::config::workers::MarketWorkerManifest;
use atelier_data::workers::market_worker::MarketWorker;
#[derive(Parser, Debug)]
#[command(name = "market_worker", version, about)]
struct Cli {
#[arg(short, long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.with_target(true)
.with_thread_ids(false)
.init();
let cli = Cli::parse();
let manifest = MarketWorkerManifest::from_toml(&cli.config)?;
let configs = manifest.resolve_all();
if configs.is_empty() {
anyhow::bail!("manifest contains no worker entries");
}
tracing::info!(
workers = configs.len(),
config = %cli.config.display(),
"market_worker.starting"
);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
tracing::info!("market_worker.shutdown_requested");
let _ = shutdown_tx_clone.send(true);
});
if let Some(dur_secs) = manifest.duration_secs() {
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs_f64(dur_secs)).await;
tracing::info!(
duration_secs = dur_secs,
"market_worker.session_duration_elapsed"
);
let _ = shutdown_tx_clone.send(true);
});
}
let mut join_set = JoinSet::new();
for (i, cfg) in configs.into_iter().enumerate() {
let rx = shutdown_rx.clone();
let label = format!("{}:{}", cfg.common.exchange, cfg.common.symbol);
let worker = MarketWorker::from_config(cfg)?;
tracing::info!(
worker = i,
label = %label,
"market_worker.spawning_worker"
);
if i > 0 {
tokio::time::sleep(Duration::from_millis(250)).await;
}
join_set.spawn(async move { worker.run(rx).await });
}
let mut total_snapshots: u64 = 0;
while let Some(result) = join_set.join_next().await {
match result {
Ok(Ok(report)) => {
tracing::info!(
exchange = %report.ingestion.exchange,
symbol = %report.ingestion.symbol,
total_events = report.ingestion.total_events,
snapshots = report.snapshots_produced,
flushes = report.flushes,
elapsed_secs = format!("{:.1}", report.ingestion.elapsed_secs),
reconnects = report.ingestion.reconnect_count,
"market_worker.worker_finished"
);
total_snapshots += report.snapshots_produced;
}
Ok(Err(e)) => {
tracing::error!(error = %e, "market_worker.worker_error");
}
Err(e) => {
tracing::error!(error = %e, "market_worker.worker_panic");
}
}
}
tracing::info!(
total_snapshots = total_snapshots,
"market_worker.all_workers_finished"
);
Ok(())
}