tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
3use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
4use crate::state;
5use crate::telemetry::init_tracing;
6use crate::PublicChannel;
7use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::net::SocketAddr;
11use std::path::{Path, PathBuf};
12use std::str::FromStr;
13use std::sync::Arc;
14use std::time::Duration as StdDuration;
15
16use anyhow::{anyhow, bail, Context, Result};
17use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
18use clap::{Args, Parser, Subcommand, ValueEnum};
19use csv::Writer;
20use rust_decimal::{
21    prelude::{FromPrimitive, ToPrimitive},
22    Decimal,
23};
24use serde::{Deserialize, Serialize};
25use tesser_backtester::reporting::PerformanceReport;
26use tesser_backtester::{BacktestConfig, BacktestMode, Backtester, MarketEvent, MarketEventKind};
27use tesser_broker::ExecutionClient;
28use tesser_config::{load_config, AppConfig, RiskManagementConfig};
29use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
30use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
31use tesser_execution::{
32    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
33    RiskAdjustedSizer,
34};
35use tesser_markets::MarketRegistry;
36use tesser_paper::{MatchingEngine, PaperExecutionClient};
37use tesser_strategy::{builtin_strategy_names, load_strategy};
38use tracing::{info, warn};
39
40#[derive(Parser)]
41#[command(author, version, about = "Tesser Trading Framework")]
42pub struct Cli {
43    /// Increases logging verbosity (-v debug, -vv trace)
44    #[arg(short, long, action = clap::ArgAction::Count)]
45    verbose: u8,
46    /// Selects which configuration environment to load (maps to config/{env}.toml)
47    #[arg(long, default_value = "default")]
48    env: String,
49    #[command(subcommand)]
50    command: Commands,
51}
52
53#[allow(clippy::large_enum_variant)]
54#[derive(Subcommand)]
55pub enum Commands {
56    /// Data engineering tasks
57    Data {
58        #[command(subcommand)]
59        action: DataCommand,
60    },
61    /// Backtesting workflows
62    Backtest {
63        #[command(subcommand)]
64        action: BacktestCommand,
65    },
66    /// Live trading workflows
67    Live {
68        #[command(subcommand)]
69        action: LiveCommand,
70    },
71    /// Inspect or repair persisted runtime state
72    State {
73        #[command(subcommand)]
74        action: StateCommand,
75    },
76    /// Strategy management helpers
77    Strategies,
78}
79
80#[derive(Subcommand)]
81pub enum DataCommand {
82    /// Download historical market data
83    Download(DataDownloadArgs),
84    /// Validate and optionally repair a local data set
85    Validate(DataValidateArgs),
86    /// Resample existing data (placeholder)
87    Resample(DataResampleArgs),
88}
89
90#[derive(Subcommand)]
91pub enum BacktestCommand {
92    /// Run a single backtest from a strategy config file
93    Run(BacktestRunArgs),
94    /// Run multiple strategy configs and aggregate the results
95    Batch(BacktestBatchArgs),
96}
97
98#[derive(Subcommand)]
99pub enum LiveCommand {
100    /// Start a live trading session (scaffolding)
101    Run(LiveRunArgs),
102}
103
104#[derive(Subcommand)]
105pub enum StateCommand {
106    /// Inspect the SQLite state database
107    Inspect(StateInspectArgs),
108}
109
110#[derive(Args)]
111pub struct DataDownloadArgs {
112    #[arg(long, default_value = "bybit")]
113    exchange: String,
114    #[arg(long)]
115    symbol: String,
116    #[arg(long, default_value = "linear")]
117    category: String,
118    #[arg(long, default_value = "1m")]
119    interval: String,
120    #[arg(long)]
121    start: String,
122    #[arg(long)]
123    end: Option<String>,
124    #[arg(long)]
125    output: Option<PathBuf>,
126    /// Skip automatic validation after download completes
127    #[arg(long)]
128    skip_validation: bool,
129    /// Attempt to repair gaps detected during validation
130    #[arg(long)]
131    repair_missing: bool,
132    /// Max allowed close-to-close jump when auto-validating (fractional)
133    #[arg(long, default_value_t = 0.05)]
134    validation_jump_threshold: f64,
135    /// Allowed divergence between primary and reference closes (fractional)
136    #[arg(long, default_value_t = 0.002)]
137    validation_reference_tolerance: f64,
138}
139
140#[derive(Args)]
141pub struct StateInspectArgs {
142    /// Path to the SQLite state database (defaults to live.state_path)
143    #[arg(long)]
144    path: Option<PathBuf>,
145    /// Emit the raw JSON payload stored inside the database
146    #[arg(long)]
147    raw: bool,
148}
149
150impl StateInspectArgs {
151    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
152        self.path
153            .clone()
154            .unwrap_or_else(|| config.live.state_path.clone())
155    }
156}
157
158impl DataDownloadArgs {
159    async fn run(&self, config: &AppConfig) -> Result<()> {
160        let exchange_cfg = config
161            .exchange
162            .get(&self.exchange)
163            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
164        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
165        let start = parse_datetime(&self.start)?;
166        let end = match &self.end {
167            Some(value) => parse_datetime(value)?,
168            None => Utc::now(),
169        };
170        if start >= end {
171            return Err(anyhow!("start time must be earlier than end time"));
172        }
173
174        info!(
175            "Downloading {} candles for {} ({})",
176            self.interval, self.symbol, self.exchange
177        );
178        let mut candles = match exchange_cfg.driver.as_str() {
179            "bybit" | "" => {
180                let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
181                let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
182                downloader
183                    .download_klines(&request)
184                    .await
185                    .with_context(|| "failed to download candles from Bybit")?
186            }
187            "binance" => {
188                let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
189                let request = KlineRequest::new("", &self.symbol, interval, start, end);
190                downloader
191                    .download_klines(&request)
192                    .await
193                    .with_context(|| "failed to download candles from Binance")?
194            }
195            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
196        };
197
198        if candles.is_empty() {
199            info!("No candles returned for {}", self.symbol);
200            return Ok(());
201        }
202
203        if !self.skip_validation {
204            let config = ValidationConfig {
205                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
206                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
207                repair_missing: self.repair_missing,
208            };
209            let outcome =
210                validate_dataset(candles.clone(), None, config).context("validation failed")?;
211            print_validation_summary(&outcome);
212            if self.repair_missing && outcome.summary.repaired_candles > 0 {
213                candles = outcome.repaired;
214                info!(
215                    "Applied {} synthetic candle(s) to repair gaps",
216                    outcome.summary.repaired_candles
217                );
218            }
219        }
220
221        let output_path = self.output.clone().unwrap_or_else(|| {
222            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
223        });
224        write_candles_csv(&output_path, &candles)?;
225        info!(
226            "Saved {} candles to {}",
227            candles.len(),
228            output_path.display()
229        );
230        Ok(())
231    }
232}
233
234#[derive(Args)]
235pub struct DataValidateArgs {
236    /// One or more CSV files to inspect
237    #[arg(
238        long = "path",
239        value_name = "PATH",
240        num_args = 1..,
241        action = clap::ArgAction::Append
242    )]
243    paths: Vec<PathBuf>,
244    /// Optional reference data set(s) used for cross validation
245    #[arg(
246        long = "reference",
247        value_name = "PATH",
248        num_args = 1..,
249        action = clap::ArgAction::Append
250    )]
251    reference_paths: Vec<PathBuf>,
252    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
253    #[arg(long, default_value_t = 0.05)]
254    jump_threshold: f64,
255    /// Allowed divergence between primary and reference closes (fractional)
256    #[arg(long, default_value_t = 0.002)]
257    reference_tolerance: f64,
258    /// Attempt to fill gaps by synthesizing candles
259    #[arg(long)]
260    repair_missing: bool,
261    /// Location to write the repaired dataset
262    #[arg(long)]
263    output: Option<PathBuf>,
264}
265
266impl DataValidateArgs {
267    fn run(&self) -> Result<()> {
268        if self.paths.is_empty() {
269            bail!("provide at least one --path for validation");
270        }
271        let candles =
272            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
273        if candles.is_empty() {
274            bail!("loaded dataset is empty; nothing to validate");
275        }
276        let reference = if self.reference_paths.is_empty() {
277            None
278        } else {
279            Some(
280                load_candles_from_paths(&self.reference_paths)
281                    .with_context(|| "failed to load reference dataset")?,
282            )
283        };
284
285        let price_jump_threshold = if self.jump_threshold <= 0.0 {
286            0.0001
287        } else {
288            self.jump_threshold
289        };
290        let reference_tolerance = if self.reference_tolerance <= 0.0 {
291            0.0001
292        } else {
293            self.reference_tolerance
294        };
295
296        let config = ValidationConfig {
297            price_jump_threshold,
298            reference_tolerance,
299            repair_missing: self.repair_missing,
300        };
301
302        let outcome = validate_dataset(candles, reference, config)?;
303        print_validation_summary(&outcome);
304
305        if let Some(output) = &self.output {
306            write_candles_csv(output, &outcome.repaired)?;
307            info!(
308                "Wrote {} candles ({} new) to {}",
309                outcome.repaired.len(),
310                outcome.summary.repaired_candles,
311                output.display()
312            );
313        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
314            warn!(
315                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
316                outcome.summary.repaired_candles
317            );
318        }
319
320        Ok(())
321    }
322}
323
324#[derive(Args)]
325pub struct DataResampleArgs {
326    #[arg(long)]
327    input: PathBuf,
328    #[arg(long)]
329    output: PathBuf,
330    #[arg(long, default_value = "1h")]
331    interval: String,
332}
333
334#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
335pub enum BacktestModeArg {
336    Candle,
337    Tick,
338}
339
340#[derive(Args)]
341pub struct BacktestRunArgs {
342    #[arg(long)]
343    strategy_config: PathBuf,
344    /// One or more CSV files with historical candles (symbol,timestamp,...)
345    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
346    data_paths: Vec<PathBuf>,
347    #[arg(long, default_value_t = 500)]
348    candles: usize,
349    #[arg(long, default_value = "0.01")]
350    quantity: Decimal,
351    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
352    #[arg(long, default_value = "0")]
353    slippage_bps: Decimal,
354    /// Trading fees in basis points applied to notional
355    #[arg(long, default_value = "0")]
356    fee_bps: Decimal,
357    /// Number of candles between signal and execution
358    #[arg(long, default_value_t = 1)]
359    latency_candles: usize,
360    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
361    #[arg(long, default_value = "fixed:0.01")]
362    sizer: String,
363    /// Selects the data source driving fills (`candle` or `tick`)
364    #[arg(long, value_enum, default_value = "candle")]
365    mode: BacktestModeArg,
366    /// One or more JSONL files containing tick/order book events (required for `--mode tick`)
367    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
368    lob_paths: Vec<PathBuf>,
369    #[arg(long)]
370    markets_file: Option<PathBuf>,
371}
372
373#[derive(Args)]
374pub struct BacktestBatchArgs {
375    /// Glob or directory containing strategy config files
376    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
377    config_paths: Vec<PathBuf>,
378    /// Candle CSVs available to every strategy
379    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
380    data_paths: Vec<PathBuf>,
381    #[arg(long, default_value = "0.01")]
382    quantity: Decimal,
383    /// Optional output CSV summarizing results
384    #[arg(long)]
385    output: Option<PathBuf>,
386    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
387    #[arg(long, default_value = "0")]
388    slippage_bps: Decimal,
389    /// Trading fees in basis points applied to notional
390    #[arg(long, default_value = "0")]
391    fee_bps: Decimal,
392    /// Number of candles between signal and execution
393    #[arg(long, default_value_t = 1)]
394    latency_candles: usize,
395    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
396    #[arg(long, default_value = "fixed:0.01")]
397    sizer: String,
398    #[arg(long)]
399    markets_file: Option<PathBuf>,
400}
401
402#[derive(Args)]
403pub struct LiveRunArgs {
404    #[arg(long)]
405    strategy_config: PathBuf,
406    #[arg(long, default_value = "paper_sandbox")]
407    exchange: String,
408    #[arg(long, default_value = "linear")]
409    category: String,
410    #[arg(long, default_value = "1m")]
411    interval: String,
412    #[arg(long, default_value = "1")]
413    quantity: Decimal,
414    /// Selects which execution backend to use (`paper` or `bybit`)
415    #[arg(
416        long = "exec",
417        default_value = "paper",
418        value_enum,
419        alias = "live-exec"
420    )]
421    exec: ExecutionBackend,
422    #[arg(long)]
423    state_path: Option<PathBuf>,
424    #[arg(long)]
425    metrics_addr: Option<String>,
426    #[arg(long)]
427    log_path: Option<PathBuf>,
428    #[arg(long)]
429    initial_equity: Option<Decimal>,
430    #[arg(long)]
431    markets_file: Option<PathBuf>,
432    #[arg(long, default_value = "0")]
433    slippage_bps: Decimal,
434    #[arg(long, default_value = "0")]
435    fee_bps: Decimal,
436    #[arg(long, default_value_t = 0)]
437    latency_ms: u64,
438    #[arg(long, default_value_t = 512)]
439    history: usize,
440    #[arg(long)]
441    reconciliation_interval_secs: Option<u64>,
442    #[arg(long)]
443    reconciliation_threshold: Option<Decimal>,
444    #[arg(long)]
445    webhook_url: Option<String>,
446    #[arg(long)]
447    alert_max_data_gap_secs: Option<u64>,
448    #[arg(long)]
449    alert_max_order_failures: Option<u32>,
450    #[arg(long)]
451    alert_max_drawdown: Option<Decimal>,
452    #[arg(long)]
453    risk_max_order_qty: Option<Decimal>,
454    #[arg(long)]
455    risk_max_position_qty: Option<Decimal>,
456    #[arg(long)]
457    risk_max_drawdown: Option<Decimal>,
458    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50). Ignored for other connectors.
459    #[arg(long)]
460    orderbook_depth: Option<usize>,
461    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
462    #[arg(long, default_value = "fixed:1.0")]
463    sizer: String,
464}
465
466impl LiveRunArgs {
467    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
468        self.log_path
469            .clone()
470            .unwrap_or_else(|| config.live.log_path.clone())
471    }
472
473    fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
474        self.state_path
475            .clone()
476            .unwrap_or_else(|| config.live.state_path.clone())
477    }
478
479    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
480        let addr = self
481            .metrics_addr
482            .clone()
483            .unwrap_or_else(|| config.live.metrics_addr.clone());
484        addr.parse()
485            .with_context(|| format!("invalid metrics address '{addr}'"))
486    }
487
488    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
489        let secs = self
490            .reconciliation_interval_secs
491            .unwrap_or(config.live.reconciliation_interval_secs)
492            .max(1);
493        StdDuration::from_secs(secs)
494    }
495
496    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
497        let configured = self
498            .reconciliation_threshold
499            .unwrap_or(config.live.reconciliation_threshold);
500        if configured <= Decimal::ZERO {
501            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
502        } else {
503            configured
504        }
505    }
506
507    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
508        let mut balances = clone_initial_balances(&config.backtest);
509        if let Some(value) = self.initial_equity {
510            balances.insert(
511                config.backtest.reporting_currency.clone(),
512                value.max(Decimal::ZERO),
513            );
514        }
515        balances
516    }
517
518    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
519        let mut alerting = config.live.alerting.clone();
520        let webhook = self
521            .webhook_url
522            .clone()
523            .or_else(|| alerting.webhook_url.clone());
524        alerting.webhook_url = sanitize_webhook(webhook);
525        if let Some(sec) = self.alert_max_data_gap_secs {
526            alerting.max_data_gap_secs = sec;
527        }
528        if let Some(limit) = self.alert_max_order_failures {
529            alerting.max_order_failures = limit;
530        }
531        if let Some(limit) = self.alert_max_drawdown {
532            alerting.max_drawdown = limit.max(Decimal::ZERO);
533        }
534        alerting
535    }
536
537    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
538        let mut risk = config.risk_management.clone();
539        if let Some(limit) = self.risk_max_order_qty {
540            risk.max_order_quantity = limit.max(Decimal::ZERO);
541        }
542        if let Some(limit) = self.risk_max_position_qty {
543            risk.max_position_quantity = limit.max(Decimal::ZERO);
544        }
545        if let Some(limit) = self.risk_max_drawdown {
546            risk.max_drawdown = limit.max(Decimal::ZERO);
547        }
548        risk
549    }
550}
551
552#[derive(Deserialize)]
553struct StrategyConfigFile {
554    #[serde(rename = "strategy_name")]
555    name: String,
556    #[serde(default = "empty_table")]
557    params: toml::Value,
558}
559
560fn empty_table() -> toml::Value {
561    toml::Value::Table(Default::default())
562}
563
564pub async fn run() -> Result<()> {
565    let cli = Cli::parse();
566    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
567
568    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
569        0 => config.log_level.clone(),
570        1 => "debug".to_string(),
571        _ => "trace".to_string(),
572    });
573
574    let log_override = match &cli.command {
575        Commands::Live {
576            action: LiveCommand::Run(args),
577        } => Some(args.resolved_log_path(&config)),
578        _ => None,
579    };
580
581    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
582
583    match cli.command {
584        Commands::Data { action } => handle_data(action, &config).await?,
585        Commands::Backtest {
586            action: BacktestCommand::Run(args),
587        } => args.run(&config).await?,
588        Commands::Backtest {
589            action: BacktestCommand::Batch(args),
590        } => args.run(&config).await?,
591        Commands::Live {
592            action: LiveCommand::Run(args),
593        } => args.run(&config).await?,
594        Commands::State { action } => handle_state(action, &config).await?,
595        Commands::Strategies => list_strategies(),
596    }
597
598    Ok(())
599}
600
601async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
602    match cmd {
603        DataCommand::Download(args) => {
604            args.run(config).await?;
605        }
606        DataCommand::Validate(args) => {
607            args.run()?;
608        }
609        DataCommand::Resample(args) => {
610            info!(
611                "stub: resampling {} into {} at {}",
612                args.input.display(),
613                args.output.display(),
614                args.interval
615            );
616        }
617    }
618    Ok(())
619}
620
621async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
622    match cmd {
623        StateCommand::Inspect(args) => {
624            state::inspect_state(args.resolved_path(config), args.raw).await?;
625        }
626    }
627    Ok(())
628}
629
630impl BacktestRunArgs {
631    async fn run(&self, config: &AppConfig) -> Result<()> {
632        let contents = std::fs::read_to_string(&self.strategy_config)
633            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
634        let def: StrategyConfigFile =
635            toml::from_str(&contents).context("failed to parse strategy config file")?;
636        let strategy = load_strategy(&def.name, def.params)
637            .with_context(|| format!("failed to configure strategy {}", def.name))?;
638        let symbols = strategy.subscriptions();
639        if symbols.is_empty() {
640            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
641        }
642
643        let mode = match self.mode {
644            BacktestModeArg::Candle => BacktestMode::Candle,
645            BacktestModeArg::Tick => BacktestMode::Tick,
646        };
647
648        let markets_path = self
649            .markets_file
650            .clone()
651            .or_else(|| config.backtest.markets_file.clone())
652            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
653        let market_registry = Arc::new(
654            MarketRegistry::load_from_file(&markets_path).with_context(|| {
655                format!("failed to load markets from {}", markets_path.display())
656            })?,
657        );
658
659        type CandleModeBundle = (
660            Vec<Candle>,
661            Vec<MarketEvent>,
662            Arc<dyn ExecutionClient>,
663            Option<Arc<MatchingEngine>>,
664        );
665
666        let (candles, lob_events, execution_client, matching_engine): CandleModeBundle = match mode
667        {
668            BacktestMode::Candle => {
669                let mut candles = if self.data_paths.is_empty() {
670                    let mut generated = Vec::new();
671                    for (idx, symbol) in symbols.iter().enumerate() {
672                        let offset = idx as i64 * 10;
673                        generated.extend(synth_candles(symbol, self.candles, offset));
674                    }
675                    generated
676                } else {
677                    load_candles_from_paths(&self.data_paths)?
678                };
679
680                if candles.is_empty() {
681                    return Err(anyhow!(
682                        "no candles loaded; provide --data or allow synthetic generation"
683                    ));
684                }
685
686                candles.sort_by_key(|c| c.timestamp);
687                (
688                    candles,
689                    Vec::new(),
690                    Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
691                    None,
692                )
693            }
694            BacktestMode::Tick => {
695                if self.lob_paths.is_empty() {
696                    bail!("--lob-data is required when --mode tick");
697                }
698                let events = load_lob_events_from_paths(&self.lob_paths)?;
699                if events.is_empty() {
700                    bail!("no order book events loaded from --lob-data");
701                }
702                let engine = Arc::new(MatchingEngine::new(
703                    "matching-engine",
704                    symbols.clone(),
705                    reporting_balance(&config.backtest),
706                ));
707                (
708                    Vec::new(),
709                    events,
710                    engine.clone() as Arc<dyn ExecutionClient>,
711                    Some(engine),
712                )
713            }
714        };
715
716        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
717        let order_quantity = self.quantity;
718        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
719
720        let mut cfg = BacktestConfig::new(symbols[0].clone(), candles);
721        cfg.lob_events = lob_events;
722        cfg.order_quantity = order_quantity;
723        cfg.initial_balances = clone_initial_balances(&config.backtest);
724        cfg.reporting_currency = config.backtest.reporting_currency.clone();
725        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
726        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
727        cfg.execution.latency_candles = self.latency_candles.max(1);
728        cfg.mode = mode;
729
730        let report = Backtester::new(cfg, strategy, execution, matching_engine, market_registry)
731            .run()
732            .await
733            .context("backtest failed")?;
734        print_report(&report);
735        Ok(())
736    }
737}
738
739impl BacktestBatchArgs {
740    async fn run(&self, config: &AppConfig) -> Result<()> {
741        if self.config_paths.is_empty() {
742            return Err(anyhow!("provide at least one --config path"));
743        }
744        if self.data_paths.is_empty() {
745            return Err(anyhow!("provide at least one --data path for batch mode"));
746        }
747        let markets_path = self
748            .markets_file
749            .clone()
750            .or_else(|| config.backtest.markets_file.clone())
751            .ok_or_else(|| {
752                anyhow!("batch mode requires --markets-file or backtest.markets_file")
753            })?;
754        let market_registry = Arc::new(
755            MarketRegistry::load_from_file(&markets_path).with_context(|| {
756                format!("failed to load markets from {}", markets_path.display())
757            })?,
758        );
759        let mut aggregated = Vec::new();
760        for config_path in &self.config_paths {
761            let contents = std::fs::read_to_string(config_path).with_context(|| {
762                format!("failed to read strategy config {}", config_path.display())
763            })?;
764            let def: StrategyConfigFile =
765                toml::from_str(&contents).context("failed to parse strategy config file")?;
766            let strategy = load_strategy(&def.name, def.params)
767                .with_context(|| format!("failed to configure strategy {}", def.name))?;
768            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
769            let order_quantity = self.quantity;
770            let mut candles = load_candles_from_paths(&self.data_paths)?;
771            candles.sort_by_key(|c| c.timestamp);
772            let execution_client: Arc<dyn ExecutionClient> =
773                Arc::new(PaperExecutionClient::default());
774            let execution =
775                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
776            let mut cfg = BacktestConfig::new(strategy.symbol().to_string(), candles);
777            cfg.order_quantity = order_quantity;
778            cfg.initial_balances = clone_initial_balances(&config.backtest);
779            cfg.reporting_currency = config.backtest.reporting_currency.clone();
780            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
781            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
782            cfg.execution.latency_candles = self.latency_candles.max(1);
783
784            let report = Backtester::new(cfg, strategy, execution, None, market_registry.clone())
785                .run()
786                .await
787                .with_context(|| format!("backtest failed for {}", config_path.display()))?;
788            aggregated.push(BatchRow {
789                config: config_path.display().to_string(),
790                signals: 0, // Legacy field, can be removed or calculated from report
791                orders: 0,  // Legacy field, can be removed or calculated from report
792                dropped_orders: 0, // Legacy field, can be removed or calculated from report
793                ending_equity: report.ending_equity,
794            });
795        }
796
797        if let Some(output) = &self.output {
798            write_batch_report(output, &aggregated)?;
799            println!("Batch report written to {}", output.display());
800        }
801        if aggregated.is_empty() {
802            return Err(anyhow!("no batch jobs executed"));
803        }
804        Ok(())
805    }
806}
807
808impl LiveRunArgs {
809    async fn run(&self, config: &AppConfig) -> Result<()> {
810        let exchange_cfg = config
811            .exchange
812            .get(&self.exchange)
813            .cloned()
814            .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
815
816        let driver = exchange_cfg.driver.clone();
817
818        let contents = fs::read_to_string(&self.strategy_config)
819            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
820        let def: StrategyConfigFile =
821            toml::from_str(&contents).context("failed to parse strategy config file")?;
822        let strategy = load_strategy(&def.name, def.params)
823            .with_context(|| format!("failed to configure strategy {}", def.name))?;
824        let symbols = strategy.subscriptions();
825        if symbols.is_empty() {
826            bail!("strategy did not declare any subscriptions");
827        }
828        if self.quantity <= Decimal::ZERO {
829            bail!("--quantity must be greater than zero");
830        }
831        let quantity = self.quantity;
832        let initial_balances = self.resolved_initial_balances(config);
833        let reporting_currency = config.backtest.reporting_currency.clone();
834        let markets_file = self
835            .markets_file
836            .clone()
837            .or_else(|| config.backtest.markets_file.clone());
838
839        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
840        let category =
841            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
842        let metrics_addr = self.resolved_metrics_addr(config)?;
843        let state_path = self.resolved_state_path(config);
844        let alerting = self.build_alerting(config);
845        let history = self.history.max(32);
846        let reconciliation_interval = self.reconciliation_interval(config);
847        let reconciliation_threshold = self.reconciliation_threshold(config);
848        let orderbook_depth = self
849            .orderbook_depth
850            .unwrap_or(super::live::default_order_book_depth());
851
852        let settings = LiveSessionSettings {
853            category,
854            interval,
855            quantity,
856            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
857            fee_bps: self.fee_bps.max(Decimal::ZERO),
858            history,
859            metrics_addr,
860            state_path,
861            initial_balances,
862            reporting_currency,
863            markets_file,
864            alerting,
865            exec_backend: self.exec,
866            risk: self.build_risk_config(config),
867            reconciliation_interval,
868            reconciliation_threshold,
869            driver,
870            orderbook_depth,
871        };
872
873        info!(
874            strategy = %def.name,
875            symbols = ?symbols,
876            exchange = %self.exchange,
877            interval = %self.interval,
878            driver = ?settings.driver,
879            exec = ?self.exec,
880            "starting live session"
881        );
882
883        run_live(strategy, symbols, exchange_cfg, settings)
884            .await
885            .context("live session failed")
886    }
887}
888
889fn list_strategies() {
890    println!("Built-in strategies:");
891    for name in builtin_strategy_names() {
892        println!("- {name}");
893    }
894}
895
896fn print_validation_summary(outcome: &ValidationOutcome) {
897    const MAX_EXAMPLES: usize = 5;
898    let summary = &outcome.summary;
899    println!(
900        "Validation summary for {} ({} candles)",
901        summary.symbol, summary.rows
902    );
903    println!(
904        "  Range: {} -> {}",
905        summary.start.to_rfc3339(),
906        summary.end.to_rfc3339()
907    );
908    println!("  Interval: {}", interval_label(summary.interval));
909    println!("  Missing intervals: {}", summary.missing_candles);
910    println!("  Duplicate intervals: {}", summary.duplicate_candles);
911    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
912    println!("  Price spikes flagged: {}", summary.price_spike_count);
913    println!(
914        "  Cross-source mismatches: {}",
915        summary.cross_mismatch_count
916    );
917    println!("  Repaired candles generated: {}", summary.repaired_candles);
918
919    if !outcome.gaps.is_empty() {
920        println!("  Gap examples:");
921        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
922            println!(
923                "    {} -> {} (missing {})",
924                gap.start.to_rfc3339(),
925                gap.end.to_rfc3339(),
926                gap.missing
927            );
928        }
929        if outcome.gaps.len() > MAX_EXAMPLES {
930            println!(
931                "    ... {} additional gap(s) omitted",
932                outcome.gaps.len() - MAX_EXAMPLES
933            );
934        }
935    }
936
937    if !outcome.price_spikes.is_empty() {
938        println!("  Price spike examples:");
939        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
940            println!(
941                "    {} (change {:.2}%)",
942                spike.timestamp.to_rfc3339(),
943                spike.change_fraction * 100.0
944            );
945        }
946        if outcome.price_spikes.len() > MAX_EXAMPLES {
947            println!(
948                "    ... {} additional spike(s) omitted",
949                outcome.price_spikes.len() - MAX_EXAMPLES
950            );
951        }
952    }
953
954    if !outcome.cross_mismatches.is_empty() {
955        println!("  Cross-source mismatch examples:");
956        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
957            println!(
958                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
959                miss.timestamp.to_rfc3339(),
960                miss.primary_close,
961                miss.reference_close,
962                miss.delta_fraction * 100.0
963            );
964        }
965        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
966            println!(
967                "    ... {} additional mismatch(es) omitted",
968                outcome.cross_mismatches.len() - MAX_EXAMPLES
969            );
970        }
971    }
972}
973
974fn print_report(report: &PerformanceReport) {
975    println!("\n{}", report);
976}
977
978fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
979    let mut candles = Vec::with_capacity(len);
980    for i in 0..len {
981        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
982        let open = base + (i as f64 % 3.0) * 10.0;
983        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
984        let open_dec =
985            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
986        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
987        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
988        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
989        candles.push(Candle {
990            symbol: Symbol::from(symbol),
991            interval: Interval::OneMinute,
992            open: open_dec,
993            high,
994            low,
995            close: close_dec,
996            volume: Decimal::ONE,
997            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
998                + Duration::minutes(offset_minutes),
999        });
1000    }
1001    candles
1002}
1003
1004fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1005    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1006        return Ok(dt.with_timezone(&Utc));
1007    }
1008    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1009        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1010    }
1011    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1012        let dt = date
1013            .and_hms_opt(0, 0, 0)
1014            .ok_or_else(|| anyhow!("invalid date"))?;
1015        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1016    }
1017    Err(anyhow!("unable to parse datetime '{value}'"))
1018}
1019
1020#[derive(Deserialize)]
1021struct CandleCsvRow {
1022    symbol: Option<String>,
1023    timestamp: String,
1024    open: f64,
1025    high: f64,
1026    low: f64,
1027    close: f64,
1028    volume: f64,
1029}
1030
1031fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1032    let mut candles = Vec::new();
1033    for path in paths {
1034        let mut reader = csv::Reader::from_path(path)
1035            .with_context(|| format!("failed to open {}", path.display()))?;
1036        for record in reader.deserialize::<CandleCsvRow>() {
1037            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1038            let timestamp = parse_datetime(&row.timestamp)?;
1039            let symbol = row
1040                .symbol
1041                .clone()
1042                .or_else(|| infer_symbol_from_path(path))
1043                .ok_or_else(|| {
1044                    anyhow!(
1045                        "missing symbol column and unable to infer from path {}",
1046                        path.display()
1047                    )
1048                })?;
1049            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1050            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1051                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1052            })?;
1053            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1054                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1055            })?;
1056            let low = Decimal::from_f64(row.low)
1057                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1058            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1059                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1060            })?;
1061            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1062                anyhow!(
1063                    "invalid volume value '{}' in {}",
1064                    row.volume,
1065                    path.display()
1066                )
1067            })?;
1068            candles.push(Candle {
1069                symbol,
1070                interval,
1071                open,
1072                high,
1073                low,
1074                close,
1075                volume,
1076                timestamp,
1077            });
1078        }
1079    }
1080    Ok(candles)
1081}
1082
1083#[derive(Deserialize)]
1084#[serde(tag = "event", rename_all = "lowercase")]
1085enum LobEventRow {
1086    Snapshot {
1087        timestamp: String,
1088        symbol: Option<String>,
1089        bids: Vec<[f64; 2]>,
1090        asks: Vec<[f64; 2]>,
1091    },
1092    Depth {
1093        timestamp: String,
1094        symbol: Option<String>,
1095        bids: Vec<[f64; 2]>,
1096        asks: Vec<[f64; 2]>,
1097    },
1098    Trade {
1099        timestamp: String,
1100        symbol: Option<String>,
1101        side: String,
1102        price: f64,
1103        size: f64,
1104    },
1105}
1106
1107fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1108    let mut events = Vec::new();
1109    for path in paths {
1110        let file = File::open(path)
1111            .with_context(|| format!("failed to open order book file {}", path.display()))?;
1112        let symbol_hint = infer_symbol_from_path(path);
1113        for line in BufReader::new(file).lines() {
1114            let line =
1115                line.with_context(|| format!("failed to read line from {}", path.display()))?;
1116            if line.trim().is_empty() {
1117                continue;
1118            }
1119            let row: LobEventRow = serde_json::from_str(&line)
1120                .with_context(|| format!("invalid order book event in {}", path.display()))?;
1121            match row {
1122                LobEventRow::Snapshot {
1123                    timestamp,
1124                    symbol,
1125                    bids,
1126                    asks,
1127                } => {
1128                    let ts = parse_datetime(&timestamp)?;
1129                    let symbol = symbol
1130                        .or_else(|| symbol_hint.clone())
1131                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1132                    let bids = convert_levels(&bids)?;
1133                    let asks = convert_levels(&asks)?;
1134                    let book = OrderBook {
1135                        symbol: symbol.clone(),
1136                        bids,
1137                        asks,
1138                        timestamp: ts,
1139                    };
1140                    events.push(MarketEvent {
1141                        timestamp: ts,
1142                        kind: MarketEventKind::OrderBook(book),
1143                    });
1144                }
1145                LobEventRow::Depth {
1146                    timestamp,
1147                    symbol,
1148                    bids,
1149                    asks,
1150                } => {
1151                    let ts = parse_datetime(&timestamp)?;
1152                    let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1153                        anyhow!("missing symbol in depth update {}", path.display())
1154                    })?;
1155                    let bids = convert_levels(&bids)?;
1156                    let asks = convert_levels(&asks)?;
1157                    let update = DepthUpdate {
1158                        symbol: symbol.clone(),
1159                        bids,
1160                        asks,
1161                        timestamp: ts,
1162                    };
1163                    events.push(MarketEvent {
1164                        timestamp: ts,
1165                        kind: MarketEventKind::Depth(update),
1166                    });
1167                }
1168                LobEventRow::Trade {
1169                    timestamp,
1170                    symbol,
1171                    side,
1172                    price,
1173                    size,
1174                } => {
1175                    let ts = parse_datetime(&timestamp)?;
1176                    let symbol = symbol
1177                        .or_else(|| symbol_hint.clone())
1178                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1179                    let side = match side.to_lowercase().as_str() {
1180                        "buy" | "bid" | "b" => Side::Buy,
1181                        "sell" | "ask" | "s" => Side::Sell,
1182                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
1183                    };
1184                    let price = Decimal::from_f64(price).ok_or_else(|| {
1185                        anyhow!("invalid trade price '{}' in {}", price, path.display())
1186                    })?;
1187                    let size = Decimal::from_f64(size).ok_or_else(|| {
1188                        anyhow!("invalid trade size '{}' in {}", size, path.display())
1189                    })?;
1190                    let tick = Tick {
1191                        symbol: symbol.clone(),
1192                        price,
1193                        size,
1194                        side,
1195                        exchange_timestamp: ts,
1196                        received_at: ts,
1197                    };
1198                    events.push(MarketEvent {
1199                        timestamp: ts,
1200                        kind: MarketEventKind::Trade(tick),
1201                    });
1202                }
1203            }
1204        }
1205    }
1206    events.sort_by_key(|event| event.timestamp);
1207    Ok(events)
1208}
1209
1210fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1211    levels
1212        .iter()
1213        .map(|pair| {
1214            let price = Decimal::from_f64(pair[0])
1215                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1216            let size = Decimal::from_f64(pair[1])
1217                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1218            Ok(OrderBookLevel { price, size })
1219        })
1220        .collect()
1221}
1222
1223fn infer_symbol_from_path(path: &Path) -> Option<String> {
1224    path.parent()
1225        .and_then(|p| p.file_name())
1226        .map(|os| os.to_string_lossy().to_string())
1227}
1228
1229fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1230    path.file_stem()
1231        .and_then(|os| os.to_str())
1232        .and_then(|stem| stem.split('_').next())
1233        .and_then(|token| Interval::from_str(token).ok())
1234}
1235
1236fn default_output_path(
1237    config: &AppConfig,
1238    exchange: &str,
1239    symbol: &str,
1240    interval: Interval,
1241    start: DateTime<Utc>,
1242    end: DateTime<Utc>,
1243) -> PathBuf {
1244    let interval_part = interval_label(interval);
1245    let start_part = start.format("%Y%m%d").to_string();
1246    let end_part = end.format("%Y%m%d").to_string();
1247    config
1248        .data_path
1249        .join(exchange)
1250        .join(symbol)
1251        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1252}
1253
1254fn interval_label(interval: Interval) -> &'static str {
1255    match interval {
1256        Interval::OneSecond => "1s",
1257        Interval::OneMinute => "1m",
1258        Interval::FiveMinutes => "5m",
1259        Interval::FifteenMinutes => "15m",
1260        Interval::OneHour => "1h",
1261        Interval::FourHours => "4h",
1262        Interval::OneDay => "1d",
1263    }
1264}
1265
1266#[derive(Serialize)]
1267struct CandleRow<'a> {
1268    symbol: &'a str,
1269    timestamp: String,
1270    open: f64,
1271    high: f64,
1272    low: f64,
1273    close: f64,
1274    volume: f64,
1275}
1276
1277fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1278    if let Some(parent) = path.parent() {
1279        fs::create_dir_all(parent)
1280            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1281    }
1282    let mut writer =
1283        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1284    for candle in candles {
1285        let row = CandleRow {
1286            symbol: &candle.symbol,
1287            timestamp: candle.timestamp.to_rfc3339(),
1288            open: candle.open.to_f64().unwrap_or(0.0),
1289            high: candle.high.to_f64().unwrap_or(0.0),
1290            low: candle.low.to_f64().unwrap_or(0.0),
1291            close: candle.close.to_f64().unwrap_or(0.0),
1292            volume: candle.volume.to_f64().unwrap_or(0.0),
1293        };
1294        writer.serialize(row)?;
1295    }
1296    writer.flush()?;
1297    Ok(())
1298}
1299
1300#[derive(Serialize)]
1301struct BatchRow {
1302    config: String,
1303    signals: usize,
1304    orders: usize,
1305    dropped_orders: usize,
1306    ending_equity: f64,
1307}
1308
1309fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1310    if let Some(parent) = path.parent() {
1311        fs::create_dir_all(parent)
1312            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1313    }
1314    let mut writer =
1315        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1316    for row in rows {
1317        writer.serialize(row)?;
1318    }
1319    writer.flush()?;
1320    Ok(())
1321}
1322
1323fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1324    config
1325        .initial_balances
1326        .iter()
1327        .map(|(currency, amount)| (currency.clone(), *amount))
1328        .collect()
1329}
1330
1331fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1332    config
1333        .initial_balances
1334        .get(&config.reporting_currency)
1335        .copied()
1336        .unwrap_or_default()
1337}
1338
1339fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1340    let parts: Vec<_> = value.split(':').collect();
1341    match parts.as_slice() {
1342        ["fixed", val] => {
1343            let quantity =
1344                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1345            Ok(Box::new(FixedOrderSizer { quantity }))
1346        }
1347        ["fixed"] => {
1348            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1349            Ok(Box::new(FixedOrderSizer { quantity }))
1350        }
1351        ["percent", val] => {
1352            let percent =
1353                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1354            Ok(Box::new(PortfolioPercentSizer {
1355                percent: percent.max(Decimal::ZERO),
1356            }))
1357        }
1358        ["risk-adjusted", val] => {
1359            let risk_fraction = Decimal::from_str(val)
1360                .context("invalid risk fraction value (use decimals)")?;
1361            Ok(Box::new(RiskAdjustedSizer {
1362                risk_fraction: risk_fraction.max(Decimal::ZERO),
1363            }))
1364        }
1365        _ => Err(anyhow!(
1366            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1367        )),
1368    }
1369}