tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{run_live, ExecutionBackend, LiveSessionSettings};
5use crate::state;
6use crate::telemetry::init_tracing;
7use crate::PublicChannel;
8use arrow::util::pretty::print_batches;
9use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
10use std::collections::HashMap;
11use std::fs::{self, File};
12use std::io::{BufRead, BufReader};
13use std::net::SocketAddr;
14use std::path::{Path, PathBuf};
15use std::str::FromStr;
16use std::sync::Arc;
17use std::time::Duration as StdDuration;
18
19use anyhow::{anyhow, bail, Context, Result};
20use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
21use clap::{Args, Parser, Subcommand, ValueEnum};
22use csv::Writer;
23use rust_decimal::{
24    prelude::{FromPrimitive, ToPrimitive},
25    Decimal,
26};
27use serde::{Deserialize, Serialize};
28use tesser_backtester::reporting::PerformanceReport;
29use tesser_backtester::{
30    BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent, MarketEventKind,
31};
32use tesser_broker::ExecutionClient;
33use tesser_config::{load_config, AppConfig, RiskManagementConfig};
34use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
35use tesser_data::analytics::ExecutionAnalysisRequest;
36use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
37use tesser_data::parquet::ParquetMarketStream;
38use tesser_execution::{
39    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
40    RiskAdjustedSizer,
41};
42use tesser_markets::MarketRegistry;
43use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
44use tesser_strategy::{builtin_strategy_names, load_strategy};
45use tracing::{info, warn};
46
47#[derive(Parser)]
48#[command(author, version, about = "Tesser Trading Framework")]
49pub struct Cli {
50    /// Increases logging verbosity (-v debug, -vv trace)
51    #[arg(short, long, action = clap::ArgAction::Count)]
52    verbose: u8,
53    /// Selects which configuration environment to load (maps to config/{env}.toml)
54    #[arg(long, default_value = "default")]
55    env: String,
56    #[command(subcommand)]
57    command: Commands,
58}
59
60#[allow(clippy::large_enum_variant)]
61#[derive(Subcommand)]
62pub enum Commands {
63    /// Data engineering tasks
64    Data {
65        #[command(subcommand)]
66        action: DataCommand,
67    },
68    /// Backtesting workflows
69    Backtest {
70        #[command(subcommand)]
71        action: BacktestCommand,
72    },
73    /// Live trading workflows
74    Live {
75        #[command(subcommand)]
76        action: LiveCommand,
77    },
78    /// Inspect or repair persisted runtime state
79    State {
80        #[command(subcommand)]
81        action: StateCommand,
82    },
83    /// Strategy management helpers
84    Strategies,
85    /// Analyze previously recorded trading sessions
86    Analyze {
87        #[command(subcommand)]
88        action: AnalyzeCommand,
89    },
90}
91
92#[derive(Subcommand)]
93pub enum DataCommand {
94    /// Download historical market data
95    Download(DataDownloadArgs),
96    /// Validate and optionally repair a local data set
97    Validate(DataValidateArgs),
98    /// Resample existing data (placeholder)
99    Resample(DataResampleArgs),
100    /// Inspect a parquet file emitted by the flight recorder
101    InspectParquet(DataInspectParquetArgs),
102}
103
104#[derive(Subcommand)]
105pub enum BacktestCommand {
106    /// Run a single backtest from a strategy config file
107    Run(BacktestRunArgs),
108    /// Run multiple strategy configs and aggregate the results
109    Batch(BacktestBatchArgs),
110}
111
112#[derive(Subcommand)]
113pub enum LiveCommand {
114    /// Start a live trading session (scaffolding)
115    Run(LiveRunArgs),
116}
117
118#[derive(Subcommand)]
119pub enum StateCommand {
120    /// Inspect the SQLite state database
121    Inspect(StateInspectArgs),
122}
123
124#[derive(Subcommand)]
125pub enum AnalyzeCommand {
126    /// Generate a TCA-lite execution report using flight recorder files
127    Execution(AnalyzeExecutionArgs),
128}
129
130#[derive(Args)]
131pub struct DataDownloadArgs {
132    #[arg(long, default_value = "bybit")]
133    exchange: String,
134    #[arg(long)]
135    symbol: String,
136    #[arg(long, default_value = "linear")]
137    category: String,
138    #[arg(long, default_value = "1m")]
139    interval: String,
140    #[arg(long)]
141    start: String,
142    #[arg(long)]
143    end: Option<String>,
144    #[arg(long)]
145    output: Option<PathBuf>,
146    /// Skip automatic validation after download completes
147    #[arg(long)]
148    skip_validation: bool,
149    /// Attempt to repair gaps detected during validation
150    #[arg(long)]
151    repair_missing: bool,
152    /// Max allowed close-to-close jump when auto-validating (fractional)
153    #[arg(long, default_value_t = 0.05)]
154    validation_jump_threshold: f64,
155    /// Allowed divergence between primary and reference closes (fractional)
156    #[arg(long, default_value_t = 0.002)]
157    validation_reference_tolerance: f64,
158}
159
160#[derive(Args)]
161pub struct StateInspectArgs {
162    /// Path to the SQLite state database (defaults to live.state_path)
163    #[arg(long)]
164    path: Option<PathBuf>,
165    /// Emit the raw JSON payload stored inside the database
166    #[arg(long)]
167    raw: bool,
168}
169
170#[derive(Args)]
171pub struct AnalyzeExecutionArgs {
172    /// Directory containing flight recorder parquet partitions
173    #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
174    data_dir: PathBuf,
175    /// Inclusive start of the analysis window (RFC3339, `YYYY-mm-dd`, etc.)
176    #[arg(long)]
177    start: Option<String>,
178    /// Inclusive end of the analysis window
179    #[arg(long)]
180    end: Option<String>,
181}
182
183impl StateInspectArgs {
184    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
185        self.path
186            .clone()
187            .unwrap_or_else(|| config.live.state_path.clone())
188    }
189}
190
191impl AnalyzeExecutionArgs {
192    fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
193        let start = match &self.start {
194            Some(value) => Some(parse_datetime(value)?),
195            None => None,
196        };
197        let end = match &self.end {
198            Some(value) => Some(parse_datetime(value)?),
199            None => None,
200        };
201        Ok(ExecutionAnalysisRequest {
202            data_dir: self.data_dir.clone(),
203            start,
204            end,
205        })
206    }
207}
208
209impl DataDownloadArgs {
210    async fn run(&self, config: &AppConfig) -> Result<()> {
211        let exchange_cfg = config
212            .exchange
213            .get(&self.exchange)
214            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
215        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
216        let start = parse_datetime(&self.start)?;
217        let end = match &self.end {
218            Some(value) => parse_datetime(value)?,
219            None => Utc::now(),
220        };
221        if start >= end {
222            return Err(anyhow!("start time must be earlier than end time"));
223        }
224
225        info!(
226            "Downloading {} candles for {} ({})",
227            self.interval, self.symbol, self.exchange
228        );
229        let mut candles = match exchange_cfg.driver.as_str() {
230            "bybit" | "" => {
231                let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
232                let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
233                downloader
234                    .download_klines(&request)
235                    .await
236                    .with_context(|| "failed to download candles from Bybit")?
237            }
238            "binance" => {
239                let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
240                let request = KlineRequest::new("", &self.symbol, interval, start, end);
241                downloader
242                    .download_klines(&request)
243                    .await
244                    .with_context(|| "failed to download candles from Binance")?
245            }
246            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
247        };
248
249        if candles.is_empty() {
250            info!("No candles returned for {}", self.symbol);
251            return Ok(());
252        }
253
254        if !self.skip_validation {
255            let config = ValidationConfig {
256                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
257                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
258                repair_missing: self.repair_missing,
259            };
260            let outcome =
261                validate_dataset(candles.clone(), None, config).context("validation failed")?;
262            print_validation_summary(&outcome);
263            if self.repair_missing && outcome.summary.repaired_candles > 0 {
264                candles = outcome.repaired;
265                info!(
266                    "Applied {} synthetic candle(s) to repair gaps",
267                    outcome.summary.repaired_candles
268                );
269            }
270        }
271
272        let output_path = self.output.clone().unwrap_or_else(|| {
273            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
274        });
275        write_candles_csv(&output_path, &candles)?;
276        info!(
277            "Saved {} candles to {}",
278            candles.len(),
279            output_path.display()
280        );
281        Ok(())
282    }
283}
284
285#[derive(Args)]
286pub struct DataValidateArgs {
287    /// One or more CSV files to inspect
288    #[arg(
289        long = "path",
290        value_name = "PATH",
291        num_args = 1..,
292        action = clap::ArgAction::Append
293    )]
294    paths: Vec<PathBuf>,
295    /// Optional reference data set(s) used for cross validation
296    #[arg(
297        long = "reference",
298        value_name = "PATH",
299        num_args = 1..,
300        action = clap::ArgAction::Append
301    )]
302    reference_paths: Vec<PathBuf>,
303    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
304    #[arg(long, default_value_t = 0.05)]
305    jump_threshold: f64,
306    /// Allowed divergence between primary and reference closes (fractional)
307    #[arg(long, default_value_t = 0.002)]
308    reference_tolerance: f64,
309    /// Attempt to fill gaps by synthesizing candles
310    #[arg(long)]
311    repair_missing: bool,
312    /// Location to write the repaired dataset
313    #[arg(long)]
314    output: Option<PathBuf>,
315}
316
317impl DataValidateArgs {
318    fn run(&self) -> Result<()> {
319        if self.paths.is_empty() {
320            bail!("provide at least one --path for validation");
321        }
322        let candles =
323            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
324        if candles.is_empty() {
325            bail!("loaded dataset is empty; nothing to validate");
326        }
327        let reference = if self.reference_paths.is_empty() {
328            None
329        } else {
330            Some(
331                load_candles_from_paths(&self.reference_paths)
332                    .with_context(|| "failed to load reference dataset")?,
333            )
334        };
335
336        let price_jump_threshold = if self.jump_threshold <= 0.0 {
337            0.0001
338        } else {
339            self.jump_threshold
340        };
341        let reference_tolerance = if self.reference_tolerance <= 0.0 {
342            0.0001
343        } else {
344            self.reference_tolerance
345        };
346
347        let config = ValidationConfig {
348            price_jump_threshold,
349            reference_tolerance,
350            repair_missing: self.repair_missing,
351        };
352
353        let outcome = validate_dataset(candles, reference, config)?;
354        print_validation_summary(&outcome);
355
356        if let Some(output) = &self.output {
357            write_candles_csv(output, &outcome.repaired)?;
358            info!(
359                "Wrote {} candles ({} new) to {}",
360                outcome.repaired.len(),
361                outcome.summary.repaired_candles,
362                output.display()
363            );
364        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
365            warn!(
366                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
367                outcome.summary.repaired_candles
368            );
369        }
370
371        Ok(())
372    }
373}
374
375#[derive(Args)]
376pub struct DataResampleArgs {
377    #[arg(long)]
378    input: PathBuf,
379    #[arg(long)]
380    output: PathBuf,
381    #[arg(long, default_value = "1h")]
382    interval: String,
383}
384
385#[derive(Args)]
386pub struct DataInspectParquetArgs {
387    /// Path to the parquet file produced by the flight recorder
388    #[arg(value_name = "PATH")]
389    path: PathBuf,
390    /// Number of rows to display (0 prints the entire file)
391    #[arg(long, default_value_t = 10)]
392    rows: usize,
393}
394
395impl DataInspectParquetArgs {
396    fn run(&self) -> Result<()> {
397        let limit = if self.rows == 0 {
398            usize::MAX
399        } else {
400            self.rows
401        };
402        let file = File::open(&self.path)
403            .with_context(|| format!("failed to open {}", self.path.display()))?;
404        let batch_size = limit.clamp(1, 8192);
405        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
406            .with_batch_size(batch_size)
407            .build()?;
408
409        let mut printed = 0usize;
410        while printed < limit {
411            match reader.next() {
412                Some(Ok(batch)) => {
413                    if batch.num_rows() == 0 {
414                        continue;
415                    }
416                    let remaining = limit.saturating_sub(printed);
417                    let take = remaining.min(batch.num_rows());
418                    let display_batch = if take == batch.num_rows() {
419                        batch
420                    } else {
421                        batch.slice(0, take)
422                    };
423                    print_batches(std::slice::from_ref(&display_batch))?;
424                    printed += take;
425                }
426                Some(Err(err)) => return Err(err.into()),
427                None => break,
428            }
429        }
430
431        if printed == 0 {
432            println!("no rows available in {}", self.path.display());
433        } else if limit != usize::MAX {
434            println!("displayed {printed} row(s) from {}", self.path.display());
435        }
436
437        Ok(())
438    }
439}
440
441#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
442pub enum BacktestModeArg {
443    Candle,
444    Tick,
445}
446
447#[derive(Args)]
448pub struct BacktestRunArgs {
449    #[arg(long)]
450    strategy_config: PathBuf,
451    /// One or more CSV files with historical candles (symbol,timestamp,...)
452    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
453    data_paths: Vec<PathBuf>,
454    #[arg(long, default_value_t = 500)]
455    candles: usize,
456    #[arg(long, default_value = "0.01")]
457    quantity: Decimal,
458    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
459    #[arg(long, default_value = "0")]
460    slippage_bps: Decimal,
461    /// Trading fees in basis points applied to notional
462    #[arg(long, default_value = "0")]
463    fee_bps: Decimal,
464    /// Number of candles between signal and execution
465    #[arg(long, default_value_t = 1)]
466    latency_candles: usize,
467    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
468    #[arg(long, default_value = "fixed:0.01")]
469    sizer: String,
470    /// Selects the data source driving fills (`candle` or `tick`)
471    #[arg(long, value_enum, default_value = "candle")]
472    mode: BacktestModeArg,
473    /// One or more JSONL files containing tick/order book events (required for `--mode tick`)
474    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
475    lob_paths: Vec<PathBuf>,
476    #[arg(long)]
477    markets_file: Option<PathBuf>,
478}
479
480#[derive(Args)]
481pub struct BacktestBatchArgs {
482    /// Glob or directory containing strategy config files
483    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
484    config_paths: Vec<PathBuf>,
485    /// Candle CSVs available to every strategy
486    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
487    data_paths: Vec<PathBuf>,
488    #[arg(long, default_value = "0.01")]
489    quantity: Decimal,
490    /// Optional output CSV summarizing results
491    #[arg(long)]
492    output: Option<PathBuf>,
493    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
494    #[arg(long, default_value = "0")]
495    slippage_bps: Decimal,
496    /// Trading fees in basis points applied to notional
497    #[arg(long, default_value = "0")]
498    fee_bps: Decimal,
499    /// Number of candles between signal and execution
500    #[arg(long, default_value_t = 1)]
501    latency_candles: usize,
502    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
503    #[arg(long, default_value = "fixed:0.01")]
504    sizer: String,
505    #[arg(long)]
506    markets_file: Option<PathBuf>,
507}
508
509#[derive(Args)]
510pub struct LiveRunArgs {
511    #[arg(long)]
512    strategy_config: PathBuf,
513    #[arg(long, default_value = "paper_sandbox")]
514    exchange: String,
515    #[arg(long, default_value = "linear")]
516    category: String,
517    #[arg(long, default_value = "1m")]
518    interval: String,
519    #[arg(long, default_value = "1")]
520    quantity: Decimal,
521    /// Selects which execution backend to use (`paper` or `bybit`)
522    #[arg(
523        long = "exec",
524        default_value = "paper",
525        value_enum,
526        alias = "live-exec"
527    )]
528    exec: ExecutionBackend,
529    #[arg(long)]
530    state_path: Option<PathBuf>,
531    #[arg(long)]
532    metrics_addr: Option<String>,
533    #[arg(long)]
534    log_path: Option<PathBuf>,
535    /// Directory where parquet market/trade data is recorded
536    #[arg(
537        long = "record-data",
538        value_name = "PATH",
539        default_value = "data/flight_recorder"
540    )]
541    record_data: PathBuf,
542    /// Control plane gRPC bind address (overrides config.live.control_addr)
543    #[arg(long)]
544    control_addr: Option<String>,
545    #[arg(long)]
546    initial_equity: Option<Decimal>,
547    #[arg(long)]
548    markets_file: Option<PathBuf>,
549    #[arg(long, default_value = "0")]
550    slippage_bps: Decimal,
551    #[arg(long, default_value = "0")]
552    fee_bps: Decimal,
553    #[arg(long, default_value_t = 0)]
554    latency_ms: u64,
555    #[arg(long, default_value_t = 512)]
556    history: usize,
557    #[arg(long)]
558    reconciliation_interval_secs: Option<u64>,
559    #[arg(long)]
560    reconciliation_threshold: Option<Decimal>,
561    #[arg(long)]
562    webhook_url: Option<String>,
563    #[arg(long)]
564    alert_max_data_gap_secs: Option<u64>,
565    #[arg(long)]
566    alert_max_order_failures: Option<u32>,
567    #[arg(long)]
568    alert_max_drawdown: Option<Decimal>,
569    #[arg(long)]
570    risk_max_order_qty: Option<Decimal>,
571    #[arg(long)]
572    risk_max_position_qty: Option<Decimal>,
573    #[arg(long)]
574    risk_max_drawdown: Option<Decimal>,
575    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50). Ignored for other connectors.
576    #[arg(long)]
577    orderbook_depth: Option<usize>,
578    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
579    #[arg(long, default_value = "fixed:1.0")]
580    sizer: String,
581}
582
583impl LiveRunArgs {
584    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
585        self.log_path
586            .clone()
587            .unwrap_or_else(|| config.live.log_path.clone())
588    }
589
590    fn resolved_state_path(&self, config: &AppConfig) -> PathBuf {
591        self.state_path
592            .clone()
593            .unwrap_or_else(|| config.live.state_path.clone())
594    }
595
596    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
597        let addr = self
598            .metrics_addr
599            .clone()
600            .unwrap_or_else(|| config.live.metrics_addr.clone());
601        addr.parse()
602            .with_context(|| format!("invalid metrics address '{addr}'"))
603    }
604
605    fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
606        let addr = self
607            .control_addr
608            .clone()
609            .unwrap_or_else(|| config.live.control_addr.clone());
610        addr.parse()
611            .with_context(|| format!("invalid control address '{addr}'"))
612    }
613
614    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
615        let secs = self
616            .reconciliation_interval_secs
617            .unwrap_or(config.live.reconciliation_interval_secs)
618            .max(1);
619        StdDuration::from_secs(secs)
620    }
621
622    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
623        let configured = self
624            .reconciliation_threshold
625            .unwrap_or(config.live.reconciliation_threshold);
626        if configured <= Decimal::ZERO {
627            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
628        } else {
629            configured
630        }
631    }
632
633    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
634        let mut balances = clone_initial_balances(&config.backtest);
635        if let Some(value) = self.initial_equity {
636            balances.insert(
637                config.backtest.reporting_currency.clone(),
638                value.max(Decimal::ZERO),
639            );
640        }
641        balances
642    }
643
644    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
645        let mut alerting = config.live.alerting.clone();
646        let webhook = self
647            .webhook_url
648            .clone()
649            .or_else(|| alerting.webhook_url.clone());
650        alerting.webhook_url = sanitize_webhook(webhook);
651        if let Some(sec) = self.alert_max_data_gap_secs {
652            alerting.max_data_gap_secs = sec;
653        }
654        if let Some(limit) = self.alert_max_order_failures {
655            alerting.max_order_failures = limit;
656        }
657        if let Some(limit) = self.alert_max_drawdown {
658            alerting.max_drawdown = limit.max(Decimal::ZERO);
659        }
660        alerting
661    }
662
663    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
664        let mut risk = config.risk_management.clone();
665        if let Some(limit) = self.risk_max_order_qty {
666            risk.max_order_quantity = limit.max(Decimal::ZERO);
667        }
668        if let Some(limit) = self.risk_max_position_qty {
669            risk.max_position_quantity = limit.max(Decimal::ZERO);
670        }
671        if let Some(limit) = self.risk_max_drawdown {
672            risk.max_drawdown = limit.max(Decimal::ZERO);
673        }
674        risk
675    }
676}
677
678#[derive(Deserialize)]
679struct StrategyConfigFile {
680    #[serde(rename = "strategy_name")]
681    name: String,
682    #[serde(default = "empty_table")]
683    params: toml::Value,
684}
685
686fn empty_table() -> toml::Value {
687    toml::Value::Table(Default::default())
688}
689
690pub async fn run() -> Result<()> {
691    let cli = Cli::parse();
692    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
693
694    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
695        0 => config.log_level.clone(),
696        1 => "debug".to_string(),
697        _ => "trace".to_string(),
698    });
699
700    let log_override = match &cli.command {
701        Commands::Live {
702            action: LiveCommand::Run(args),
703        } => Some(args.resolved_log_path(&config)),
704        _ => None,
705    };
706
707    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
708
709    match cli.command {
710        Commands::Data { action } => handle_data(action, &config).await?,
711        Commands::Backtest {
712            action: BacktestCommand::Run(args),
713        } => args.run(&config).await?,
714        Commands::Backtest {
715            action: BacktestCommand::Batch(args),
716        } => args.run(&config).await?,
717        Commands::Live {
718            action: LiveCommand::Run(args),
719        } => args.run(&config).await?,
720        Commands::State { action } => handle_state(action, &config).await?,
721        Commands::Analyze { action } => handle_analyze(action)?,
722        Commands::Strategies => list_strategies(),
723    }
724
725    Ok(())
726}
727
728async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
729    match cmd {
730        DataCommand::Download(args) => {
731            args.run(config).await?;
732        }
733        DataCommand::Validate(args) => {
734            args.run()?;
735        }
736        DataCommand::Resample(args) => {
737            info!(
738                "stub: resampling {} into {} at {}",
739                args.input.display(),
740                args.output.display(),
741                args.interval
742            );
743        }
744        DataCommand::InspectParquet(args) => {
745            args.run()?;
746        }
747    }
748    Ok(())
749}
750
751async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
752    match cmd {
753        StateCommand::Inspect(args) => {
754            state::inspect_state(args.resolved_path(config), args.raw).await?;
755        }
756    }
757    Ok(())
758}
759
760fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
761    match cmd {
762        AnalyzeCommand::Execution(args) => analyze::run_execution(args.build_request()?),
763    }
764}
765
766impl BacktestRunArgs {
767    async fn run(&self, config: &AppConfig) -> Result<()> {
768        let contents = std::fs::read_to_string(&self.strategy_config)
769            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
770        let def: StrategyConfigFile =
771            toml::from_str(&contents).context("failed to parse strategy config file")?;
772        let strategy = load_strategy(&def.name, def.params)
773            .with_context(|| format!("failed to configure strategy {}", def.name))?;
774        let symbols = strategy.subscriptions();
775        if symbols.is_empty() {
776            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
777        }
778
779        let mode = match self.mode {
780            BacktestModeArg::Candle => BacktestMode::Candle,
781            BacktestModeArg::Tick => BacktestMode::Tick,
782        };
783
784        let markets_path = self
785            .markets_file
786            .clone()
787            .or_else(|| config.backtest.markets_file.clone())
788            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
789        let market_registry = Arc::new(
790            MarketRegistry::load_from_file(&markets_path).with_context(|| {
791                format!("failed to load markets from {}", markets_path.display())
792            })?,
793        );
794
795        let (market_stream, lob_events, execution_client, matching_engine) = match mode {
796            BacktestMode::Candle => {
797                let stream = self.build_candle_stream(&symbols)?;
798                (
799                    Some(stream),
800                    Vec::new(),
801                    Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
802                    None,
803                )
804            }
805            BacktestMode::Tick => {
806                if self.lob_paths.is_empty() {
807                    bail!("--lob-data is required when --mode tick");
808                }
809                let events = load_lob_events_from_paths(&self.lob_paths)?;
810                if events.is_empty() {
811                    bail!("no order book events loaded from --lob-data");
812                }
813                let engine = Arc::new(MatchingEngine::new(
814                    "matching-engine",
815                    symbols.clone(),
816                    reporting_balance(&config.backtest),
817                ));
818                (
819                    None,
820                    events,
821                    engine.clone() as Arc<dyn ExecutionClient>,
822                    Some(engine),
823                )
824            }
825        };
826
827        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
828        let order_quantity = self.quantity;
829        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
830
831        let mut cfg = BacktestConfig::new(symbols[0].clone());
832        cfg.lob_events = lob_events;
833        cfg.order_quantity = order_quantity;
834        cfg.initial_balances = clone_initial_balances(&config.backtest);
835        cfg.reporting_currency = config.backtest.reporting_currency.clone();
836        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
837        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
838        cfg.execution.latency_candles = self.latency_candles.max(1);
839        cfg.mode = mode;
840
841        let report = Backtester::new(
842            cfg,
843            strategy,
844            execution,
845            matching_engine,
846            market_registry,
847            market_stream,
848        )
849        .run()
850        .await
851        .context("backtest failed")?;
852        print_report(&report);
853        Ok(())
854    }
855
856    fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
857        if symbols.is_empty() {
858            bail!("strategy did not declare any subscriptions");
859        }
860        if self.data_paths.is_empty() {
861            let mut generated = Vec::new();
862            for (idx, symbol) in symbols.iter().enumerate() {
863                let offset = idx as i64 * 10;
864                generated.extend(synth_candles(symbol, self.candles, offset));
865            }
866            generated.sort_by_key(|c| c.timestamp);
867            if generated.is_empty() {
868                bail!("no synthetic candles generated; provide --data files instead");
869            }
870            return Ok(memory_market_stream(&symbols[0], generated));
871        }
872
873        match detect_data_format(&self.data_paths)? {
874            DataFormat::Csv => {
875                let mut candles = load_candles_from_paths(&self.data_paths)?;
876                if candles.is_empty() {
877                    bail!("no candles loaded from --data paths");
878                }
879                candles.sort_by_key(|c| c.timestamp);
880                Ok(memory_market_stream(&symbols[0], candles))
881            }
882            DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
883        }
884    }
885}
886
887impl BacktestBatchArgs {
888    async fn run(&self, config: &AppConfig) -> Result<()> {
889        if self.config_paths.is_empty() {
890            return Err(anyhow!("provide at least one --config path"));
891        }
892        if self.data_paths.is_empty() {
893            return Err(anyhow!("provide at least one --data path for batch mode"));
894        }
895        let markets_path = self
896            .markets_file
897            .clone()
898            .or_else(|| config.backtest.markets_file.clone())
899            .ok_or_else(|| {
900                anyhow!("batch mode requires --markets-file or backtest.markets_file")
901            })?;
902        let market_registry = Arc::new(
903            MarketRegistry::load_from_file(&markets_path).with_context(|| {
904                format!("failed to load markets from {}", markets_path.display())
905            })?,
906        );
907        let data_format = detect_data_format(&self.data_paths)?;
908        let mut aggregated = Vec::new();
909        for config_path in &self.config_paths {
910            let contents = std::fs::read_to_string(config_path).with_context(|| {
911                format!("failed to read strategy config {}", config_path.display())
912            })?;
913            let def: StrategyConfigFile =
914                toml::from_str(&contents).context("failed to parse strategy config file")?;
915            let strategy = load_strategy(&def.name, def.params)
916                .with_context(|| format!("failed to configure strategy {}", def.name))?;
917            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
918            let order_quantity = self.quantity;
919            let symbols = strategy.subscriptions();
920            if symbols.is_empty() {
921                bail!("strategy {} did not declare subscriptions", strategy.name());
922            }
923            let stream = match data_format {
924                DataFormat::Csv => {
925                    let mut candles = load_candles_from_paths(&self.data_paths)?;
926                    if candles.is_empty() {
927                        bail!("no candles loaded from --data paths");
928                    }
929                    candles.sort_by_key(|c| c.timestamp);
930                    memory_market_stream(&symbols[0], candles)
931                }
932                DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
933            };
934            let execution_client: Arc<dyn ExecutionClient> =
935                Arc::new(PaperExecutionClient::default());
936            let execution =
937                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
938            let mut cfg = BacktestConfig::new(symbols[0].clone());
939            cfg.order_quantity = order_quantity;
940            cfg.initial_balances = clone_initial_balances(&config.backtest);
941            cfg.reporting_currency = config.backtest.reporting_currency.clone();
942            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
943            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
944            cfg.execution.latency_candles = self.latency_candles.max(1);
945
946            let report = Backtester::new(
947                cfg,
948                strategy,
949                execution,
950                None,
951                market_registry.clone(),
952                Some(stream),
953            )
954            .run()
955            .await
956            .with_context(|| format!("backtest failed for {}", config_path.display()))?;
957            aggregated.push(BatchRow {
958                config: config_path.display().to_string(),
959                signals: 0, // Legacy field, can be removed or calculated from report
960                orders: 0,  // Legacy field, can be removed or calculated from report
961                dropped_orders: 0, // Legacy field, can be removed or calculated from report
962                ending_equity: report.ending_equity,
963            });
964        }
965
966        if let Some(output) = &self.output {
967            write_batch_report(output, &aggregated)?;
968            println!("Batch report written to {}", output.display());
969        }
970        if aggregated.is_empty() {
971            return Err(anyhow!("no batch jobs executed"));
972        }
973        Ok(())
974    }
975}
976
977impl LiveRunArgs {
978    async fn run(&self, config: &AppConfig) -> Result<()> {
979        let exchange_cfg = config
980            .exchange
981            .get(&self.exchange)
982            .cloned()
983            .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
984
985        let driver = exchange_cfg.driver.clone();
986
987        let contents = fs::read_to_string(&self.strategy_config)
988            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
989        let def: StrategyConfigFile =
990            toml::from_str(&contents).context("failed to parse strategy config file")?;
991        let strategy = load_strategy(&def.name, def.params)
992            .with_context(|| format!("failed to configure strategy {}", def.name))?;
993        let symbols = strategy.subscriptions();
994        if symbols.is_empty() {
995            bail!("strategy did not declare any subscriptions");
996        }
997        if self.quantity <= Decimal::ZERO {
998            bail!("--quantity must be greater than zero");
999        }
1000        let quantity = self.quantity;
1001        let initial_balances = self.resolved_initial_balances(config);
1002        let reporting_currency = config.backtest.reporting_currency.clone();
1003        let markets_file = self
1004            .markets_file
1005            .clone()
1006            .or_else(|| config.backtest.markets_file.clone());
1007
1008        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1009        let category =
1010            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1011        let metrics_addr = self.resolved_metrics_addr(config)?;
1012        let control_addr = self.resolved_control_addr(config)?;
1013        let state_path = self.resolved_state_path(config);
1014        let alerting = self.build_alerting(config);
1015        let history = self.history.max(32);
1016        let reconciliation_interval = self.reconciliation_interval(config);
1017        let reconciliation_threshold = self.reconciliation_threshold(config);
1018        let orderbook_depth = self
1019            .orderbook_depth
1020            .unwrap_or(super::live::default_order_book_depth());
1021
1022        let settings = LiveSessionSettings {
1023            category,
1024            interval,
1025            quantity,
1026            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1027            fee_bps: self.fee_bps.max(Decimal::ZERO),
1028            history,
1029            metrics_addr,
1030            state_path,
1031            initial_balances,
1032            reporting_currency,
1033            markets_file,
1034            alerting,
1035            exec_backend: self.exec,
1036            risk: self.build_risk_config(config),
1037            reconciliation_interval,
1038            reconciliation_threshold,
1039            driver,
1040            orderbook_depth,
1041            record_path: Some(self.record_data.clone()),
1042            control_addr,
1043        };
1044
1045        info!(
1046            strategy = %def.name,
1047            symbols = ?symbols,
1048            exchange = %self.exchange,
1049            interval = %self.interval,
1050            driver = ?settings.driver,
1051            exec = ?self.exec,
1052            control_addr = %settings.control_addr,
1053            "starting live session"
1054        );
1055
1056        run_live(strategy, symbols, exchange_cfg, settings)
1057            .await
1058            .context("live session failed")
1059    }
1060}
1061
1062fn list_strategies() {
1063    println!("Built-in strategies:");
1064    for name in builtin_strategy_names() {
1065        println!("- {name}");
1066    }
1067}
1068
1069fn print_validation_summary(outcome: &ValidationOutcome) {
1070    const MAX_EXAMPLES: usize = 5;
1071    let summary = &outcome.summary;
1072    println!(
1073        "Validation summary for {} ({} candles)",
1074        summary.symbol, summary.rows
1075    );
1076    println!(
1077        "  Range: {} -> {}",
1078        summary.start.to_rfc3339(),
1079        summary.end.to_rfc3339()
1080    );
1081    println!("  Interval: {}", interval_label(summary.interval));
1082    println!("  Missing intervals: {}", summary.missing_candles);
1083    println!("  Duplicate intervals: {}", summary.duplicate_candles);
1084    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
1085    println!("  Price spikes flagged: {}", summary.price_spike_count);
1086    println!(
1087        "  Cross-source mismatches: {}",
1088        summary.cross_mismatch_count
1089    );
1090    println!("  Repaired candles generated: {}", summary.repaired_candles);
1091
1092    if !outcome.gaps.is_empty() {
1093        println!("  Gap examples:");
1094        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1095            println!(
1096                "    {} -> {} (missing {})",
1097                gap.start.to_rfc3339(),
1098                gap.end.to_rfc3339(),
1099                gap.missing
1100            );
1101        }
1102        if outcome.gaps.len() > MAX_EXAMPLES {
1103            println!(
1104                "    ... {} additional gap(s) omitted",
1105                outcome.gaps.len() - MAX_EXAMPLES
1106            );
1107        }
1108    }
1109
1110    if !outcome.price_spikes.is_empty() {
1111        println!("  Price spike examples:");
1112        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1113            println!(
1114                "    {} (change {:.2}%)",
1115                spike.timestamp.to_rfc3339(),
1116                spike.change_fraction * 100.0
1117            );
1118        }
1119        if outcome.price_spikes.len() > MAX_EXAMPLES {
1120            println!(
1121                "    ... {} additional spike(s) omitted",
1122                outcome.price_spikes.len() - MAX_EXAMPLES
1123            );
1124        }
1125    }
1126
1127    if !outcome.cross_mismatches.is_empty() {
1128        println!("  Cross-source mismatch examples:");
1129        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1130            println!(
1131                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
1132                miss.timestamp.to_rfc3339(),
1133                miss.primary_close,
1134                miss.reference_close,
1135                miss.delta_fraction * 100.0
1136            );
1137        }
1138        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1139            println!(
1140                "    ... {} additional mismatch(es) omitted",
1141                outcome.cross_mismatches.len() - MAX_EXAMPLES
1142            );
1143        }
1144    }
1145}
1146
1147fn print_report(report: &PerformanceReport) {
1148    println!("\n{}", report);
1149}
1150
1151fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1152    let mut candles = Vec::with_capacity(len);
1153    for i in 0..len {
1154        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1155        let open = base + (i as f64 % 3.0) * 10.0;
1156        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1157        let open_dec =
1158            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1159        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1160        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1161        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1162        candles.push(Candle {
1163            symbol: Symbol::from(symbol),
1164            interval: Interval::OneMinute,
1165            open: open_dec,
1166            high,
1167            low,
1168            close: close_dec,
1169            volume: Decimal::ONE,
1170            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1171                + Duration::minutes(offset_minutes),
1172        });
1173    }
1174    candles
1175}
1176
1177fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1178    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1179        return Ok(dt.with_timezone(&Utc));
1180    }
1181    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1182        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1183    }
1184    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1185        let dt = date
1186            .and_hms_opt(0, 0, 0)
1187            .ok_or_else(|| anyhow!("invalid date"))?;
1188        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1189    }
1190    Err(anyhow!("unable to parse datetime '{value}'"))
1191}
1192
1193#[derive(Deserialize)]
1194struct CandleCsvRow {
1195    symbol: Option<String>,
1196    timestamp: String,
1197    open: f64,
1198    high: f64,
1199    low: f64,
1200    close: f64,
1201    volume: f64,
1202}
1203
1204fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1205    let mut candles = Vec::new();
1206    for path in paths {
1207        let mut reader = csv::Reader::from_path(path)
1208            .with_context(|| format!("failed to open {}", path.display()))?;
1209        for record in reader.deserialize::<CandleCsvRow>() {
1210            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1211            let timestamp = parse_datetime(&row.timestamp)?;
1212            let symbol = row
1213                .symbol
1214                .clone()
1215                .or_else(|| infer_symbol_from_path(path))
1216                .ok_or_else(|| {
1217                    anyhow!(
1218                        "missing symbol column and unable to infer from path {}",
1219                        path.display()
1220                    )
1221                })?;
1222            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1223            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1224                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1225            })?;
1226            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1227                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1228            })?;
1229            let low = Decimal::from_f64(row.low)
1230                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1231            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1232                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1233            })?;
1234            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1235                anyhow!(
1236                    "invalid volume value '{}' in {}",
1237                    row.volume,
1238                    path.display()
1239                )
1240            })?;
1241            candles.push(Candle {
1242                symbol,
1243                interval,
1244                open,
1245                high,
1246                low,
1247                close,
1248                volume,
1249                timestamp,
1250            });
1251        }
1252    }
1253    Ok(candles)
1254}
1255
1256#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1257enum DataFormat {
1258    Csv,
1259    Parquet,
1260}
1261
1262fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1263    let mut detected: Option<DataFormat> = None;
1264    for path in paths {
1265        let ext = path
1266            .extension()
1267            .and_then(|ext| ext.to_str())
1268            .map(|ext| ext.to_ascii_lowercase())
1269            .unwrap_or_else(|| String::from(""));
1270        let current = if ext == "parquet" {
1271            DataFormat::Parquet
1272        } else {
1273            DataFormat::Csv
1274        };
1275        if let Some(existing) = detected {
1276            if existing != current {
1277                bail!("cannot mix CSV and Parquet inputs in --data");
1278            }
1279        } else {
1280            detected = Some(current);
1281        }
1282    }
1283    detected.ok_or_else(|| anyhow!("no data paths provided"))
1284}
1285
1286fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1287    Box::new(PaperMarketStream::from_data(
1288        symbol.to_string(),
1289        Vec::new(),
1290        candles,
1291    ))
1292}
1293
1294fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1295    Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1296}
1297
1298#[derive(Deserialize)]
1299#[serde(tag = "event", rename_all = "lowercase")]
1300enum LobEventRow {
1301    Snapshot {
1302        timestamp: String,
1303        symbol: Option<String>,
1304        bids: Vec<[f64; 2]>,
1305        asks: Vec<[f64; 2]>,
1306    },
1307    Depth {
1308        timestamp: String,
1309        symbol: Option<String>,
1310        bids: Vec<[f64; 2]>,
1311        asks: Vec<[f64; 2]>,
1312    },
1313    Trade {
1314        timestamp: String,
1315        symbol: Option<String>,
1316        side: String,
1317        price: f64,
1318        size: f64,
1319    },
1320}
1321
1322fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1323    let mut events = Vec::new();
1324    for path in paths {
1325        let file = File::open(path)
1326            .with_context(|| format!("failed to open order book file {}", path.display()))?;
1327        let symbol_hint = infer_symbol_from_path(path);
1328        for line in BufReader::new(file).lines() {
1329            let line =
1330                line.with_context(|| format!("failed to read line from {}", path.display()))?;
1331            if line.trim().is_empty() {
1332                continue;
1333            }
1334            let row: LobEventRow = serde_json::from_str(&line)
1335                .with_context(|| format!("invalid order book event in {}", path.display()))?;
1336            match row {
1337                LobEventRow::Snapshot {
1338                    timestamp,
1339                    symbol,
1340                    bids,
1341                    asks,
1342                } => {
1343                    let ts = parse_datetime(&timestamp)?;
1344                    let symbol = symbol
1345                        .or_else(|| symbol_hint.clone())
1346                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1347                    let bids = convert_levels(&bids)?;
1348                    let asks = convert_levels(&asks)?;
1349                    let book = OrderBook {
1350                        symbol: symbol.clone(),
1351                        bids,
1352                        asks,
1353                        timestamp: ts,
1354                    };
1355                    events.push(MarketEvent {
1356                        timestamp: ts,
1357                        kind: MarketEventKind::OrderBook(book),
1358                    });
1359                }
1360                LobEventRow::Depth {
1361                    timestamp,
1362                    symbol,
1363                    bids,
1364                    asks,
1365                } => {
1366                    let ts = parse_datetime(&timestamp)?;
1367                    let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1368                        anyhow!("missing symbol in depth update {}", path.display())
1369                    })?;
1370                    let bids = convert_levels(&bids)?;
1371                    let asks = convert_levels(&asks)?;
1372                    let update = DepthUpdate {
1373                        symbol: symbol.clone(),
1374                        bids,
1375                        asks,
1376                        timestamp: ts,
1377                    };
1378                    events.push(MarketEvent {
1379                        timestamp: ts,
1380                        kind: MarketEventKind::Depth(update),
1381                    });
1382                }
1383                LobEventRow::Trade {
1384                    timestamp,
1385                    symbol,
1386                    side,
1387                    price,
1388                    size,
1389                } => {
1390                    let ts = parse_datetime(&timestamp)?;
1391                    let symbol = symbol
1392                        .or_else(|| symbol_hint.clone())
1393                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1394                    let side = match side.to_lowercase().as_str() {
1395                        "buy" | "bid" | "b" => Side::Buy,
1396                        "sell" | "ask" | "s" => Side::Sell,
1397                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
1398                    };
1399                    let price = Decimal::from_f64(price).ok_or_else(|| {
1400                        anyhow!("invalid trade price '{}' in {}", price, path.display())
1401                    })?;
1402                    let size = Decimal::from_f64(size).ok_or_else(|| {
1403                        anyhow!("invalid trade size '{}' in {}", size, path.display())
1404                    })?;
1405                    let tick = Tick {
1406                        symbol: symbol.clone(),
1407                        price,
1408                        size,
1409                        side,
1410                        exchange_timestamp: ts,
1411                        received_at: ts,
1412                    };
1413                    events.push(MarketEvent {
1414                        timestamp: ts,
1415                        kind: MarketEventKind::Trade(tick),
1416                    });
1417                }
1418            }
1419        }
1420    }
1421    events.sort_by_key(|event| event.timestamp);
1422    Ok(events)
1423}
1424
1425fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1426    levels
1427        .iter()
1428        .map(|pair| {
1429            let price = Decimal::from_f64(pair[0])
1430                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1431            let size = Decimal::from_f64(pair[1])
1432                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1433            Ok(OrderBookLevel { price, size })
1434        })
1435        .collect()
1436}
1437
1438fn infer_symbol_from_path(path: &Path) -> Option<String> {
1439    path.parent()
1440        .and_then(|p| p.file_name())
1441        .map(|os| os.to_string_lossy().to_string())
1442}
1443
1444fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1445    path.file_stem()
1446        .and_then(|os| os.to_str())
1447        .and_then(|stem| stem.split('_').next())
1448        .and_then(|token| Interval::from_str(token).ok())
1449}
1450
1451fn default_output_path(
1452    config: &AppConfig,
1453    exchange: &str,
1454    symbol: &str,
1455    interval: Interval,
1456    start: DateTime<Utc>,
1457    end: DateTime<Utc>,
1458) -> PathBuf {
1459    let interval_part = interval_label(interval);
1460    let start_part = start.format("%Y%m%d").to_string();
1461    let end_part = end.format("%Y%m%d").to_string();
1462    config
1463        .data_path
1464        .join(exchange)
1465        .join(symbol)
1466        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1467}
1468
1469fn interval_label(interval: Interval) -> &'static str {
1470    match interval {
1471        Interval::OneSecond => "1s",
1472        Interval::OneMinute => "1m",
1473        Interval::FiveMinutes => "5m",
1474        Interval::FifteenMinutes => "15m",
1475        Interval::OneHour => "1h",
1476        Interval::FourHours => "4h",
1477        Interval::OneDay => "1d",
1478    }
1479}
1480
1481#[derive(Serialize)]
1482struct CandleRow<'a> {
1483    symbol: &'a str,
1484    timestamp: String,
1485    open: f64,
1486    high: f64,
1487    low: f64,
1488    close: f64,
1489    volume: f64,
1490}
1491
1492fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1493    if let Some(parent) = path.parent() {
1494        fs::create_dir_all(parent)
1495            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1496    }
1497    let mut writer =
1498        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1499    for candle in candles {
1500        let row = CandleRow {
1501            symbol: &candle.symbol,
1502            timestamp: candle.timestamp.to_rfc3339(),
1503            open: candle.open.to_f64().unwrap_or(0.0),
1504            high: candle.high.to_f64().unwrap_or(0.0),
1505            low: candle.low.to_f64().unwrap_or(0.0),
1506            close: candle.close.to_f64().unwrap_or(0.0),
1507            volume: candle.volume.to_f64().unwrap_or(0.0),
1508        };
1509        writer.serialize(row)?;
1510    }
1511    writer.flush()?;
1512    Ok(())
1513}
1514
1515#[derive(Serialize)]
1516struct BatchRow {
1517    config: String,
1518    signals: usize,
1519    orders: usize,
1520    dropped_orders: usize,
1521    ending_equity: f64,
1522}
1523
1524fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1525    if let Some(parent) = path.parent() {
1526        fs::create_dir_all(parent)
1527            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1528    }
1529    let mut writer =
1530        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1531    for row in rows {
1532        writer.serialize(row)?;
1533    }
1534    writer.flush()?;
1535    Ok(())
1536}
1537
1538fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1539    config
1540        .initial_balances
1541        .iter()
1542        .map(|(currency, amount)| (currency.clone(), *amount))
1543        .collect()
1544}
1545
1546fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1547    config
1548        .initial_balances
1549        .get(&config.reporting_currency)
1550        .copied()
1551        .unwrap_or_default()
1552}
1553
1554fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1555    let parts: Vec<_> = value.split(':').collect();
1556    match parts.as_slice() {
1557        ["fixed", val] => {
1558            let quantity =
1559                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1560            Ok(Box::new(FixedOrderSizer { quantity }))
1561        }
1562        ["fixed"] => {
1563            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1564            Ok(Box::new(FixedOrderSizer { quantity }))
1565        }
1566        ["percent", val] => {
1567            let percent =
1568                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1569            Ok(Box::new(PortfolioPercentSizer {
1570                percent: percent.max(Decimal::ZERO),
1571            }))
1572        }
1573        ["risk-adjusted", val] => {
1574            let risk_fraction = Decimal::from_str(val)
1575                .context("invalid risk fraction value (use decimals)")?;
1576            Ok(Box::new(RiskAdjustedSizer {
1577                risk_fraction: risk_fraction.max(Decimal::ZERO),
1578            }))
1579        }
1580        _ => Err(anyhow!(
1581            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1582        )),
1583    }
1584}