1use crate::alerts::sanitize_webhook;
2use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
3use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
4use crate::state;
5use crate::telemetry::init_tracing;
6use std::collections::HashMap;
7use std::fs::{self, File};
8use std::io::{BufRead, BufReader};
9use std::net::SocketAddr;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration as StdDuration;
14
15use anyhow::{anyhow, bail, Context, Result};
16use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
17use clap::{Args, Parser, Subcommand, ValueEnum};
18use csv::Writer;
19use rust_decimal::{
20 prelude::{FromPrimitive, ToPrimitive},
21 Decimal,
22};
23use serde::{Deserialize, Serialize};
24use tesser_backtester::reporting::PerformanceReport;
25use tesser_backtester::{BacktestConfig, BacktestMode, Backtester, MarketEvent, MarketEventKind};
26use tesser_broker::ExecutionClient;
27use tesser_bybit::PublicChannel;
28use tesser_config::{load_config, AppConfig, RiskManagementConfig};
29use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
30use tesser_data::download::{BybitDownloader, KlineRequest};
31use tesser_execution::{
32 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
33 RiskAdjustedSizer,
34};
35use tesser_markets::MarketRegistry;
36use tesser_paper::{MatchingEngine, PaperExecutionClient};
37use tesser_strategy::{builtin_strategy_names, load_strategy};
38use tracing::{info, warn};
39
40#[derive(Parser)]
41#[command(author, version, about = "Tesser Trading Framework")]
42pub struct Cli {
43 #[arg(short, long, action = clap::ArgAction::Count)]
45 verbose: u8,
46 #[arg(long, default_value = "default")]
48 env: String,
49 #[command(subcommand)]
50 command: Commands,
51}
52
53#[allow(clippy::large_enum_variant)]
54#[derive(Subcommand)]
55pub enum Commands {
56 Data {
58 #[command(subcommand)]
59 action: DataCommand,
60 },
61 Backtest {
63 #[command(subcommand)]
64 action: BacktestCommand,
65 },
66 Live {
68 #[command(subcommand)]
69 action: LiveCommand,
70 },
71 State {
73 #[command(subcommand)]
74 action: StateCommand,
75 },
76 Strategies,
78}
79
80#[derive(Subcommand)]
81pub enum DataCommand {
82 Download(DataDownloadArgs),
84 Validate(DataValidateArgs),
86 Resample(DataResampleArgs),
88}
89
90#[derive(Subcommand)]
91pub enum BacktestCommand {
92 Run(BacktestRunArgs),
94 Batch(BacktestBatchArgs),
96}
97
98#[derive(Subcommand)]
99pub enum LiveCommand {
100 Run(LiveRunArgs),
102}
103
104#[derive(Subcommand)]
105pub enum StateCommand {
106 Inspect(StateInspectArgs),
108}
109
110#[derive(Args)]
111pub struct DataDownloadArgs {
112 #[arg(long, default_value = "bybit")]
113 exchange: String,
114 #[arg(long)]
115 symbol: String,
116 #[arg(long, default_value = "linear")]
117 category: String,
118 #[arg(long, default_value = "1m")]
119 interval: String,
120 #[arg(long)]
121 start: String,
122 #[arg(long)]
123 end: Option<String>,
124 #[arg(long)]
125 output: Option<PathBuf>,
126 #[arg(long)]
128 skip_validation: bool,
129 #[arg(long)]
131 repair_missing: bool,
132 #[arg(long, default_value_t = 0.05)]
134 validation_jump_threshold: f64,
135 #[arg(long, default_value_t = 0.002)]
137 validation_reference_tolerance: f64,
138}
139
140#[derive(Args)]
141pub struct StateInspectArgs {
142 #[arg(long)]
144 path: Option<PathBuf>,
145 #[arg(long)]
147 raw: bool,
148}
149
150impl StateInspectArgs {
151 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
152 self.path
153 .clone()
154 .unwrap_or_else(|| config.live.state_path.clone())
155 }
156}
157
158impl DataDownloadArgs {
159 async fn run(&self, config: &AppConfig) -> Result<()> {
160 let exchange_cfg = config
161 .exchange
162 .get(&self.exchange)
163 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
164 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
165 let start = parse_datetime(&self.start)?;
166 let end = match &self.end {
167 Some(value) => parse_datetime(value)?,
168 None => Utc::now(),
169 };
170 if start >= end {
171 return Err(anyhow!("start time must be earlier than end time"));
172 }
173
174 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
175 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
176 info!(
177 "Downloading {} candles for {} ({})",
178 self.interval, self.symbol, self.exchange
179 );
180 let mut candles = downloader
181 .download_klines(&request)
182 .await
183 .with_context(|| "failed to download candles from Bybit")?;
184
185 if candles.is_empty() {
186 info!("No candles returned for {}", self.symbol);
187 return Ok(());
188 }
189
190 if !self.skip_validation {
191 let config = ValidationConfig {
192 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
193 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
194 repair_missing: self.repair_missing,
195 };
196 let outcome =
197 validate_dataset(candles.clone(), None, config).context("validation failed")?;
198 print_validation_summary(&outcome);
199 if self.repair_missing && outcome.summary.repaired_candles > 0 {
200 candles = outcome.repaired;
201 info!(
202 "Applied {} synthetic candle(s) to repair gaps",
203 outcome.summary.repaired_candles
204 );
205 }
206 }
207
208 let output_path = self.output.clone().unwrap_or_else(|| {
209 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
210 });
211 write_candles_csv(&output_path, &candles)?;
212 info!(
213 "Saved {} candles to {}",
214 candles.len(),
215 output_path.display()
216 );
217 Ok(())
218 }
219}
220
221#[derive(Args)]
222pub struct DataValidateArgs {
223 #[arg(
225 long = "path",
226 value_name = "PATH",
227 num_args = 1..,
228 action = clap::ArgAction::Append
229 )]
230 paths: Vec<PathBuf>,
231 #[arg(
233 long = "reference",
234 value_name = "PATH",
235 num_args = 1..,
236 action = clap::ArgAction::Append
237 )]
238 reference_paths: Vec<PathBuf>,
239 #[arg(long, default_value_t = 0.05)]
241 jump_threshold: f64,
242 #[arg(long, default_value_t = 0.002)]
244 reference_tolerance: f64,
245 #[arg(long)]
247 repair_missing: bool,
248 #[arg(long)]
250 output: Option<PathBuf>,
251}
252
253impl DataValidateArgs {
254 fn run(&self) -> Result<()> {
255 if self.paths.is_empty() {
256 bail!("provide at least one --path for validation");
257 }
258 let candles =
259 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
260 if candles.is_empty() {
261 bail!("loaded dataset is empty; nothing to validate");
262 }
263 let reference = if self.reference_paths.is_empty() {
264 None
265 } else {
266 Some(
267 load_candles_from_paths(&self.reference_paths)
268 .with_context(|| "failed to load reference dataset")?,
269 )
270 };
271
272 let price_jump_threshold = if self.jump_threshold <= 0.0 {
273 0.0001
274 } else {
275 self.jump_threshold
276 };
277 let reference_tolerance = if self.reference_tolerance <= 0.0 {
278 0.0001
279 } else {
280 self.reference_tolerance
281 };
282
283 let config = ValidationConfig {
284 price_jump_threshold,
285 reference_tolerance,
286 repair_missing: self.repair_missing,
287 };
288
289 let outcome = validate_dataset(candles, reference, config)?;
290 print_validation_summary(&outcome);
291
292 if let Some(output) = &self.output {
293 write_candles_csv(output, &outcome.repaired)?;
294 info!(
295 "Wrote {} candles ({} new) to {}",
296 outcome.repaired.len(),
297 outcome.summary.repaired_candles,
298 output.display()
299 );
300 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
301 warn!(
302 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
303 outcome.summary.repaired_candles
304 );
305 }
306
307 Ok(())
308 }
309}
310
311#[derive(Args)]
312pub struct DataResampleArgs {
313 #[arg(long)]
314 input: PathBuf,
315 #[arg(long)]
316 output: PathBuf,
317 #[arg(long, default_value = "1h")]
318 interval: String,
319}
320
321#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
322pub enum BacktestModeArg {
323 Candle,
324 Tick,
325}
326
327#[derive(Args)]
328pub struct BacktestRunArgs {
329 #[arg(long)]
330 strategy_config: PathBuf,
331 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
333 data_paths: Vec<PathBuf>,
334 #[arg(long, default_value_t = 500)]
335 candles: usize,
336 #[arg(long, default_value = "0.01")]
337 quantity: Decimal,
338 #[arg(long, default_value = "0")]
340 slippage_bps: Decimal,
341 #[arg(long, default_value = "0")]
343 fee_bps: Decimal,
344 #[arg(long, default_value_t = 1)]
346 latency_candles: usize,
347 #[arg(long, default_value = "fixed:0.01")]
349 sizer: String,
350 #[arg(long, value_enum, default_value = "candle")]
352 mode: BacktestModeArg,
353 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
355 lob_paths: Vec<PathBuf>,
356 #[arg(long)]
357 markets_file: Option<PathBuf>,
358}
359
360#[derive(Args)]
361pub struct BacktestBatchArgs {
362 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
364 config_paths: Vec<PathBuf>,
365 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
367 data_paths: Vec<PathBuf>,
368 #[arg(long, default_value = "0.01")]
369 quantity: Decimal,
370 #[arg(long)]
372 output: Option<PathBuf>,
373 #[arg(long, default_value = "0")]
375 slippage_bps: Decimal,
376 #[arg(long, default_value = "0")]
378 fee_bps: Decimal,
379 #[arg(long, default_value_t = 1)]
381 latency_candles: usize,
382 #[arg(long, default_value = "fixed:0.01")]
384 sizer: String,
385 #[arg(long)]
386 markets_file: Option<PathBuf>,
387}
388
389#[derive(Args)]
390pub struct LiveRunArgs {
391 #[arg(long)]
392 strategy_config: PathBuf,
393 #[arg(long, default_value = "bybit_testnet")]
394 exchange: String,
395 #[arg(long, default_value = "linear")]
396 category: String,
397 #[arg(long, default_value = "1m")]
398 interval: String,
399 #[arg(long, default_value = "1")]
400 quantity: Decimal,
401 #[arg(
403 long = "exec",
404 default_value = "paper",
405 value_enum,
406 alias = "live-exec"
407 )]
408 exec: ExecutionBackend,
409 #[arg(long)]
410 state_path: Option<PathBuf>,
411 #[arg(long)]
412 metrics_addr: Option<String>,
413 #[arg(long)]
414 log_path: Option<PathBuf>,
415 #[arg(long)]
416 initial_equity: Option<Decimal>,
417 #[arg(long)]
418 markets_file: Option<PathBuf>,
419 #[arg(long, default_value = "0")]
420 slippage_bps: Decimal,
421 #[arg(long, default_value = "0")]
422 fee_bps: Decimal,
423 #[arg(long, default_value_t = 0)]
424 latency_ms: u64,
425 #[arg(long, default_value_t = 512)]
426 history: usize,
427 #[arg(long)]
428 reconciliation_interval_secs: Option<u64>,
429 #[arg(long)]
430 reconciliation_threshold: Option<Decimal>,
431 #[arg(long)]
432 webhook_url: Option<String>,
433 #[arg(long)]
434 alert_max_data_gap_secs: Option<u64>,
435 #[arg(long)]
436 alert_max_order_failures: Option<u32>,
437 #[arg(long)]
438 alert_max_drawdown: Option<Decimal>,
439 #[arg(long)]
440 risk_max_order_qty: Option<Decimal>,
441 #[arg(long)]
442 risk_max_position_qty: Option<Decimal>,
443 #[arg(long)]
444 risk_max_drawdown: Option<Decimal>,
445 #[arg(long)]
447 orderbook_depth: Option<usize>,
448 #[arg(long, default_value = "fixed:1.0")]
450 sizer: String,
451}
452
453impl LiveRunArgs {
454 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
455 self.log_path
456 .clone()
457 .unwrap_or_else(|| config.live.log_path.clone())
458 }
459
460 fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
461 self.state_path
462 .clone()
463 .unwrap_or_else(|| config.live.state_path.clone())
464 }
465
466 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
467 let addr = self
468 .metrics_addr
469 .clone()
470 .unwrap_or_else(|| config.live.metrics_addr.clone());
471 addr.parse()
472 .with_context(|| format!("invalid metrics address '{addr}'"))
473 }
474
475 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
476 let secs = self
477 .reconciliation_interval_secs
478 .unwrap_or(config.live.reconciliation_interval_secs)
479 .max(1);
480 StdDuration::from_secs(secs)
481 }
482
483 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
484 let configured = self
485 .reconciliation_threshold
486 .unwrap_or(config.live.reconciliation_threshold);
487 if configured <= Decimal::ZERO {
488 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
489 } else {
490 configured
491 }
492 }
493
494 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
495 let mut balances = clone_initial_balances(&config.backtest);
496 if let Some(value) = self.initial_equity {
497 balances.insert(
498 config.backtest.reporting_currency.clone(),
499 value.max(Decimal::ZERO),
500 );
501 }
502 balances
503 }
504
505 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
506 let mut alerting = config.live.alerting.clone();
507 let webhook = self
508 .webhook_url
509 .clone()
510 .or_else(|| alerting.webhook_url.clone());
511 alerting.webhook_url = sanitize_webhook(webhook);
512 if let Some(sec) = self.alert_max_data_gap_secs {
513 alerting.max_data_gap_secs = sec;
514 }
515 if let Some(limit) = self.alert_max_order_failures {
516 alerting.max_order_failures = limit;
517 }
518 if let Some(limit) = self.alert_max_drawdown {
519 alerting.max_drawdown = limit.max(Decimal::ZERO);
520 }
521 alerting
522 }
523
524 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
525 let mut risk = config.risk_management.clone();
526 if let Some(limit) = self.risk_max_order_qty {
527 risk.max_order_quantity = limit.max(Decimal::ZERO);
528 }
529 if let Some(limit) = self.risk_max_position_qty {
530 risk.max_position_quantity = limit.max(Decimal::ZERO);
531 }
532 if let Some(limit) = self.risk_max_drawdown {
533 risk.max_drawdown = limit.max(Decimal::ZERO);
534 }
535 risk
536 }
537}
538
539#[derive(Deserialize)]
540struct StrategyConfigFile {
541 #[serde(rename = "strategy_name")]
542 name: String,
543 #[serde(default = "empty_table")]
544 params: toml::Value,
545}
546
547fn empty_table() -> toml::Value {
548 toml::Value::Table(Default::default())
549}
550
551pub async fn run() -> Result<()> {
552 let cli = Cli::parse();
553 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
554
555 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
556 0 => config.log_level.clone(),
557 1 => "debug".to_string(),
558 _ => "trace".to_string(),
559 });
560
561 let log_override = match &cli.command {
562 Commands::Live {
563 action: LiveCommand::Run(args),
564 } => Some(args.resolved_log_path(&config)),
565 _ => None,
566 };
567
568 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
569
570 match cli.command {
571 Commands::Data { action } => handle_data(action, &config).await?,
572 Commands::Backtest {
573 action: BacktestCommand::Run(args),
574 } => args.run(&config).await?,
575 Commands::Backtest {
576 action: BacktestCommand::Batch(args),
577 } => args.run(&config).await?,
578 Commands::Live {
579 action: LiveCommand::Run(args),
580 } => args.run(&config).await?,
581 Commands::State { action } => handle_state(action, &config).await?,
582 Commands::Strategies => list_strategies(),
583 }
584
585 Ok(())
586}
587
588async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
589 match cmd {
590 DataCommand::Download(args) => {
591 args.run(config).await?;
592 }
593 DataCommand::Validate(args) => {
594 args.run()?;
595 }
596 DataCommand::Resample(args) => {
597 info!(
598 "stub: resampling {} into {} at {}",
599 args.input.display(),
600 args.output.display(),
601 args.interval
602 );
603 }
604 }
605 Ok(())
606}
607
608async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
609 match cmd {
610 StateCommand::Inspect(args) => {
611 state::inspect_state(args.resolved_path(config), args.raw).await?;
612 }
613 }
614 Ok(())
615}
616
617impl BacktestRunArgs {
618 async fn run(&self, config: &AppConfig) -> Result<()> {
619 let contents = std::fs::read_to_string(&self.strategy_config)
620 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
621 let def: StrategyConfigFile =
622 toml::from_str(&contents).context("failed to parse strategy config file")?;
623 let strategy = load_strategy(&def.name, def.params)
624 .with_context(|| format!("failed to configure strategy {}", def.name))?;
625 let symbols = strategy.subscriptions();
626 if symbols.is_empty() {
627 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
628 }
629
630 let mode = match self.mode {
631 BacktestModeArg::Candle => BacktestMode::Candle,
632 BacktestModeArg::Tick => BacktestMode::Tick,
633 };
634
635 let markets_path = self
636 .markets_file
637 .clone()
638 .or_else(|| config.backtest.markets_file.clone())
639 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
640 let market_registry = Arc::new(
641 MarketRegistry::load_from_file(&markets_path).with_context(|| {
642 format!("failed to load markets from {}", markets_path.display())
643 })?,
644 );
645
646 type CandleModeBundle = (
647 Vec<Candle>,
648 Vec<MarketEvent>,
649 Arc<dyn ExecutionClient>,
650 Option<Arc<MatchingEngine>>,
651 );
652
653 let (candles, lob_events, execution_client, matching_engine): CandleModeBundle = match mode
654 {
655 BacktestMode::Candle => {
656 let mut candles = if self.data_paths.is_empty() {
657 let mut generated = Vec::new();
658 for (idx, symbol) in symbols.iter().enumerate() {
659 let offset = idx as i64 * 10;
660 generated.extend(synth_candles(symbol, self.candles, offset));
661 }
662 generated
663 } else {
664 load_candles_from_paths(&self.data_paths)?
665 };
666
667 if candles.is_empty() {
668 return Err(anyhow!(
669 "no candles loaded; provide --data or allow synthetic generation"
670 ));
671 }
672
673 candles.sort_by_key(|c| c.timestamp);
674 (
675 candles,
676 Vec::new(),
677 Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
678 None,
679 )
680 }
681 BacktestMode::Tick => {
682 if self.lob_paths.is_empty() {
683 bail!("--lob-data is required when --mode tick");
684 }
685 let events = load_lob_events_from_paths(&self.lob_paths)?;
686 if events.is_empty() {
687 bail!("no order book events loaded from --lob-data");
688 }
689 let engine = Arc::new(MatchingEngine::new(
690 "matching-engine",
691 symbols.clone(),
692 reporting_balance(&config.backtest),
693 ));
694 (
695 Vec::new(),
696 events,
697 engine.clone() as Arc<dyn ExecutionClient>,
698 Some(engine),
699 )
700 }
701 };
702
703 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
704 let order_quantity = self.quantity;
705 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
706
707 let mut cfg = BacktestConfig::new(symbols[0].clone(), candles);
708 cfg.lob_events = lob_events;
709 cfg.order_quantity = order_quantity;
710 cfg.initial_balances = clone_initial_balances(&config.backtest);
711 cfg.reporting_currency = config.backtest.reporting_currency.clone();
712 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
713 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
714 cfg.execution.latency_candles = self.latency_candles.max(1);
715 cfg.mode = mode;
716
717 let report = Backtester::new(cfg, strategy, execution, matching_engine, market_registry)
718 .run()
719 .await
720 .context("backtest failed")?;
721 print_report(&report);
722 Ok(())
723 }
724}
725
726impl BacktestBatchArgs {
727 async fn run(&self, config: &AppConfig) -> Result<()> {
728 if self.config_paths.is_empty() {
729 return Err(anyhow!("provide at least one --config path"));
730 }
731 if self.data_paths.is_empty() {
732 return Err(anyhow!("provide at least one --data path for batch mode"));
733 }
734 let markets_path = self
735 .markets_file
736 .clone()
737 .or_else(|| config.backtest.markets_file.clone())
738 .ok_or_else(|| {
739 anyhow!("batch mode requires --markets-file or backtest.markets_file")
740 })?;
741 let market_registry = Arc::new(
742 MarketRegistry::load_from_file(&markets_path).with_context(|| {
743 format!("failed to load markets from {}", markets_path.display())
744 })?,
745 );
746 let mut aggregated = Vec::new();
747 for config_path in &self.config_paths {
748 let contents = std::fs::read_to_string(config_path).with_context(|| {
749 format!("failed to read strategy config {}", config_path.display())
750 })?;
751 let def: StrategyConfigFile =
752 toml::from_str(&contents).context("failed to parse strategy config file")?;
753 let strategy = load_strategy(&def.name, def.params)
754 .with_context(|| format!("failed to configure strategy {}", def.name))?;
755 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
756 let order_quantity = self.quantity;
757 let mut candles = load_candles_from_paths(&self.data_paths)?;
758 candles.sort_by_key(|c| c.timestamp);
759 let execution_client: Arc<dyn ExecutionClient> =
760 Arc::new(PaperExecutionClient::default());
761 let execution =
762 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
763 let mut cfg = BacktestConfig::new(strategy.symbol().to_string(), candles);
764 cfg.order_quantity = order_quantity;
765 cfg.initial_balances = clone_initial_balances(&config.backtest);
766 cfg.reporting_currency = config.backtest.reporting_currency.clone();
767 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
768 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
769 cfg.execution.latency_candles = self.latency_candles.max(1);
770
771 let report = Backtester::new(cfg, strategy, execution, None, market_registry.clone())
772 .run()
773 .await
774 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
775 aggregated.push(BatchRow {
776 config: config_path.display().to_string(),
777 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
781 });
782 }
783
784 if let Some(output) = &self.output {
785 write_batch_report(output, &aggregated)?;
786 println!("Batch report written to {}", output.display());
787 }
788 if aggregated.is_empty() {
789 return Err(anyhow!("no batch jobs executed"));
790 }
791 Ok(())
792 }
793}
794
795impl LiveRunArgs {
796 async fn run(&self, config: &AppConfig) -> Result<()> {
797 let exchange_cfg = config
798 .exchange
799 .get(&self.exchange)
800 .cloned()
801 .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
802
803 let contents = fs::read_to_string(&self.strategy_config)
804 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
805 let def: StrategyConfigFile =
806 toml::from_str(&contents).context("failed to parse strategy config file")?;
807 let strategy = load_strategy(&def.name, def.params)
808 .with_context(|| format!("failed to configure strategy {}", def.name))?;
809 let symbols = strategy.subscriptions();
810 if symbols.is_empty() {
811 bail!("strategy did not declare any subscriptions");
812 }
813 if self.quantity <= Decimal::ZERO {
814 bail!("--quantity must be greater than zero");
815 }
816 let quantity = self.quantity;
817 let initial_balances = self.resolved_initial_balances(config);
818 let reporting_currency = config.backtest.reporting_currency.clone();
819 let markets_file = self
820 .markets_file
821 .clone()
822 .or_else(|| config.backtest.markets_file.clone());
823
824 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
825 let category =
826 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
827 let metrics_addr = self.resolved_metrics_addr(config)?;
828 let state_path = self.resolved_state_path(config);
829 let alerting = self.build_alerting(config);
830 let history = self.history.max(32);
831 let reconciliation_interval = self.reconciliation_interval(config);
832 let reconciliation_threshold = self.reconciliation_threshold(config);
833
834 let settings = LiveSessionSettings {
835 category,
836 interval,
837 quantity,
838 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
839 fee_bps: self.fee_bps.max(Decimal::ZERO),
840 history,
841 metrics_addr,
842 state_path,
843 initial_balances,
844 reporting_currency,
845 markets_file,
846 alerting,
847 exec_backend: self.exec,
848 risk: self.build_risk_config(config),
849 reconciliation_interval,
850 reconciliation_threshold,
851 };
852
853 info!(
854 strategy = %def.name,
855 symbols = ?symbols,
856 exchange = %self.exchange,
857 interval = %self.interval,
858 exec = ?self.exec,
859 "starting live session"
860 );
861
862 run_live(strategy, symbols, exchange_cfg, settings)
863 .await
864 .context("live session failed")
865 }
866}
867
868fn list_strategies() {
869 println!("Built-in strategies:");
870 for name in builtin_strategy_names() {
871 println!("- {name}");
872 }
873}
874
875fn print_validation_summary(outcome: &ValidationOutcome) {
876 const MAX_EXAMPLES: usize = 5;
877 let summary = &outcome.summary;
878 println!(
879 "Validation summary for {} ({} candles)",
880 summary.symbol, summary.rows
881 );
882 println!(
883 " Range: {} -> {}",
884 summary.start.to_rfc3339(),
885 summary.end.to_rfc3339()
886 );
887 println!(" Interval: {}", interval_label(summary.interval));
888 println!(" Missing intervals: {}", summary.missing_candles);
889 println!(" Duplicate intervals: {}", summary.duplicate_candles);
890 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
891 println!(" Price spikes flagged: {}", summary.price_spike_count);
892 println!(
893 " Cross-source mismatches: {}",
894 summary.cross_mismatch_count
895 );
896 println!(" Repaired candles generated: {}", summary.repaired_candles);
897
898 if !outcome.gaps.is_empty() {
899 println!(" Gap examples:");
900 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
901 println!(
902 " {} -> {} (missing {})",
903 gap.start.to_rfc3339(),
904 gap.end.to_rfc3339(),
905 gap.missing
906 );
907 }
908 if outcome.gaps.len() > MAX_EXAMPLES {
909 println!(
910 " ... {} additional gap(s) omitted",
911 outcome.gaps.len() - MAX_EXAMPLES
912 );
913 }
914 }
915
916 if !outcome.price_spikes.is_empty() {
917 println!(" Price spike examples:");
918 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
919 println!(
920 " {} (change {:.2}%)",
921 spike.timestamp.to_rfc3339(),
922 spike.change_fraction * 100.0
923 );
924 }
925 if outcome.price_spikes.len() > MAX_EXAMPLES {
926 println!(
927 " ... {} additional spike(s) omitted",
928 outcome.price_spikes.len() - MAX_EXAMPLES
929 );
930 }
931 }
932
933 if !outcome.cross_mismatches.is_empty() {
934 println!(" Cross-source mismatch examples:");
935 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
936 println!(
937 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
938 miss.timestamp.to_rfc3339(),
939 miss.primary_close,
940 miss.reference_close,
941 miss.delta_fraction * 100.0
942 );
943 }
944 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
945 println!(
946 " ... {} additional mismatch(es) omitted",
947 outcome.cross_mismatches.len() - MAX_EXAMPLES
948 );
949 }
950 }
951}
952
953fn print_report(report: &PerformanceReport) {
954 println!("\n{}", report);
955}
956
957fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
958 let mut candles = Vec::with_capacity(len);
959 for i in 0..len {
960 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
961 let open = base + (i as f64 % 3.0) * 10.0;
962 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
963 let open_dec =
964 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
965 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
966 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
967 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
968 candles.push(Candle {
969 symbol: Symbol::from(symbol),
970 interval: Interval::OneMinute,
971 open: open_dec,
972 high,
973 low,
974 close: close_dec,
975 volume: Decimal::ONE,
976 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
977 + Duration::minutes(offset_minutes),
978 });
979 }
980 candles
981}
982
983fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
984 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
985 return Ok(dt.with_timezone(&Utc));
986 }
987 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
988 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
989 }
990 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
991 let dt = date
992 .and_hms_opt(0, 0, 0)
993 .ok_or_else(|| anyhow!("invalid date"))?;
994 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
995 }
996 Err(anyhow!("unable to parse datetime '{value}'"))
997}
998
999#[derive(Deserialize)]
1000struct CandleCsvRow {
1001 symbol: Option<String>,
1002 timestamp: String,
1003 open: f64,
1004 high: f64,
1005 low: f64,
1006 close: f64,
1007 volume: f64,
1008}
1009
1010fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1011 let mut candles = Vec::new();
1012 for path in paths {
1013 let mut reader = csv::Reader::from_path(path)
1014 .with_context(|| format!("failed to open {}", path.display()))?;
1015 for record in reader.deserialize::<CandleCsvRow>() {
1016 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1017 let timestamp = parse_datetime(&row.timestamp)?;
1018 let symbol = row
1019 .symbol
1020 .clone()
1021 .or_else(|| infer_symbol_from_path(path))
1022 .ok_or_else(|| {
1023 anyhow!(
1024 "missing symbol column and unable to infer from path {}",
1025 path.display()
1026 )
1027 })?;
1028 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1029 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1030 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1031 })?;
1032 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1033 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1034 })?;
1035 let low = Decimal::from_f64(row.low)
1036 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1037 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1038 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1039 })?;
1040 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1041 anyhow!(
1042 "invalid volume value '{}' in {}",
1043 row.volume,
1044 path.display()
1045 )
1046 })?;
1047 candles.push(Candle {
1048 symbol,
1049 interval,
1050 open,
1051 high,
1052 low,
1053 close,
1054 volume,
1055 timestamp,
1056 });
1057 }
1058 }
1059 Ok(candles)
1060}
1061
1062#[derive(Deserialize)]
1063#[serde(tag = "event", rename_all = "lowercase")]
1064enum LobEventRow {
1065 Snapshot {
1066 timestamp: String,
1067 symbol: Option<String>,
1068 bids: Vec<[f64; 2]>,
1069 asks: Vec<[f64; 2]>,
1070 },
1071 Depth {
1072 timestamp: String,
1073 symbol: Option<String>,
1074 bids: Vec<[f64; 2]>,
1075 asks: Vec<[f64; 2]>,
1076 },
1077 Trade {
1078 timestamp: String,
1079 symbol: Option<String>,
1080 side: String,
1081 price: f64,
1082 size: f64,
1083 },
1084}
1085
1086fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1087 let mut events = Vec::new();
1088 for path in paths {
1089 let file = File::open(path)
1090 .with_context(|| format!("failed to open order book file {}", path.display()))?;
1091 let symbol_hint = infer_symbol_from_path(path);
1092 for line in BufReader::new(file).lines() {
1093 let line =
1094 line.with_context(|| format!("failed to read line from {}", path.display()))?;
1095 if line.trim().is_empty() {
1096 continue;
1097 }
1098 let row: LobEventRow = serde_json::from_str(&line)
1099 .with_context(|| format!("invalid order book event in {}", path.display()))?;
1100 match row {
1101 LobEventRow::Snapshot {
1102 timestamp,
1103 symbol,
1104 bids,
1105 asks,
1106 } => {
1107 let ts = parse_datetime(×tamp)?;
1108 let symbol = symbol
1109 .or_else(|| symbol_hint.clone())
1110 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1111 let bids = convert_levels(&bids)?;
1112 let asks = convert_levels(&asks)?;
1113 let book = OrderBook {
1114 symbol: symbol.clone(),
1115 bids,
1116 asks,
1117 timestamp: ts,
1118 };
1119 events.push(MarketEvent {
1120 timestamp: ts,
1121 kind: MarketEventKind::OrderBook(book),
1122 });
1123 }
1124 LobEventRow::Depth {
1125 timestamp,
1126 symbol,
1127 bids,
1128 asks,
1129 } => {
1130 let ts = parse_datetime(×tamp)?;
1131 let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1132 anyhow!("missing symbol in depth update {}", path.display())
1133 })?;
1134 let bids = convert_levels(&bids)?;
1135 let asks = convert_levels(&asks)?;
1136 let update = DepthUpdate {
1137 symbol: symbol.clone(),
1138 bids,
1139 asks,
1140 timestamp: ts,
1141 };
1142 events.push(MarketEvent {
1143 timestamp: ts,
1144 kind: MarketEventKind::Depth(update),
1145 });
1146 }
1147 LobEventRow::Trade {
1148 timestamp,
1149 symbol,
1150 side,
1151 price,
1152 size,
1153 } => {
1154 let ts = parse_datetime(×tamp)?;
1155 let symbol = symbol
1156 .or_else(|| symbol_hint.clone())
1157 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1158 let side = match side.to_lowercase().as_str() {
1159 "buy" | "bid" | "b" => Side::Buy,
1160 "sell" | "ask" | "s" => Side::Sell,
1161 other => bail!("unsupported trade side '{other}' in {}", path.display()),
1162 };
1163 let price = Decimal::from_f64(price).ok_or_else(|| {
1164 anyhow!("invalid trade price '{}' in {}", price, path.display())
1165 })?;
1166 let size = Decimal::from_f64(size).ok_or_else(|| {
1167 anyhow!("invalid trade size '{}' in {}", size, path.display())
1168 })?;
1169 let tick = Tick {
1170 symbol: symbol.clone(),
1171 price,
1172 size,
1173 side,
1174 exchange_timestamp: ts,
1175 received_at: ts,
1176 };
1177 events.push(MarketEvent {
1178 timestamp: ts,
1179 kind: MarketEventKind::Trade(tick),
1180 });
1181 }
1182 }
1183 }
1184 }
1185 events.sort_by_key(|event| event.timestamp);
1186 Ok(events)
1187}
1188
1189fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1190 levels
1191 .iter()
1192 .map(|pair| {
1193 let price = Decimal::from_f64(pair[0])
1194 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1195 let size = Decimal::from_f64(pair[1])
1196 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1197 Ok(OrderBookLevel { price, size })
1198 })
1199 .collect()
1200}
1201
1202fn infer_symbol_from_path(path: &Path) -> Option<String> {
1203 path.parent()
1204 .and_then(|p| p.file_name())
1205 .map(|os| os.to_string_lossy().to_string())
1206}
1207
1208fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1209 path.file_stem()
1210 .and_then(|os| os.to_str())
1211 .and_then(|stem| stem.split('_').next())
1212 .and_then(|token| Interval::from_str(token).ok())
1213}
1214
1215fn default_output_path(
1216 config: &AppConfig,
1217 exchange: &str,
1218 symbol: &str,
1219 interval: Interval,
1220 start: DateTime<Utc>,
1221 end: DateTime<Utc>,
1222) -> PathBuf {
1223 let interval_part = interval_label(interval);
1224 let start_part = start.format("%Y%m%d").to_string();
1225 let end_part = end.format("%Y%m%d").to_string();
1226 config
1227 .data_path
1228 .join(exchange)
1229 .join(symbol)
1230 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1231}
1232
1233fn interval_label(interval: Interval) -> &'static str {
1234 match interval {
1235 Interval::OneSecond => "1s",
1236 Interval::OneMinute => "1m",
1237 Interval::FiveMinutes => "5m",
1238 Interval::FifteenMinutes => "15m",
1239 Interval::OneHour => "1h",
1240 Interval::FourHours => "4h",
1241 Interval::OneDay => "1d",
1242 }
1243}
1244
1245#[derive(Serialize)]
1246struct CandleRow<'a> {
1247 symbol: &'a str,
1248 timestamp: String,
1249 open: f64,
1250 high: f64,
1251 low: f64,
1252 close: f64,
1253 volume: f64,
1254}
1255
1256fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1257 if let Some(parent) = path.parent() {
1258 fs::create_dir_all(parent)
1259 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1260 }
1261 let mut writer =
1262 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1263 for candle in candles {
1264 let row = CandleRow {
1265 symbol: &candle.symbol,
1266 timestamp: candle.timestamp.to_rfc3339(),
1267 open: candle.open.to_f64().unwrap_or(0.0),
1268 high: candle.high.to_f64().unwrap_or(0.0),
1269 low: candle.low.to_f64().unwrap_or(0.0),
1270 close: candle.close.to_f64().unwrap_or(0.0),
1271 volume: candle.volume.to_f64().unwrap_or(0.0),
1272 };
1273 writer.serialize(row)?;
1274 }
1275 writer.flush()?;
1276 Ok(())
1277}
1278
1279#[derive(Serialize)]
1280struct BatchRow {
1281 config: String,
1282 signals: usize,
1283 orders: usize,
1284 dropped_orders: usize,
1285 ending_equity: f64,
1286}
1287
1288fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1289 if let Some(parent) = path.parent() {
1290 fs::create_dir_all(parent)
1291 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1292 }
1293 let mut writer =
1294 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1295 for row in rows {
1296 writer.serialize(row)?;
1297 }
1298 writer.flush()?;
1299 Ok(())
1300}
1301
1302fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1303 config
1304 .initial_balances
1305 .iter()
1306 .map(|(currency, amount)| (currency.clone(), *amount))
1307 .collect()
1308}
1309
1310fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1311 config
1312 .initial_balances
1313 .get(&config.reporting_currency)
1314 .copied()
1315 .unwrap_or_default()
1316}
1317
1318fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1319 let parts: Vec<_> = value.split(':').collect();
1320 match parts.as_slice() {
1321 ["fixed", val] => {
1322 let quantity =
1323 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1324 Ok(Box::new(FixedOrderSizer { quantity }))
1325 }
1326 ["fixed"] => {
1327 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1328 Ok(Box::new(FixedOrderSizer { quantity }))
1329 }
1330 ["percent", val] => {
1331 let percent =
1332 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1333 Ok(Box::new(PortfolioPercentSizer {
1334 percent: percent.max(Decimal::ZERO),
1335 }))
1336 }
1337 ["risk-adjusted", val] => {
1338 let risk_fraction = Decimal::from_str(val)
1339 .context("invalid risk fraction value (use decimals)")?;
1340 Ok(Box::new(RiskAdjustedSizer {
1341 risk_fraction: risk_fraction.max(Decimal::ZERO),
1342 }))
1343 }
1344 _ => Err(anyhow!(
1345 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1346 )),
1347 }
1348}