1use crate::alerts::sanitize_webhook;
2use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
3use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
4use crate::state;
5use crate::telemetry::init_tracing;
6use crate::PublicChannel;
7use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::net::SocketAddr;
11use std::path::{Path, PathBuf};
12use std::str::FromStr;
13use std::sync::Arc;
14use std::time::Duration as StdDuration;
15
16use anyhow::{anyhow, bail, Context, Result};
17use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
18use clap::{Args, Parser, Subcommand, ValueEnum};
19use csv::Writer;
20use rust_decimal::{
21 prelude::{FromPrimitive, ToPrimitive},
22 Decimal,
23};
24use serde::{Deserialize, Serialize};
25use tesser_backtester::reporting::PerformanceReport;
26use tesser_backtester::{BacktestConfig, BacktestMode, Backtester, MarketEvent, MarketEventKind};
27use tesser_broker::ExecutionClient;
28use tesser_config::{load_config, AppConfig, RiskManagementConfig};
29use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
30use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
31use tesser_execution::{
32 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
33 RiskAdjustedSizer,
34};
35use tesser_markets::MarketRegistry;
36use tesser_paper::{MatchingEngine, PaperExecutionClient};
37use tesser_strategy::{builtin_strategy_names, load_strategy};
38use tracing::{info, warn};
39
40#[derive(Parser)]
41#[command(author, version, about = "Tesser Trading Framework")]
42pub struct Cli {
43 #[arg(short, long, action = clap::ArgAction::Count)]
45 verbose: u8,
46 #[arg(long, default_value = "default")]
48 env: String,
49 #[command(subcommand)]
50 command: Commands,
51}
52
53#[allow(clippy::large_enum_variant)]
54#[derive(Subcommand)]
55pub enum Commands {
56 Data {
58 #[command(subcommand)]
59 action: DataCommand,
60 },
61 Backtest {
63 #[command(subcommand)]
64 action: BacktestCommand,
65 },
66 Live {
68 #[command(subcommand)]
69 action: LiveCommand,
70 },
71 State {
73 #[command(subcommand)]
74 action: StateCommand,
75 },
76 Strategies,
78}
79
80#[derive(Subcommand)]
81pub enum DataCommand {
82 Download(DataDownloadArgs),
84 Validate(DataValidateArgs),
86 Resample(DataResampleArgs),
88}
89
90#[derive(Subcommand)]
91pub enum BacktestCommand {
92 Run(BacktestRunArgs),
94 Batch(BacktestBatchArgs),
96}
97
98#[derive(Subcommand)]
99pub enum LiveCommand {
100 Run(LiveRunArgs),
102}
103
104#[derive(Subcommand)]
105pub enum StateCommand {
106 Inspect(StateInspectArgs),
108}
109
110#[derive(Args)]
111pub struct DataDownloadArgs {
112 #[arg(long, default_value = "bybit")]
113 exchange: String,
114 #[arg(long)]
115 symbol: String,
116 #[arg(long, default_value = "linear")]
117 category: String,
118 #[arg(long, default_value = "1m")]
119 interval: String,
120 #[arg(long)]
121 start: String,
122 #[arg(long)]
123 end: Option<String>,
124 #[arg(long)]
125 output: Option<PathBuf>,
126 #[arg(long)]
128 skip_validation: bool,
129 #[arg(long)]
131 repair_missing: bool,
132 #[arg(long, default_value_t = 0.05)]
134 validation_jump_threshold: f64,
135 #[arg(long, default_value_t = 0.002)]
137 validation_reference_tolerance: f64,
138}
139
140#[derive(Args)]
141pub struct StateInspectArgs {
142 #[arg(long)]
144 path: Option<PathBuf>,
145 #[arg(long)]
147 raw: bool,
148}
149
150impl StateInspectArgs {
151 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
152 self.path
153 .clone()
154 .unwrap_or_else(|| config.live.state_path.clone())
155 }
156}
157
158impl DataDownloadArgs {
159 async fn run(&self, config: &AppConfig) -> Result<()> {
160 let exchange_cfg = config
161 .exchange
162 .get(&self.exchange)
163 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
164 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
165 let start = parse_datetime(&self.start)?;
166 let end = match &self.end {
167 Some(value) => parse_datetime(value)?,
168 None => Utc::now(),
169 };
170 if start >= end {
171 return Err(anyhow!("start time must be earlier than end time"));
172 }
173
174 info!(
175 "Downloading {} candles for {} ({})",
176 self.interval, self.symbol, self.exchange
177 );
178 let mut candles = match exchange_cfg.driver.as_str() {
179 "bybit" | "" => {
180 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
181 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
182 downloader
183 .download_klines(&request)
184 .await
185 .with_context(|| "failed to download candles from Bybit")?
186 }
187 "binance" => {
188 let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
189 let request = KlineRequest::new("", &self.symbol, interval, start, end);
190 downloader
191 .download_klines(&request)
192 .await
193 .with_context(|| "failed to download candles from Binance")?
194 }
195 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
196 };
197
198 if candles.is_empty() {
199 info!("No candles returned for {}", self.symbol);
200 return Ok(());
201 }
202
203 if !self.skip_validation {
204 let config = ValidationConfig {
205 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
206 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
207 repair_missing: self.repair_missing,
208 };
209 let outcome =
210 validate_dataset(candles.clone(), None, config).context("validation failed")?;
211 print_validation_summary(&outcome);
212 if self.repair_missing && outcome.summary.repaired_candles > 0 {
213 candles = outcome.repaired;
214 info!(
215 "Applied {} synthetic candle(s) to repair gaps",
216 outcome.summary.repaired_candles
217 );
218 }
219 }
220
221 let output_path = self.output.clone().unwrap_or_else(|| {
222 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
223 });
224 write_candles_csv(&output_path, &candles)?;
225 info!(
226 "Saved {} candles to {}",
227 candles.len(),
228 output_path.display()
229 );
230 Ok(())
231 }
232}
233
234#[derive(Args)]
235pub struct DataValidateArgs {
236 #[arg(
238 long = "path",
239 value_name = "PATH",
240 num_args = 1..,
241 action = clap::ArgAction::Append
242 )]
243 paths: Vec<PathBuf>,
244 #[arg(
246 long = "reference",
247 value_name = "PATH",
248 num_args = 1..,
249 action = clap::ArgAction::Append
250 )]
251 reference_paths: Vec<PathBuf>,
252 #[arg(long, default_value_t = 0.05)]
254 jump_threshold: f64,
255 #[arg(long, default_value_t = 0.002)]
257 reference_tolerance: f64,
258 #[arg(long)]
260 repair_missing: bool,
261 #[arg(long)]
263 output: Option<PathBuf>,
264}
265
266impl DataValidateArgs {
267 fn run(&self) -> Result<()> {
268 if self.paths.is_empty() {
269 bail!("provide at least one --path for validation");
270 }
271 let candles =
272 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
273 if candles.is_empty() {
274 bail!("loaded dataset is empty; nothing to validate");
275 }
276 let reference = if self.reference_paths.is_empty() {
277 None
278 } else {
279 Some(
280 load_candles_from_paths(&self.reference_paths)
281 .with_context(|| "failed to load reference dataset")?,
282 )
283 };
284
285 let price_jump_threshold = if self.jump_threshold <= 0.0 {
286 0.0001
287 } else {
288 self.jump_threshold
289 };
290 let reference_tolerance = if self.reference_tolerance <= 0.0 {
291 0.0001
292 } else {
293 self.reference_tolerance
294 };
295
296 let config = ValidationConfig {
297 price_jump_threshold,
298 reference_tolerance,
299 repair_missing: self.repair_missing,
300 };
301
302 let outcome = validate_dataset(candles, reference, config)?;
303 print_validation_summary(&outcome);
304
305 if let Some(output) = &self.output {
306 write_candles_csv(output, &outcome.repaired)?;
307 info!(
308 "Wrote {} candles ({} new) to {}",
309 outcome.repaired.len(),
310 outcome.summary.repaired_candles,
311 output.display()
312 );
313 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
314 warn!(
315 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
316 outcome.summary.repaired_candles
317 );
318 }
319
320 Ok(())
321 }
322}
323
324#[derive(Args)]
325pub struct DataResampleArgs {
326 #[arg(long)]
327 input: PathBuf,
328 #[arg(long)]
329 output: PathBuf,
330 #[arg(long, default_value = "1h")]
331 interval: String,
332}
333
334#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
335pub enum BacktestModeArg {
336 Candle,
337 Tick,
338}
339
340#[derive(Args)]
341pub struct BacktestRunArgs {
342 #[arg(long)]
343 strategy_config: PathBuf,
344 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
346 data_paths: Vec<PathBuf>,
347 #[arg(long, default_value_t = 500)]
348 candles: usize,
349 #[arg(long, default_value = "0.01")]
350 quantity: Decimal,
351 #[arg(long, default_value = "0")]
353 slippage_bps: Decimal,
354 #[arg(long, default_value = "0")]
356 fee_bps: Decimal,
357 #[arg(long, default_value_t = 1)]
359 latency_candles: usize,
360 #[arg(long, default_value = "fixed:0.01")]
362 sizer: String,
363 #[arg(long, value_enum, default_value = "candle")]
365 mode: BacktestModeArg,
366 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
368 lob_paths: Vec<PathBuf>,
369 #[arg(long)]
370 markets_file: Option<PathBuf>,
371}
372
373#[derive(Args)]
374pub struct BacktestBatchArgs {
375 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
377 config_paths: Vec<PathBuf>,
378 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
380 data_paths: Vec<PathBuf>,
381 #[arg(long, default_value = "0.01")]
382 quantity: Decimal,
383 #[arg(long)]
385 output: Option<PathBuf>,
386 #[arg(long, default_value = "0")]
388 slippage_bps: Decimal,
389 #[arg(long, default_value = "0")]
391 fee_bps: Decimal,
392 #[arg(long, default_value_t = 1)]
394 latency_candles: usize,
395 #[arg(long, default_value = "fixed:0.01")]
397 sizer: String,
398 #[arg(long)]
399 markets_file: Option<PathBuf>,
400}
401
402#[derive(Args)]
403pub struct LiveRunArgs {
404 #[arg(long)]
405 strategy_config: PathBuf,
406 #[arg(long, default_value = "paper_sandbox")]
407 exchange: String,
408 #[arg(long, default_value = "linear")]
409 category: String,
410 #[arg(long, default_value = "1m")]
411 interval: String,
412 #[arg(long, default_value = "1")]
413 quantity: Decimal,
414 #[arg(
416 long = "exec",
417 default_value = "paper",
418 value_enum,
419 alias = "live-exec"
420 )]
421 exec: ExecutionBackend,
422 #[arg(long)]
423 state_path: Option<PathBuf>,
424 #[arg(long)]
425 metrics_addr: Option<String>,
426 #[arg(long)]
427 log_path: Option<PathBuf>,
428 #[arg(long)]
429 initial_equity: Option<Decimal>,
430 #[arg(long)]
431 markets_file: Option<PathBuf>,
432 #[arg(long, default_value = "0")]
433 slippage_bps: Decimal,
434 #[arg(long, default_value = "0")]
435 fee_bps: Decimal,
436 #[arg(long, default_value_t = 0)]
437 latency_ms: u64,
438 #[arg(long, default_value_t = 512)]
439 history: usize,
440 #[arg(long)]
441 reconciliation_interval_secs: Option<u64>,
442 #[arg(long)]
443 reconciliation_threshold: Option<Decimal>,
444 #[arg(long)]
445 webhook_url: Option<String>,
446 #[arg(long)]
447 alert_max_data_gap_secs: Option<u64>,
448 #[arg(long)]
449 alert_max_order_failures: Option<u32>,
450 #[arg(long)]
451 alert_max_drawdown: Option<Decimal>,
452 #[arg(long)]
453 risk_max_order_qty: Option<Decimal>,
454 #[arg(long)]
455 risk_max_position_qty: Option<Decimal>,
456 #[arg(long)]
457 risk_max_drawdown: Option<Decimal>,
458 #[arg(long)]
460 orderbook_depth: Option<usize>,
461 #[arg(long, default_value = "fixed:1.0")]
463 sizer: String,
464}
465
466impl LiveRunArgs {
467 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
468 self.log_path
469 .clone()
470 .unwrap_or_else(|| config.live.log_path.clone())
471 }
472
473 fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
474 self.state_path
475 .clone()
476 .unwrap_or_else(|| config.live.state_path.clone())
477 }
478
479 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
480 let addr = self
481 .metrics_addr
482 .clone()
483 .unwrap_or_else(|| config.live.metrics_addr.clone());
484 addr.parse()
485 .with_context(|| format!("invalid metrics address '{addr}'"))
486 }
487
488 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
489 let secs = self
490 .reconciliation_interval_secs
491 .unwrap_or(config.live.reconciliation_interval_secs)
492 .max(1);
493 StdDuration::from_secs(secs)
494 }
495
496 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
497 let configured = self
498 .reconciliation_threshold
499 .unwrap_or(config.live.reconciliation_threshold);
500 if configured <= Decimal::ZERO {
501 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
502 } else {
503 configured
504 }
505 }
506
507 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
508 let mut balances = clone_initial_balances(&config.backtest);
509 if let Some(value) = self.initial_equity {
510 balances.insert(
511 config.backtest.reporting_currency.clone(),
512 value.max(Decimal::ZERO),
513 );
514 }
515 balances
516 }
517
518 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
519 let mut alerting = config.live.alerting.clone();
520 let webhook = self
521 .webhook_url
522 .clone()
523 .or_else(|| alerting.webhook_url.clone());
524 alerting.webhook_url = sanitize_webhook(webhook);
525 if let Some(sec) = self.alert_max_data_gap_secs {
526 alerting.max_data_gap_secs = sec;
527 }
528 if let Some(limit) = self.alert_max_order_failures {
529 alerting.max_order_failures = limit;
530 }
531 if let Some(limit) = self.alert_max_drawdown {
532 alerting.max_drawdown = limit.max(Decimal::ZERO);
533 }
534 alerting
535 }
536
537 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
538 let mut risk = config.risk_management.clone();
539 if let Some(limit) = self.risk_max_order_qty {
540 risk.max_order_quantity = limit.max(Decimal::ZERO);
541 }
542 if let Some(limit) = self.risk_max_position_qty {
543 risk.max_position_quantity = limit.max(Decimal::ZERO);
544 }
545 if let Some(limit) = self.risk_max_drawdown {
546 risk.max_drawdown = limit.max(Decimal::ZERO);
547 }
548 risk
549 }
550}
551
552#[derive(Deserialize)]
553struct StrategyConfigFile {
554 #[serde(rename = "strategy_name")]
555 name: String,
556 #[serde(default = "empty_table")]
557 params: toml::Value,
558}
559
560fn empty_table() -> toml::Value {
561 toml::Value::Table(Default::default())
562}
563
564pub async fn run() -> Result<()> {
565 let cli = Cli::parse();
566 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
567
568 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
569 0 => config.log_level.clone(),
570 1 => "debug".to_string(),
571 _ => "trace".to_string(),
572 });
573
574 let log_override = match &cli.command {
575 Commands::Live {
576 action: LiveCommand::Run(args),
577 } => Some(args.resolved_log_path(&config)),
578 _ => None,
579 };
580
581 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
582
583 match cli.command {
584 Commands::Data { action } => handle_data(action, &config).await?,
585 Commands::Backtest {
586 action: BacktestCommand::Run(args),
587 } => args.run(&config).await?,
588 Commands::Backtest {
589 action: BacktestCommand::Batch(args),
590 } => args.run(&config).await?,
591 Commands::Live {
592 action: LiveCommand::Run(args),
593 } => args.run(&config).await?,
594 Commands::State { action } => handle_state(action, &config).await?,
595 Commands::Strategies => list_strategies(),
596 }
597
598 Ok(())
599}
600
601async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
602 match cmd {
603 DataCommand::Download(args) => {
604 args.run(config).await?;
605 }
606 DataCommand::Validate(args) => {
607 args.run()?;
608 }
609 DataCommand::Resample(args) => {
610 info!(
611 "stub: resampling {} into {} at {}",
612 args.input.display(),
613 args.output.display(),
614 args.interval
615 );
616 }
617 }
618 Ok(())
619}
620
621async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
622 match cmd {
623 StateCommand::Inspect(args) => {
624 state::inspect_state(args.resolved_path(config), args.raw).await?;
625 }
626 }
627 Ok(())
628}
629
630impl BacktestRunArgs {
631 async fn run(&self, config: &AppConfig) -> Result<()> {
632 let contents = std::fs::read_to_string(&self.strategy_config)
633 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
634 let def: StrategyConfigFile =
635 toml::from_str(&contents).context("failed to parse strategy config file")?;
636 let strategy = load_strategy(&def.name, def.params)
637 .with_context(|| format!("failed to configure strategy {}", def.name))?;
638 let symbols = strategy.subscriptions();
639 if symbols.is_empty() {
640 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
641 }
642
643 let mode = match self.mode {
644 BacktestModeArg::Candle => BacktestMode::Candle,
645 BacktestModeArg::Tick => BacktestMode::Tick,
646 };
647
648 let markets_path = self
649 .markets_file
650 .clone()
651 .or_else(|| config.backtest.markets_file.clone())
652 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
653 let market_registry = Arc::new(
654 MarketRegistry::load_from_file(&markets_path).with_context(|| {
655 format!("failed to load markets from {}", markets_path.display())
656 })?,
657 );
658
659 type CandleModeBundle = (
660 Vec<Candle>,
661 Vec<MarketEvent>,
662 Arc<dyn ExecutionClient>,
663 Option<Arc<MatchingEngine>>,
664 );
665
666 let (candles, lob_events, execution_client, matching_engine): CandleModeBundle = match mode
667 {
668 BacktestMode::Candle => {
669 let mut candles = if self.data_paths.is_empty() {
670 let mut generated = Vec::new();
671 for (idx, symbol) in symbols.iter().enumerate() {
672 let offset = idx as i64 * 10;
673 generated.extend(synth_candles(symbol, self.candles, offset));
674 }
675 generated
676 } else {
677 load_candles_from_paths(&self.data_paths)?
678 };
679
680 if candles.is_empty() {
681 return Err(anyhow!(
682 "no candles loaded; provide --data or allow synthetic generation"
683 ));
684 }
685
686 candles.sort_by_key(|c| c.timestamp);
687 (
688 candles,
689 Vec::new(),
690 Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
691 None,
692 )
693 }
694 BacktestMode::Tick => {
695 if self.lob_paths.is_empty() {
696 bail!("--lob-data is required when --mode tick");
697 }
698 let events = load_lob_events_from_paths(&self.lob_paths)?;
699 if events.is_empty() {
700 bail!("no order book events loaded from --lob-data");
701 }
702 let engine = Arc::new(MatchingEngine::new(
703 "matching-engine",
704 symbols.clone(),
705 reporting_balance(&config.backtest),
706 ));
707 (
708 Vec::new(),
709 events,
710 engine.clone() as Arc<dyn ExecutionClient>,
711 Some(engine),
712 )
713 }
714 };
715
716 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
717 let order_quantity = self.quantity;
718 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
719
720 let mut cfg = BacktestConfig::new(symbols[0].clone(), candles);
721 cfg.lob_events = lob_events;
722 cfg.order_quantity = order_quantity;
723 cfg.initial_balances = clone_initial_balances(&config.backtest);
724 cfg.reporting_currency = config.backtest.reporting_currency.clone();
725 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
726 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
727 cfg.execution.latency_candles = self.latency_candles.max(1);
728 cfg.mode = mode;
729
730 let report = Backtester::new(cfg, strategy, execution, matching_engine, market_registry)
731 .run()
732 .await
733 .context("backtest failed")?;
734 print_report(&report);
735 Ok(())
736 }
737}
738
739impl BacktestBatchArgs {
740 async fn run(&self, config: &AppConfig) -> Result<()> {
741 if self.config_paths.is_empty() {
742 return Err(anyhow!("provide at least one --config path"));
743 }
744 if self.data_paths.is_empty() {
745 return Err(anyhow!("provide at least one --data path for batch mode"));
746 }
747 let markets_path = self
748 .markets_file
749 .clone()
750 .or_else(|| config.backtest.markets_file.clone())
751 .ok_or_else(|| {
752 anyhow!("batch mode requires --markets-file or backtest.markets_file")
753 })?;
754 let market_registry = Arc::new(
755 MarketRegistry::load_from_file(&markets_path).with_context(|| {
756 format!("failed to load markets from {}", markets_path.display())
757 })?,
758 );
759 let mut aggregated = Vec::new();
760 for config_path in &self.config_paths {
761 let contents = std::fs::read_to_string(config_path).with_context(|| {
762 format!("failed to read strategy config {}", config_path.display())
763 })?;
764 let def: StrategyConfigFile =
765 toml::from_str(&contents).context("failed to parse strategy config file")?;
766 let strategy = load_strategy(&def.name, def.params)
767 .with_context(|| format!("failed to configure strategy {}", def.name))?;
768 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
769 let order_quantity = self.quantity;
770 let mut candles = load_candles_from_paths(&self.data_paths)?;
771 candles.sort_by_key(|c| c.timestamp);
772 let execution_client: Arc<dyn ExecutionClient> =
773 Arc::new(PaperExecutionClient::default());
774 let execution =
775 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
776 let mut cfg = BacktestConfig::new(strategy.symbol().to_string(), candles);
777 cfg.order_quantity = order_quantity;
778 cfg.initial_balances = clone_initial_balances(&config.backtest);
779 cfg.reporting_currency = config.backtest.reporting_currency.clone();
780 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
781 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
782 cfg.execution.latency_candles = self.latency_candles.max(1);
783
784 let report = Backtester::new(cfg, strategy, execution, None, market_registry.clone())
785 .run()
786 .await
787 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
788 aggregated.push(BatchRow {
789 config: config_path.display().to_string(),
790 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
794 });
795 }
796
797 if let Some(output) = &self.output {
798 write_batch_report(output, &aggregated)?;
799 println!("Batch report written to {}", output.display());
800 }
801 if aggregated.is_empty() {
802 return Err(anyhow!("no batch jobs executed"));
803 }
804 Ok(())
805 }
806}
807
808impl LiveRunArgs {
809 async fn run(&self, config: &AppConfig) -> Result<()> {
810 let exchange_cfg = config
811 .exchange
812 .get(&self.exchange)
813 .cloned()
814 .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
815
816 let driver = exchange_cfg.driver.clone();
817
818 let contents = fs::read_to_string(&self.strategy_config)
819 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
820 let def: StrategyConfigFile =
821 toml::from_str(&contents).context("failed to parse strategy config file")?;
822 let strategy = load_strategy(&def.name, def.params)
823 .with_context(|| format!("failed to configure strategy {}", def.name))?;
824 let symbols = strategy.subscriptions();
825 if symbols.is_empty() {
826 bail!("strategy did not declare any subscriptions");
827 }
828 if self.quantity <= Decimal::ZERO {
829 bail!("--quantity must be greater than zero");
830 }
831 let quantity = self.quantity;
832 let initial_balances = self.resolved_initial_balances(config);
833 let reporting_currency = config.backtest.reporting_currency.clone();
834 let markets_file = self
835 .markets_file
836 .clone()
837 .or_else(|| config.backtest.markets_file.clone());
838
839 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
840 let category =
841 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
842 let metrics_addr = self.resolved_metrics_addr(config)?;
843 let state_path = self.resolved_state_path(config);
844 let alerting = self.build_alerting(config);
845 let history = self.history.max(32);
846 let reconciliation_interval = self.reconciliation_interval(config);
847 let reconciliation_threshold = self.reconciliation_threshold(config);
848 let orderbook_depth = self
849 .orderbook_depth
850 .unwrap_or(super::live::default_order_book_depth());
851
852 let settings = LiveSessionSettings {
853 category,
854 interval,
855 quantity,
856 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
857 fee_bps: self.fee_bps.max(Decimal::ZERO),
858 history,
859 metrics_addr,
860 state_path,
861 initial_balances,
862 reporting_currency,
863 markets_file,
864 alerting,
865 exec_backend: self.exec,
866 risk: self.build_risk_config(config),
867 reconciliation_interval,
868 reconciliation_threshold,
869 driver,
870 orderbook_depth,
871 };
872
873 info!(
874 strategy = %def.name,
875 symbols = ?symbols,
876 exchange = %self.exchange,
877 interval = %self.interval,
878 driver = ?settings.driver,
879 exec = ?self.exec,
880 "starting live session"
881 );
882
883 run_live(strategy, symbols, exchange_cfg, settings)
884 .await
885 .context("live session failed")
886 }
887}
888
889fn list_strategies() {
890 println!("Built-in strategies:");
891 for name in builtin_strategy_names() {
892 println!("- {name}");
893 }
894}
895
896fn print_validation_summary(outcome: &ValidationOutcome) {
897 const MAX_EXAMPLES: usize = 5;
898 let summary = &outcome.summary;
899 println!(
900 "Validation summary for {} ({} candles)",
901 summary.symbol, summary.rows
902 );
903 println!(
904 " Range: {} -> {}",
905 summary.start.to_rfc3339(),
906 summary.end.to_rfc3339()
907 );
908 println!(" Interval: {}", interval_label(summary.interval));
909 println!(" Missing intervals: {}", summary.missing_candles);
910 println!(" Duplicate intervals: {}", summary.duplicate_candles);
911 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
912 println!(" Price spikes flagged: {}", summary.price_spike_count);
913 println!(
914 " Cross-source mismatches: {}",
915 summary.cross_mismatch_count
916 );
917 println!(" Repaired candles generated: {}", summary.repaired_candles);
918
919 if !outcome.gaps.is_empty() {
920 println!(" Gap examples:");
921 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
922 println!(
923 " {} -> {} (missing {})",
924 gap.start.to_rfc3339(),
925 gap.end.to_rfc3339(),
926 gap.missing
927 );
928 }
929 if outcome.gaps.len() > MAX_EXAMPLES {
930 println!(
931 " ... {} additional gap(s) omitted",
932 outcome.gaps.len() - MAX_EXAMPLES
933 );
934 }
935 }
936
937 if !outcome.price_spikes.is_empty() {
938 println!(" Price spike examples:");
939 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
940 println!(
941 " {} (change {:.2}%)",
942 spike.timestamp.to_rfc3339(),
943 spike.change_fraction * 100.0
944 );
945 }
946 if outcome.price_spikes.len() > MAX_EXAMPLES {
947 println!(
948 " ... {} additional spike(s) omitted",
949 outcome.price_spikes.len() - MAX_EXAMPLES
950 );
951 }
952 }
953
954 if !outcome.cross_mismatches.is_empty() {
955 println!(" Cross-source mismatch examples:");
956 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
957 println!(
958 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
959 miss.timestamp.to_rfc3339(),
960 miss.primary_close,
961 miss.reference_close,
962 miss.delta_fraction * 100.0
963 );
964 }
965 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
966 println!(
967 " ... {} additional mismatch(es) omitted",
968 outcome.cross_mismatches.len() - MAX_EXAMPLES
969 );
970 }
971 }
972}
973
974fn print_report(report: &PerformanceReport) {
975 println!("\n{}", report);
976}
977
978fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
979 let mut candles = Vec::with_capacity(len);
980 for i in 0..len {
981 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
982 let open = base + (i as f64 % 3.0) * 10.0;
983 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
984 let open_dec =
985 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
986 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
987 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
988 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
989 candles.push(Candle {
990 symbol: Symbol::from(symbol),
991 interval: Interval::OneMinute,
992 open: open_dec,
993 high,
994 low,
995 close: close_dec,
996 volume: Decimal::ONE,
997 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
998 + Duration::minutes(offset_minutes),
999 });
1000 }
1001 candles
1002}
1003
1004fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1005 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1006 return Ok(dt.with_timezone(&Utc));
1007 }
1008 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1009 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1010 }
1011 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1012 let dt = date
1013 .and_hms_opt(0, 0, 0)
1014 .ok_or_else(|| anyhow!("invalid date"))?;
1015 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1016 }
1017 Err(anyhow!("unable to parse datetime '{value}'"))
1018}
1019
1020#[derive(Deserialize)]
1021struct CandleCsvRow {
1022 symbol: Option<String>,
1023 timestamp: String,
1024 open: f64,
1025 high: f64,
1026 low: f64,
1027 close: f64,
1028 volume: f64,
1029}
1030
1031fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1032 let mut candles = Vec::new();
1033 for path in paths {
1034 let mut reader = csv::Reader::from_path(path)
1035 .with_context(|| format!("failed to open {}", path.display()))?;
1036 for record in reader.deserialize::<CandleCsvRow>() {
1037 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1038 let timestamp = parse_datetime(&row.timestamp)?;
1039 let symbol = row
1040 .symbol
1041 .clone()
1042 .or_else(|| infer_symbol_from_path(path))
1043 .ok_or_else(|| {
1044 anyhow!(
1045 "missing symbol column and unable to infer from path {}",
1046 path.display()
1047 )
1048 })?;
1049 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1050 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1051 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1052 })?;
1053 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1054 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1055 })?;
1056 let low = Decimal::from_f64(row.low)
1057 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1058 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1059 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1060 })?;
1061 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1062 anyhow!(
1063 "invalid volume value '{}' in {}",
1064 row.volume,
1065 path.display()
1066 )
1067 })?;
1068 candles.push(Candle {
1069 symbol,
1070 interval,
1071 open,
1072 high,
1073 low,
1074 close,
1075 volume,
1076 timestamp,
1077 });
1078 }
1079 }
1080 Ok(candles)
1081}
1082
1083#[derive(Deserialize)]
1084#[serde(tag = "event", rename_all = "lowercase")]
1085enum LobEventRow {
1086 Snapshot {
1087 timestamp: String,
1088 symbol: Option<String>,
1089 bids: Vec<[f64; 2]>,
1090 asks: Vec<[f64; 2]>,
1091 },
1092 Depth {
1093 timestamp: String,
1094 symbol: Option<String>,
1095 bids: Vec<[f64; 2]>,
1096 asks: Vec<[f64; 2]>,
1097 },
1098 Trade {
1099 timestamp: String,
1100 symbol: Option<String>,
1101 side: String,
1102 price: f64,
1103 size: f64,
1104 },
1105}
1106
1107fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1108 let mut events = Vec::new();
1109 for path in paths {
1110 let file = File::open(path)
1111 .with_context(|| format!("failed to open order book file {}", path.display()))?;
1112 let symbol_hint = infer_symbol_from_path(path);
1113 for line in BufReader::new(file).lines() {
1114 let line =
1115 line.with_context(|| format!("failed to read line from {}", path.display()))?;
1116 if line.trim().is_empty() {
1117 continue;
1118 }
1119 let row: LobEventRow = serde_json::from_str(&line)
1120 .with_context(|| format!("invalid order book event in {}", path.display()))?;
1121 match row {
1122 LobEventRow::Snapshot {
1123 timestamp,
1124 symbol,
1125 bids,
1126 asks,
1127 } => {
1128 let ts = parse_datetime(×tamp)?;
1129 let symbol = symbol
1130 .or_else(|| symbol_hint.clone())
1131 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1132 let bids = convert_levels(&bids)?;
1133 let asks = convert_levels(&asks)?;
1134 let book = OrderBook {
1135 symbol: symbol.clone(),
1136 bids,
1137 asks,
1138 timestamp: ts,
1139 };
1140 events.push(MarketEvent {
1141 timestamp: ts,
1142 kind: MarketEventKind::OrderBook(book),
1143 });
1144 }
1145 LobEventRow::Depth {
1146 timestamp,
1147 symbol,
1148 bids,
1149 asks,
1150 } => {
1151 let ts = parse_datetime(×tamp)?;
1152 let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1153 anyhow!("missing symbol in depth update {}", path.display())
1154 })?;
1155 let bids = convert_levels(&bids)?;
1156 let asks = convert_levels(&asks)?;
1157 let update = DepthUpdate {
1158 symbol: symbol.clone(),
1159 bids,
1160 asks,
1161 timestamp: ts,
1162 };
1163 events.push(MarketEvent {
1164 timestamp: ts,
1165 kind: MarketEventKind::Depth(update),
1166 });
1167 }
1168 LobEventRow::Trade {
1169 timestamp,
1170 symbol,
1171 side,
1172 price,
1173 size,
1174 } => {
1175 let ts = parse_datetime(×tamp)?;
1176 let symbol = symbol
1177 .or_else(|| symbol_hint.clone())
1178 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1179 let side = match side.to_lowercase().as_str() {
1180 "buy" | "bid" | "b" => Side::Buy,
1181 "sell" | "ask" | "s" => Side::Sell,
1182 other => bail!("unsupported trade side '{other}' in {}", path.display()),
1183 };
1184 let price = Decimal::from_f64(price).ok_or_else(|| {
1185 anyhow!("invalid trade price '{}' in {}", price, path.display())
1186 })?;
1187 let size = Decimal::from_f64(size).ok_or_else(|| {
1188 anyhow!("invalid trade size '{}' in {}", size, path.display())
1189 })?;
1190 let tick = Tick {
1191 symbol: symbol.clone(),
1192 price,
1193 size,
1194 side,
1195 exchange_timestamp: ts,
1196 received_at: ts,
1197 };
1198 events.push(MarketEvent {
1199 timestamp: ts,
1200 kind: MarketEventKind::Trade(tick),
1201 });
1202 }
1203 }
1204 }
1205 }
1206 events.sort_by_key(|event| event.timestamp);
1207 Ok(events)
1208}
1209
1210fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1211 levels
1212 .iter()
1213 .map(|pair| {
1214 let price = Decimal::from_f64(pair[0])
1215 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1216 let size = Decimal::from_f64(pair[1])
1217 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1218 Ok(OrderBookLevel { price, size })
1219 })
1220 .collect()
1221}
1222
1223fn infer_symbol_from_path(path: &Path) -> Option<String> {
1224 path.parent()
1225 .and_then(|p| p.file_name())
1226 .map(|os| os.to_string_lossy().to_string())
1227}
1228
1229fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1230 path.file_stem()
1231 .and_then(|os| os.to_str())
1232 .and_then(|stem| stem.split('_').next())
1233 .and_then(|token| Interval::from_str(token).ok())
1234}
1235
1236fn default_output_path(
1237 config: &AppConfig,
1238 exchange: &str,
1239 symbol: &str,
1240 interval: Interval,
1241 start: DateTime<Utc>,
1242 end: DateTime<Utc>,
1243) -> PathBuf {
1244 let interval_part = interval_label(interval);
1245 let start_part = start.format("%Y%m%d").to_string();
1246 let end_part = end.format("%Y%m%d").to_string();
1247 config
1248 .data_path
1249 .join(exchange)
1250 .join(symbol)
1251 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1252}
1253
1254fn interval_label(interval: Interval) -> &'static str {
1255 match interval {
1256 Interval::OneSecond => "1s",
1257 Interval::OneMinute => "1m",
1258 Interval::FiveMinutes => "5m",
1259 Interval::FifteenMinutes => "15m",
1260 Interval::OneHour => "1h",
1261 Interval::FourHours => "4h",
1262 Interval::OneDay => "1d",
1263 }
1264}
1265
1266#[derive(Serialize)]
1267struct CandleRow<'a> {
1268 symbol: &'a str,
1269 timestamp: String,
1270 open: f64,
1271 high: f64,
1272 low: f64,
1273 close: f64,
1274 volume: f64,
1275}
1276
1277fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1278 if let Some(parent) = path.parent() {
1279 fs::create_dir_all(parent)
1280 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1281 }
1282 let mut writer =
1283 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1284 for candle in candles {
1285 let row = CandleRow {
1286 symbol: &candle.symbol,
1287 timestamp: candle.timestamp.to_rfc3339(),
1288 open: candle.open.to_f64().unwrap_or(0.0),
1289 high: candle.high.to_f64().unwrap_or(0.0),
1290 low: candle.low.to_f64().unwrap_or(0.0),
1291 close: candle.close.to_f64().unwrap_or(0.0),
1292 volume: candle.volume.to_f64().unwrap_or(0.0),
1293 };
1294 writer.serialize(row)?;
1295 }
1296 writer.flush()?;
1297 Ok(())
1298}
1299
1300#[derive(Serialize)]
1301struct BatchRow {
1302 config: String,
1303 signals: usize,
1304 orders: usize,
1305 dropped_orders: usize,
1306 ending_equity: f64,
1307}
1308
1309fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1310 if let Some(parent) = path.parent() {
1311 fs::create_dir_all(parent)
1312 .with_context(|| format!("failed to create directory {}", parent.display()))?;
1313 }
1314 let mut writer =
1315 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1316 for row in rows {
1317 writer.serialize(row)?;
1318 }
1319 writer.flush()?;
1320 Ok(())
1321}
1322
1323fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1324 config
1325 .initial_balances
1326 .iter()
1327 .map(|(currency, amount)| (currency.clone(), *amount))
1328 .collect()
1329}
1330
1331fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1332 config
1333 .initial_balances
1334 .get(&config.reporting_currency)
1335 .copied()
1336 .unwrap_or_default()
1337}
1338
1339fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1340 let parts: Vec<_> = value.split(':').collect();
1341 match parts.as_slice() {
1342 ["fixed", val] => {
1343 let quantity =
1344 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1345 Ok(Box::new(FixedOrderSizer { quantity }))
1346 }
1347 ["fixed"] => {
1348 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1349 Ok(Box::new(FixedOrderSizer { quantity }))
1350 }
1351 ["percent", val] => {
1352 let percent =
1353 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1354 Ok(Box::new(PortfolioPercentSizer {
1355 percent: percent.max(Decimal::ZERO),
1356 }))
1357 }
1358 ["risk-adjusted", val] => {
1359 let risk_fraction = Decimal::from_str(val)
1360 .context("invalid risk fraction value (use decimals)")?;
1361 Ok(Box::new(RiskAdjustedSizer {
1362 risk_fraction: risk_fraction.max(Decimal::ZERO),
1363 }))
1364 }
1365 _ => Err(anyhow!(
1366 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1367 )),
1368 }
1369}