1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5 run_live, ExecutionBackend, LiveSessionSettings, PersistenceBackend, PersistenceSettings,
6};
7use crate::state;
8use crate::telemetry::init_tracing;
9use crate::PublicChannel;
10use arrow::util::pretty::print_batches;
11use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
12use std::collections::HashMap;
13use std::fs::{self, File};
14use std::io::{BufRead, BufReader};
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::str::FromStr;
18use std::sync::Arc;
19use std::time::Duration as StdDuration;
20
21use anyhow::{anyhow, bail, Context, Result};
22use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
23use clap::{Args, Parser, Subcommand, ValueEnum};
24use csv::Writer;
25use futures::StreamExt;
26use rust_decimal::{
27 prelude::{FromPrimitive, ToPrimitive},
28 Decimal,
29};
30use serde::{Deserialize, Serialize};
31use tesser_backtester::reporting::PerformanceReport;
32use tesser_backtester::{
33 stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
34 MarketEventKind, MarketEventStream,
35};
36use tesser_broker::ExecutionClient;
37use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
38use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
39use tesser_data::analytics::ExecutionAnalysisRequest;
40use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
41use tesser_data::merger::UnifiedEventStream;
42use tesser_data::parquet::ParquetMarketStream;
43use tesser_execution::{
44 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
45 RiskAdjustedSizer,
46};
47use tesser_markets::MarketRegistry;
48use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
49use tesser_strategy::{builtin_strategy_names, load_strategy};
50use tracing::{info, warn};
51
52#[derive(Parser)]
53#[command(author, version, about = "Tesser Trading Framework")]
54pub struct Cli {
55 #[arg(short, long, action = clap::ArgAction::Count)]
57 verbose: u8,
58 #[arg(long, default_value = "default")]
60 env: String,
61 #[command(subcommand)]
62 command: Commands,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Subcommand)]
67pub enum Commands {
68 Data {
70 #[command(subcommand)]
71 action: DataCommand,
72 },
73 Backtest {
75 #[command(subcommand)]
76 action: BacktestCommand,
77 },
78 Live {
80 #[command(subcommand)]
81 action: LiveCommand,
82 },
83 State {
85 #[command(subcommand)]
86 action: StateCommand,
87 },
88 Strategies,
90 Analyze {
92 #[command(subcommand)]
93 action: AnalyzeCommand,
94 },
95}
96
97#[derive(Subcommand)]
98pub enum DataCommand {
99 Download(DataDownloadArgs),
101 Validate(DataValidateArgs),
103 Resample(DataResampleArgs),
105 InspectParquet(DataInspectParquetArgs),
107}
108
109#[derive(Subcommand)]
110pub enum BacktestCommand {
111 Run(BacktestRunArgs),
113 Batch(BacktestBatchArgs),
115}
116
117#[derive(Subcommand)]
118pub enum LiveCommand {
119 Run(LiveRunArgs),
121}
122
123#[derive(Subcommand)]
124pub enum StateCommand {
125 Inspect(StateInspectArgs),
127}
128
129#[derive(Subcommand)]
130pub enum AnalyzeCommand {
131 Execution(AnalyzeExecutionArgs),
133}
134
135#[derive(Args)]
136pub struct DataDownloadArgs {
137 #[arg(long, default_value = "bybit")]
138 exchange: String,
139 #[arg(long)]
140 symbol: String,
141 #[arg(long, default_value = "linear")]
142 category: String,
143 #[arg(long, default_value = "1m")]
144 interval: String,
145 #[arg(long)]
146 start: String,
147 #[arg(long)]
148 end: Option<String>,
149 #[arg(long)]
150 output: Option<PathBuf>,
151 #[arg(long)]
153 skip_validation: bool,
154 #[arg(long)]
156 repair_missing: bool,
157 #[arg(long, default_value_t = 0.05)]
159 validation_jump_threshold: f64,
160 #[arg(long, default_value_t = 0.002)]
162 validation_reference_tolerance: f64,
163}
164
165#[derive(Args)]
166pub struct StateInspectArgs {
167 #[arg(long)]
169 path: Option<PathBuf>,
170 #[arg(long)]
172 raw: bool,
173}
174
175#[derive(Args)]
176pub struct AnalyzeExecutionArgs {
177 #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
179 data_dir: PathBuf,
180 #[arg(long)]
182 start: Option<String>,
183 #[arg(long)]
185 end: Option<String>,
186}
187
188impl StateInspectArgs {
189 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
190 self.path
191 .clone()
192 .unwrap_or_else(|| config.live.persistence_config().path.clone())
193 }
194
195 fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
196 config.live.persistence_config().engine
197 }
198}
199
200impl AnalyzeExecutionArgs {
201 fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
202 let start = match &self.start {
203 Some(value) => Some(parse_datetime(value)?),
204 None => None,
205 };
206 let end = match &self.end {
207 Some(value) => Some(parse_datetime(value)?),
208 None => None,
209 };
210 Ok(ExecutionAnalysisRequest {
211 data_dir: self.data_dir.clone(),
212 start,
213 end,
214 })
215 }
216}
217
218impl DataDownloadArgs {
219 async fn run(&self, config: &AppConfig) -> Result<()> {
220 let exchange_cfg = config
221 .exchange
222 .get(&self.exchange)
223 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
224 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
225 let start = parse_datetime(&self.start)?;
226 let end = match &self.end {
227 Some(value) => parse_datetime(value)?,
228 None => Utc::now(),
229 };
230 if start >= end {
231 return Err(anyhow!("start time must be earlier than end time"));
232 }
233
234 info!(
235 "Downloading {} candles for {} ({})",
236 self.interval, self.symbol, self.exchange
237 );
238 let mut candles = match exchange_cfg.driver.as_str() {
239 "bybit" | "" => {
240 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
241 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
242 downloader
243 .download_klines(&request)
244 .await
245 .with_context(|| "failed to download candles from Bybit")?
246 }
247 "binance" => {
248 let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
249 let request = KlineRequest::new("", &self.symbol, interval, start, end);
250 downloader
251 .download_klines(&request)
252 .await
253 .with_context(|| "failed to download candles from Binance")?
254 }
255 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
256 };
257
258 if candles.is_empty() {
259 info!("No candles returned for {}", self.symbol);
260 return Ok(());
261 }
262
263 if !self.skip_validation {
264 let config = ValidationConfig {
265 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
266 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
267 repair_missing: self.repair_missing,
268 };
269 let outcome =
270 validate_dataset(candles.clone(), None, config).context("validation failed")?;
271 print_validation_summary(&outcome);
272 if self.repair_missing && outcome.summary.repaired_candles > 0 {
273 candles = outcome.repaired;
274 info!(
275 "Applied {} synthetic candle(s) to repair gaps",
276 outcome.summary.repaired_candles
277 );
278 }
279 }
280
281 let output_path = self.output.clone().unwrap_or_else(|| {
282 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
283 });
284 write_candles_csv(&output_path, &candles)?;
285 info!(
286 "Saved {} candles to {}",
287 candles.len(),
288 output_path.display()
289 );
290 Ok(())
291 }
292}
293
294#[derive(Args)]
295pub struct DataValidateArgs {
296 #[arg(
298 long = "path",
299 value_name = "PATH",
300 num_args = 1..,
301 action = clap::ArgAction::Append
302 )]
303 paths: Vec<PathBuf>,
304 #[arg(
306 long = "reference",
307 value_name = "PATH",
308 num_args = 1..,
309 action = clap::ArgAction::Append
310 )]
311 reference_paths: Vec<PathBuf>,
312 #[arg(long, default_value_t = 0.05)]
314 jump_threshold: f64,
315 #[arg(long, default_value_t = 0.002)]
317 reference_tolerance: f64,
318 #[arg(long)]
320 repair_missing: bool,
321 #[arg(long)]
323 output: Option<PathBuf>,
324}
325
326impl DataValidateArgs {
327 fn run(&self) -> Result<()> {
328 if self.paths.is_empty() {
329 bail!("provide at least one --path for validation");
330 }
331 let candles =
332 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
333 if candles.is_empty() {
334 bail!("loaded dataset is empty; nothing to validate");
335 }
336 let reference = if self.reference_paths.is_empty() {
337 None
338 } else {
339 Some(
340 load_candles_from_paths(&self.reference_paths)
341 .with_context(|| "failed to load reference dataset")?,
342 )
343 };
344
345 let price_jump_threshold = if self.jump_threshold <= 0.0 {
346 0.0001
347 } else {
348 self.jump_threshold
349 };
350 let reference_tolerance = if self.reference_tolerance <= 0.0 {
351 0.0001
352 } else {
353 self.reference_tolerance
354 };
355
356 let config = ValidationConfig {
357 price_jump_threshold,
358 reference_tolerance,
359 repair_missing: self.repair_missing,
360 };
361
362 let outcome = validate_dataset(candles, reference, config)?;
363 print_validation_summary(&outcome);
364
365 if let Some(output) = &self.output {
366 write_candles_csv(output, &outcome.repaired)?;
367 info!(
368 "Wrote {} candles ({} new) to {}",
369 outcome.repaired.len(),
370 outcome.summary.repaired_candles,
371 output.display()
372 );
373 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
374 warn!(
375 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
376 outcome.summary.repaired_candles
377 );
378 }
379
380 Ok(())
381 }
382}
383
384#[derive(Args)]
385pub struct DataResampleArgs {
386 #[arg(long)]
387 input: PathBuf,
388 #[arg(long)]
389 output: PathBuf,
390 #[arg(long, default_value = "1h")]
391 interval: String,
392}
393
394#[derive(Args)]
395pub struct DataInspectParquetArgs {
396 #[arg(value_name = "PATH")]
398 path: PathBuf,
399 #[arg(long, default_value_t = 10)]
401 rows: usize,
402}
403
404impl DataInspectParquetArgs {
405 fn run(&self) -> Result<()> {
406 let limit = if self.rows == 0 {
407 usize::MAX
408 } else {
409 self.rows
410 };
411 let file = File::open(&self.path)
412 .with_context(|| format!("failed to open {}", self.path.display()))?;
413 let batch_size = limit.clamp(1, 8192);
414 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
415 .with_batch_size(batch_size)
416 .build()?;
417
418 let mut printed = 0usize;
419 while printed < limit {
420 match reader.next() {
421 Some(Ok(batch)) => {
422 if batch.num_rows() == 0 {
423 continue;
424 }
425 let remaining = limit.saturating_sub(printed);
426 let take = remaining.min(batch.num_rows());
427 let display_batch = if take == batch.num_rows() {
428 batch
429 } else {
430 batch.slice(0, take)
431 };
432 print_batches(std::slice::from_ref(&display_batch))?;
433 printed += take;
434 }
435 Some(Err(err)) => return Err(err.into()),
436 None => break,
437 }
438 }
439
440 if printed == 0 {
441 println!("no rows available in {}", self.path.display());
442 } else if limit != usize::MAX {
443 println!("displayed {printed} row(s) from {}", self.path.display());
444 }
445
446 Ok(())
447 }
448}
449
450#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
451pub enum BacktestModeArg {
452 Candle,
453 Tick,
454}
455
456#[derive(Args)]
457pub struct BacktestRunArgs {
458 #[arg(long)]
459 strategy_config: PathBuf,
460 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
462 data_paths: Vec<PathBuf>,
463 #[arg(long, default_value_t = 500)]
464 candles: usize,
465 #[arg(long, default_value = "0.01")]
466 quantity: Decimal,
467 #[arg(long, default_value = "0")]
469 slippage_bps: Decimal,
470 #[arg(long, default_value = "0")]
472 fee_bps: Decimal,
473 #[arg(long, default_value_t = 1)]
475 latency_candles: usize,
476 #[arg(long, default_value = "fixed:0.01")]
478 sizer: String,
479 #[arg(long, value_enum, default_value = "candle")]
481 mode: BacktestModeArg,
482 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
484 lob_paths: Vec<PathBuf>,
485 #[arg(long)]
486 markets_file: Option<PathBuf>,
487}
488
489enum LobSource {
490 Json(Vec<PathBuf>),
491 FlightRecorder(PathBuf),
492}
493
494#[derive(Args)]
495pub struct BacktestBatchArgs {
496 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
498 config_paths: Vec<PathBuf>,
499 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
501 data_paths: Vec<PathBuf>,
502 #[arg(long, default_value = "0.01")]
503 quantity: Decimal,
504 #[arg(long)]
506 output: Option<PathBuf>,
507 #[arg(long, default_value = "0")]
509 slippage_bps: Decimal,
510 #[arg(long, default_value = "0")]
512 fee_bps: Decimal,
513 #[arg(long, default_value_t = 1)]
515 latency_candles: usize,
516 #[arg(long, default_value = "fixed:0.01")]
518 sizer: String,
519 #[arg(long)]
520 markets_file: Option<PathBuf>,
521}
522
523#[derive(Args)]
524pub struct LiveRunArgs {
525 #[arg(long)]
526 strategy_config: PathBuf,
527 #[arg(long, default_value = "paper_sandbox")]
528 exchange: String,
529 #[arg(long, default_value = "linear")]
530 category: String,
531 #[arg(long, default_value = "1m")]
532 interval: String,
533 #[arg(long, default_value = "1")]
534 quantity: Decimal,
535 #[arg(
537 long = "exec",
538 default_value = "paper",
539 value_enum,
540 alias = "live-exec"
541 )]
542 exec: ExecutionBackend,
543 #[arg(long)]
545 state_path: Option<PathBuf>,
546 #[arg(long, value_enum)]
548 persistence: Option<PersistenceBackend>,
549 #[arg(long)]
550 metrics_addr: Option<String>,
551 #[arg(long)]
552 log_path: Option<PathBuf>,
553 #[arg(
555 long = "record-data",
556 value_name = "PATH",
557 default_value = "data/flight_recorder"
558 )]
559 record_data: PathBuf,
560 #[arg(long)]
562 control_addr: Option<String>,
563 #[arg(long)]
564 initial_equity: Option<Decimal>,
565 #[arg(long)]
566 markets_file: Option<PathBuf>,
567 #[arg(long, default_value = "0")]
568 slippage_bps: Decimal,
569 #[arg(long, default_value = "0")]
570 fee_bps: Decimal,
571 #[arg(long, default_value_t = 0)]
572 latency_ms: u64,
573 #[arg(long, default_value_t = 512)]
574 history: usize,
575 #[arg(long)]
576 reconciliation_interval_secs: Option<u64>,
577 #[arg(long)]
578 reconciliation_threshold: Option<Decimal>,
579 #[arg(long)]
580 webhook_url: Option<String>,
581 #[arg(long)]
582 alert_max_data_gap_secs: Option<u64>,
583 #[arg(long)]
584 alert_max_order_failures: Option<u32>,
585 #[arg(long)]
586 alert_max_drawdown: Option<Decimal>,
587 #[arg(long)]
588 risk_max_order_qty: Option<Decimal>,
589 #[arg(long)]
590 risk_max_position_qty: Option<Decimal>,
591 #[arg(long)]
592 risk_max_drawdown: Option<Decimal>,
593 #[arg(long)]
595 orderbook_depth: Option<usize>,
596 #[arg(long, default_value = "fixed:1.0")]
598 sizer: String,
599}
600
601impl LiveRunArgs {
602 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
603 self.log_path
604 .clone()
605 .unwrap_or_else(|| config.live.log_path.clone())
606 }
607
608 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
609 let addr = self
610 .metrics_addr
611 .clone()
612 .unwrap_or_else(|| config.live.metrics_addr.clone());
613 addr.parse()
614 .with_context(|| format!("invalid metrics address '{addr}'"))
615 }
616
617 fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
618 let addr = self
619 .control_addr
620 .clone()
621 .unwrap_or_else(|| config.live.control_addr.clone());
622 addr.parse()
623 .with_context(|| format!("invalid control address '{addr}'"))
624 }
625
626 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
627 let secs = self
628 .reconciliation_interval_secs
629 .unwrap_or(config.live.reconciliation_interval_secs)
630 .max(1);
631 StdDuration::from_secs(secs)
632 }
633
634 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
635 let configured = self
636 .reconciliation_threshold
637 .unwrap_or(config.live.reconciliation_threshold);
638 if configured <= Decimal::ZERO {
639 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
640 } else {
641 configured
642 }
643 }
644
645 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
646 let mut balances = clone_initial_balances(&config.backtest);
647 if let Some(value) = self.initial_equity {
648 balances.insert(
649 config.backtest.reporting_currency.clone(),
650 value.max(Decimal::ZERO),
651 );
652 }
653 balances
654 }
655
656 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
657 let mut alerting = config.live.alerting.clone();
658 let webhook = self
659 .webhook_url
660 .clone()
661 .or_else(|| alerting.webhook_url.clone());
662 alerting.webhook_url = sanitize_webhook(webhook);
663 if let Some(sec) = self.alert_max_data_gap_secs {
664 alerting.max_data_gap_secs = sec;
665 }
666 if let Some(limit) = self.alert_max_order_failures {
667 alerting.max_order_failures = limit;
668 }
669 if let Some(limit) = self.alert_max_drawdown {
670 alerting.max_drawdown = limit.max(Decimal::ZERO);
671 }
672 alerting
673 }
674
675 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
676 let mut risk = config.risk_management.clone();
677 if let Some(limit) = self.risk_max_order_qty {
678 risk.max_order_quantity = limit.max(Decimal::ZERO);
679 }
680 if let Some(limit) = self.risk_max_position_qty {
681 risk.max_position_quantity = limit.max(Decimal::ZERO);
682 }
683 if let Some(limit) = self.risk_max_drawdown {
684 risk.max_drawdown = limit.max(Decimal::ZERO);
685 }
686 risk
687 }
688}
689
690#[derive(Deserialize)]
691struct StrategyConfigFile {
692 #[serde(rename = "strategy_name")]
693 name: String,
694 #[serde(default = "empty_table")]
695 params: toml::Value,
696}
697
698fn empty_table() -> toml::Value {
699 toml::Value::Table(Default::default())
700}
701
702pub async fn run() -> Result<()> {
703 let cli = Cli::parse();
704 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
705
706 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
707 0 => config.log_level.clone(),
708 1 => "debug".to_string(),
709 _ => "trace".to_string(),
710 });
711
712 let log_override = match &cli.command {
713 Commands::Live {
714 action: LiveCommand::Run(args),
715 } => Some(args.resolved_log_path(&config)),
716 _ => None,
717 };
718
719 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
720
721 match cli.command {
722 Commands::Data { action } => handle_data(action, &config).await?,
723 Commands::Backtest {
724 action: BacktestCommand::Run(args),
725 } => args.run(&config).await?,
726 Commands::Backtest {
727 action: BacktestCommand::Batch(args),
728 } => args.run(&config).await?,
729 Commands::Live {
730 action: LiveCommand::Run(args),
731 } => args.run(&config).await?,
732 Commands::State { action } => handle_state(action, &config).await?,
733 Commands::Analyze { action } => handle_analyze(action)?,
734 Commands::Strategies => list_strategies(),
735 }
736
737 Ok(())
738}
739
740async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
741 match cmd {
742 DataCommand::Download(args) => {
743 args.run(config).await?;
744 }
745 DataCommand::Validate(args) => {
746 args.run()?;
747 }
748 DataCommand::Resample(args) => {
749 info!(
750 "stub: resampling {} into {} at {}",
751 args.input.display(),
752 args.output.display(),
753 args.interval
754 );
755 }
756 DataCommand::InspectParquet(args) => {
757 args.run()?;
758 }
759 }
760 Ok(())
761}
762
763async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
764 match cmd {
765 StateCommand::Inspect(args) => {
766 state::inspect_state(
767 args.resolved_path(config),
768 args.resolved_engine(config),
769 args.raw,
770 )
771 .await?;
772 }
773 }
774 Ok(())
775}
776
777fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
778 match cmd {
779 AnalyzeCommand::Execution(args) => analyze::run_execution(args.build_request()?),
780 }
781}
782
783impl BacktestRunArgs {
784 async fn run(&self, config: &AppConfig) -> Result<()> {
785 let contents = std::fs::read_to_string(&self.strategy_config)
786 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
787 let def: StrategyConfigFile =
788 toml::from_str(&contents).context("failed to parse strategy config file")?;
789 let strategy = load_strategy(&def.name, def.params)
790 .with_context(|| format!("failed to configure strategy {}", def.name))?;
791 let symbols = strategy.subscriptions();
792 if symbols.is_empty() {
793 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
794 }
795
796 let mode = match self.mode {
797 BacktestModeArg::Candle => BacktestMode::Candle,
798 BacktestModeArg::Tick => BacktestMode::Tick,
799 };
800
801 let markets_path = self
802 .markets_file
803 .clone()
804 .or_else(|| config.backtest.markets_file.clone())
805 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
806 let market_registry = Arc::new(
807 MarketRegistry::load_from_file(&markets_path).with_context(|| {
808 format!("failed to load markets from {}", markets_path.display())
809 })?,
810 );
811
812 let (market_stream, event_stream, execution_client, matching_engine) = match mode {
813 BacktestMode::Candle => {
814 let stream = self.build_candle_stream(&symbols)?;
815 (
816 Some(stream),
817 None,
818 Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
819 None,
820 )
821 }
822 BacktestMode::Tick => {
823 if self.lob_paths.is_empty() {
824 bail!("--lob-data is required when --mode tick");
825 }
826 let source = self.detect_lob_source()?;
827 let engine = Arc::new(MatchingEngine::new(
828 "matching-engine",
829 symbols.clone(),
830 reporting_balance(&config.backtest),
831 ));
832 let stream = match source {
833 LobSource::Json(paths) => {
834 let events = load_lob_events_from_paths(&paths)?;
835 if events.is_empty() {
836 bail!("no order book events loaded from --lob-data");
837 }
838 stream_from_events(events)
839 }
840 LobSource::FlightRecorder(root) => self
841 .build_flight_recorder_stream(&root, &symbols)
842 .context("failed to initialize flight recorder stream")?,
843 };
844 (
845 None,
846 Some(stream),
847 engine.clone() as Arc<dyn ExecutionClient>,
848 Some(engine),
849 )
850 }
851 };
852
853 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
854 let order_quantity = self.quantity;
855 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
856
857 let mut cfg = BacktestConfig::new(symbols[0].clone());
858 cfg.order_quantity = order_quantity;
859 cfg.initial_balances = clone_initial_balances(&config.backtest);
860 cfg.reporting_currency = config.backtest.reporting_currency.clone();
861 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
862 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
863 cfg.execution.latency_candles = self.latency_candles.max(1);
864 cfg.mode = mode;
865
866 let report = Backtester::new(
867 cfg,
868 strategy,
869 execution,
870 matching_engine,
871 market_registry,
872 market_stream,
873 event_stream,
874 )
875 .run()
876 .await
877 .context("backtest failed")?;
878 print_report(&report);
879 Ok(())
880 }
881
882 fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
883 if symbols.is_empty() {
884 bail!("strategy did not declare any subscriptions");
885 }
886 if self.data_paths.is_empty() {
887 let mut generated = Vec::new();
888 for (idx, symbol) in symbols.iter().enumerate() {
889 let offset = idx as i64 * 10;
890 generated.extend(synth_candles(symbol, self.candles, offset));
891 }
892 generated.sort_by_key(|c| c.timestamp);
893 if generated.is_empty() {
894 bail!("no synthetic candles generated; provide --data files instead");
895 }
896 return Ok(memory_market_stream(&symbols[0], generated));
897 }
898
899 match detect_data_format(&self.data_paths)? {
900 DataFormat::Csv => {
901 let mut candles = load_candles_from_paths(&self.data_paths)?;
902 if candles.is_empty() {
903 bail!("no candles loaded from --data paths");
904 }
905 candles.sort_by_key(|c| c.timestamp);
906 Ok(memory_market_stream(&symbols[0], candles))
907 }
908 DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
909 }
910 }
911
912 fn detect_lob_source(&self) -> Result<LobSource> {
913 if self.lob_paths.len() == 1 {
914 let path = &self.lob_paths[0];
915 if path.is_dir() {
916 return Ok(LobSource::FlightRecorder(path.clone()));
917 }
918 }
919 if self.lob_paths.iter().all(|path| path.is_file()) {
920 return Ok(LobSource::Json(self.lob_paths.clone()));
921 }
922 bail!(
923 "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
924 )
925 }
926
927 fn build_flight_recorder_stream(
928 &self,
929 root: &Path,
930 symbols: &[Symbol],
931 ) -> Result<MarketEventStream> {
932 let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
933 .into_stream()
934 .map(|event| event.map(MarketEvent::from));
935 Ok(Box::pin(stream))
936 }
937}
938
939impl BacktestBatchArgs {
940 async fn run(&self, config: &AppConfig) -> Result<()> {
941 if self.config_paths.is_empty() {
942 return Err(anyhow!("provide at least one --config path"));
943 }
944 if self.data_paths.is_empty() {
945 return Err(anyhow!("provide at least one --data path for batch mode"));
946 }
947 let markets_path = self
948 .markets_file
949 .clone()
950 .or_else(|| config.backtest.markets_file.clone())
951 .ok_or_else(|| {
952 anyhow!("batch mode requires --markets-file or backtest.markets_file")
953 })?;
954 let market_registry = Arc::new(
955 MarketRegistry::load_from_file(&markets_path).with_context(|| {
956 format!("failed to load markets from {}", markets_path.display())
957 })?,
958 );
959 let data_format = detect_data_format(&self.data_paths)?;
960 let mut aggregated = Vec::new();
961 for config_path in &self.config_paths {
962 let contents = std::fs::read_to_string(config_path).with_context(|| {
963 format!("failed to read strategy config {}", config_path.display())
964 })?;
965 let def: StrategyConfigFile =
966 toml::from_str(&contents).context("failed to parse strategy config file")?;
967 let strategy = load_strategy(&def.name, def.params)
968 .with_context(|| format!("failed to configure strategy {}", def.name))?;
969 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
970 let order_quantity = self.quantity;
971 let symbols = strategy.subscriptions();
972 if symbols.is_empty() {
973 bail!("strategy {} did not declare subscriptions", strategy.name());
974 }
975 let stream = match data_format {
976 DataFormat::Csv => {
977 let mut candles = load_candles_from_paths(&self.data_paths)?;
978 if candles.is_empty() {
979 bail!("no candles loaded from --data paths");
980 }
981 candles.sort_by_key(|c| c.timestamp);
982 memory_market_stream(&symbols[0], candles)
983 }
984 DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
985 };
986 let execution_client: Arc<dyn ExecutionClient> =
987 Arc::new(PaperExecutionClient::default());
988 let execution =
989 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
990 let mut cfg = BacktestConfig::new(symbols[0].clone());
991 cfg.order_quantity = order_quantity;
992 cfg.initial_balances = clone_initial_balances(&config.backtest);
993 cfg.reporting_currency = config.backtest.reporting_currency.clone();
994 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
995 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
996 cfg.execution.latency_candles = self.latency_candles.max(1);
997
998 let report = Backtester::new(
999 cfg,
1000 strategy,
1001 execution,
1002 None,
1003 market_registry.clone(),
1004 Some(stream),
1005 None,
1006 )
1007 .run()
1008 .await
1009 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1010 aggregated.push(BatchRow {
1011 config: config_path.display().to_string(),
1012 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
1016 });
1017 }
1018
1019 if let Some(output) = &self.output {
1020 write_batch_report(output, &aggregated)?;
1021 println!("Batch report written to {}", output.display());
1022 }
1023 if aggregated.is_empty() {
1024 return Err(anyhow!("no batch jobs executed"));
1025 }
1026 Ok(())
1027 }
1028}
1029
1030impl LiveRunArgs {
1031 async fn run(&self, config: &AppConfig) -> Result<()> {
1032 let exchange_cfg = config
1033 .exchange
1034 .get(&self.exchange)
1035 .cloned()
1036 .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
1037
1038 let driver = exchange_cfg.driver.clone();
1039
1040 let contents = fs::read_to_string(&self.strategy_config)
1041 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1042 let def: StrategyConfigFile =
1043 toml::from_str(&contents).context("failed to parse strategy config file")?;
1044 let strategy = load_strategy(&def.name, def.params)
1045 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1046 let symbols = strategy.subscriptions();
1047 if symbols.is_empty() {
1048 bail!("strategy did not declare any subscriptions");
1049 }
1050 if self.quantity <= Decimal::ZERO {
1051 bail!("--quantity must be greater than zero");
1052 }
1053 let quantity = self.quantity;
1054 let initial_balances = self.resolved_initial_balances(config);
1055 let reporting_currency = config.backtest.reporting_currency.clone();
1056 let markets_file = self
1057 .markets_file
1058 .clone()
1059 .or_else(|| config.backtest.markets_file.clone());
1060
1061 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1062 let category =
1063 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1064 let metrics_addr = self.resolved_metrics_addr(config)?;
1065 let control_addr = self.resolved_control_addr(config)?;
1066 let persistence_cfg = config.live.persistence_config();
1067 let persistence_engine = self
1068 .persistence
1069 .map(PersistenceEngine::from)
1070 .unwrap_or(persistence_cfg.engine);
1071 let state_path = self
1072 .state_path
1073 .clone()
1074 .unwrap_or_else(|| persistence_cfg.path.clone());
1075 let persistence = PersistenceSettings::new(persistence_engine, state_path);
1076 let alerting = self.build_alerting(config);
1077 let history = self.history.max(32);
1078 let reconciliation_interval = self.reconciliation_interval(config);
1079 let reconciliation_threshold = self.reconciliation_threshold(config);
1080 let orderbook_depth = self
1081 .orderbook_depth
1082 .unwrap_or(super::live::default_order_book_depth());
1083
1084 let settings = LiveSessionSettings {
1085 category,
1086 interval,
1087 quantity,
1088 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1089 fee_bps: self.fee_bps.max(Decimal::ZERO),
1090 history,
1091 metrics_addr,
1092 persistence,
1093 initial_balances,
1094 reporting_currency,
1095 markets_file,
1096 alerting,
1097 exec_backend: self.exec,
1098 risk: self.build_risk_config(config),
1099 reconciliation_interval,
1100 reconciliation_threshold,
1101 driver,
1102 orderbook_depth,
1103 record_path: Some(self.record_data.clone()),
1104 control_addr,
1105 };
1106
1107 info!(
1108 strategy = %def.name,
1109 symbols = ?symbols,
1110 exchange = %self.exchange,
1111 interval = %self.interval,
1112 driver = ?settings.driver,
1113 exec = ?self.exec,
1114 persistence_engine = ?settings.persistence.engine,
1115 state_path = %settings.persistence.state_path.display(),
1116 control_addr = %settings.control_addr,
1117 "starting live session"
1118 );
1119
1120 run_live(strategy, symbols, exchange_cfg, settings)
1121 .await
1122 .context("live session failed")
1123 }
1124}
1125
1126fn list_strategies() {
1127 println!("Built-in strategies:");
1128 for name in builtin_strategy_names() {
1129 println!("- {name}");
1130 }
1131}
1132
1133fn print_validation_summary(outcome: &ValidationOutcome) {
1134 const MAX_EXAMPLES: usize = 5;
1135 let summary = &outcome.summary;
1136 println!(
1137 "Validation summary for {} ({} candles)",
1138 summary.symbol, summary.rows
1139 );
1140 println!(
1141 " Range: {} -> {}",
1142 summary.start.to_rfc3339(),
1143 summary.end.to_rfc3339()
1144 );
1145 println!(" Interval: {}", interval_label(summary.interval));
1146 println!(" Missing intervals: {}", summary.missing_candles);
1147 println!(" Duplicate intervals: {}", summary.duplicate_candles);
1148 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
1149 println!(" Price spikes flagged: {}", summary.price_spike_count);
1150 println!(
1151 " Cross-source mismatches: {}",
1152 summary.cross_mismatch_count
1153 );
1154 println!(" Repaired candles generated: {}", summary.repaired_candles);
1155
1156 if !outcome.gaps.is_empty() {
1157 println!(" Gap examples:");
1158 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1159 println!(
1160 " {} -> {} (missing {})",
1161 gap.start.to_rfc3339(),
1162 gap.end.to_rfc3339(),
1163 gap.missing
1164 );
1165 }
1166 if outcome.gaps.len() > MAX_EXAMPLES {
1167 println!(
1168 " ... {} additional gap(s) omitted",
1169 outcome.gaps.len() - MAX_EXAMPLES
1170 );
1171 }
1172 }
1173
1174 if !outcome.price_spikes.is_empty() {
1175 println!(" Price spike examples:");
1176 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1177 println!(
1178 " {} (change {:.2}%)",
1179 spike.timestamp.to_rfc3339(),
1180 spike.change_fraction * 100.0
1181 );
1182 }
1183 if outcome.price_spikes.len() > MAX_EXAMPLES {
1184 println!(
1185 " ... {} additional spike(s) omitted",
1186 outcome.price_spikes.len() - MAX_EXAMPLES
1187 );
1188 }
1189 }
1190
1191 if !outcome.cross_mismatches.is_empty() {
1192 println!(" Cross-source mismatch examples:");
1193 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1194 println!(
1195 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
1196 miss.timestamp.to_rfc3339(),
1197 miss.primary_close,
1198 miss.reference_close,
1199 miss.delta_fraction * 100.0
1200 );
1201 }
1202 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1203 println!(
1204 " ... {} additional mismatch(es) omitted",
1205 outcome.cross_mismatches.len() - MAX_EXAMPLES
1206 );
1207 }
1208 }
1209}
1210
1211fn print_report(report: &PerformanceReport) {
1212 println!("\n{}", report);
1213}
1214
1215fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1216 let mut candles = Vec::with_capacity(len);
1217 for i in 0..len {
1218 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1219 let open = base + (i as f64 % 3.0) * 10.0;
1220 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1221 let open_dec =
1222 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1223 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1224 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1225 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1226 candles.push(Candle {
1227 symbol: Symbol::from(symbol),
1228 interval: Interval::OneMinute,
1229 open: open_dec,
1230 high,
1231 low,
1232 close: close_dec,
1233 volume: Decimal::ONE,
1234 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1235 + Duration::minutes(offset_minutes),
1236 });
1237 }
1238 candles
1239}
1240
1241fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1242 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1243 return Ok(dt.with_timezone(&Utc));
1244 }
1245 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1246 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1247 }
1248 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1249 let dt = date
1250 .and_hms_opt(0, 0, 0)
1251 .ok_or_else(|| anyhow!("invalid date"))?;
1252 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1253 }
1254 Err(anyhow!("unable to parse datetime '{value}'"))
1255}
1256
1257#[derive(Deserialize)]
1258struct CandleCsvRow {
1259 symbol: Option<String>,
1260 timestamp: String,
1261 open: f64,
1262 high: f64,
1263 low: f64,
1264 close: f64,
1265 volume: f64,
1266}
1267
1268fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1269 let mut candles = Vec::new();
1270 for path in paths {
1271 let mut reader = csv::Reader::from_path(path)
1272 .with_context(|| format!("failed to open {}", path.display()))?;
1273 for record in reader.deserialize::<CandleCsvRow>() {
1274 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1275 let timestamp = parse_datetime(&row.timestamp)?;
1276 let symbol = row
1277 .symbol
1278 .clone()
1279 .or_else(|| infer_symbol_from_path(path))
1280 .ok_or_else(|| {
1281 anyhow!(
1282 "missing symbol column and unable to infer from path {}",
1283 path.display()
1284 )
1285 })?;
1286 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1287 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1288 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1289 })?;
1290 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1291 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1292 })?;
1293 let low = Decimal::from_f64(row.low)
1294 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1295 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1296 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1297 })?;
1298 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1299 anyhow!(
1300 "invalid volume value '{}' in {}",
1301 row.volume,
1302 path.display()
1303 )
1304 })?;
1305 candles.push(Candle {
1306 symbol,
1307 interval,
1308 open,
1309 high,
1310 low,
1311 close,
1312 volume,
1313 timestamp,
1314 });
1315 }
1316 }
1317 Ok(candles)
1318}
1319
1320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1321enum DataFormat {
1322 Csv,
1323 Parquet,
1324}
1325
1326fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1327 let mut detected: Option<DataFormat> = None;
1328 for path in paths {
1329 let ext = path
1330 .extension()
1331 .and_then(|ext| ext.to_str())
1332 .map(|ext| ext.to_ascii_lowercase())
1333 .unwrap_or_else(|| String::from(""));
1334 let current = if ext == "parquet" {
1335 DataFormat::Parquet
1336 } else {
1337 DataFormat::Csv
1338 };
1339 if let Some(existing) = detected {
1340 if existing != current {
1341 bail!("cannot mix CSV and Parquet inputs in --data");
1342 }
1343 } else {
1344 detected = Some(current);
1345 }
1346 }
1347 detected.ok_or_else(|| anyhow!("no data paths provided"))
1348}
1349
1350fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1351 Box::new(PaperMarketStream::from_data(
1352 symbol.to_string(),
1353 Vec::new(),
1354 candles,
1355 ))
1356}
1357
1358fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1359 Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1360}
1361
1362#[derive(Deserialize)]
1363#[serde(tag = "event", rename_all = "lowercase")]
1364enum LobEventRow {
1365 Snapshot {
1366 timestamp: String,
1367 symbol: Option<String>,
1368 bids: Vec<[f64; 2]>,
1369 asks: Vec<[f64; 2]>,
1370 },
1371 Depth {
1372 timestamp: String,
1373 symbol: Option<String>,
1374 bids: Vec<[f64; 2]>,
1375 asks: Vec<[f64; 2]>,
1376 },
1377 Trade {
1378 timestamp: String,
1379 symbol: Option<String>,
1380 side: String,
1381 price: f64,
1382 size: f64,
1383 },
1384}
1385
1386fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1387 let mut events = Vec::new();
1388 for path in paths {
1389 let file = File::open(path)
1390 .with_context(|| format!("failed to open order book file {}", path.display()))?;
1391 let symbol_hint = infer_symbol_from_path(path);
1392 for line in BufReader::new(file).lines() {
1393 let line =
1394 line.with_context(|| format!("failed to read line from {}", path.display()))?;
1395 if line.trim().is_empty() {
1396 continue;
1397 }
1398 let row: LobEventRow = serde_json::from_str(&line)
1399 .with_context(|| format!("invalid order book event in {}", path.display()))?;
1400 match row {
1401 LobEventRow::Snapshot {
1402 timestamp,
1403 symbol,
1404 bids,
1405 asks,
1406 } => {
1407 let ts = parse_datetime(×tamp)?;
1408 let symbol = symbol
1409 .or_else(|| symbol_hint.clone())
1410 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1411 let bids = convert_levels(&bids)?;
1412 let asks = convert_levels(&asks)?;
1413 let book = OrderBook {
1414 symbol: symbol.clone(),
1415 bids,
1416 asks,
1417 timestamp: ts,
1418 exchange_checksum: None,
1419 local_checksum: None,
1420 };
1421 events.push(MarketEvent {
1422 timestamp: ts,
1423 kind: MarketEventKind::OrderBook(book),
1424 });
1425 }
1426 LobEventRow::Depth {
1427 timestamp,
1428 symbol,
1429 bids,
1430 asks,
1431 } => {
1432 let ts = parse_datetime(×tamp)?;
1433 let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1434 anyhow!("missing symbol in depth update {}", path.display())
1435 })?;
1436 let bids = convert_levels(&bids)?;
1437 let asks = convert_levels(&asks)?;
1438 let update = DepthUpdate {
1439 symbol: symbol.clone(),
1440 bids,
1441 asks,
1442 timestamp: ts,
1443 };
1444 events.push(MarketEvent {
1445 timestamp: ts,
1446 kind: MarketEventKind::Depth(update),
1447 });
1448 }
1449 LobEventRow::Trade {
1450 timestamp,
1451 symbol,
1452 side,
1453 price,
1454 size,
1455 } => {
1456 let ts = parse_datetime(×tamp)?;
1457 let symbol = symbol
1458 .or_else(|| symbol_hint.clone())
1459 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1460 let side = match side.to_lowercase().as_str() {
1461 "buy" | "bid" | "b" => Side::Buy,
1462 "sell" | "ask" | "s" => Side::Sell,
1463 other => bail!("unsupported trade side '{other}' in {}", path.display()),
1464 };
1465 let price = Decimal::from_f64(price).ok_or_else(|| {
1466 anyhow!("invalid trade price '{}' in {}", price, path.display())
1467 })?;
1468 let size = Decimal::from_f64(size).ok_or_else(|| {
1469 anyhow!("invalid trade size '{}' in {}", size, path.display())
1470 })?;
1471 let tick = Tick {
1472 symbol: symbol.clone(),
1473 price,
1474 size,
1475 side,
1476 exchange_timestamp: ts,
1477 received_at: ts,
1478 };
1479 events.push(MarketEvent {
1480 timestamp: ts,
1481 kind: MarketEventKind::Trade(tick),
1482 });
1483 }
1484 }
1485 }
1486 }
1487 events.sort_by_key(|event| event.timestamp);
1488 Ok(events)
1489}
1490
1491fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1492 levels
1493 .iter()
1494 .map(|pair| {
1495 let price = Decimal::from_f64(pair[0])
1496 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1497 let size = Decimal::from_f64(pair[1])
1498 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1499 Ok(OrderBookLevel { price, size })
1500 })
1501 .collect()
1502}
1503
1504fn infer_symbol_from_path(path: &Path) -> Option<String> {
1505 path.parent()
1506 .and_then(|p| p.file_name())
1507 .map(|os| os.to_string_lossy().to_string())
1508}
1509
1510fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1511 path.file_stem()
1512 .and_then(|os| os.to_str())
1513 .and_then(|stem| stem.split('_').next())
1514 .and_then(|token| Interval::from_str(token).ok())
1515}
1516
1517fn default_output_path(
1518 config: &AppConfig,
1519 exchange: &str,
1520 symbol: &str,
1521 interval: Interval,
1522 start: DateTime<Utc>,
1523 end: DateTime<Utc>,
1524) -> PathBuf {
1525 let interval_part = interval_label(interval);
1526 let start_part = start.format("%Y%m%d").to_string();
1527 let end_part = end.format("%Y%m%d").to_string();
1528 config
1529 .data_path
1530 .join(exchange)
1531 .join(symbol)
1532 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1533}
1534
1535fn interval_label(interval: Interval) -> &'static str {
1536 match interval {
1537 Interval::OneSecond => "1s",
1538 Interval::OneMinute => "1m",
1539 Interval::FiveMinutes => "5m",
1540 Interval::FifteenMinutes => "15m",
1541 Interval::OneHour => "1h",
1542 Interval::FourHours => "4h",
1543 Interval::OneDay => "1d",
1544 }
1545}
1546
1547#[derive(Serialize)]
1548struct CandleRow<'a> {
1549 symbol: &'a str,
1550 timestamp: String,
1551 open: f64,
1552 high: f64,
1553 low: f64,
1554 close: f64,
1555 volume: f64,
1556}
1557
1558fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1559 if let Some(parent) = path.parent() {
1560 fs::create_dir_all(parent)
1561 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1562 }
1563 let mut writer =
1564 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1565 for candle in candles {
1566 let row = CandleRow {
1567 symbol: &candle.symbol,
1568 timestamp: candle.timestamp.to_rfc3339(),
1569 open: candle.open.to_f64().unwrap_or(0.0),
1570 high: candle.high.to_f64().unwrap_or(0.0),
1571 low: candle.low.to_f64().unwrap_or(0.0),
1572 close: candle.close.to_f64().unwrap_or(0.0),
1573 volume: candle.volume.to_f64().unwrap_or(0.0),
1574 };
1575 writer.serialize(row)?;
1576 }
1577 writer.flush()?;
1578 Ok(())
1579}
1580
1581#[derive(Serialize)]
1582struct BatchRow {
1583 config: String,
1584 signals: usize,
1585 orders: usize,
1586 dropped_orders: usize,
1587 ending_equity: f64,
1588}
1589
1590fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1591 if let Some(parent) = path.parent() {
1592 fs::create_dir_all(parent)
1593 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1594 }
1595 let mut writer =
1596 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1597 for row in rows {
1598 writer.serialize(row)?;
1599 }
1600 writer.flush()?;
1601 Ok(())
1602}
1603
1604fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1605 config
1606 .initial_balances
1607 .iter()
1608 .map(|(currency, amount)| (currency.clone(), *amount))
1609 .collect()
1610}
1611
1612fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1613 config
1614 .initial_balances
1615 .get(&config.reporting_currency)
1616 .copied()
1617 .unwrap_or_default()
1618}
1619
1620fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1621 let parts: Vec<_> = value.split(':').collect();
1622 match parts.as_slice() {
1623 ["fixed", val] => {
1624 let quantity =
1625 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1626 Ok(Box::new(FixedOrderSizer { quantity }))
1627 }
1628 ["fixed"] => {
1629 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1630 Ok(Box::new(FixedOrderSizer { quantity }))
1631 }
1632 ["percent", val] => {
1633 let percent =
1634 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1635 Ok(Box::new(PortfolioPercentSizer {
1636 percent: percent.max(Decimal::ZERO),
1637 }))
1638 }
1639 ["risk-adjusted", val] => {
1640 let risk_fraction = Decimal::from_str(val)
1641 .context("invalid risk fraction value (use decimals)")?;
1642 Ok(Box::new(RiskAdjustedSizer {
1643 risk_fraction: risk_fraction.max(Decimal::ZERO),
1644 }))
1645 }
1646 _ => Err(anyhow!(
1647 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1648 )),
1649 }
1650}