1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5 run_live, ExecutionBackend, LiveSessionSettings, PersistenceBackend, PersistenceSettings,
6};
7use crate::state;
8use crate::telemetry::init_tracing;
9use crate::tui;
10use crate::PublicChannel;
11use arrow::util::pretty::print_batches;
12use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
13use std::collections::HashMap;
14use std::fs::{self, File};
15use std::io::{BufRead, BufReader};
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Duration as StdDuration;
21
22use anyhow::{anyhow, bail, Context, Result};
23use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
24use clap::{Args, Parser, Subcommand, ValueEnum};
25use csv::Writer;
26use futures::StreamExt;
27use rust_decimal::{
28 prelude::{FromPrimitive, ToPrimitive},
29 Decimal,
30};
31use serde::{Deserialize, Serialize};
32use tesser_backtester::reporting::PerformanceReport;
33use tesser_backtester::{
34 stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
35 MarketEventKind, MarketEventStream,
36};
37use tesser_broker::ExecutionClient;
38use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
39use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
40use tesser_data::analytics::ExecutionAnalysisRequest;
41use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
42use tesser_data::io::{self, DatasetFormat as IoDatasetFormat};
43use tesser_data::merger::UnifiedEventStream;
44use tesser_data::parquet::ParquetMarketStream;
45use tesser_data::transform::Resampler;
46use tesser_execution::{
47 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
48 RiskAdjustedSizer,
49};
50use tesser_markets::MarketRegistry;
51use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
52use tesser_strategy::{builtin_strategy_names, load_strategy};
53use tracing::{info, warn};
54
55#[derive(Parser)]
56#[command(author, version, about = "Tesser Trading Framework")]
57pub struct Cli {
58 #[arg(short, long, action = clap::ArgAction::Count)]
60 verbose: u8,
61 #[arg(long, default_value = "default")]
63 env: String,
64 #[command(subcommand)]
65 command: Commands,
66}
67
68#[allow(clippy::large_enum_variant)]
69#[derive(Subcommand)]
70pub enum Commands {
71 Data {
73 #[command(subcommand)]
74 action: DataCommand,
75 },
76 Backtest {
78 #[command(subcommand)]
79 action: BacktestCommand,
80 },
81 Live {
83 #[command(subcommand)]
84 action: LiveCommand,
85 },
86 State {
88 #[command(subcommand)]
89 action: StateCommand,
90 },
91 Strategies,
93 Analyze {
95 #[command(subcommand)]
96 action: AnalyzeCommand,
97 },
98 Monitor(MonitorArgs),
100}
101
102#[derive(Subcommand)]
103pub enum DataCommand {
104 Download(DataDownloadArgs),
106 Validate(DataValidateArgs),
108 Resample(DataResampleArgs),
110 InspectParquet(DataInspectParquetArgs),
112}
113
114#[derive(Subcommand)]
115pub enum BacktestCommand {
116 Run(BacktestRunArgs),
118 Batch(BacktestBatchArgs),
120}
121
122#[derive(Subcommand)]
123pub enum LiveCommand {
124 Run(LiveRunArgs),
126}
127
128#[derive(Subcommand)]
129pub enum StateCommand {
130 Inspect(StateInspectArgs),
132}
133
134#[derive(Subcommand)]
135pub enum AnalyzeCommand {
136 Execution(AnalyzeExecutionArgs),
138}
139
140#[derive(Args)]
141pub struct DataDownloadArgs {
142 #[arg(long, default_value = "bybit")]
143 exchange: String,
144 #[arg(long)]
145 symbol: String,
146 #[arg(long, default_value = "linear")]
147 category: String,
148 #[arg(long, default_value = "1m")]
149 interval: String,
150 #[arg(long)]
151 start: String,
152 #[arg(long)]
153 end: Option<String>,
154 #[arg(long)]
155 output: Option<PathBuf>,
156 #[arg(long)]
158 skip_validation: bool,
159 #[arg(long)]
161 repair_missing: bool,
162 #[arg(long, default_value_t = 0.05)]
164 validation_jump_threshold: f64,
165 #[arg(long, default_value_t = 0.002)]
167 validation_reference_tolerance: f64,
168}
169
170#[derive(Args)]
171pub struct StateInspectArgs {
172 #[arg(long)]
174 path: Option<PathBuf>,
175 #[arg(long)]
177 raw: bool,
178}
179
180#[derive(Args)]
181pub struct AnalyzeExecutionArgs {
182 #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
184 data_dir: PathBuf,
185 #[arg(long)]
187 start: Option<String>,
188 #[arg(long)]
190 end: Option<String>,
191 #[arg(long, value_name = "PATH")]
193 export_csv: Option<PathBuf>,
194}
195
196#[derive(Args)]
197pub struct MonitorArgs {
198 #[arg(long)]
200 control_addr: Option<String>,
201 #[arg(long, default_value_t = 250)]
203 tick_rate: u64,
204}
205
206impl StateInspectArgs {
207 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
208 self.path
209 .clone()
210 .unwrap_or_else(|| config.live.persistence_config().path.clone())
211 }
212
213 fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
214 config.live.persistence_config().engine
215 }
216}
217
218impl MonitorArgs {
219 async fn run(&self, config: &AppConfig) -> Result<()> {
220 let addr = self
221 .control_addr
222 .clone()
223 .unwrap_or_else(|| config.live.control_addr.clone());
224 let refresh = self.tick_rate.max(50);
225 let monitor_config = tui::MonitorConfig::new(addr, StdDuration::from_millis(refresh));
226 tui::run_monitor(monitor_config).await
227 }
228}
229
230impl AnalyzeExecutionArgs {
231 fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
232 let start = match &self.start {
233 Some(value) => Some(parse_datetime(value)?),
234 None => None,
235 };
236 let end = match &self.end {
237 Some(value) => Some(parse_datetime(value)?),
238 None => None,
239 };
240 Ok(ExecutionAnalysisRequest {
241 data_dir: self.data_dir.clone(),
242 start,
243 end,
244 })
245 }
246}
247
248impl DataDownloadArgs {
249 async fn run(&self, config: &AppConfig) -> Result<()> {
250 let exchange_cfg = config
251 .exchange
252 .get(&self.exchange)
253 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
254 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
255 let start = parse_datetime(&self.start)?;
256 let end = match &self.end {
257 Some(value) => parse_datetime(value)?,
258 None => Utc::now(),
259 };
260 if start >= end {
261 return Err(anyhow!("start time must be earlier than end time"));
262 }
263
264 info!(
265 "Downloading {} candles for {} ({})",
266 self.interval, self.symbol, self.exchange
267 );
268 let mut candles = match exchange_cfg.driver.as_str() {
269 "bybit" | "" => {
270 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
271 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
272 downloader
273 .download_klines(&request)
274 .await
275 .with_context(|| "failed to download candles from Bybit")?
276 }
277 "binance" => {
278 let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
279 let request = KlineRequest::new("", &self.symbol, interval, start, end);
280 downloader
281 .download_klines(&request)
282 .await
283 .with_context(|| "failed to download candles from Binance")?
284 }
285 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
286 };
287
288 if candles.is_empty() {
289 info!("No candles returned for {}", self.symbol);
290 return Ok(());
291 }
292
293 if !self.skip_validation {
294 let config = ValidationConfig {
295 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
296 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
297 repair_missing: self.repair_missing,
298 };
299 let outcome =
300 validate_dataset(candles.clone(), None, config).context("validation failed")?;
301 print_validation_summary(&outcome);
302 if self.repair_missing && outcome.summary.repaired_candles > 0 {
303 candles = outcome.repaired;
304 info!(
305 "Applied {} synthetic candle(s) to repair gaps",
306 outcome.summary.repaired_candles
307 );
308 }
309 }
310
311 let output_path = self.output.clone().unwrap_or_else(|| {
312 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
313 });
314 write_candles_csv(&output_path, &candles)?;
315 info!(
316 "Saved {} candles to {}",
317 candles.len(),
318 output_path.display()
319 );
320 Ok(())
321 }
322}
323
324#[derive(Args)]
325pub struct DataValidateArgs {
326 #[arg(
328 long = "path",
329 value_name = "PATH",
330 num_args = 1..,
331 action = clap::ArgAction::Append
332 )]
333 paths: Vec<PathBuf>,
334 #[arg(
336 long = "reference",
337 value_name = "PATH",
338 num_args = 1..,
339 action = clap::ArgAction::Append
340 )]
341 reference_paths: Vec<PathBuf>,
342 #[arg(long, default_value_t = 0.05)]
344 jump_threshold: f64,
345 #[arg(long, default_value_t = 0.002)]
347 reference_tolerance: f64,
348 #[arg(long)]
350 repair_missing: bool,
351 #[arg(long)]
353 output: Option<PathBuf>,
354}
355
356impl DataValidateArgs {
357 fn run(&self) -> Result<()> {
358 if self.paths.is_empty() {
359 bail!("provide at least one --path for validation");
360 }
361 let candles =
362 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
363 if candles.is_empty() {
364 bail!("loaded dataset is empty; nothing to validate");
365 }
366 let reference = if self.reference_paths.is_empty() {
367 None
368 } else {
369 Some(
370 load_candles_from_paths(&self.reference_paths)
371 .with_context(|| "failed to load reference dataset")?,
372 )
373 };
374
375 let price_jump_threshold = if self.jump_threshold <= 0.0 {
376 0.0001
377 } else {
378 self.jump_threshold
379 };
380 let reference_tolerance = if self.reference_tolerance <= 0.0 {
381 0.0001
382 } else {
383 self.reference_tolerance
384 };
385
386 let config = ValidationConfig {
387 price_jump_threshold,
388 reference_tolerance,
389 repair_missing: self.repair_missing,
390 };
391
392 let outcome = validate_dataset(candles, reference, config)?;
393 print_validation_summary(&outcome);
394
395 if let Some(output) = &self.output {
396 write_candles_csv(output, &outcome.repaired)?;
397 info!(
398 "Wrote {} candles ({} new) to {}",
399 outcome.repaired.len(),
400 outcome.summary.repaired_candles,
401 output.display()
402 );
403 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
404 warn!(
405 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
406 outcome.summary.repaired_candles
407 );
408 }
409
410 Ok(())
411 }
412}
413
414impl DataResampleArgs {
415 fn run(&self) -> Result<()> {
416 if !self.input.exists() {
417 bail!("input file {} not found", self.input.display());
418 }
419 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
420 let dataset = io::read_dataset(&self.input)
421 .with_context(|| format!("failed to read {}", self.input.display()))?;
422 let io::CandleDataset {
423 format: input_format,
424 candles,
425 } = dataset;
426 if candles.is_empty() {
427 bail!(
428 "dataset {} does not contain any candles",
429 self.input.display()
430 );
431 }
432 let input_len = candles.len();
433 let resampled = Resampler::resample(candles, interval);
434 if resampled.is_empty() {
435 bail!(
436 "no candles produced after resampling {}",
437 self.input.display()
438 );
439 }
440 let output_format = self
441 .format
442 .map(IoDatasetFormat::from)
443 .unwrap_or_else(|| IoDatasetFormat::from_path(&self.output));
444 io::write_dataset(&self.output, output_format, &resampled)
445 .with_context(|| format!("failed to write {}", self.output.display()))?;
446 info!(
447 "Resampled {} -> {} candles (input {:?} -> output {:?}) to {}",
448 input_len,
449 resampled.len(),
450 input_format,
451 output_format,
452 self.output.display()
453 );
454 Ok(())
455 }
456}
457
458#[derive(Args)]
459pub struct DataResampleArgs {
460 #[arg(long)]
461 input: PathBuf,
462 #[arg(long)]
463 output: PathBuf,
464 #[arg(long, default_value = "1h")]
465 interval: String,
466 #[arg(long, value_enum)]
468 format: Option<DatasetFormatArg>,
469}
470
471#[derive(Copy, Clone, Debug, ValueEnum)]
472pub enum DatasetFormatArg {
473 Csv,
474 Parquet,
475}
476
477impl From<DatasetFormatArg> for IoDatasetFormat {
478 fn from(value: DatasetFormatArg) -> Self {
479 match value {
480 DatasetFormatArg::Csv => IoDatasetFormat::Csv,
481 DatasetFormatArg::Parquet => IoDatasetFormat::Parquet,
482 }
483 }
484}
485
486#[derive(Args)]
487pub struct DataInspectParquetArgs {
488 #[arg(value_name = "PATH")]
490 path: PathBuf,
491 #[arg(long, default_value_t = 10)]
493 rows: usize,
494}
495
496impl DataInspectParquetArgs {
497 fn run(&self) -> Result<()> {
498 let limit = if self.rows == 0 {
499 usize::MAX
500 } else {
501 self.rows
502 };
503 let file = File::open(&self.path)
504 .with_context(|| format!("failed to open {}", self.path.display()))?;
505 let batch_size = limit.clamp(1, 8192);
506 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
507 .with_batch_size(batch_size)
508 .build()?;
509
510 let mut printed = 0usize;
511 while printed < limit {
512 match reader.next() {
513 Some(Ok(batch)) => {
514 if batch.num_rows() == 0 {
515 continue;
516 }
517 let remaining = limit.saturating_sub(printed);
518 let take = remaining.min(batch.num_rows());
519 let display_batch = if take == batch.num_rows() {
520 batch
521 } else {
522 batch.slice(0, take)
523 };
524 print_batches(std::slice::from_ref(&display_batch))?;
525 printed += take;
526 }
527 Some(Err(err)) => return Err(err.into()),
528 None => break,
529 }
530 }
531
532 if printed == 0 {
533 println!("no rows available in {}", self.path.display());
534 } else if limit != usize::MAX {
535 println!("displayed {printed} row(s) from {}", self.path.display());
536 }
537
538 Ok(())
539 }
540}
541
542#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
543pub enum BacktestModeArg {
544 Candle,
545 Tick,
546}
547
548#[derive(Args)]
549pub struct BacktestRunArgs {
550 #[arg(long)]
551 strategy_config: PathBuf,
552 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
554 data_paths: Vec<PathBuf>,
555 #[arg(long, default_value_t = 500)]
556 candles: usize,
557 #[arg(long, default_value = "0.01")]
558 quantity: Decimal,
559 #[arg(long, default_value = "0")]
561 slippage_bps: Decimal,
562 #[arg(long, default_value = "0")]
564 fee_bps: Decimal,
565 #[arg(long, default_value_t = 1)]
567 latency_candles: usize,
568 #[arg(long, default_value = "fixed:0.01")]
570 sizer: String,
571 #[arg(long, value_enum, default_value = "candle")]
573 mode: BacktestModeArg,
574 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
576 lob_paths: Vec<PathBuf>,
577 #[arg(long)]
578 markets_file: Option<PathBuf>,
579}
580
581enum LobSource {
582 Json(Vec<PathBuf>),
583 FlightRecorder(PathBuf),
584}
585
586#[derive(Args)]
587pub struct BacktestBatchArgs {
588 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
590 config_paths: Vec<PathBuf>,
591 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
593 data_paths: Vec<PathBuf>,
594 #[arg(long, default_value = "0.01")]
595 quantity: Decimal,
596 #[arg(long)]
598 output: Option<PathBuf>,
599 #[arg(long, default_value = "0")]
601 slippage_bps: Decimal,
602 #[arg(long, default_value = "0")]
604 fee_bps: Decimal,
605 #[arg(long, default_value_t = 1)]
607 latency_candles: usize,
608 #[arg(long, default_value = "fixed:0.01")]
610 sizer: String,
611 #[arg(long)]
612 markets_file: Option<PathBuf>,
613}
614
615#[derive(Args)]
616pub struct LiveRunArgs {
617 #[arg(long)]
618 strategy_config: PathBuf,
619 #[arg(long, default_value = "paper_sandbox")]
620 exchange: String,
621 #[arg(long, default_value = "linear")]
622 category: String,
623 #[arg(long, default_value = "1m")]
624 interval: String,
625 #[arg(long, default_value = "1")]
626 quantity: Decimal,
627 #[arg(
629 long = "exec",
630 default_value = "paper",
631 value_enum,
632 alias = "live-exec"
633 )]
634 exec: ExecutionBackend,
635 #[arg(long)]
637 state_path: Option<PathBuf>,
638 #[arg(long, value_enum)]
640 persistence: Option<PersistenceBackend>,
641 #[arg(long)]
642 metrics_addr: Option<String>,
643 #[arg(long)]
644 log_path: Option<PathBuf>,
645 #[arg(
647 long = "record-data",
648 value_name = "PATH",
649 default_value = "data/flight_recorder"
650 )]
651 record_data: PathBuf,
652 #[arg(long)]
654 control_addr: Option<String>,
655 #[arg(long)]
656 initial_equity: Option<Decimal>,
657 #[arg(long)]
658 markets_file: Option<PathBuf>,
659 #[arg(long, default_value = "0")]
660 slippage_bps: Decimal,
661 #[arg(long, default_value = "0")]
662 fee_bps: Decimal,
663 #[arg(long, default_value_t = 0)]
664 latency_ms: u64,
665 #[arg(long, default_value_t = 512)]
666 history: usize,
667 #[arg(long)]
668 reconciliation_interval_secs: Option<u64>,
669 #[arg(long)]
670 reconciliation_threshold: Option<Decimal>,
671 #[arg(long)]
672 webhook_url: Option<String>,
673 #[arg(long)]
674 alert_max_data_gap_secs: Option<u64>,
675 #[arg(long)]
676 alert_max_order_failures: Option<u32>,
677 #[arg(long)]
678 alert_max_drawdown: Option<Decimal>,
679 #[arg(long)]
680 risk_max_order_qty: Option<Decimal>,
681 #[arg(long)]
682 risk_max_position_qty: Option<Decimal>,
683 #[arg(long)]
684 risk_max_drawdown: Option<Decimal>,
685 #[arg(long)]
687 orderbook_depth: Option<usize>,
688 #[arg(long, default_value = "fixed:1.0")]
690 sizer: String,
691}
692
693impl LiveRunArgs {
694 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
695 self.log_path
696 .clone()
697 .unwrap_or_else(|| config.live.log_path.clone())
698 }
699
700 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
701 let addr = self
702 .metrics_addr
703 .clone()
704 .unwrap_or_else(|| config.live.metrics_addr.clone());
705 addr.parse()
706 .with_context(|| format!("invalid metrics address '{addr}'"))
707 }
708
709 fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
710 let addr = self
711 .control_addr
712 .clone()
713 .unwrap_or_else(|| config.live.control_addr.clone());
714 addr.parse()
715 .with_context(|| format!("invalid control address '{addr}'"))
716 }
717
718 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
719 let secs = self
720 .reconciliation_interval_secs
721 .unwrap_or(config.live.reconciliation_interval_secs)
722 .max(1);
723 StdDuration::from_secs(secs)
724 }
725
726 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
727 let configured = self
728 .reconciliation_threshold
729 .unwrap_or(config.live.reconciliation_threshold);
730 if configured <= Decimal::ZERO {
731 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
732 } else {
733 configured
734 }
735 }
736
737 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
738 let mut balances = clone_initial_balances(&config.backtest);
739 if let Some(value) = self.initial_equity {
740 balances.insert(
741 config.backtest.reporting_currency.clone(),
742 value.max(Decimal::ZERO),
743 );
744 }
745 balances
746 }
747
748 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
749 let mut alerting = config.live.alerting.clone();
750 let webhook = self
751 .webhook_url
752 .clone()
753 .or_else(|| alerting.webhook_url.clone());
754 alerting.webhook_url = sanitize_webhook(webhook);
755 if let Some(sec) = self.alert_max_data_gap_secs {
756 alerting.max_data_gap_secs = sec;
757 }
758 if let Some(limit) = self.alert_max_order_failures {
759 alerting.max_order_failures = limit;
760 }
761 if let Some(limit) = self.alert_max_drawdown {
762 alerting.max_drawdown = limit.max(Decimal::ZERO);
763 }
764 alerting
765 }
766
767 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
768 let mut risk = config.risk_management.clone();
769 if let Some(limit) = self.risk_max_order_qty {
770 risk.max_order_quantity = limit.max(Decimal::ZERO);
771 }
772 if let Some(limit) = self.risk_max_position_qty {
773 risk.max_position_quantity = limit.max(Decimal::ZERO);
774 }
775 if let Some(limit) = self.risk_max_drawdown {
776 risk.max_drawdown = limit.max(Decimal::ZERO);
777 }
778 risk
779 }
780}
781
782#[derive(Deserialize)]
783struct StrategyConfigFile {
784 #[serde(rename = "strategy_name")]
785 name: String,
786 #[serde(default = "empty_table")]
787 params: toml::Value,
788}
789
790fn empty_table() -> toml::Value {
791 toml::Value::Table(Default::default())
792}
793
794pub async fn run() -> Result<()> {
795 let cli = Cli::parse();
796 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
797
798 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
799 0 => config.log_level.clone(),
800 1 => "debug".to_string(),
801 _ => "trace".to_string(),
802 });
803
804 let log_override = match &cli.command {
805 Commands::Live {
806 action: LiveCommand::Run(args),
807 } => Some(args.resolved_log_path(&config)),
808 _ => None,
809 };
810
811 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
812
813 match cli.command {
814 Commands::Data { action } => handle_data(action, &config).await?,
815 Commands::Backtest {
816 action: BacktestCommand::Run(args),
817 } => args.run(&config).await?,
818 Commands::Backtest {
819 action: BacktestCommand::Batch(args),
820 } => args.run(&config).await?,
821 Commands::Live {
822 action: LiveCommand::Run(args),
823 } => args.run(&config).await?,
824 Commands::State { action } => handle_state(action, &config).await?,
825 Commands::Analyze { action } => handle_analyze(action)?,
826 Commands::Strategies => list_strategies(),
827 Commands::Monitor(args) => args.run(&config).await?,
828 }
829
830 Ok(())
831}
832
833async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
834 match cmd {
835 DataCommand::Download(args) => {
836 args.run(config).await?;
837 }
838 DataCommand::Validate(args) => {
839 args.run()?;
840 }
841 DataCommand::Resample(args) => {
842 args.run()?;
843 }
844 DataCommand::InspectParquet(args) => {
845 args.run()?;
846 }
847 }
848 Ok(())
849}
850
851async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
852 match cmd {
853 StateCommand::Inspect(args) => {
854 state::inspect_state(
855 args.resolved_path(config),
856 args.resolved_engine(config),
857 args.raw,
858 )
859 .await?;
860 }
861 }
862 Ok(())
863}
864
865fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
866 match cmd {
867 AnalyzeCommand::Execution(args) => {
868 analyze::run_execution(args.build_request()?, args.export_csv.as_deref())
869 }
870 }
871}
872
873impl BacktestRunArgs {
874 async fn run(&self, config: &AppConfig) -> Result<()> {
875 let contents = std::fs::read_to_string(&self.strategy_config)
876 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
877 let def: StrategyConfigFile =
878 toml::from_str(&contents).context("failed to parse strategy config file")?;
879 let strategy = load_strategy(&def.name, def.params)
880 .with_context(|| format!("failed to configure strategy {}", def.name))?;
881 let symbols = strategy.subscriptions();
882 if symbols.is_empty() {
883 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
884 }
885
886 let mode = match self.mode {
887 BacktestModeArg::Candle => BacktestMode::Candle,
888 BacktestModeArg::Tick => BacktestMode::Tick,
889 };
890
891 let markets_path = self
892 .markets_file
893 .clone()
894 .or_else(|| config.backtest.markets_file.clone())
895 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
896 let market_registry = Arc::new(
897 MarketRegistry::load_from_file(&markets_path).with_context(|| {
898 format!("failed to load markets from {}", markets_path.display())
899 })?,
900 );
901
902 let (market_stream, event_stream, execution_client, matching_engine) = match mode {
903 BacktestMode::Candle => {
904 let stream = self.build_candle_stream(&symbols)?;
905 (
906 Some(stream),
907 None,
908 Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
909 None,
910 )
911 }
912 BacktestMode::Tick => {
913 if self.lob_paths.is_empty() {
914 bail!("--lob-data is required when --mode tick");
915 }
916 let source = self.detect_lob_source()?;
917 let engine = Arc::new(MatchingEngine::new(
918 "matching-engine",
919 symbols.clone(),
920 reporting_balance(&config.backtest),
921 ));
922 let stream = match source {
923 LobSource::Json(paths) => {
924 let events = load_lob_events_from_paths(&paths)?;
925 if events.is_empty() {
926 bail!("no order book events loaded from --lob-data");
927 }
928 stream_from_events(events)
929 }
930 LobSource::FlightRecorder(root) => self
931 .build_flight_recorder_stream(&root, &symbols)
932 .context("failed to initialize flight recorder stream")?,
933 };
934 (
935 None,
936 Some(stream),
937 engine.clone() as Arc<dyn ExecutionClient>,
938 Some(engine),
939 )
940 }
941 };
942
943 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
944 let order_quantity = self.quantity;
945 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
946
947 let mut cfg = BacktestConfig::new(symbols[0].clone());
948 cfg.order_quantity = order_quantity;
949 cfg.initial_balances = clone_initial_balances(&config.backtest);
950 cfg.reporting_currency = config.backtest.reporting_currency.clone();
951 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
952 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
953 cfg.execution.latency_candles = self.latency_candles.max(1);
954 cfg.mode = mode;
955
956 let report = Backtester::new(
957 cfg,
958 strategy,
959 execution,
960 matching_engine,
961 market_registry,
962 market_stream,
963 event_stream,
964 )
965 .run()
966 .await
967 .context("backtest failed")?;
968 print_report(&report);
969 Ok(())
970 }
971
972 fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
973 if symbols.is_empty() {
974 bail!("strategy did not declare any subscriptions");
975 }
976 if self.data_paths.is_empty() {
977 let mut generated = Vec::new();
978 for (idx, symbol) in symbols.iter().enumerate() {
979 let offset = idx as i64 * 10;
980 generated.extend(synth_candles(symbol, self.candles, offset));
981 }
982 generated.sort_by_key(|c| c.timestamp);
983 if generated.is_empty() {
984 bail!("no synthetic candles generated; provide --data files instead");
985 }
986 return Ok(memory_market_stream(&symbols[0], generated));
987 }
988
989 match detect_data_format(&self.data_paths)? {
990 DataFormat::Csv => {
991 let mut candles = load_candles_from_paths(&self.data_paths)?;
992 if candles.is_empty() {
993 bail!("no candles loaded from --data paths");
994 }
995 candles.sort_by_key(|c| c.timestamp);
996 Ok(memory_market_stream(&symbols[0], candles))
997 }
998 DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
999 }
1000 }
1001
1002 fn detect_lob_source(&self) -> Result<LobSource> {
1003 if self.lob_paths.len() == 1 {
1004 let path = &self.lob_paths[0];
1005 if path.is_dir() {
1006 return Ok(LobSource::FlightRecorder(path.clone()));
1007 }
1008 }
1009 if self.lob_paths.iter().all(|path| path.is_file()) {
1010 return Ok(LobSource::Json(self.lob_paths.clone()));
1011 }
1012 bail!(
1013 "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
1014 )
1015 }
1016
1017 fn build_flight_recorder_stream(
1018 &self,
1019 root: &Path,
1020 symbols: &[Symbol],
1021 ) -> Result<MarketEventStream> {
1022 let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
1023 .into_stream()
1024 .map(|event| event.map(MarketEvent::from));
1025 Ok(Box::pin(stream))
1026 }
1027}
1028
1029impl BacktestBatchArgs {
1030 async fn run(&self, config: &AppConfig) -> Result<()> {
1031 if self.config_paths.is_empty() {
1032 return Err(anyhow!("provide at least one --config path"));
1033 }
1034 if self.data_paths.is_empty() {
1035 return Err(anyhow!("provide at least one --data path for batch mode"));
1036 }
1037 let markets_path = self
1038 .markets_file
1039 .clone()
1040 .or_else(|| config.backtest.markets_file.clone())
1041 .ok_or_else(|| {
1042 anyhow!("batch mode requires --markets-file or backtest.markets_file")
1043 })?;
1044 let market_registry = Arc::new(
1045 MarketRegistry::load_from_file(&markets_path).with_context(|| {
1046 format!("failed to load markets from {}", markets_path.display())
1047 })?,
1048 );
1049 let data_format = detect_data_format(&self.data_paths)?;
1050 let mut aggregated = Vec::new();
1051 for config_path in &self.config_paths {
1052 let contents = std::fs::read_to_string(config_path).with_context(|| {
1053 format!("failed to read strategy config {}", config_path.display())
1054 })?;
1055 let def: StrategyConfigFile =
1056 toml::from_str(&contents).context("failed to parse strategy config file")?;
1057 let strategy = load_strategy(&def.name, def.params)
1058 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1059 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1060 let order_quantity = self.quantity;
1061 let symbols = strategy.subscriptions();
1062 if symbols.is_empty() {
1063 bail!("strategy {} did not declare subscriptions", strategy.name());
1064 }
1065 let stream = match data_format {
1066 DataFormat::Csv => {
1067 let mut candles = load_candles_from_paths(&self.data_paths)?;
1068 if candles.is_empty() {
1069 bail!("no candles loaded from --data paths");
1070 }
1071 candles.sort_by_key(|c| c.timestamp);
1072 memory_market_stream(&symbols[0], candles)
1073 }
1074 DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
1075 };
1076 let execution_client: Arc<dyn ExecutionClient> =
1077 Arc::new(PaperExecutionClient::default());
1078 let execution =
1079 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1080 let mut cfg = BacktestConfig::new(symbols[0].clone());
1081 cfg.order_quantity = order_quantity;
1082 cfg.initial_balances = clone_initial_balances(&config.backtest);
1083 cfg.reporting_currency = config.backtest.reporting_currency.clone();
1084 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1085 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1086 cfg.execution.latency_candles = self.latency_candles.max(1);
1087
1088 let report = Backtester::new(
1089 cfg,
1090 strategy,
1091 execution,
1092 None,
1093 market_registry.clone(),
1094 Some(stream),
1095 None,
1096 )
1097 .run()
1098 .await
1099 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1100 aggregated.push(BatchRow {
1101 config: config_path.display().to_string(),
1102 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
1106 });
1107 }
1108
1109 if let Some(output) = &self.output {
1110 write_batch_report(output, &aggregated)?;
1111 println!("Batch report written to {}", output.display());
1112 }
1113 if aggregated.is_empty() {
1114 return Err(anyhow!("no batch jobs executed"));
1115 }
1116 Ok(())
1117 }
1118}
1119
1120impl LiveRunArgs {
1121 async fn run(&self, config: &AppConfig) -> Result<()> {
1122 let exchange_cfg = config
1123 .exchange
1124 .get(&self.exchange)
1125 .cloned()
1126 .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
1127
1128 let driver = exchange_cfg.driver.clone();
1129
1130 let contents = fs::read_to_string(&self.strategy_config)
1131 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1132 let def: StrategyConfigFile =
1133 toml::from_str(&contents).context("failed to parse strategy config file")?;
1134 let strategy = load_strategy(&def.name, def.params)
1135 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1136 let symbols = strategy.subscriptions();
1137 if symbols.is_empty() {
1138 bail!("strategy did not declare any subscriptions");
1139 }
1140 if self.quantity <= Decimal::ZERO {
1141 bail!("--quantity must be greater than zero");
1142 }
1143 let quantity = self.quantity;
1144 let initial_balances = self.resolved_initial_balances(config);
1145 let reporting_currency = config.backtest.reporting_currency.clone();
1146 let markets_file = self
1147 .markets_file
1148 .clone()
1149 .or_else(|| config.backtest.markets_file.clone());
1150
1151 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1152 let category =
1153 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1154 let metrics_addr = self.resolved_metrics_addr(config)?;
1155 let control_addr = self.resolved_control_addr(config)?;
1156 let persistence_cfg = config.live.persistence_config();
1157 let persistence_engine = self
1158 .persistence
1159 .map(PersistenceEngine::from)
1160 .unwrap_or(persistence_cfg.engine);
1161 let state_path = self
1162 .state_path
1163 .clone()
1164 .unwrap_or_else(|| persistence_cfg.path.clone());
1165 let persistence = PersistenceSettings::new(persistence_engine, state_path);
1166 let alerting = self.build_alerting(config);
1167 let history = self.history.max(32);
1168 let reconciliation_interval = self.reconciliation_interval(config);
1169 let reconciliation_threshold = self.reconciliation_threshold(config);
1170 let orderbook_depth = self
1171 .orderbook_depth
1172 .unwrap_or(super::live::default_order_book_depth());
1173
1174 let settings = LiveSessionSettings {
1175 category,
1176 interval,
1177 quantity,
1178 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1179 fee_bps: self.fee_bps.max(Decimal::ZERO),
1180 history,
1181 metrics_addr,
1182 persistence,
1183 initial_balances,
1184 reporting_currency,
1185 markets_file,
1186 alerting,
1187 exec_backend: self.exec,
1188 risk: self.build_risk_config(config),
1189 reconciliation_interval,
1190 reconciliation_threshold,
1191 driver,
1192 orderbook_depth,
1193 record_path: Some(self.record_data.clone()),
1194 control_addr,
1195 };
1196
1197 info!(
1198 strategy = %def.name,
1199 symbols = ?symbols,
1200 exchange = %self.exchange,
1201 interval = %self.interval,
1202 driver = ?settings.driver,
1203 exec = ?self.exec,
1204 persistence_engine = ?settings.persistence.engine,
1205 state_path = %settings.persistence.state_path.display(),
1206 control_addr = %settings.control_addr,
1207 "starting live session"
1208 );
1209
1210 run_live(strategy, symbols, exchange_cfg, settings)
1211 .await
1212 .context("live session failed")
1213 }
1214}
1215
1216fn list_strategies() {
1217 println!("Built-in strategies:");
1218 for name in builtin_strategy_names() {
1219 println!("- {name}");
1220 }
1221}
1222
1223fn print_validation_summary(outcome: &ValidationOutcome) {
1224 const MAX_EXAMPLES: usize = 5;
1225 let summary = &outcome.summary;
1226 println!(
1227 "Validation summary for {} ({} candles)",
1228 summary.symbol, summary.rows
1229 );
1230 println!(
1231 " Range: {} -> {}",
1232 summary.start.to_rfc3339(),
1233 summary.end.to_rfc3339()
1234 );
1235 println!(" Interval: {}", interval_label(summary.interval));
1236 println!(" Missing intervals: {}", summary.missing_candles);
1237 println!(" Duplicate intervals: {}", summary.duplicate_candles);
1238 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
1239 println!(" Price spikes flagged: {}", summary.price_spike_count);
1240 println!(
1241 " Cross-source mismatches: {}",
1242 summary.cross_mismatch_count
1243 );
1244 println!(" Repaired candles generated: {}", summary.repaired_candles);
1245
1246 if !outcome.gaps.is_empty() {
1247 println!(" Gap examples:");
1248 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1249 println!(
1250 " {} -> {} (missing {})",
1251 gap.start.to_rfc3339(),
1252 gap.end.to_rfc3339(),
1253 gap.missing
1254 );
1255 }
1256 if outcome.gaps.len() > MAX_EXAMPLES {
1257 println!(
1258 " ... {} additional gap(s) omitted",
1259 outcome.gaps.len() - MAX_EXAMPLES
1260 );
1261 }
1262 }
1263
1264 if !outcome.price_spikes.is_empty() {
1265 println!(" Price spike examples:");
1266 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1267 println!(
1268 " {} (change {:.2}%)",
1269 spike.timestamp.to_rfc3339(),
1270 spike.change_fraction * 100.0
1271 );
1272 }
1273 if outcome.price_spikes.len() > MAX_EXAMPLES {
1274 println!(
1275 " ... {} additional spike(s) omitted",
1276 outcome.price_spikes.len() - MAX_EXAMPLES
1277 );
1278 }
1279 }
1280
1281 if !outcome.cross_mismatches.is_empty() {
1282 println!(" Cross-source mismatch examples:");
1283 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1284 println!(
1285 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
1286 miss.timestamp.to_rfc3339(),
1287 miss.primary_close,
1288 miss.reference_close,
1289 miss.delta_fraction * 100.0
1290 );
1291 }
1292 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1293 println!(
1294 " ... {} additional mismatch(es) omitted",
1295 outcome.cross_mismatches.len() - MAX_EXAMPLES
1296 );
1297 }
1298 }
1299}
1300
1301fn print_report(report: &PerformanceReport) {
1302 println!("\n{}", report);
1303}
1304
1305fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1306 let mut candles = Vec::with_capacity(len);
1307 for i in 0..len {
1308 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1309 let open = base + (i as f64 % 3.0) * 10.0;
1310 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1311 let open_dec =
1312 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1313 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1314 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1315 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1316 candles.push(Candle {
1317 symbol: Symbol::from(symbol),
1318 interval: Interval::OneMinute,
1319 open: open_dec,
1320 high,
1321 low,
1322 close: close_dec,
1323 volume: Decimal::ONE,
1324 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1325 + Duration::minutes(offset_minutes),
1326 });
1327 }
1328 candles
1329}
1330
1331fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1332 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1333 return Ok(dt.with_timezone(&Utc));
1334 }
1335 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1336 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1337 }
1338 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1339 let dt = date
1340 .and_hms_opt(0, 0, 0)
1341 .ok_or_else(|| anyhow!("invalid date"))?;
1342 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1343 }
1344 Err(anyhow!("unable to parse datetime '{value}'"))
1345}
1346
1347#[derive(Deserialize)]
1348struct CandleCsvRow {
1349 symbol: Option<String>,
1350 timestamp: String,
1351 open: f64,
1352 high: f64,
1353 low: f64,
1354 close: f64,
1355 volume: f64,
1356}
1357
1358fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1359 let mut candles = Vec::new();
1360 for path in paths {
1361 let mut reader = csv::Reader::from_path(path)
1362 .with_context(|| format!("failed to open {}", path.display()))?;
1363 for record in reader.deserialize::<CandleCsvRow>() {
1364 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1365 let timestamp = parse_datetime(&row.timestamp)?;
1366 let symbol = row
1367 .symbol
1368 .clone()
1369 .or_else(|| infer_symbol_from_path(path))
1370 .ok_or_else(|| {
1371 anyhow!(
1372 "missing symbol column and unable to infer from path {}",
1373 path.display()
1374 )
1375 })?;
1376 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1377 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1378 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1379 })?;
1380 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1381 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1382 })?;
1383 let low = Decimal::from_f64(row.low)
1384 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1385 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1386 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1387 })?;
1388 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1389 anyhow!(
1390 "invalid volume value '{}' in {}",
1391 row.volume,
1392 path.display()
1393 )
1394 })?;
1395 candles.push(Candle {
1396 symbol,
1397 interval,
1398 open,
1399 high,
1400 low,
1401 close,
1402 volume,
1403 timestamp,
1404 });
1405 }
1406 }
1407 Ok(candles)
1408}
1409
1410#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1411enum DataFormat {
1412 Csv,
1413 Parquet,
1414}
1415
1416fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1417 let mut detected: Option<DataFormat> = None;
1418 for path in paths {
1419 let ext = path
1420 .extension()
1421 .and_then(|ext| ext.to_str())
1422 .map(|ext| ext.to_ascii_lowercase())
1423 .unwrap_or_else(|| String::from(""));
1424 let current = if ext == "parquet" {
1425 DataFormat::Parquet
1426 } else {
1427 DataFormat::Csv
1428 };
1429 if let Some(existing) = detected {
1430 if existing != current {
1431 bail!("cannot mix CSV and Parquet inputs in --data");
1432 }
1433 } else {
1434 detected = Some(current);
1435 }
1436 }
1437 detected.ok_or_else(|| anyhow!("no data paths provided"))
1438}
1439
1440fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1441 Box::new(PaperMarketStream::from_data(
1442 symbol.to_string(),
1443 Vec::new(),
1444 candles,
1445 ))
1446}
1447
1448fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1449 Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1450}
1451
1452#[derive(Deserialize)]
1453#[serde(tag = "event", rename_all = "lowercase")]
1454enum LobEventRow {
1455 Snapshot {
1456 timestamp: String,
1457 symbol: Option<String>,
1458 bids: Vec<[f64; 2]>,
1459 asks: Vec<[f64; 2]>,
1460 },
1461 Depth {
1462 timestamp: String,
1463 symbol: Option<String>,
1464 bids: Vec<[f64; 2]>,
1465 asks: Vec<[f64; 2]>,
1466 },
1467 Trade {
1468 timestamp: String,
1469 symbol: Option<String>,
1470 side: String,
1471 price: f64,
1472 size: f64,
1473 },
1474}
1475
1476fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1477 let mut events = Vec::new();
1478 for path in paths {
1479 let file = File::open(path)
1480 .with_context(|| format!("failed to open order book file {}", path.display()))?;
1481 let symbol_hint = infer_symbol_from_path(path);
1482 for line in BufReader::new(file).lines() {
1483 let line =
1484 line.with_context(|| format!("failed to read line from {}", path.display()))?;
1485 if line.trim().is_empty() {
1486 continue;
1487 }
1488 let row: LobEventRow = serde_json::from_str(&line)
1489 .with_context(|| format!("invalid order book event in {}", path.display()))?;
1490 match row {
1491 LobEventRow::Snapshot {
1492 timestamp,
1493 symbol,
1494 bids,
1495 asks,
1496 } => {
1497 let ts = parse_datetime(×tamp)?;
1498 let symbol = symbol
1499 .or_else(|| symbol_hint.clone())
1500 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1501 let bids = convert_levels(&bids)?;
1502 let asks = convert_levels(&asks)?;
1503 let book = OrderBook {
1504 symbol: symbol.clone(),
1505 bids,
1506 asks,
1507 timestamp: ts,
1508 exchange_checksum: None,
1509 local_checksum: None,
1510 };
1511 events.push(MarketEvent {
1512 timestamp: ts,
1513 kind: MarketEventKind::OrderBook(book),
1514 });
1515 }
1516 LobEventRow::Depth {
1517 timestamp,
1518 symbol,
1519 bids,
1520 asks,
1521 } => {
1522 let ts = parse_datetime(×tamp)?;
1523 let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1524 anyhow!("missing symbol in depth update {}", path.display())
1525 })?;
1526 let bids = convert_levels(&bids)?;
1527 let asks = convert_levels(&asks)?;
1528 let update = DepthUpdate {
1529 symbol: symbol.clone(),
1530 bids,
1531 asks,
1532 timestamp: ts,
1533 };
1534 events.push(MarketEvent {
1535 timestamp: ts,
1536 kind: MarketEventKind::Depth(update),
1537 });
1538 }
1539 LobEventRow::Trade {
1540 timestamp,
1541 symbol,
1542 side,
1543 price,
1544 size,
1545 } => {
1546 let ts = parse_datetime(×tamp)?;
1547 let symbol = symbol
1548 .or_else(|| symbol_hint.clone())
1549 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1550 let side = match side.to_lowercase().as_str() {
1551 "buy" | "bid" | "b" => Side::Buy,
1552 "sell" | "ask" | "s" => Side::Sell,
1553 other => bail!("unsupported trade side '{other}' in {}", path.display()),
1554 };
1555 let price = Decimal::from_f64(price).ok_or_else(|| {
1556 anyhow!("invalid trade price '{}' in {}", price, path.display())
1557 })?;
1558 let size = Decimal::from_f64(size).ok_or_else(|| {
1559 anyhow!("invalid trade size '{}' in {}", size, path.display())
1560 })?;
1561 let tick = Tick {
1562 symbol: symbol.clone(),
1563 price,
1564 size,
1565 side,
1566 exchange_timestamp: ts,
1567 received_at: ts,
1568 };
1569 events.push(MarketEvent {
1570 timestamp: ts,
1571 kind: MarketEventKind::Trade(tick),
1572 });
1573 }
1574 }
1575 }
1576 }
1577 events.sort_by_key(|event| event.timestamp);
1578 Ok(events)
1579}
1580
1581fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1582 levels
1583 .iter()
1584 .map(|pair| {
1585 let price = Decimal::from_f64(pair[0])
1586 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1587 let size = Decimal::from_f64(pair[1])
1588 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1589 Ok(OrderBookLevel { price, size })
1590 })
1591 .collect()
1592}
1593
1594fn infer_symbol_from_path(path: &Path) -> Option<String> {
1595 path.parent()
1596 .and_then(|p| p.file_name())
1597 .map(|os| os.to_string_lossy().to_string())
1598}
1599
1600fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1601 path.file_stem()
1602 .and_then(|os| os.to_str())
1603 .and_then(|stem| stem.split('_').next())
1604 .and_then(|token| Interval::from_str(token).ok())
1605}
1606
1607fn default_output_path(
1608 config: &AppConfig,
1609 exchange: &str,
1610 symbol: &str,
1611 interval: Interval,
1612 start: DateTime<Utc>,
1613 end: DateTime<Utc>,
1614) -> PathBuf {
1615 let interval_part = interval_label(interval);
1616 let start_part = start.format("%Y%m%d").to_string();
1617 let end_part = end.format("%Y%m%d").to_string();
1618 config
1619 .data_path
1620 .join(exchange)
1621 .join(symbol)
1622 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1623}
1624
1625fn interval_label(interval: Interval) -> &'static str {
1626 match interval {
1627 Interval::OneSecond => "1s",
1628 Interval::OneMinute => "1m",
1629 Interval::FiveMinutes => "5m",
1630 Interval::FifteenMinutes => "15m",
1631 Interval::OneHour => "1h",
1632 Interval::FourHours => "4h",
1633 Interval::OneDay => "1d",
1634 }
1635}
1636
1637#[derive(Serialize)]
1638struct CandleRow<'a> {
1639 symbol: &'a str,
1640 timestamp: String,
1641 open: f64,
1642 high: f64,
1643 low: f64,
1644 close: f64,
1645 volume: f64,
1646}
1647
1648fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1649 if let Some(parent) = path.parent() {
1650 fs::create_dir_all(parent)
1651 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1652 }
1653 let mut writer =
1654 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1655 for candle in candles {
1656 let row = CandleRow {
1657 symbol: &candle.symbol,
1658 timestamp: candle.timestamp.to_rfc3339(),
1659 open: candle.open.to_f64().unwrap_or(0.0),
1660 high: candle.high.to_f64().unwrap_or(0.0),
1661 low: candle.low.to_f64().unwrap_or(0.0),
1662 close: candle.close.to_f64().unwrap_or(0.0),
1663 volume: candle.volume.to_f64().unwrap_or(0.0),
1664 };
1665 writer.serialize(row)?;
1666 }
1667 writer.flush()?;
1668 Ok(())
1669}
1670
1671#[derive(Serialize)]
1672struct BatchRow {
1673 config: String,
1674 signals: usize,
1675 orders: usize,
1676 dropped_orders: usize,
1677 ending_equity: f64,
1678}
1679
1680fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1681 if let Some(parent) = path.parent() {
1682 fs::create_dir_all(parent)
1683 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1684 }
1685 let mut writer =
1686 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1687 for row in rows {
1688 writer.serialize(row)?;
1689 }
1690 writer.flush()?;
1691 Ok(())
1692}
1693
1694fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1695 config
1696 .initial_balances
1697 .iter()
1698 .map(|(currency, amount)| (currency.clone(), *amount))
1699 .collect()
1700}
1701
1702fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1703 config
1704 .initial_balances
1705 .get(&config.reporting_currency)
1706 .copied()
1707 .unwrap_or_default()
1708}
1709
1710fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1711 let parts: Vec<_> = value.split(':').collect();
1712 match parts.as_slice() {
1713 ["fixed", val] => {
1714 let quantity =
1715 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1716 Ok(Box::new(FixedOrderSizer { quantity }))
1717 }
1718 ["fixed"] => {
1719 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1720 Ok(Box::new(FixedOrderSizer { quantity }))
1721 }
1722 ["percent", val] => {
1723 let percent =
1724 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1725 Ok(Box::new(PortfolioPercentSizer {
1726 percent: percent.max(Decimal::ZERO),
1727 }))
1728 }
1729 ["risk-adjusted", val] => {
1730 let risk_fraction = Decimal::from_str(val)
1731 .context("invalid risk fraction value (use decimals)")?;
1732 Ok(Box::new(RiskAdjustedSizer {
1733 risk_fraction: risk_fraction.max(Decimal::ZERO),
1734 }))
1735 }
1736 _ => Err(anyhow!(
1737 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1738 )),
1739 }
1740}