1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
5use crate::state;
6use crate::telemetry::init_tracing;
7use crate::PublicChannel;
8use arrow::util::pretty::print_batches;
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10use std::collections::HashMap;
11use std::fs::{self, File};
12use std::io::{BufRead, BufReader};
13use std::net::SocketAddr;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::Duration as StdDuration;
18
19use anyhow::{anyhow, bail, Context, Result};
20use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
21use clap::{Args, Parser, Subcommand, ValueEnum};
22use csv::Writer;
23use rust_decimal::{
24 prelude::{FromPrimitive, ToPrimitive},
25 Decimal,
26};
27use serde::{Deserialize, Serialize};
28use tesser_backtester::reporting::PerformanceReport;
29use tesser_backtester::{
30 BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent, MarketEventKind,
31};
32use tesser_broker::ExecutionClient;
33use tesser_config::{load_config, AppConfig, RiskManagementConfig};
34use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
35use tesser_data::analytics::ExecutionAnalysisRequest;
36use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
37use tesser_data::parquet::ParquetMarketStream;
38use tesser_execution::{
39 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
40 RiskAdjustedSizer,
41};
42use tesser_markets::MarketRegistry;
43use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
44use tesser_strategy::{builtin_strategy_names, load_strategy};
45use tracing::{info, warn};
46
47#[derive(Parser)]
48#[command(author, version, about = "Tesser Trading Framework")]
49pub struct Cli {
50 #[arg(short, long, action = clap::ArgAction::Count)]
52 verbose: u8,
53 #[arg(long, default_value = "default")]
55 env: String,
56 #[command(subcommand)]
57 command: Commands,
58}
59
60#[allow(clippy::large_enum_variant)]
61#[derive(Subcommand)]
62pub enum Commands {
63 Data {
65 #[command(subcommand)]
66 action: DataCommand,
67 },
68 Backtest {
70 #[command(subcommand)]
71 action: BacktestCommand,
72 },
73 Live {
75 #[command(subcommand)]
76 action: LiveCommand,
77 },
78 State {
80 #[command(subcommand)]
81 action: StateCommand,
82 },
83 Strategies,
85 Analyze {
87 #[command(subcommand)]
88 action: AnalyzeCommand,
89 },
90}
91
92#[derive(Subcommand)]
93pub enum DataCommand {
94 Download(DataDownloadArgs),
96 Validate(DataValidateArgs),
98 Resample(DataResampleArgs),
100 InspectParquet(DataInspectParquetArgs),
102}
103
104#[derive(Subcommand)]
105pub enum BacktestCommand {
106 Run(BacktestRunArgs),
108 Batch(BacktestBatchArgs),
110}
111
112#[derive(Subcommand)]
113pub enum LiveCommand {
114 Run(LiveRunArgs),
116}
117
118#[derive(Subcommand)]
119pub enum StateCommand {
120 Inspect(StateInspectArgs),
122}
123
124#[derive(Subcommand)]
125pub enum AnalyzeCommand {
126 Execution(AnalyzeExecutionArgs),
128}
129
130#[derive(Args)]
131pub struct DataDownloadArgs {
132 #[arg(long, default_value = "bybit")]
133 exchange: String,
134 #[arg(long)]
135 symbol: String,
136 #[arg(long, default_value = "linear")]
137 category: String,
138 #[arg(long, default_value = "1m")]
139 interval: String,
140 #[arg(long)]
141 start: String,
142 #[arg(long)]
143 end: Option<String>,
144 #[arg(long)]
145 output: Option<PathBuf>,
146 #[arg(long)]
148 skip_validation: bool,
149 #[arg(long)]
151 repair_missing: bool,
152 #[arg(long, default_value_t = 0.05)]
154 validation_jump_threshold: f64,
155 #[arg(long, default_value_t = 0.002)]
157 validation_reference_tolerance: f64,
158}
159
160#[derive(Args)]
161pub struct StateInspectArgs {
162 #[arg(long)]
164 path: Option<PathBuf>,
165 #[arg(long)]
167 raw: bool,
168}
169
170#[derive(Args)]
171pub struct AnalyzeExecutionArgs {
172 #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
174 data_dir: PathBuf,
175 #[arg(long)]
177 start: Option<String>,
178 #[arg(long)]
180 end: Option<String>,
181}
182
183impl StateInspectArgs {
184 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
185 self.path
186 .clone()
187 .unwrap_or_else(|| config.live.state_path.clone())
188 }
189}
190
191impl AnalyzeExecutionArgs {
192 fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
193 let start = match &self.start {
194 Some(value) => Some(parse_datetime(value)?),
195 None => None,
196 };
197 let end = match &self.end {
198 Some(value) => Some(parse_datetime(value)?),
199 None => None,
200 };
201 Ok(ExecutionAnalysisRequest {
202 data_dir: self.data_dir.clone(),
203 start,
204 end,
205 })
206 }
207}
208
209impl DataDownloadArgs {
210 async fn run(&self, config: &AppConfig) -> Result<()> {
211 let exchange_cfg = config
212 .exchange
213 .get(&self.exchange)
214 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
215 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
216 let start = parse_datetime(&self.start)?;
217 let end = match &self.end {
218 Some(value) => parse_datetime(value)?,
219 None => Utc::now(),
220 };
221 if start >= end {
222 return Err(anyhow!("start time must be earlier than end time"));
223 }
224
225 info!(
226 "Downloading {} candles for {} ({})",
227 self.interval, self.symbol, self.exchange
228 );
229 let mut candles = match exchange_cfg.driver.as_str() {
230 "bybit" | "" => {
231 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
232 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
233 downloader
234 .download_klines(&request)
235 .await
236 .with_context(|| "failed to download candles from Bybit")?
237 }
238 "binance" => {
239 let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
240 let request = KlineRequest::new("", &self.symbol, interval, start, end);
241 downloader
242 .download_klines(&request)
243 .await
244 .with_context(|| "failed to download candles from Binance")?
245 }
246 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
247 };
248
249 if candles.is_empty() {
250 info!("No candles returned for {}", self.symbol);
251 return Ok(());
252 }
253
254 if !self.skip_validation {
255 let config = ValidationConfig {
256 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
257 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
258 repair_missing: self.repair_missing,
259 };
260 let outcome =
261 validate_dataset(candles.clone(), None, config).context("validation failed")?;
262 print_validation_summary(&outcome);
263 if self.repair_missing && outcome.summary.repaired_candles > 0 {
264 candles = outcome.repaired;
265 info!(
266 "Applied {} synthetic candle(s) to repair gaps",
267 outcome.summary.repaired_candles
268 );
269 }
270 }
271
272 let output_path = self.output.clone().unwrap_or_else(|| {
273 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
274 });
275 write_candles_csv(&output_path, &candles)?;
276 info!(
277 "Saved {} candles to {}",
278 candles.len(),
279 output_path.display()
280 );
281 Ok(())
282 }
283}
284
285#[derive(Args)]
286pub struct DataValidateArgs {
287 #[arg(
289 long = "path",
290 value_name = "PATH",
291 num_args = 1..,
292 action = clap::ArgAction::Append
293 )]
294 paths: Vec<PathBuf>,
295 #[arg(
297 long = "reference",
298 value_name = "PATH",
299 num_args = 1..,
300 action = clap::ArgAction::Append
301 )]
302 reference_paths: Vec<PathBuf>,
303 #[arg(long, default_value_t = 0.05)]
305 jump_threshold: f64,
306 #[arg(long, default_value_t = 0.002)]
308 reference_tolerance: f64,
309 #[arg(long)]
311 repair_missing: bool,
312 #[arg(long)]
314 output: Option<PathBuf>,
315}
316
317impl DataValidateArgs {
318 fn run(&self) -> Result<()> {
319 if self.paths.is_empty() {
320 bail!("provide at least one --path for validation");
321 }
322 let candles =
323 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
324 if candles.is_empty() {
325 bail!("loaded dataset is empty; nothing to validate");
326 }
327 let reference = if self.reference_paths.is_empty() {
328 None
329 } else {
330 Some(
331 load_candles_from_paths(&self.reference_paths)
332 .with_context(|| "failed to load reference dataset")?,
333 )
334 };
335
336 let price_jump_threshold = if self.jump_threshold <= 0.0 {
337 0.0001
338 } else {
339 self.jump_threshold
340 };
341 let reference_tolerance = if self.reference_tolerance <= 0.0 {
342 0.0001
343 } else {
344 self.reference_tolerance
345 };
346
347 let config = ValidationConfig {
348 price_jump_threshold,
349 reference_tolerance,
350 repair_missing: self.repair_missing,
351 };
352
353 let outcome = validate_dataset(candles, reference, config)?;
354 print_validation_summary(&outcome);
355
356 if let Some(output) = &self.output {
357 write_candles_csv(output, &outcome.repaired)?;
358 info!(
359 "Wrote {} candles ({} new) to {}",
360 outcome.repaired.len(),
361 outcome.summary.repaired_candles,
362 output.display()
363 );
364 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
365 warn!(
366 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
367 outcome.summary.repaired_candles
368 );
369 }
370
371 Ok(())
372 }
373}
374
375#[derive(Args)]
376pub struct DataResampleArgs {
377 #[arg(long)]
378 input: PathBuf,
379 #[arg(long)]
380 output: PathBuf,
381 #[arg(long, default_value = "1h")]
382 interval: String,
383}
384
385#[derive(Args)]
386pub struct DataInspectParquetArgs {
387 #[arg(value_name = "PATH")]
389 path: PathBuf,
390 #[arg(long, default_value_t = 10)]
392 rows: usize,
393}
394
395impl DataInspectParquetArgs {
396 fn run(&self) -> Result<()> {
397 let limit = if self.rows == 0 {
398 usize::MAX
399 } else {
400 self.rows
401 };
402 let file = File::open(&self.path)
403 .with_context(|| format!("failed to open {}", self.path.display()))?;
404 let batch_size = limit.clamp(1, 8192);
405 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406 .with_batch_size(batch_size)
407 .build()?;
408
409 let mut printed = 0usize;
410 while printed < limit {
411 match reader.next() {
412 Some(Ok(batch)) => {
413 if batch.num_rows() == 0 {
414 continue;
415 }
416 let remaining = limit.saturating_sub(printed);
417 let take = remaining.min(batch.num_rows());
418 let display_batch = if take == batch.num_rows() {
419 batch
420 } else {
421 batch.slice(0, take)
422 };
423 print_batches(std::slice::from_ref(&display_batch))?;
424 printed += take;
425 }
426 Some(Err(err)) => return Err(err.into()),
427 None => break,
428 }
429 }
430
431 if printed == 0 {
432 println!("no rows available in {}", self.path.display());
433 } else if limit != usize::MAX {
434 println!("displayed {printed} row(s) from {}", self.path.display());
435 }
436
437 Ok(())
438 }
439}
440
441#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
442pub enum BacktestModeArg {
443 Candle,
444 Tick,
445}
446
447#[derive(Args)]
448pub struct BacktestRunArgs {
449 #[arg(long)]
450 strategy_config: PathBuf,
451 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
453 data_paths: Vec<PathBuf>,
454 #[arg(long, default_value_t = 500)]
455 candles: usize,
456 #[arg(long, default_value = "0.01")]
457 quantity: Decimal,
458 #[arg(long, default_value = "0")]
460 slippage_bps: Decimal,
461 #[arg(long, default_value = "0")]
463 fee_bps: Decimal,
464 #[arg(long, default_value_t = 1)]
466 latency_candles: usize,
467 #[arg(long, default_value = "fixed:0.01")]
469 sizer: String,
470 #[arg(long, value_enum, default_value = "candle")]
472 mode: BacktestModeArg,
473 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
475 lob_paths: Vec<PathBuf>,
476 #[arg(long)]
477 markets_file: Option<PathBuf>,
478}
479
480#[derive(Args)]
481pub struct BacktestBatchArgs {
482 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
484 config_paths: Vec<PathBuf>,
485 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
487 data_paths: Vec<PathBuf>,
488 #[arg(long, default_value = "0.01")]
489 quantity: Decimal,
490 #[arg(long)]
492 output: Option<PathBuf>,
493 #[arg(long, default_value = "0")]
495 slippage_bps: Decimal,
496 #[arg(long, default_value = "0")]
498 fee_bps: Decimal,
499 #[arg(long, default_value_t = 1)]
501 latency_candles: usize,
502 #[arg(long, default_value = "fixed:0.01")]
504 sizer: String,
505 #[arg(long)]
506 markets_file: Option<PathBuf>,
507}
508
509#[derive(Args)]
510pub struct LiveRunArgs {
511 #[arg(long)]
512 strategy_config: PathBuf,
513 #[arg(long, default_value = "paper_sandbox")]
514 exchange: String,
515 #[arg(long, default_value = "linear")]
516 category: String,
517 #[arg(long, default_value = "1m")]
518 interval: String,
519 #[arg(long, default_value = "1")]
520 quantity: Decimal,
521 #[arg(
523 long = "exec",
524 default_value = "paper",
525 value_enum,
526 alias = "live-exec"
527 )]
528 exec: ExecutionBackend,
529 #[arg(long)]
530 state_path: Option<PathBuf>,
531 #[arg(long)]
532 metrics_addr: Option<String>,
533 #[arg(long)]
534 log_path: Option<PathBuf>,
535 #[arg(
537 long = "record-data",
538 value_name = "PATH",
539 default_value = "data/flight_recorder"
540 )]
541 record_data: PathBuf,
542 #[arg(long)]
544 control_addr: Option<String>,
545 #[arg(long)]
546 initial_equity: Option<Decimal>,
547 #[arg(long)]
548 markets_file: Option<PathBuf>,
549 #[arg(long, default_value = "0")]
550 slippage_bps: Decimal,
551 #[arg(long, default_value = "0")]
552 fee_bps: Decimal,
553 #[arg(long, default_value_t = 0)]
554 latency_ms: u64,
555 #[arg(long, default_value_t = 512)]
556 history: usize,
557 #[arg(long)]
558 reconciliation_interval_secs: Option<u64>,
559 #[arg(long)]
560 reconciliation_threshold: Option<Decimal>,
561 #[arg(long)]
562 webhook_url: Option<String>,
563 #[arg(long)]
564 alert_max_data_gap_secs: Option<u64>,
565 #[arg(long)]
566 alert_max_order_failures: Option<u32>,
567 #[arg(long)]
568 alert_max_drawdown: Option<Decimal>,
569 #[arg(long)]
570 risk_max_order_qty: Option<Decimal>,
571 #[arg(long)]
572 risk_max_position_qty: Option<Decimal>,
573 #[arg(long)]
574 risk_max_drawdown: Option<Decimal>,
575 #[arg(long)]
577 orderbook_depth: Option<usize>,
578 #[arg(long, default_value = "fixed:1.0")]
580 sizer: String,
581}
582
583impl LiveRunArgs {
584 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
585 self.log_path
586 .clone()
587 .unwrap_or_else(|| config.live.log_path.clone())
588 }
589
590 fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
591 self.state_path
592 .clone()
593 .unwrap_or_else(|| config.live.state_path.clone())
594 }
595
596 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
597 let addr = self
598 .metrics_addr
599 .clone()
600 .unwrap_or_else(|| config.live.metrics_addr.clone());
601 addr.parse()
602 .with_context(|| format!("invalid metrics address '{addr}'"))
603 }
604
605 fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
606 let addr = self
607 .control_addr
608 .clone()
609 .unwrap_or_else(|| config.live.control_addr.clone());
610 addr.parse()
611 .with_context(|| format!("invalid control address '{addr}'"))
612 }
613
614 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
615 let secs = self
616 .reconciliation_interval_secs
617 .unwrap_or(config.live.reconciliation_interval_secs)
618 .max(1);
619 StdDuration::from_secs(secs)
620 }
621
622 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
623 let configured = self
624 .reconciliation_threshold
625 .unwrap_or(config.live.reconciliation_threshold);
626 if configured <= Decimal::ZERO {
627 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
628 } else {
629 configured
630 }
631 }
632
633 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
634 let mut balances = clone_initial_balances(&config.backtest);
635 if let Some(value) = self.initial_equity {
636 balances.insert(
637 config.backtest.reporting_currency.clone(),
638 value.max(Decimal::ZERO),
639 );
640 }
641 balances
642 }
643
644 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
645 let mut alerting = config.live.alerting.clone();
646 let webhook = self
647 .webhook_url
648 .clone()
649 .or_else(|| alerting.webhook_url.clone());
650 alerting.webhook_url = sanitize_webhook(webhook);
651 if let Some(sec) = self.alert_max_data_gap_secs {
652 alerting.max_data_gap_secs = sec;
653 }
654 if let Some(limit) = self.alert_max_order_failures {
655 alerting.max_order_failures = limit;
656 }
657 if let Some(limit) = self.alert_max_drawdown {
658 alerting.max_drawdown = limit.max(Decimal::ZERO);
659 }
660 alerting
661 }
662
663 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
664 let mut risk = config.risk_management.clone();
665 if let Some(limit) = self.risk_max_order_qty {
666 risk.max_order_quantity = limit.max(Decimal::ZERO);
667 }
668 if let Some(limit) = self.risk_max_position_qty {
669 risk.max_position_quantity = limit.max(Decimal::ZERO);
670 }
671 if let Some(limit) = self.risk_max_drawdown {
672 risk.max_drawdown = limit.max(Decimal::ZERO);
673 }
674 risk
675 }
676}
677
678#[derive(Deserialize)]
679struct StrategyConfigFile {
680 #[serde(rename = "strategy_name")]
681 name: String,
682 #[serde(default = "empty_table")]
683 params: toml::Value,
684}
685
686fn empty_table() -> toml::Value {
687 toml::Value::Table(Default::default())
688}
689
690pub async fn run() -> Result<()> {
691 let cli = Cli::parse();
692 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
693
694 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
695 0 => config.log_level.clone(),
696 1 => "debug".to_string(),
697 _ => "trace".to_string(),
698 });
699
700 let log_override = match &cli.command {
701 Commands::Live {
702 action: LiveCommand::Run(args),
703 } => Some(args.resolved_log_path(&config)),
704 _ => None,
705 };
706
707 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
708
709 match cli.command {
710 Commands::Data { action } => handle_data(action, &config).await?,
711 Commands::Backtest {
712 action: BacktestCommand::Run(args),
713 } => args.run(&config).await?,
714 Commands::Backtest {
715 action: BacktestCommand::Batch(args),
716 } => args.run(&config).await?,
717 Commands::Live {
718 action: LiveCommand::Run(args),
719 } => args.run(&config).await?,
720 Commands::State { action } => handle_state(action, &config).await?,
721 Commands::Analyze { action } => handle_analyze(action)?,
722 Commands::Strategies => list_strategies(),
723 }
724
725 Ok(())
726}
727
728async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
729 match cmd {
730 DataCommand::Download(args) => {
731 args.run(config).await?;
732 }
733 DataCommand::Validate(args) => {
734 args.run()?;
735 }
736 DataCommand::Resample(args) => {
737 info!(
738 "stub: resampling {} into {} at {}",
739 args.input.display(),
740 args.output.display(),
741 args.interval
742 );
743 }
744 DataCommand::InspectParquet(args) => {
745 args.run()?;
746 }
747 }
748 Ok(())
749}
750
751async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
752 match cmd {
753 StateCommand::Inspect(args) => {
754 state::inspect_state(args.resolved_path(config), args.raw).await?;
755 }
756 }
757 Ok(())
758}
759
760fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
761 match cmd {
762 AnalyzeCommand::Execution(args) => analyze::run_execution(args.build_request()?),
763 }
764}
765
766impl BacktestRunArgs {
767 async fn run(&self, config: &AppConfig) -> Result<()> {
768 let contents = std::fs::read_to_string(&self.strategy_config)
769 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
770 let def: StrategyConfigFile =
771 toml::from_str(&contents).context("failed to parse strategy config file")?;
772 let strategy = load_strategy(&def.name, def.params)
773 .with_context(|| format!("failed to configure strategy {}", def.name))?;
774 let symbols = strategy.subscriptions();
775 if symbols.is_empty() {
776 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
777 }
778
779 let mode = match self.mode {
780 BacktestModeArg::Candle => BacktestMode::Candle,
781 BacktestModeArg::Tick => BacktestMode::Tick,
782 };
783
784 let markets_path = self
785 .markets_file
786 .clone()
787 .or_else(|| config.backtest.markets_file.clone())
788 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
789 let market_registry = Arc::new(
790 MarketRegistry::load_from_file(&markets_path).with_context(|| {
791 format!("failed to load markets from {}", markets_path.display())
792 })?,
793 );
794
795 let (market_stream, lob_events, execution_client, matching_engine) = match mode {
796 BacktestMode::Candle => {
797 let stream = self.build_candle_stream(&symbols)?;
798 (
799 Some(stream),
800 Vec::new(),
801 Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
802 None,
803 )
804 }
805 BacktestMode::Tick => {
806 if self.lob_paths.is_empty() {
807 bail!("--lob-data is required when --mode tick");
808 }
809 let events = load_lob_events_from_paths(&self.lob_paths)?;
810 if events.is_empty() {
811 bail!("no order book events loaded from --lob-data");
812 }
813 let engine = Arc::new(MatchingEngine::new(
814 "matching-engine",
815 symbols.clone(),
816 reporting_balance(&config.backtest),
817 ));
818 (
819 None,
820 events,
821 engine.clone() as Arc<dyn ExecutionClient>,
822 Some(engine),
823 )
824 }
825 };
826
827 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
828 let order_quantity = self.quantity;
829 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
830
831 let mut cfg = BacktestConfig::new(symbols[0].clone());
832 cfg.lob_events = lob_events;
833 cfg.order_quantity = order_quantity;
834 cfg.initial_balances = clone_initial_balances(&config.backtest);
835 cfg.reporting_currency = config.backtest.reporting_currency.clone();
836 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
837 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
838 cfg.execution.latency_candles = self.latency_candles.max(1);
839 cfg.mode = mode;
840
841 let report = Backtester::new(
842 cfg,
843 strategy,
844 execution,
845 matching_engine,
846 market_registry,
847 market_stream,
848 )
849 .run()
850 .await
851 .context("backtest failed")?;
852 print_report(&report);
853 Ok(())
854 }
855
856 fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
857 if symbols.is_empty() {
858 bail!("strategy did not declare any subscriptions");
859 }
860 if self.data_paths.is_empty() {
861 let mut generated = Vec::new();
862 for (idx, symbol) in symbols.iter().enumerate() {
863 let offset = idx as i64 * 10;
864 generated.extend(synth_candles(symbol, self.candles, offset));
865 }
866 generated.sort_by_key(|c| c.timestamp);
867 if generated.is_empty() {
868 bail!("no synthetic candles generated; provide --data files instead");
869 }
870 return Ok(memory_market_stream(&symbols[0], generated));
871 }
872
873 match detect_data_format(&self.data_paths)? {
874 DataFormat::Csv => {
875 let mut candles = load_candles_from_paths(&self.data_paths)?;
876 if candles.is_empty() {
877 bail!("no candles loaded from --data paths");
878 }
879 candles.sort_by_key(|c| c.timestamp);
880 Ok(memory_market_stream(&symbols[0], candles))
881 }
882 DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
883 }
884 }
885}
886
887impl BacktestBatchArgs {
888 async fn run(&self, config: &AppConfig) -> Result<()> {
889 if self.config_paths.is_empty() {
890 return Err(anyhow!("provide at least one --config path"));
891 }
892 if self.data_paths.is_empty() {
893 return Err(anyhow!("provide at least one --data path for batch mode"));
894 }
895 let markets_path = self
896 .markets_file
897 .clone()
898 .or_else(|| config.backtest.markets_file.clone())
899 .ok_or_else(|| {
900 anyhow!("batch mode requires --markets-file or backtest.markets_file")
901 })?;
902 let market_registry = Arc::new(
903 MarketRegistry::load_from_file(&markets_path).with_context(|| {
904 format!("failed to load markets from {}", markets_path.display())
905 })?,
906 );
907 let data_format = detect_data_format(&self.data_paths)?;
908 let mut aggregated = Vec::new();
909 for config_path in &self.config_paths {
910 let contents = std::fs::read_to_string(config_path).with_context(|| {
911 format!("failed to read strategy config {}", config_path.display())
912 })?;
913 let def: StrategyConfigFile =
914 toml::from_str(&contents).context("failed to parse strategy config file")?;
915 let strategy = load_strategy(&def.name, def.params)
916 .with_context(|| format!("failed to configure strategy {}", def.name))?;
917 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
918 let order_quantity = self.quantity;
919 let symbols = strategy.subscriptions();
920 if symbols.is_empty() {
921 bail!("strategy {} did not declare subscriptions", strategy.name());
922 }
923 let stream = match data_format {
924 DataFormat::Csv => {
925 let mut candles = load_candles_from_paths(&self.data_paths)?;
926 if candles.is_empty() {
927 bail!("no candles loaded from --data paths");
928 }
929 candles.sort_by_key(|c| c.timestamp);
930 memory_market_stream(&symbols[0], candles)
931 }
932 DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
933 };
934 let execution_client: Arc<dyn ExecutionClient> =
935 Arc::new(PaperExecutionClient::default());
936 let execution =
937 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
938 let mut cfg = BacktestConfig::new(symbols[0].clone());
939 cfg.order_quantity = order_quantity;
940 cfg.initial_balances = clone_initial_balances(&config.backtest);
941 cfg.reporting_currency = config.backtest.reporting_currency.clone();
942 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
943 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
944 cfg.execution.latency_candles = self.latency_candles.max(1);
945
946 let report = Backtester::new(
947 cfg,
948 strategy,
949 execution,
950 None,
951 market_registry.clone(),
952 Some(stream),
953 )
954 .run()
955 .await
956 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
957 aggregated.push(BatchRow {
958 config: config_path.display().to_string(),
959 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
963 });
964 }
965
966 if let Some(output) = &self.output {
967 write_batch_report(output, &aggregated)?;
968 println!("Batch report written to {}", output.display());
969 }
970 if aggregated.is_empty() {
971 return Err(anyhow!("no batch jobs executed"));
972 }
973 Ok(())
974 }
975}
976
977impl LiveRunArgs {
978 async fn run(&self, config: &AppConfig) -> Result<()> {
979 let exchange_cfg = config
980 .exchange
981 .get(&self.exchange)
982 .cloned()
983 .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
984
985 let driver = exchange_cfg.driver.clone();
986
987 let contents = fs::read_to_string(&self.strategy_config)
988 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
989 let def: StrategyConfigFile =
990 toml::from_str(&contents).context("failed to parse strategy config file")?;
991 let strategy = load_strategy(&def.name, def.params)
992 .with_context(|| format!("failed to configure strategy {}", def.name))?;
993 let symbols = strategy.subscriptions();
994 if symbols.is_empty() {
995 bail!("strategy did not declare any subscriptions");
996 }
997 if self.quantity <= Decimal::ZERO {
998 bail!("--quantity must be greater than zero");
999 }
1000 let quantity = self.quantity;
1001 let initial_balances = self.resolved_initial_balances(config);
1002 let reporting_currency = config.backtest.reporting_currency.clone();
1003 let markets_file = self
1004 .markets_file
1005 .clone()
1006 .or_else(|| config.backtest.markets_file.clone());
1007
1008 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1009 let category =
1010 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1011 let metrics_addr = self.resolved_metrics_addr(config)?;
1012 let control_addr = self.resolved_control_addr(config)?;
1013 let state_path = self.resolved_state_path(config);
1014 let alerting = self.build_alerting(config);
1015 let history = self.history.max(32);
1016 let reconciliation_interval = self.reconciliation_interval(config);
1017 let reconciliation_threshold = self.reconciliation_threshold(config);
1018 let orderbook_depth = self
1019 .orderbook_depth
1020 .unwrap_or(super::live::default_order_book_depth());
1021
1022 let settings = LiveSessionSettings {
1023 category,
1024 interval,
1025 quantity,
1026 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1027 fee_bps: self.fee_bps.max(Decimal::ZERO),
1028 history,
1029 metrics_addr,
1030 state_path,
1031 initial_balances,
1032 reporting_currency,
1033 markets_file,
1034 alerting,
1035 exec_backend: self.exec,
1036 risk: self.build_risk_config(config),
1037 reconciliation_interval,
1038 reconciliation_threshold,
1039 driver,
1040 orderbook_depth,
1041 record_path: Some(self.record_data.clone()),
1042 control_addr,
1043 };
1044
1045 info!(
1046 strategy = %def.name,
1047 symbols = ?symbols,
1048 exchange = %self.exchange,
1049 interval = %self.interval,
1050 driver = ?settings.driver,
1051 exec = ?self.exec,
1052 control_addr = %settings.control_addr,
1053 "starting live session"
1054 );
1055
1056 run_live(strategy, symbols, exchange_cfg, settings)
1057 .await
1058 .context("live session failed")
1059 }
1060}
1061
1062fn list_strategies() {
1063 println!("Built-in strategies:");
1064 for name in builtin_strategy_names() {
1065 println!("- {name}");
1066 }
1067}
1068
1069fn print_validation_summary(outcome: &ValidationOutcome) {
1070 const MAX_EXAMPLES: usize = 5;
1071 let summary = &outcome.summary;
1072 println!(
1073 "Validation summary for {} ({} candles)",
1074 summary.symbol, summary.rows
1075 );
1076 println!(
1077 " Range: {} -> {}",
1078 summary.start.to_rfc3339(),
1079 summary.end.to_rfc3339()
1080 );
1081 println!(" Interval: {}", interval_label(summary.interval));
1082 println!(" Missing intervals: {}", summary.missing_candles);
1083 println!(" Duplicate intervals: {}", summary.duplicate_candles);
1084 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
1085 println!(" Price spikes flagged: {}", summary.price_spike_count);
1086 println!(
1087 " Cross-source mismatches: {}",
1088 summary.cross_mismatch_count
1089 );
1090 println!(" Repaired candles generated: {}", summary.repaired_candles);
1091
1092 if !outcome.gaps.is_empty() {
1093 println!(" Gap examples:");
1094 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1095 println!(
1096 " {} -> {} (missing {})",
1097 gap.start.to_rfc3339(),
1098 gap.end.to_rfc3339(),
1099 gap.missing
1100 );
1101 }
1102 if outcome.gaps.len() > MAX_EXAMPLES {
1103 println!(
1104 " ... {} additional gap(s) omitted",
1105 outcome.gaps.len() - MAX_EXAMPLES
1106 );
1107 }
1108 }
1109
1110 if !outcome.price_spikes.is_empty() {
1111 println!(" Price spike examples:");
1112 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1113 println!(
1114 " {} (change {:.2}%)",
1115 spike.timestamp.to_rfc3339(),
1116 spike.change_fraction * 100.0
1117 );
1118 }
1119 if outcome.price_spikes.len() > MAX_EXAMPLES {
1120 println!(
1121 " ... {} additional spike(s) omitted",
1122 outcome.price_spikes.len() - MAX_EXAMPLES
1123 );
1124 }
1125 }
1126
1127 if !outcome.cross_mismatches.is_empty() {
1128 println!(" Cross-source mismatch examples:");
1129 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1130 println!(
1131 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
1132 miss.timestamp.to_rfc3339(),
1133 miss.primary_close,
1134 miss.reference_close,
1135 miss.delta_fraction * 100.0
1136 );
1137 }
1138 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1139 println!(
1140 " ... {} additional mismatch(es) omitted",
1141 outcome.cross_mismatches.len() - MAX_EXAMPLES
1142 );
1143 }
1144 }
1145}
1146
1147fn print_report(report: &PerformanceReport) {
1148 println!("\n{}", report);
1149}
1150
1151fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1152 let mut candles = Vec::with_capacity(len);
1153 for i in 0..len {
1154 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1155 let open = base + (i as f64 % 3.0) * 10.0;
1156 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1157 let open_dec =
1158 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1159 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1160 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1161 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1162 candles.push(Candle {
1163 symbol: Symbol::from(symbol),
1164 interval: Interval::OneMinute,
1165 open: open_dec,
1166 high,
1167 low,
1168 close: close_dec,
1169 volume: Decimal::ONE,
1170 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1171 + Duration::minutes(offset_minutes),
1172 });
1173 }
1174 candles
1175}
1176
1177fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1178 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1179 return Ok(dt.with_timezone(&Utc));
1180 }
1181 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1182 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1183 }
1184 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1185 let dt = date
1186 .and_hms_opt(0, 0, 0)
1187 .ok_or_else(|| anyhow!("invalid date"))?;
1188 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1189 }
1190 Err(anyhow!("unable to parse datetime '{value}'"))
1191}
1192
1193#[derive(Deserialize)]
1194struct CandleCsvRow {
1195 symbol: Option<String>,
1196 timestamp: String,
1197 open: f64,
1198 high: f64,
1199 low: f64,
1200 close: f64,
1201 volume: f64,
1202}
1203
1204fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1205 let mut candles = Vec::new();
1206 for path in paths {
1207 let mut reader = csv::Reader::from_path(path)
1208 .with_context(|| format!("failed to open {}", path.display()))?;
1209 for record in reader.deserialize::<CandleCsvRow>() {
1210 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1211 let timestamp = parse_datetime(&row.timestamp)?;
1212 let symbol = row
1213 .symbol
1214 .clone()
1215 .or_else(|| infer_symbol_from_path(path))
1216 .ok_or_else(|| {
1217 anyhow!(
1218 "missing symbol column and unable to infer from path {}",
1219 path.display()
1220 )
1221 })?;
1222 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1223 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1224 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1225 })?;
1226 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1227 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1228 })?;
1229 let low = Decimal::from_f64(row.low)
1230 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1231 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1232 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1233 })?;
1234 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1235 anyhow!(
1236 "invalid volume value '{}' in {}",
1237 row.volume,
1238 path.display()
1239 )
1240 })?;
1241 candles.push(Candle {
1242 symbol,
1243 interval,
1244 open,
1245 high,
1246 low,
1247 close,
1248 volume,
1249 timestamp,
1250 });
1251 }
1252 }
1253 Ok(candles)
1254}
1255
1256#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1257enum DataFormat {
1258 Csv,
1259 Parquet,
1260}
1261
1262fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1263 let mut detected: Option<DataFormat> = None;
1264 for path in paths {
1265 let ext = path
1266 .extension()
1267 .and_then(|ext| ext.to_str())
1268 .map(|ext| ext.to_ascii_lowercase())
1269 .unwrap_or_else(|| String::from(""));
1270 let current = if ext == "parquet" {
1271 DataFormat::Parquet
1272 } else {
1273 DataFormat::Csv
1274 };
1275 if let Some(existing) = detected {
1276 if existing != current {
1277 bail!("cannot mix CSV and Parquet inputs in --data");
1278 }
1279 } else {
1280 detected = Some(current);
1281 }
1282 }
1283 detected.ok_or_else(|| anyhow!("no data paths provided"))
1284}
1285
1286fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1287 Box::new(PaperMarketStream::from_data(
1288 symbol.to_string(),
1289 Vec::new(),
1290 candles,
1291 ))
1292}
1293
1294fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1295 Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1296}
1297
1298#[derive(Deserialize)]
1299#[serde(tag = "event", rename_all = "lowercase")]
1300enum LobEventRow {
1301 Snapshot {
1302 timestamp: String,
1303 symbol: Option<String>,
1304 bids: Vec<[f64; 2]>,
1305 asks: Vec<[f64; 2]>,
1306 },
1307 Depth {
1308 timestamp: String,
1309 symbol: Option<String>,
1310 bids: Vec<[f64; 2]>,
1311 asks: Vec<[f64; 2]>,
1312 },
1313 Trade {
1314 timestamp: String,
1315 symbol: Option<String>,
1316 side: String,
1317 price: f64,
1318 size: f64,
1319 },
1320}
1321
1322fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1323 let mut events = Vec::new();
1324 for path in paths {
1325 let file = File::open(path)
1326 .with_context(|| format!("failed to open order book file {}", path.display()))?;
1327 let symbol_hint = infer_symbol_from_path(path);
1328 for line in BufReader::new(file).lines() {
1329 let line =
1330 line.with_context(|| format!("failed to read line from {}", path.display()))?;
1331 if line.trim().is_empty() {
1332 continue;
1333 }
1334 let row: LobEventRow = serde_json::from_str(&line)
1335 .with_context(|| format!("invalid order book event in {}", path.display()))?;
1336 match row {
1337 LobEventRow::Snapshot {
1338 timestamp,
1339 symbol,
1340 bids,
1341 asks,
1342 } => {
1343 let ts = parse_datetime(×tamp)?;
1344 let symbol = symbol
1345 .or_else(|| symbol_hint.clone())
1346 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1347 let bids = convert_levels(&bids)?;
1348 let asks = convert_levels(&asks)?;
1349 let book = OrderBook {
1350 symbol: symbol.clone(),
1351 bids,
1352 asks,
1353 timestamp: ts,
1354 };
1355 events.push(MarketEvent {
1356 timestamp: ts,
1357 kind: MarketEventKind::OrderBook(book),
1358 });
1359 }
1360 LobEventRow::Depth {
1361 timestamp,
1362 symbol,
1363 bids,
1364 asks,
1365 } => {
1366 let ts = parse_datetime(×tamp)?;
1367 let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1368 anyhow!("missing symbol in depth update {}", path.display())
1369 })?;
1370 let bids = convert_levels(&bids)?;
1371 let asks = convert_levels(&asks)?;
1372 let update = DepthUpdate {
1373 symbol: symbol.clone(),
1374 bids,
1375 asks,
1376 timestamp: ts,
1377 };
1378 events.push(MarketEvent {
1379 timestamp: ts,
1380 kind: MarketEventKind::Depth(update),
1381 });
1382 }
1383 LobEventRow::Trade {
1384 timestamp,
1385 symbol,
1386 side,
1387 price,
1388 size,
1389 } => {
1390 let ts = parse_datetime(×tamp)?;
1391 let symbol = symbol
1392 .or_else(|| symbol_hint.clone())
1393 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1394 let side = match side.to_lowercase().as_str() {
1395 "buy" | "bid" | "b" => Side::Buy,
1396 "sell" | "ask" | "s" => Side::Sell,
1397 other => bail!("unsupported trade side '{other}' in {}", path.display()),
1398 };
1399 let price = Decimal::from_f64(price).ok_or_else(|| {
1400 anyhow!("invalid trade price '{}' in {}", price, path.display())
1401 })?;
1402 let size = Decimal::from_f64(size).ok_or_else(|| {
1403 anyhow!("invalid trade size '{}' in {}", size, path.display())
1404 })?;
1405 let tick = Tick {
1406 symbol: symbol.clone(),
1407 price,
1408 size,
1409 side,
1410 exchange_timestamp: ts,
1411 received_at: ts,
1412 };
1413 events.push(MarketEvent {
1414 timestamp: ts,
1415 kind: MarketEventKind::Trade(tick),
1416 });
1417 }
1418 }
1419 }
1420 }
1421 events.sort_by_key(|event| event.timestamp);
1422 Ok(events)
1423}
1424
1425fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1426 levels
1427 .iter()
1428 .map(|pair| {
1429 let price = Decimal::from_f64(pair[0])
1430 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1431 let size = Decimal::from_f64(pair[1])
1432 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1433 Ok(OrderBookLevel { price, size })
1434 })
1435 .collect()
1436}
1437
1438fn infer_symbol_from_path(path: &Path) -> Option<String> {
1439 path.parent()
1440 .and_then(|p| p.file_name())
1441 .map(|os| os.to_string_lossy().to_string())
1442}
1443
1444fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1445 path.file_stem()
1446 .and_then(|os| os.to_str())
1447 .and_then(|stem| stem.split('_').next())
1448 .and_then(|token| Interval::from_str(token).ok())
1449}
1450
1451fn default_output_path(
1452 config: &AppConfig,
1453 exchange: &str,
1454 symbol: &str,
1455 interval: Interval,
1456 start: DateTime<Utc>,
1457 end: DateTime<Utc>,
1458) -> PathBuf {
1459 let interval_part = interval_label(interval);
1460 let start_part = start.format("%Y%m%d").to_string();
1461 let end_part = end.format("%Y%m%d").to_string();
1462 config
1463 .data_path
1464 .join(exchange)
1465 .join(symbol)
1466 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1467}
1468
1469fn interval_label(interval: Interval) -> &'static str {
1470 match interval {
1471 Interval::OneSecond => "1s",
1472 Interval::OneMinute => "1m",
1473 Interval::FiveMinutes => "5m",
1474 Interval::FifteenMinutes => "15m",
1475 Interval::OneHour => "1h",
1476 Interval::FourHours => "4h",
1477 Interval::OneDay => "1d",
1478 }
1479}
1480
1481#[derive(Serialize)]
1482struct CandleRow<'a> {
1483 symbol: &'a str,
1484 timestamp: String,
1485 open: f64,
1486 high: f64,
1487 low: f64,
1488 close: f64,
1489 volume: f64,
1490}
1491
1492fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1493 if let Some(parent) = path.parent() {
1494 fs::create_dir_all(parent)
1495 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1496 }
1497 let mut writer =
1498 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1499 for candle in candles {
1500 let row = CandleRow {
1501 symbol: &candle.symbol,
1502 timestamp: candle.timestamp.to_rfc3339(),
1503 open: candle.open.to_f64().unwrap_or(0.0),
1504 high: candle.high.to_f64().unwrap_or(0.0),
1505 low: candle.low.to_f64().unwrap_or(0.0),
1506 close: candle.close.to_f64().unwrap_or(0.0),
1507 volume: candle.volume.to_f64().unwrap_or(0.0),
1508 };
1509 writer.serialize(row)?;
1510 }
1511 writer.flush()?;
1512 Ok(())
1513}
1514
1515#[derive(Serialize)]
1516struct BatchRow {
1517 config: String,
1518 signals: usize,
1519 orders: usize,
1520 dropped_orders: usize,
1521 ending_equity: f64,
1522}
1523
1524fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1525 if let Some(parent) = path.parent() {
1526 fs::create_dir_all(parent)
1527 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1528 }
1529 let mut writer =
1530 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1531 for row in rows {
1532 writer.serialize(row)?;
1533 }
1534 writer.flush()?;
1535 Ok(())
1536}
1537
1538fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1539 config
1540 .initial_balances
1541 .iter()
1542 .map(|(currency, amount)| (currency.clone(), *amount))
1543 .collect()
1544}
1545
1546fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1547 config
1548 .initial_balances
1549 .get(&config.reporting_currency)
1550 .copied()
1551 .unwrap_or_default()
1552}
1553
1554fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1555 let parts: Vec<_> = value.split(':').collect();
1556 match parts.as_slice() {
1557 ["fixed", val] => {
1558 let quantity =
1559 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1560 Ok(Box::new(FixedOrderSizer { quantity }))
1561 }
1562 ["fixed"] => {
1563 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1564 Ok(Box::new(FixedOrderSizer { quantity }))
1565 }
1566 ["percent", val] => {
1567 let percent =
1568 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1569 Ok(Box::new(PortfolioPercentSizer {
1570 percent: percent.max(Decimal::ZERO),
1571 }))
1572 }
1573 ["risk-adjusted", val] => {
1574 let risk_fraction = Decimal::from_str(val)
1575 .context("invalid risk fraction value (use decimals)")?;
1576 Ok(Box::new(RiskAdjustedSizer {
1577 risk_fraction: risk_fraction.max(Decimal::ZERO),
1578 }))
1579 }
1580 _ => Err(anyhow!(
1581 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1582 )),
1583 }
1584}