tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
3use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
4use crate::state;
5use crate::telemetry::init_tracing;
6use std::collections::HashMap;
7use std::fs::{self, File};
8use std::io::{BufRead, BufReader};
9use std::net::SocketAddr;
10use std::path::{Path, PathBuf};
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration as StdDuration;
14
15use anyhow::{anyhow, bail, Context, Result};
16use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
17use clap::{Args, Parser, Subcommand, ValueEnum};
18use csv::Writer;
19use rust_decimal::{
20    prelude::{FromPrimitive, ToPrimitive},
21    Decimal,
22};
23use serde::{Deserialize, Serialize};
24use tesser_backtester::reporting::PerformanceReport;
25use tesser_backtester::{BacktestConfig, BacktestMode, Backtester, MarketEvent, MarketEventKind};
26use tesser_broker::ExecutionClient;
27use tesser_bybit::PublicChannel;
28use tesser_config::{load_config, AppConfig, RiskManagementConfig};
29use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
30use tesser_data::download::{BybitDownloader, KlineRequest};
31use tesser_execution::{
32    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
33    RiskAdjustedSizer,
34};
35use tesser_markets::MarketRegistry;
36use tesser_paper::{MatchingEngine, PaperExecutionClient};
37use tesser_strategy::{builtin_strategy_names, load_strategy};
38use tracing::{info, warn};
39
40#[derive(Parser)]
41#[command(author, version, about = "Tesser Trading Framework")]
42pub struct Cli {
43    /// Increases logging verbosity (-v debug, -vv trace)
44    #[arg(short, long, action = clap::ArgAction::Count)]
45    verbose: u8,
46    /// Selects which configuration environment to load (maps to config/{env}.toml)
47    #[arg(long, default_value = "default")]
48    env: String,
49    #[command(subcommand)]
50    command: Commands,
51}
52
53#[allow(clippy::large_enum_variant)]
54#[derive(Subcommand)]
55pub enum Commands {
56    /// Data engineering tasks
57    Data {
58        #[command(subcommand)]
59        action: DataCommand,
60    },
61    /// Backtesting workflows
62    Backtest {
63        #[command(subcommand)]
64        action: BacktestCommand,
65    },
66    /// Live trading workflows
67    Live {
68        #[command(subcommand)]
69        action: LiveCommand,
70    },
71    /// Inspect or repair persisted runtime state
72    State {
73        #[command(subcommand)]
74        action: StateCommand,
75    },
76    /// Strategy management helpers
77    Strategies,
78}
79
80#[derive(Subcommand)]
81pub enum DataCommand {
82    /// Download historical market data
83    Download(DataDownloadArgs),
84    /// Validate and optionally repair a local data set
85    Validate(DataValidateArgs),
86    /// Resample existing data (placeholder)
87    Resample(DataResampleArgs),
88}
89
90#[derive(Subcommand)]
91pub enum BacktestCommand {
92    /// Run a single backtest from a strategy config file
93    Run(BacktestRunArgs),
94    /// Run multiple strategy configs and aggregate the results
95    Batch(BacktestBatchArgs),
96}
97
98#[derive(Subcommand)]
99pub enum LiveCommand {
100    /// Start a live trading session (scaffolding)
101    Run(LiveRunArgs),
102}
103
104#[derive(Subcommand)]
105pub enum StateCommand {
106    /// Inspect the SQLite state database
107    Inspect(StateInspectArgs),
108}
109
110#[derive(Args)]
111pub struct DataDownloadArgs {
112    #[arg(long, default_value = "bybit")]
113    exchange: String,
114    #[arg(long)]
115    symbol: String,
116    #[arg(long, default_value = "linear")]
117    category: String,
118    #[arg(long, default_value = "1m")]
119    interval: String,
120    #[arg(long)]
121    start: String,
122    #[arg(long)]
123    end: Option<String>,
124    #[arg(long)]
125    output: Option<PathBuf>,
126    /// Skip automatic validation after download completes
127    #[arg(long)]
128    skip_validation: bool,
129    /// Attempt to repair gaps detected during validation
130    #[arg(long)]
131    repair_missing: bool,
132    /// Max allowed close-to-close jump when auto-validating (fractional)
133    #[arg(long, default_value_t = 0.05)]
134    validation_jump_threshold: f64,
135    /// Allowed divergence between primary and reference closes (fractional)
136    #[arg(long, default_value_t = 0.002)]
137    validation_reference_tolerance: f64,
138}
139
140#[derive(Args)]
141pub struct StateInspectArgs {
142    /// Path to the SQLite state database (defaults to live.state_path)
143    #[arg(long)]
144    path: Option<PathBuf>,
145    /// Emit the raw JSON payload stored inside the database
146    #[arg(long)]
147    raw: bool,
148}
149
150impl StateInspectArgs {
151    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
152        self.path
153            .clone()
154            .unwrap_or_else(|| config.live.state_path.clone())
155    }
156}
157
158impl DataDownloadArgs {
159    async fn run(&self, config: &AppConfig) -> Result<()> {
160        let exchange_cfg = config
161            .exchange
162            .get(&self.exchange)
163            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
164        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
165        let start = parse_datetime(&self.start)?;
166        let end = match &self.end {
167            Some(value) => parse_datetime(value)?,
168            None => Utc::now(),
169        };
170        if start >= end {
171            return Err(anyhow!("start time must be earlier than end time"));
172        }
173
174        let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
175        let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
176        info!(
177            "Downloading {} candles for {} ({})",
178            self.interval, self.symbol, self.exchange
179        );
180        let mut candles = downloader
181            .download_klines(&request)
182            .await
183            .with_context(|| "failed to download candles from Bybit")?;
184
185        if candles.is_empty() {
186            info!("No candles returned for {}", self.symbol);
187            return Ok(());
188        }
189
190        if !self.skip_validation {
191            let config = ValidationConfig {
192                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
193                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
194                repair_missing: self.repair_missing,
195            };
196            let outcome =
197                validate_dataset(candles.clone(), None, config).context("validation failed")?;
198            print_validation_summary(&outcome);
199            if self.repair_missing && outcome.summary.repaired_candles > 0 {
200                candles = outcome.repaired;
201                info!(
202                    "Applied {} synthetic candle(s) to repair gaps",
203                    outcome.summary.repaired_candles
204                );
205            }
206        }
207
208        let output_path = self.output.clone().unwrap_or_else(|| {
209            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
210        });
211        write_candles_csv(&output_path, &candles)?;
212        info!(
213            "Saved {} candles to {}",
214            candles.len(),
215            output_path.display()
216        );
217        Ok(())
218    }
219}
220
221#[derive(Args)]
222pub struct DataValidateArgs {
223    /// One or more CSV files to inspect
224    #[arg(
225        long = "path",
226        value_name = "PATH",
227        num_args = 1..,
228        action = clap::ArgAction::Append
229    )]
230    paths: Vec<PathBuf>,
231    /// Optional reference data set(s) used for cross validation
232    #[arg(
233        long = "reference",
234        value_name = "PATH",
235        num_args = 1..,
236        action = clap::ArgAction::Append
237    )]
238    reference_paths: Vec<PathBuf>,
239    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
240    #[arg(long, default_value_t = 0.05)]
241    jump_threshold: f64,
242    /// Allowed divergence between primary and reference closes (fractional)
243    #[arg(long, default_value_t = 0.002)]
244    reference_tolerance: f64,
245    /// Attempt to fill gaps by synthesizing candles
246    #[arg(long)]
247    repair_missing: bool,
248    /// Location to write the repaired dataset
249    #[arg(long)]
250    output: Option<PathBuf>,
251}
252
253impl DataValidateArgs {
254    fn run(&self) -> Result<()> {
255        if self.paths.is_empty() {
256            bail!("provide at least one --path for validation");
257        }
258        let candles =
259            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
260        if candles.is_empty() {
261            bail!("loaded dataset is empty; nothing to validate");
262        }
263        let reference = if self.reference_paths.is_empty() {
264            None
265        } else {
266            Some(
267                load_candles_from_paths(&self.reference_paths)
268                    .with_context(|| "failed to load reference dataset")?,
269            )
270        };
271
272        let price_jump_threshold = if self.jump_threshold <= 0.0 {
273            0.0001
274        } else {
275            self.jump_threshold
276        };
277        let reference_tolerance = if self.reference_tolerance <= 0.0 {
278            0.0001
279        } else {
280            self.reference_tolerance
281        };
282
283        let config = ValidationConfig {
284            price_jump_threshold,
285            reference_tolerance,
286            repair_missing: self.repair_missing,
287        };
288
289        let outcome = validate_dataset(candles, reference, config)?;
290        print_validation_summary(&outcome);
291
292        if let Some(output) = &self.output {
293            write_candles_csv(output, &outcome.repaired)?;
294            info!(
295                "Wrote {} candles ({} new) to {}",
296                outcome.repaired.len(),
297                outcome.summary.repaired_candles,
298                output.display()
299            );
300        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
301            warn!(
302                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
303                outcome.summary.repaired_candles
304            );
305        }
306
307        Ok(())
308    }
309}
310
311#[derive(Args)]
312pub struct DataResampleArgs {
313    #[arg(long)]
314    input: PathBuf,
315    #[arg(long)]
316    output: PathBuf,
317    #[arg(long, default_value = "1h")]
318    interval: String,
319}
320
321#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
322pub enum BacktestModeArg {
323    Candle,
324    Tick,
325}
326
327#[derive(Args)]
328pub struct BacktestRunArgs {
329    #[arg(long)]
330    strategy_config: PathBuf,
331    /// One or more CSV files with historical candles (symbol,timestamp,...)
332    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
333    data_paths: Vec<PathBuf>,
334    #[arg(long, default_value_t = 500)]
335    candles: usize,
336    #[arg(long, default_value = "0.01")]
337    quantity: Decimal,
338    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
339    #[arg(long, default_value = "0")]
340    slippage_bps: Decimal,
341    /// Trading fees in basis points applied to notional
342    #[arg(long, default_value = "0")]
343    fee_bps: Decimal,
344    /// Number of candles between signal and execution
345    #[arg(long, default_value_t = 1)]
346    latency_candles: usize,
347    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
348    #[arg(long, default_value = "fixed:0.01")]
349    sizer: String,
350    /// Selects the data source driving fills (`candle` or `tick`)
351    #[arg(long, value_enum, default_value = "candle")]
352    mode: BacktestModeArg,
353    /// One or more JSONL files containing tick/order book events (required for `--mode tick`)
354    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
355    lob_paths: Vec<PathBuf>,
356    #[arg(long)]
357    markets_file: Option<PathBuf>,
358}
359
360#[derive(Args)]
361pub struct BacktestBatchArgs {
362    /// Glob or directory containing strategy config files
363    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
364    config_paths: Vec<PathBuf>,
365    /// Candle CSVs available to every strategy
366    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
367    data_paths: Vec<PathBuf>,
368    #[arg(long, default_value = "0.01")]
369    quantity: Decimal,
370    /// Optional output CSV summarizing results
371    #[arg(long)]
372    output: Option<PathBuf>,
373    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
374    #[arg(long, default_value = "0")]
375    slippage_bps: Decimal,
376    /// Trading fees in basis points applied to notional
377    #[arg(long, default_value = "0")]
378    fee_bps: Decimal,
379    /// Number of candles between signal and execution
380    #[arg(long, default_value_t = 1)]
381    latency_candles: usize,
382    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
383    #[arg(long, default_value = "fixed:0.01")]
384    sizer: String,
385    #[arg(long)]
386    markets_file: Option<PathBuf>,
387}
388
389#[derive(Args)]
390pub struct LiveRunArgs {
391    #[arg(long)]
392    strategy_config: PathBuf,
393    #[arg(long, default_value = "bybit_testnet")]
394    exchange: String,
395    #[arg(long, default_value = "linear")]
396    category: String,
397    #[arg(long, default_value = "1m")]
398    interval: String,
399    #[arg(long, default_value = "1")]
400    quantity: Decimal,
401    /// Selects which execution backend to use (`paper` or `bybit`)
402    #[arg(
403        long = "exec",
404        default_value = "paper",
405        value_enum,
406        alias = "live-exec"
407    )]
408    exec: ExecutionBackend,
409    #[arg(long)]
410    state_path: Option<PathBuf>,
411    #[arg(long)]
412    metrics_addr: Option<String>,
413    #[arg(long)]
414    log_path: Option<PathBuf>,
415    #[arg(long)]
416    initial_equity: Option<Decimal>,
417    #[arg(long)]
418    markets_file: Option<PathBuf>,
419    #[arg(long, default_value = "0")]
420    slippage_bps: Decimal,
421    #[arg(long, default_value = "0")]
422    fee_bps: Decimal,
423    #[arg(long, default_value_t = 0)]
424    latency_ms: u64,
425    #[arg(long, default_value_t = 512)]
426    history: usize,
427    #[arg(long)]
428    reconciliation_interval_secs: Option<u64>,
429    #[arg(long)]
430    reconciliation_threshold: Option<Decimal>,
431    #[arg(long)]
432    webhook_url: Option<String>,
433    #[arg(long)]
434    alert_max_data_gap_secs: Option<u64>,
435    #[arg(long)]
436    alert_max_order_failures: Option<u32>,
437    #[arg(long)]
438    alert_max_drawdown: Option<Decimal>,
439    #[arg(long)]
440    risk_max_order_qty: Option<Decimal>,
441    #[arg(long)]
442    risk_max_position_qty: Option<Decimal>,
443    #[arg(long)]
444    risk_max_drawdown: Option<Decimal>,
445    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50)
446    #[arg(long)]
447    orderbook_depth: Option<usize>,
448    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
449    #[arg(long, default_value = "fixed:1.0")]
450    sizer: String,
451}
452
453impl LiveRunArgs {
454    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
455        self.log_path
456            .clone()
457            .unwrap_or_else(|| config.live.log_path.clone())
458    }
459
460    fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
461        self.state_path
462            .clone()
463            .unwrap_or_else(|| config.live.state_path.clone())
464    }
465
466    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
467        let addr = self
468            .metrics_addr
469            .clone()
470            .unwrap_or_else(|| config.live.metrics_addr.clone());
471        addr.parse()
472            .with_context(|| format!("invalid metrics address '{addr}'"))
473    }
474
475    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
476        let secs = self
477            .reconciliation_interval_secs
478            .unwrap_or(config.live.reconciliation_interval_secs)
479            .max(1);
480        StdDuration::from_secs(secs)
481    }
482
483    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
484        let configured = self
485            .reconciliation_threshold
486            .unwrap_or(config.live.reconciliation_threshold);
487        if configured <= Decimal::ZERO {
488            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
489        } else {
490            configured
491        }
492    }
493
494    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
495        let mut balances = clone_initial_balances(&config.backtest);
496        if let Some(value) = self.initial_equity {
497            balances.insert(
498                config.backtest.reporting_currency.clone(),
499                value.max(Decimal::ZERO),
500            );
501        }
502        balances
503    }
504
505    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
506        let mut alerting = config.live.alerting.clone();
507        let webhook = self
508            .webhook_url
509            .clone()
510            .or_else(|| alerting.webhook_url.clone());
511        alerting.webhook_url = sanitize_webhook(webhook);
512        if let Some(sec) = self.alert_max_data_gap_secs {
513            alerting.max_data_gap_secs = sec;
514        }
515        if let Some(limit) = self.alert_max_order_failures {
516            alerting.max_order_failures = limit;
517        }
518        if let Some(limit) = self.alert_max_drawdown {
519            alerting.max_drawdown = limit.max(Decimal::ZERO);
520        }
521        alerting
522    }
523
524    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
525        let mut risk = config.risk_management.clone();
526        if let Some(limit) = self.risk_max_order_qty {
527            risk.max_order_quantity = limit.max(Decimal::ZERO);
528        }
529        if let Some(limit) = self.risk_max_position_qty {
530            risk.max_position_quantity = limit.max(Decimal::ZERO);
531        }
532        if let Some(limit) = self.risk_max_drawdown {
533            risk.max_drawdown = limit.max(Decimal::ZERO);
534        }
535        risk
536    }
537}
538
539#[derive(Deserialize)]
540struct StrategyConfigFile {
541    #[serde(rename = "strategy_name")]
542    name: String,
543    #[serde(default = "empty_table")]
544    params: toml::Value,
545}
546
547fn empty_table() -> toml::Value {
548    toml::Value::Table(Default::default())
549}
550
551pub async fn run() -> Result<()> {
552    let cli = Cli::parse();
553    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
554
555    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
556        0 => config.log_level.clone(),
557        1 => "debug".to_string(),
558        _ => "trace".to_string(),
559    });
560
561    let log_override = match &cli.command {
562        Commands::Live {
563            action: LiveCommand::Run(args),
564        } => Some(args.resolved_log_path(&config)),
565        _ => None,
566    };
567
568    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
569
570    match cli.command {
571        Commands::Data { action } => handle_data(action, &config).await?,
572        Commands::Backtest {
573            action: BacktestCommand::Run(args),
574        } => args.run(&config).await?,
575        Commands::Backtest {
576            action: BacktestCommand::Batch(args),
577        } => args.run(&config).await?,
578        Commands::Live {
579            action: LiveCommand::Run(args),
580        } => args.run(&config).await?,
581        Commands::State { action } => handle_state(action, &config).await?,
582        Commands::Strategies => list_strategies(),
583    }
584
585    Ok(())
586}
587
588async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
589    match cmd {
590        DataCommand::Download(args) => {
591            args.run(config).await?;
592        }
593        DataCommand::Validate(args) => {
594            args.run()?;
595        }
596        DataCommand::Resample(args) => {
597            info!(
598                "stub: resampling {} into {} at {}",
599                args.input.display(),
600                args.output.display(),
601                args.interval
602            );
603        }
604    }
605    Ok(())
606}
607
608async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
609    match cmd {
610        StateCommand::Inspect(args) => {
611            state::inspect_state(args.resolved_path(config), args.raw).await?;
612        }
613    }
614    Ok(())
615}
616
617impl BacktestRunArgs {
618    async fn run(&self, config: &AppConfig) -> Result<()> {
619        let contents = std::fs::read_to_string(&self.strategy_config)
620            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
621        let def: StrategyConfigFile =
622            toml::from_str(&contents).context("failed to parse strategy config file")?;
623        let strategy = load_strategy(&def.name, def.params)
624            .with_context(|| format!("failed to configure strategy {}", def.name))?;
625        let symbols = strategy.subscriptions();
626        if symbols.is_empty() {
627            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
628        }
629
630        let mode = match self.mode {
631            BacktestModeArg::Candle => BacktestMode::Candle,
632            BacktestModeArg::Tick => BacktestMode::Tick,
633        };
634
635        let markets_path = self
636            .markets_file
637            .clone()
638            .or_else(|| config.backtest.markets_file.clone())
639            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
640        let market_registry = Arc::new(
641            MarketRegistry::load_from_file(&markets_path).with_context(|| {
642                format!("failed to load markets from {}", markets_path.display())
643            })?,
644        );
645
646        type CandleModeBundle = (
647            Vec<Candle>,
648            Vec<MarketEvent>,
649            Arc<dyn ExecutionClient>,
650            Option<Arc<MatchingEngine>>,
651        );
652
653        let (candles, lob_events, execution_client, matching_engine): CandleModeBundle = match mode
654        {
655            BacktestMode::Candle => {
656                let mut candles = if self.data_paths.is_empty() {
657                    let mut generated = Vec::new();
658                    for (idx, symbol) in symbols.iter().enumerate() {
659                        let offset = idx as i64 * 10;
660                        generated.extend(synth_candles(symbol, self.candles, offset));
661                    }
662                    generated
663                } else {
664                    load_candles_from_paths(&self.data_paths)?
665                };
666
667                if candles.is_empty() {
668                    return Err(anyhow!(
669                        "no candles loaded; provide --data or allow synthetic generation"
670                    ));
671                }
672
673                candles.sort_by_key(|c| c.timestamp);
674                (
675                    candles,
676                    Vec::new(),
677                    Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
678                    None,
679                )
680            }
681            BacktestMode::Tick => {
682                if self.lob_paths.is_empty() {
683                    bail!("--lob-data is required when --mode tick");
684                }
685                let events = load_lob_events_from_paths(&self.lob_paths)?;
686                if events.is_empty() {
687                    bail!("no order book events loaded from --lob-data");
688                }
689                let engine = Arc::new(MatchingEngine::new(
690                    "matching-engine",
691                    symbols.clone(),
692                    reporting_balance(&config.backtest),
693                ));
694                (
695                    Vec::new(),
696                    events,
697                    engine.clone() as Arc<dyn ExecutionClient>,
698                    Some(engine),
699                )
700            }
701        };
702
703        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
704        let order_quantity = self.quantity;
705        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
706
707        let mut cfg = BacktestConfig::new(symbols[0].clone(), candles);
708        cfg.lob_events = lob_events;
709        cfg.order_quantity = order_quantity;
710        cfg.initial_balances = clone_initial_balances(&config.backtest);
711        cfg.reporting_currency = config.backtest.reporting_currency.clone();
712        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
713        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
714        cfg.execution.latency_candles = self.latency_candles.max(1);
715        cfg.mode = mode;
716
717        let report = Backtester::new(cfg, strategy, execution, matching_engine, market_registry)
718            .run()
719            .await
720            .context("backtest failed")?;
721        print_report(&report);
722        Ok(())
723    }
724}
725
726impl BacktestBatchArgs {
727    async fn run(&self, config: &AppConfig) -> Result<()> {
728        if self.config_paths.is_empty() {
729            return Err(anyhow!("provide at least one --config path"));
730        }
731        if self.data_paths.is_empty() {
732            return Err(anyhow!("provide at least one --data path for batch mode"));
733        }
734        let markets_path = self
735            .markets_file
736            .clone()
737            .or_else(|| config.backtest.markets_file.clone())
738            .ok_or_else(|| {
739                anyhow!("batch mode requires --markets-file or backtest.markets_file")
740            })?;
741        let market_registry = Arc::new(
742            MarketRegistry::load_from_file(&markets_path).with_context(|| {
743                format!("failed to load markets from {}", markets_path.display())
744            })?,
745        );
746        let mut aggregated = Vec::new();
747        for config_path in &self.config_paths {
748            let contents = std::fs::read_to_string(config_path).with_context(|| {
749                format!("failed to read strategy config {}", config_path.display())
750            })?;
751            let def: StrategyConfigFile =
752                toml::from_str(&contents).context("failed to parse strategy config file")?;
753            let strategy = load_strategy(&def.name, def.params)
754                .with_context(|| format!("failed to configure strategy {}", def.name))?;
755            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
756            let order_quantity = self.quantity;
757            let mut candles = load_candles_from_paths(&self.data_paths)?;
758            candles.sort_by_key(|c| c.timestamp);
759            let execution_client: Arc<dyn ExecutionClient> =
760                Arc::new(PaperExecutionClient::default());
761            let execution =
762                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
763            let mut cfg = BacktestConfig::new(strategy.symbol().to_string(), candles);
764            cfg.order_quantity = order_quantity;
765            cfg.initial_balances = clone_initial_balances(&config.backtest);
766            cfg.reporting_currency = config.backtest.reporting_currency.clone();
767            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
768            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
769            cfg.execution.latency_candles = self.latency_candles.max(1);
770
771            let report = Backtester::new(cfg, strategy, execution, None, market_registry.clone())
772                .run()
773                .await
774                .with_context(|| format!("backtest failed for {}", config_path.display()))?;
775            aggregated.push(BatchRow {
776                config: config_path.display().to_string(),
777                signals: 0, // Legacy field, can be removed or calculated from report
778                orders: 0,  // Legacy field, can be removed or calculated from report
779                dropped_orders: 0, // Legacy field, can be removed or calculated from report
780                ending_equity: report.ending_equity,
781            });
782        }
783
784        if let Some(output) = &self.output {
785            write_batch_report(output, &aggregated)?;
786            println!("Batch report written to {}", output.display());
787        }
788        if aggregated.is_empty() {
789            return Err(anyhow!("no batch jobs executed"));
790        }
791        Ok(())
792    }
793}
794
795impl LiveRunArgs {
796    async fn run(&self, config: &AppConfig) -> Result<()> {
797        let exchange_cfg = config
798            .exchange
799            .get(&self.exchange)
800            .cloned()
801            .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
802
803        let contents = fs::read_to_string(&self.strategy_config)
804            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
805        let def: StrategyConfigFile =
806            toml::from_str(&contents).context("failed to parse strategy config file")?;
807        let strategy = load_strategy(&def.name, def.params)
808            .with_context(|| format!("failed to configure strategy {}", def.name))?;
809        let symbols = strategy.subscriptions();
810        if symbols.is_empty() {
811            bail!("strategy did not declare any subscriptions");
812        }
813        if self.quantity <= Decimal::ZERO {
814            bail!("--quantity must be greater than zero");
815        }
816        let quantity = self.quantity;
817        let initial_balances = self.resolved_initial_balances(config);
818        let reporting_currency = config.backtest.reporting_currency.clone();
819        let markets_file = self
820            .markets_file
821            .clone()
822            .or_else(|| config.backtest.markets_file.clone());
823
824        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
825        let category =
826            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
827        let metrics_addr = self.resolved_metrics_addr(config)?;
828        let state_path = self.resolved_state_path(config);
829        let alerting = self.build_alerting(config);
830        let history = self.history.max(32);
831        let reconciliation_interval = self.reconciliation_interval(config);
832        let reconciliation_threshold = self.reconciliation_threshold(config);
833
834        let settings = LiveSessionSettings {
835            category,
836            interval,
837            quantity,
838            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
839            fee_bps: self.fee_bps.max(Decimal::ZERO),
840            history,
841            metrics_addr,
842            state_path,
843            initial_balances,
844            reporting_currency,
845            markets_file,
846            alerting,
847            exec_backend: self.exec,
848            risk: self.build_risk_config(config),
849            reconciliation_interval,
850            reconciliation_threshold,
851        };
852
853        info!(
854            strategy = %def.name,
855            symbols = ?symbols,
856            exchange = %self.exchange,
857            interval = %self.interval,
858            exec = ?self.exec,
859            "starting live session"
860        );
861
862        run_live(strategy, symbols, exchange_cfg, settings)
863            .await
864            .context("live session failed")
865    }
866}
867
868fn list_strategies() {
869    println!("Built-in strategies:");
870    for name in builtin_strategy_names() {
871        println!("- {name}");
872    }
873}
874
875fn print_validation_summary(outcome: &ValidationOutcome) {
876    const MAX_EXAMPLES: usize = 5;
877    let summary = &outcome.summary;
878    println!(
879        "Validation summary for {} ({} candles)",
880        summary.symbol, summary.rows
881    );
882    println!(
883        "  Range: {} -> {}",
884        summary.start.to_rfc3339(),
885        summary.end.to_rfc3339()
886    );
887    println!("  Interval: {}", interval_label(summary.interval));
888    println!("  Missing intervals: {}", summary.missing_candles);
889    println!("  Duplicate intervals: {}", summary.duplicate_candles);
890    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
891    println!("  Price spikes flagged: {}", summary.price_spike_count);
892    println!(
893        "  Cross-source mismatches: {}",
894        summary.cross_mismatch_count
895    );
896    println!("  Repaired candles generated: {}", summary.repaired_candles);
897
898    if !outcome.gaps.is_empty() {
899        println!("  Gap examples:");
900        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
901            println!(
902                "    {} -> {} (missing {})",
903                gap.start.to_rfc3339(),
904                gap.end.to_rfc3339(),
905                gap.missing
906            );
907        }
908        if outcome.gaps.len() > MAX_EXAMPLES {
909            println!(
910                "    ... {} additional gap(s) omitted",
911                outcome.gaps.len() - MAX_EXAMPLES
912            );
913        }
914    }
915
916    if !outcome.price_spikes.is_empty() {
917        println!("  Price spike examples:");
918        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
919            println!(
920                "    {} (change {:.2}%)",
921                spike.timestamp.to_rfc3339(),
922                spike.change_fraction * 100.0
923            );
924        }
925        if outcome.price_spikes.len() > MAX_EXAMPLES {
926            println!(
927                "    ... {} additional spike(s) omitted",
928                outcome.price_spikes.len() - MAX_EXAMPLES
929            );
930        }
931    }
932
933    if !outcome.cross_mismatches.is_empty() {
934        println!("  Cross-source mismatch examples:");
935        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
936            println!(
937                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
938                miss.timestamp.to_rfc3339(),
939                miss.primary_close,
940                miss.reference_close,
941                miss.delta_fraction * 100.0
942            );
943        }
944        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
945            println!(
946                "    ... {} additional mismatch(es) omitted",
947                outcome.cross_mismatches.len() - MAX_EXAMPLES
948            );
949        }
950    }
951}
952
953fn print_report(report: &PerformanceReport) {
954    println!("\n{}", report);
955}
956
957fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
958    let mut candles = Vec::with_capacity(len);
959    for i in 0..len {
960        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
961        let open = base + (i as f64 % 3.0) * 10.0;
962        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
963        let open_dec =
964            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
965        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
966        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
967        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
968        candles.push(Candle {
969            symbol: Symbol::from(symbol),
970            interval: Interval::OneMinute,
971            open: open_dec,
972            high,
973            low,
974            close: close_dec,
975            volume: Decimal::ONE,
976            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
977                + Duration::minutes(offset_minutes),
978        });
979    }
980    candles
981}
982
983fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
984    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
985        return Ok(dt.with_timezone(&Utc));
986    }
987    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
988        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
989    }
990    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
991        let dt = date
992            .and_hms_opt(0, 0, 0)
993            .ok_or_else(|| anyhow!("invalid date"))?;
994        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
995    }
996    Err(anyhow!("unable to parse datetime '{value}'"))
997}
998
999#[derive(Deserialize)]
1000struct CandleCsvRow {
1001    symbol: Option<String>,
1002    timestamp: String,
1003    open: f64,
1004    high: f64,
1005    low: f64,
1006    close: f64,
1007    volume: f64,
1008}
1009
1010fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1011    let mut candles = Vec::new();
1012    for path in paths {
1013        let mut reader = csv::Reader::from_path(path)
1014            .with_context(|| format!("failed to open {}", path.display()))?;
1015        for record in reader.deserialize::<CandleCsvRow>() {
1016            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1017            let timestamp = parse_datetime(&row.timestamp)?;
1018            let symbol = row
1019                .symbol
1020                .clone()
1021                .or_else(|| infer_symbol_from_path(path))
1022                .ok_or_else(|| {
1023                    anyhow!(
1024                        "missing symbol column and unable to infer from path {}",
1025                        path.display()
1026                    )
1027                })?;
1028            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1029            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1030                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1031            })?;
1032            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1033                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1034            })?;
1035            let low = Decimal::from_f64(row.low)
1036                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1037            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1038                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1039            })?;
1040            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1041                anyhow!(
1042                    "invalid volume value '{}' in {}",
1043                    row.volume,
1044                    path.display()
1045                )
1046            })?;
1047            candles.push(Candle {
1048                symbol,
1049                interval,
1050                open,
1051                high,
1052                low,
1053                close,
1054                volume,
1055                timestamp,
1056            });
1057        }
1058    }
1059    Ok(candles)
1060}
1061
1062#[derive(Deserialize)]
1063#[serde(tag = "event", rename_all = "lowercase")]
1064enum LobEventRow {
1065    Snapshot {
1066        timestamp: String,
1067        symbol: Option<String>,
1068        bids: Vec<[f64; 2]>,
1069        asks: Vec<[f64; 2]>,
1070    },
1071    Depth {
1072        timestamp: String,
1073        symbol: Option<String>,
1074        bids: Vec<[f64; 2]>,
1075        asks: Vec<[f64; 2]>,
1076    },
1077    Trade {
1078        timestamp: String,
1079        symbol: Option<String>,
1080        side: String,
1081        price: f64,
1082        size: f64,
1083    },
1084}
1085
1086fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1087    let mut events = Vec::new();
1088    for path in paths {
1089        let file = File::open(path)
1090            .with_context(|| format!("failed to open order book file {}", path.display()))?;
1091        let symbol_hint = infer_symbol_from_path(path);
1092        for line in BufReader::new(file).lines() {
1093            let line =
1094                line.with_context(|| format!("failed to read line from {}", path.display()))?;
1095            if line.trim().is_empty() {
1096                continue;
1097            }
1098            let row: LobEventRow = serde_json::from_str(&line)
1099                .with_context(|| format!("invalid order book event in {}", path.display()))?;
1100            match row {
1101                LobEventRow::Snapshot {
1102                    timestamp,
1103                    symbol,
1104                    bids,
1105                    asks,
1106                } => {
1107                    let ts = parse_datetime(&timestamp)?;
1108                    let symbol = symbol
1109                        .or_else(|| symbol_hint.clone())
1110                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1111                    let bids = convert_levels(&bids)?;
1112                    let asks = convert_levels(&asks)?;
1113                    let book = OrderBook {
1114                        symbol: symbol.clone(),
1115                        bids,
1116                        asks,
1117                        timestamp: ts,
1118                    };
1119                    events.push(MarketEvent {
1120                        timestamp: ts,
1121                        kind: MarketEventKind::OrderBook(book),
1122                    });
1123                }
1124                LobEventRow::Depth {
1125                    timestamp,
1126                    symbol,
1127                    bids,
1128                    asks,
1129                } => {
1130                    let ts = parse_datetime(&timestamp)?;
1131                    let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1132                        anyhow!("missing symbol in depth update {}", path.display())
1133                    })?;
1134                    let bids = convert_levels(&bids)?;
1135                    let asks = convert_levels(&asks)?;
1136                    let update = DepthUpdate {
1137                        symbol: symbol.clone(),
1138                        bids,
1139                        asks,
1140                        timestamp: ts,
1141                    };
1142                    events.push(MarketEvent {
1143                        timestamp: ts,
1144                        kind: MarketEventKind::Depth(update),
1145                    });
1146                }
1147                LobEventRow::Trade {
1148                    timestamp,
1149                    symbol,
1150                    side,
1151                    price,
1152                    size,
1153                } => {
1154                    let ts = parse_datetime(&timestamp)?;
1155                    let symbol = symbol
1156                        .or_else(|| symbol_hint.clone())
1157                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1158                    let side = match side.to_lowercase().as_str() {
1159                        "buy" | "bid" | "b" => Side::Buy,
1160                        "sell" | "ask" | "s" => Side::Sell,
1161                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
1162                    };
1163                    let price = Decimal::from_f64(price).ok_or_else(|| {
1164                        anyhow!("invalid trade price '{}' in {}", price, path.display())
1165                    })?;
1166                    let size = Decimal::from_f64(size).ok_or_else(|| {
1167                        anyhow!("invalid trade size '{}' in {}", size, path.display())
1168                    })?;
1169                    let tick = Tick {
1170                        symbol: symbol.clone(),
1171                        price,
1172                        size,
1173                        side,
1174                        exchange_timestamp: ts,
1175                        received_at: ts,
1176                    };
1177                    events.push(MarketEvent {
1178                        timestamp: ts,
1179                        kind: MarketEventKind::Trade(tick),
1180                    });
1181                }
1182            }
1183        }
1184    }
1185    events.sort_by_key(|event| event.timestamp);
1186    Ok(events)
1187}
1188
1189fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1190    levels
1191        .iter()
1192        .map(|pair| {
1193            let price = Decimal::from_f64(pair[0])
1194                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1195            let size = Decimal::from_f64(pair[1])
1196                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1197            Ok(OrderBookLevel { price, size })
1198        })
1199        .collect()
1200}
1201
1202fn infer_symbol_from_path(path: &Path) -> Option<String> {
1203    path.parent()
1204        .and_then(|p| p.file_name())
1205        .map(|os| os.to_string_lossy().to_string())
1206}
1207
1208fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1209    path.file_stem()
1210        .and_then(|os| os.to_str())
1211        .and_then(|stem| stem.split('_').next())
1212        .and_then(|token| Interval::from_str(token).ok())
1213}
1214
1215fn default_output_path(
1216    config: &AppConfig,
1217    exchange: &str,
1218    symbol: &str,
1219    interval: Interval,
1220    start: DateTime<Utc>,
1221    end: DateTime<Utc>,
1222) -> PathBuf {
1223    let interval_part = interval_label(interval);
1224    let start_part = start.format("%Y%m%d").to_string();
1225    let end_part = end.format("%Y%m%d").to_string();
1226    config
1227        .data_path
1228        .join(exchange)
1229        .join(symbol)
1230        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1231}
1232
1233fn interval_label(interval: Interval) -> &'static str {
1234    match interval {
1235        Interval::OneSecond => "1s",
1236        Interval::OneMinute => "1m",
1237        Interval::FiveMinutes => "5m",
1238        Interval::FifteenMinutes => "15m",
1239        Interval::OneHour => "1h",
1240        Interval::FourHours => "4h",
1241        Interval::OneDay => "1d",
1242    }
1243}
1244
1245#[derive(Serialize)]
1246struct CandleRow<'a> {
1247    symbol: &'a str,
1248    timestamp: String,
1249    open: f64,
1250    high: f64,
1251    low: f64,
1252    close: f64,
1253    volume: f64,
1254}
1255
1256fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1257    if let Some(parent) = path.parent() {
1258        fs::create_dir_all(parent)
1259            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1260    }
1261    let mut writer =
1262        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1263    for candle in candles {
1264        let row = CandleRow {
1265            symbol: &candle.symbol,
1266            timestamp: candle.timestamp.to_rfc3339(),
1267            open: candle.open.to_f64().unwrap_or(0.0),
1268            high: candle.high.to_f64().unwrap_or(0.0),
1269            low: candle.low.to_f64().unwrap_or(0.0),
1270            close: candle.close.to_f64().unwrap_or(0.0),
1271            volume: candle.volume.to_f64().unwrap_or(0.0),
1272        };
1273        writer.serialize(row)?;
1274    }
1275    writer.flush()?;
1276    Ok(())
1277}
1278
1279#[derive(Serialize)]
1280struct BatchRow {
1281    config: String,
1282    signals: usize,
1283    orders: usize,
1284    dropped_orders: usize,
1285    ending_equity: f64,
1286}
1287
1288fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1289    if let Some(parent) = path.parent() {
1290        fs::create_dir_all(parent)
1291            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1292    }
1293    let mut writer =
1294        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1295    for row in rows {
1296        writer.serialize(row)?;
1297    }
1298    writer.flush()?;
1299    Ok(())
1300}
1301
1302fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1303    config
1304        .initial_balances
1305        .iter()
1306        .map(|(currency, amount)| (currency.clone(), *amount))
1307        .collect()
1308}
1309
1310fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1311    config
1312        .initial_balances
1313        .get(&config.reporting_currency)
1314        .copied()
1315        .unwrap_or_default()
1316}
1317
1318fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1319    let parts: Vec<_> = value.split(':').collect();
1320    match parts.as_slice() {
1321        ["fixed", val] => {
1322            let quantity =
1323                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1324            Ok(Box::new(FixedOrderSizer { quantity }))
1325        }
1326        ["fixed"] => {
1327            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1328            Ok(Box::new(FixedOrderSizer { quantity }))
1329        }
1330        ["percent", val] => {
1331            let percent =
1332                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1333            Ok(Box::new(PortfolioPercentSizer {
1334                percent: percent.max(Decimal::ZERO),
1335            }))
1336        }
1337        ["risk-adjusted", val] => {
1338            let risk_fraction = Decimal::from_str(val)
1339                .context("invalid risk fraction value (use decimals)")?;
1340            Ok(Box::new(RiskAdjustedSizer {
1341                risk_fraction: risk_fraction.max(Decimal::ZERO),
1342            }))
1343        }
1344        _ => Err(anyhow!(
1345            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1346        )),
1347    }
1348}