tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5    run_live, ExecutionBackend, LiveSessionSettings, PersistenceBackend, PersistenceSettings,
6};
7use crate::state;
8use crate::telemetry::init_tracing;
9use crate::tui;
10use crate::PublicChannel;
11use arrow::util::pretty::print_batches;
12use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
13use std::collections::HashMap;
14use std::fs::{self, File};
15use std::io::{BufRead, BufReader};
16use std::net::SocketAddr;
17use std::path::{Path, PathBuf};
18use std::str::FromStr;
19use std::sync::Arc;
20use std::time::Duration as StdDuration;
21
22use anyhow::{anyhow, bail, Context, Result};
23use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
24use clap::{Args, Parser, Subcommand, ValueEnum};
25use csv::Writer;
26use futures::StreamExt;
27use rust_decimal::{
28    prelude::{FromPrimitive, ToPrimitive},
29    Decimal,
30};
31use serde::{Deserialize, Serialize};
32use tesser_backtester::reporting::PerformanceReport;
33use tesser_backtester::{
34    stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
35    MarketEventKind, MarketEventStream,
36};
37use tesser_broker::ExecutionClient;
38use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
39use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
40use tesser_data::analytics::ExecutionAnalysisRequest;
41use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
42use tesser_data::io::{self, DatasetFormat as IoDatasetFormat};
43use tesser_data::merger::UnifiedEventStream;
44use tesser_data::parquet::ParquetMarketStream;
45use tesser_data::transform::Resampler;
46use tesser_execution::{
47    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
48    RiskAdjustedSizer,
49};
50use tesser_markets::MarketRegistry;
51use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
52use tesser_strategy::{builtin_strategy_names, load_strategy};
53use tracing::{info, warn};
54
55#[derive(Parser)]
56#[command(author, version, about = "Tesser Trading Framework")]
57pub struct Cli {
58    /// Increases logging verbosity (-v debug, -vv trace)
59    #[arg(short, long, action = clap::ArgAction::Count)]
60    verbose: u8,
61    /// Selects which configuration environment to load (maps to config/{env}.toml)
62    #[arg(long, default_value = "default")]
63    env: String,
64    #[command(subcommand)]
65    command: Commands,
66}
67
68#[allow(clippy::large_enum_variant)]
69#[derive(Subcommand)]
70pub enum Commands {
71    /// Data engineering tasks
72    Data {
73        #[command(subcommand)]
74        action: DataCommand,
75    },
76    /// Backtesting workflows
77    Backtest {
78        #[command(subcommand)]
79        action: BacktestCommand,
80    },
81    /// Live trading workflows
82    Live {
83        #[command(subcommand)]
84        action: LiveCommand,
85    },
86    /// Inspect or repair persisted runtime state
87    State {
88        #[command(subcommand)]
89        action: StateCommand,
90    },
91    /// Strategy management helpers
92    Strategies,
93    /// Analyze previously recorded trading sessions
94    Analyze {
95        #[command(subcommand)]
96        action: AnalyzeCommand,
97    },
98    /// Launch the real-time TUI dashboard
99    Monitor(MonitorArgs),
100}
101
102#[derive(Subcommand)]
103pub enum DataCommand {
104    /// Download historical market data
105    Download(DataDownloadArgs),
106    /// Validate and optionally repair a local data set
107    Validate(DataValidateArgs),
108    /// Resample existing data (placeholder)
109    Resample(DataResampleArgs),
110    /// Inspect a parquet file emitted by the flight recorder
111    InspectParquet(DataInspectParquetArgs),
112}
113
114#[derive(Subcommand)]
115pub enum BacktestCommand {
116    /// Run a single backtest from a strategy config file
117    Run(BacktestRunArgs),
118    /// Run multiple strategy configs and aggregate the results
119    Batch(BacktestBatchArgs),
120}
121
122#[derive(Subcommand)]
123pub enum LiveCommand {
124    /// Start a live trading session (scaffolding)
125    Run(LiveRunArgs),
126}
127
128#[derive(Subcommand)]
129pub enum StateCommand {
130    /// Inspect the persisted live state snapshot
131    Inspect(StateInspectArgs),
132}
133
134#[derive(Subcommand)]
135pub enum AnalyzeCommand {
136    /// Generate a TCA-lite execution report using flight recorder files
137    Execution(AnalyzeExecutionArgs),
138}
139
140#[derive(Args)]
141pub struct DataDownloadArgs {
142    #[arg(long, default_value = "bybit")]
143    exchange: String,
144    #[arg(long)]
145    symbol: String,
146    #[arg(long, default_value = "linear")]
147    category: String,
148    #[arg(long, default_value = "1m")]
149    interval: String,
150    #[arg(long)]
151    start: String,
152    #[arg(long)]
153    end: Option<String>,
154    #[arg(long)]
155    output: Option<PathBuf>,
156    /// Skip automatic validation after download completes
157    #[arg(long)]
158    skip_validation: bool,
159    /// Attempt to repair gaps detected during validation
160    #[arg(long)]
161    repair_missing: bool,
162    /// Max allowed close-to-close jump when auto-validating (fractional)
163    #[arg(long, default_value_t = 0.05)]
164    validation_jump_threshold: f64,
165    /// Allowed divergence between primary and reference closes (fractional)
166    #[arg(long, default_value_t = 0.002)]
167    validation_reference_tolerance: f64,
168}
169
170#[derive(Args)]
171pub struct StateInspectArgs {
172    /// Path to the persisted state (file for sqlite, directory for lmdb)
173    #[arg(long)]
174    path: Option<PathBuf>,
175    /// Emit the raw JSON payload stored inside the database
176    #[arg(long)]
177    raw: bool,
178}
179
180#[derive(Args)]
181pub struct AnalyzeExecutionArgs {
182    /// Directory containing flight recorder parquet partitions
183    #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
184    data_dir: PathBuf,
185    /// Inclusive start of the analysis window (RFC3339, `YYYY-mm-dd`, etc.)
186    #[arg(long)]
187    start: Option<String>,
188    /// Inclusive end of the analysis window
189    #[arg(long)]
190    end: Option<String>,
191    /// Optional CSV file path for exporting ExecutionStats
192    #[arg(long, value_name = "PATH")]
193    export_csv: Option<PathBuf>,
194}
195
196#[derive(Args)]
197pub struct MonitorArgs {
198    /// Control plane address (overrides config.live.control_addr)
199    #[arg(long)]
200    control_addr: Option<String>,
201    /// UI refresh rate in milliseconds
202    #[arg(long, default_value_t = 250)]
203    tick_rate: u64,
204}
205
206impl StateInspectArgs {
207    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
208        self.path
209            .clone()
210            .unwrap_or_else(|| config.live.persistence_config().path.clone())
211    }
212
213    fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
214        config.live.persistence_config().engine
215    }
216}
217
218impl MonitorArgs {
219    async fn run(&self, config: &AppConfig) -> Result<()> {
220        let addr = self
221            .control_addr
222            .clone()
223            .unwrap_or_else(|| config.live.control_addr.clone());
224        let refresh = self.tick_rate.max(50);
225        let monitor_config = tui::MonitorConfig::new(addr, StdDuration::from_millis(refresh));
226        tui::run_monitor(monitor_config).await
227    }
228}
229
230impl AnalyzeExecutionArgs {
231    fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
232        let start = match &self.start {
233            Some(value) => Some(parse_datetime(value)?),
234            None => None,
235        };
236        let end = match &self.end {
237            Some(value) => Some(parse_datetime(value)?),
238            None => None,
239        };
240        Ok(ExecutionAnalysisRequest {
241            data_dir: self.data_dir.clone(),
242            start,
243            end,
244        })
245    }
246}
247
248impl DataDownloadArgs {
249    async fn run(&self, config: &AppConfig) -> Result<()> {
250        let exchange_cfg = config
251            .exchange
252            .get(&self.exchange)
253            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
254        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
255        let start = parse_datetime(&self.start)?;
256        let end = match &self.end {
257            Some(value) => parse_datetime(value)?,
258            None => Utc::now(),
259        };
260        if start >= end {
261            return Err(anyhow!("start time must be earlier than end time"));
262        }
263
264        info!(
265            "Downloading {} candles for {} ({})",
266            self.interval, self.symbol, self.exchange
267        );
268        let mut candles = match exchange_cfg.driver.as_str() {
269            "bybit" | "" => {
270                let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
271                let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
272                downloader
273                    .download_klines(&request)
274                    .await
275                    .with_context(|| "failed to download candles from Bybit")?
276            }
277            "binance" => {
278                let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
279                let request = KlineRequest::new("", &self.symbol, interval, start, end);
280                downloader
281                    .download_klines(&request)
282                    .await
283                    .with_context(|| "failed to download candles from Binance")?
284            }
285            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
286        };
287
288        if candles.is_empty() {
289            info!("No candles returned for {}", self.symbol);
290            return Ok(());
291        }
292
293        if !self.skip_validation {
294            let config = ValidationConfig {
295                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
296                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
297                repair_missing: self.repair_missing,
298            };
299            let outcome =
300                validate_dataset(candles.clone(), None, config).context("validation failed")?;
301            print_validation_summary(&outcome);
302            if self.repair_missing && outcome.summary.repaired_candles > 0 {
303                candles = outcome.repaired;
304                info!(
305                    "Applied {} synthetic candle(s) to repair gaps",
306                    outcome.summary.repaired_candles
307                );
308            }
309        }
310
311        let output_path = self.output.clone().unwrap_or_else(|| {
312            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
313        });
314        write_candles_csv(&output_path, &candles)?;
315        info!(
316            "Saved {} candles to {}",
317            candles.len(),
318            output_path.display()
319        );
320        Ok(())
321    }
322}
323
324#[derive(Args)]
325pub struct DataValidateArgs {
326    /// One or more CSV files to inspect
327    #[arg(
328        long = "path",
329        value_name = "PATH",
330        num_args = 1..,
331        action = clap::ArgAction::Append
332    )]
333    paths: Vec<PathBuf>,
334    /// Optional reference data set(s) used for cross validation
335    #[arg(
336        long = "reference",
337        value_name = "PATH",
338        num_args = 1..,
339        action = clap::ArgAction::Append
340    )]
341    reference_paths: Vec<PathBuf>,
342    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
343    #[arg(long, default_value_t = 0.05)]
344    jump_threshold: f64,
345    /// Allowed divergence between primary and reference closes (fractional)
346    #[arg(long, default_value_t = 0.002)]
347    reference_tolerance: f64,
348    /// Attempt to fill gaps by synthesizing candles
349    #[arg(long)]
350    repair_missing: bool,
351    /// Location to write the repaired dataset
352    #[arg(long)]
353    output: Option<PathBuf>,
354}
355
356impl DataValidateArgs {
357    fn run(&self) -> Result<()> {
358        if self.paths.is_empty() {
359            bail!("provide at least one --path for validation");
360        }
361        let candles =
362            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
363        if candles.is_empty() {
364            bail!("loaded dataset is empty; nothing to validate");
365        }
366        let reference = if self.reference_paths.is_empty() {
367            None
368        } else {
369            Some(
370                load_candles_from_paths(&self.reference_paths)
371                    .with_context(|| "failed to load reference dataset")?,
372            )
373        };
374
375        let price_jump_threshold = if self.jump_threshold <= 0.0 {
376            0.0001
377        } else {
378            self.jump_threshold
379        };
380        let reference_tolerance = if self.reference_tolerance <= 0.0 {
381            0.0001
382        } else {
383            self.reference_tolerance
384        };
385
386        let config = ValidationConfig {
387            price_jump_threshold,
388            reference_tolerance,
389            repair_missing: self.repair_missing,
390        };
391
392        let outcome = validate_dataset(candles, reference, config)?;
393        print_validation_summary(&outcome);
394
395        if let Some(output) = &self.output {
396            write_candles_csv(output, &outcome.repaired)?;
397            info!(
398                "Wrote {} candles ({} new) to {}",
399                outcome.repaired.len(),
400                outcome.summary.repaired_candles,
401                output.display()
402            );
403        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
404            warn!(
405                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
406                outcome.summary.repaired_candles
407            );
408        }
409
410        Ok(())
411    }
412}
413
414impl DataResampleArgs {
415    fn run(&self) -> Result<()> {
416        if !self.input.exists() {
417            bail!("input file {} not found", self.input.display());
418        }
419        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
420        let dataset = io::read_dataset(&self.input)
421            .with_context(|| format!("failed to read {}", self.input.display()))?;
422        let io::CandleDataset {
423            format: input_format,
424            candles,
425        } = dataset;
426        if candles.is_empty() {
427            bail!(
428                "dataset {} does not contain any candles",
429                self.input.display()
430            );
431        }
432        let input_len = candles.len();
433        let resampled = Resampler::resample(candles, interval);
434        if resampled.is_empty() {
435            bail!(
436                "no candles produced after resampling {}",
437                self.input.display()
438            );
439        }
440        let output_format = self
441            .format
442            .map(IoDatasetFormat::from)
443            .unwrap_or_else(|| IoDatasetFormat::from_path(&self.output));
444        io::write_dataset(&self.output, output_format, &resampled)
445            .with_context(|| format!("failed to write {}", self.output.display()))?;
446        info!(
447            "Resampled {} -> {} candles (input {:?} -> output {:?}) to {}",
448            input_len,
449            resampled.len(),
450            input_format,
451            output_format,
452            self.output.display()
453        );
454        Ok(())
455    }
456}
457
458#[derive(Args)]
459pub struct DataResampleArgs {
460    #[arg(long)]
461    input: PathBuf,
462    #[arg(long)]
463    output: PathBuf,
464    #[arg(long, default_value = "1h")]
465    interval: String,
466    /// Force the output format (defaults to --output extension)
467    #[arg(long, value_enum)]
468    format: Option<DatasetFormatArg>,
469}
470
471#[derive(Copy, Clone, Debug, ValueEnum)]
472pub enum DatasetFormatArg {
473    Csv,
474    Parquet,
475}
476
477impl From<DatasetFormatArg> for IoDatasetFormat {
478    fn from(value: DatasetFormatArg) -> Self {
479        match value {
480            DatasetFormatArg::Csv => IoDatasetFormat::Csv,
481            DatasetFormatArg::Parquet => IoDatasetFormat::Parquet,
482        }
483    }
484}
485
486#[derive(Args)]
487pub struct DataInspectParquetArgs {
488    /// Path to the parquet file produced by the flight recorder
489    #[arg(value_name = "PATH")]
490    path: PathBuf,
491    /// Number of rows to display (0 prints the entire file)
492    #[arg(long, default_value_t = 10)]
493    rows: usize,
494}
495
496impl DataInspectParquetArgs {
497    fn run(&self) -> Result<()> {
498        let limit = if self.rows == 0 {
499            usize::MAX
500        } else {
501            self.rows
502        };
503        let file = File::open(&self.path)
504            .with_context(|| format!("failed to open {}", self.path.display()))?;
505        let batch_size = limit.clamp(1, 8192);
506        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
507            .with_batch_size(batch_size)
508            .build()?;
509
510        let mut printed = 0usize;
511        while printed < limit {
512            match reader.next() {
513                Some(Ok(batch)) => {
514                    if batch.num_rows() == 0 {
515                        continue;
516                    }
517                    let remaining = limit.saturating_sub(printed);
518                    let take = remaining.min(batch.num_rows());
519                    let display_batch = if take == batch.num_rows() {
520                        batch
521                    } else {
522                        batch.slice(0, take)
523                    };
524                    print_batches(std::slice::from_ref(&display_batch))?;
525                    printed += take;
526                }
527                Some(Err(err)) => return Err(err.into()),
528                None => break,
529            }
530        }
531
532        if printed == 0 {
533            println!("no rows available in {}", self.path.display());
534        } else if limit != usize::MAX {
535            println!("displayed {printed} row(s) from {}", self.path.display());
536        }
537
538        Ok(())
539    }
540}
541
542#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
543pub enum BacktestModeArg {
544    Candle,
545    Tick,
546}
547
548#[derive(Args)]
549pub struct BacktestRunArgs {
550    #[arg(long)]
551    strategy_config: PathBuf,
552    /// One or more CSV files with historical candles (symbol,timestamp,...)
553    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
554    data_paths: Vec<PathBuf>,
555    #[arg(long, default_value_t = 500)]
556    candles: usize,
557    #[arg(long, default_value = "0.01")]
558    quantity: Decimal,
559    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
560    #[arg(long, default_value = "0")]
561    slippage_bps: Decimal,
562    /// Trading fees in basis points applied to notional
563    #[arg(long, default_value = "0")]
564    fee_bps: Decimal,
565    /// Number of candles between signal and execution
566    #[arg(long, default_value_t = 1)]
567    latency_candles: usize,
568    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
569    #[arg(long, default_value = "fixed:0.01")]
570    sizer: String,
571    /// Selects the data source driving fills (`candle` or `tick`)
572    #[arg(long, value_enum, default_value = "candle")]
573    mode: BacktestModeArg,
574    /// One or more JSONL files or a flight-recorder directory containing tick/order book events (required for `--mode tick`)
575    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
576    lob_paths: Vec<PathBuf>,
577    #[arg(long)]
578    markets_file: Option<PathBuf>,
579}
580
581enum LobSource {
582    Json(Vec<PathBuf>),
583    FlightRecorder(PathBuf),
584}
585
586#[derive(Args)]
587pub struct BacktestBatchArgs {
588    /// Glob or directory containing strategy config files
589    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
590    config_paths: Vec<PathBuf>,
591    /// Candle CSVs available to every strategy
592    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
593    data_paths: Vec<PathBuf>,
594    #[arg(long, default_value = "0.01")]
595    quantity: Decimal,
596    /// Optional output CSV summarizing results
597    #[arg(long)]
598    output: Option<PathBuf>,
599    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
600    #[arg(long, default_value = "0")]
601    slippage_bps: Decimal,
602    /// Trading fees in basis points applied to notional
603    #[arg(long, default_value = "0")]
604    fee_bps: Decimal,
605    /// Number of candles between signal and execution
606    #[arg(long, default_value_t = 1)]
607    latency_candles: usize,
608    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
609    #[arg(long, default_value = "fixed:0.01")]
610    sizer: String,
611    #[arg(long)]
612    markets_file: Option<PathBuf>,
613}
614
615#[derive(Args)]
616pub struct LiveRunArgs {
617    #[arg(long)]
618    strategy_config: PathBuf,
619    #[arg(long, default_value = "paper_sandbox")]
620    exchange: String,
621    #[arg(long, default_value = "linear")]
622    category: String,
623    #[arg(long, default_value = "1m")]
624    interval: String,
625    #[arg(long, default_value = "1")]
626    quantity: Decimal,
627    /// Selects which execution backend to use (`paper` or `bybit`)
628    #[arg(
629        long = "exec",
630        default_value = "paper",
631        value_enum,
632        alias = "live-exec"
633    )]
634    exec: ExecutionBackend,
635    /// Path to persisted state (file for sqlite, directory for lmdb)
636    #[arg(long)]
637    state_path: Option<PathBuf>,
638    /// Persistence backend to use for runtime state (overrides config.live.persistence.engine)
639    #[arg(long, value_enum)]
640    persistence: Option<PersistenceBackend>,
641    #[arg(long)]
642    metrics_addr: Option<String>,
643    #[arg(long)]
644    log_path: Option<PathBuf>,
645    /// Directory where parquet market/trade data is recorded
646    #[arg(
647        long = "record-data",
648        value_name = "PATH",
649        default_value = "data/flight_recorder"
650    )]
651    record_data: PathBuf,
652    /// Control plane gRPC bind address (overrides config.live.control_addr)
653    #[arg(long)]
654    control_addr: Option<String>,
655    #[arg(long)]
656    initial_equity: Option<Decimal>,
657    #[arg(long)]
658    markets_file: Option<PathBuf>,
659    #[arg(long, default_value = "0")]
660    slippage_bps: Decimal,
661    #[arg(long, default_value = "0")]
662    fee_bps: Decimal,
663    #[arg(long, default_value_t = 0)]
664    latency_ms: u64,
665    #[arg(long, default_value_t = 512)]
666    history: usize,
667    #[arg(long)]
668    reconciliation_interval_secs: Option<u64>,
669    #[arg(long)]
670    reconciliation_threshold: Option<Decimal>,
671    #[arg(long)]
672    webhook_url: Option<String>,
673    #[arg(long)]
674    alert_max_data_gap_secs: Option<u64>,
675    #[arg(long)]
676    alert_max_order_failures: Option<u32>,
677    #[arg(long)]
678    alert_max_drawdown: Option<Decimal>,
679    #[arg(long)]
680    risk_max_order_qty: Option<Decimal>,
681    #[arg(long)]
682    risk_max_position_qty: Option<Decimal>,
683    #[arg(long)]
684    risk_max_drawdown: Option<Decimal>,
685    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50). Ignored for other connectors.
686    #[arg(long)]
687    orderbook_depth: Option<usize>,
688    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
689    #[arg(long, default_value = "fixed:1.0")]
690    sizer: String,
691}
692
693impl LiveRunArgs {
694    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
695        self.log_path
696            .clone()
697            .unwrap_or_else(|| config.live.log_path.clone())
698    }
699
700    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
701        let addr = self
702            .metrics_addr
703            .clone()
704            .unwrap_or_else(|| config.live.metrics_addr.clone());
705        addr.parse()
706            .with_context(|| format!("invalid metrics address '{addr}'"))
707    }
708
709    fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
710        let addr = self
711            .control_addr
712            .clone()
713            .unwrap_or_else(|| config.live.control_addr.clone());
714        addr.parse()
715            .with_context(|| format!("invalid control address '{addr}'"))
716    }
717
718    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
719        let secs = self
720            .reconciliation_interval_secs
721            .unwrap_or(config.live.reconciliation_interval_secs)
722            .max(1);
723        StdDuration::from_secs(secs)
724    }
725
726    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
727        let configured = self
728            .reconciliation_threshold
729            .unwrap_or(config.live.reconciliation_threshold);
730        if configured <= Decimal::ZERO {
731            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
732        } else {
733            configured
734        }
735    }
736
737    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
738        let mut balances = clone_initial_balances(&config.backtest);
739        if let Some(value) = self.initial_equity {
740            balances.insert(
741                config.backtest.reporting_currency.clone(),
742                value.max(Decimal::ZERO),
743            );
744        }
745        balances
746    }
747
748    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
749        let mut alerting = config.live.alerting.clone();
750        let webhook = self
751            .webhook_url
752            .clone()
753            .or_else(|| alerting.webhook_url.clone());
754        alerting.webhook_url = sanitize_webhook(webhook);
755        if let Some(sec) = self.alert_max_data_gap_secs {
756            alerting.max_data_gap_secs = sec;
757        }
758        if let Some(limit) = self.alert_max_order_failures {
759            alerting.max_order_failures = limit;
760        }
761        if let Some(limit) = self.alert_max_drawdown {
762            alerting.max_drawdown = limit.max(Decimal::ZERO);
763        }
764        alerting
765    }
766
767    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
768        let mut risk = config.risk_management.clone();
769        if let Some(limit) = self.risk_max_order_qty {
770            risk.max_order_quantity = limit.max(Decimal::ZERO);
771        }
772        if let Some(limit) = self.risk_max_position_qty {
773            risk.max_position_quantity = limit.max(Decimal::ZERO);
774        }
775        if let Some(limit) = self.risk_max_drawdown {
776            risk.max_drawdown = limit.max(Decimal::ZERO);
777        }
778        risk
779    }
780}
781
782#[derive(Deserialize)]
783struct StrategyConfigFile {
784    #[serde(rename = "strategy_name")]
785    name: String,
786    #[serde(default = "empty_table")]
787    params: toml::Value,
788}
789
790fn empty_table() -> toml::Value {
791    toml::Value::Table(Default::default())
792}
793
794pub async fn run() -> Result<()> {
795    let cli = Cli::parse();
796    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
797
798    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
799        0 => config.log_level.clone(),
800        1 => "debug".to_string(),
801        _ => "trace".to_string(),
802    });
803
804    let log_override = match &cli.command {
805        Commands::Live {
806            action: LiveCommand::Run(args),
807        } => Some(args.resolved_log_path(&config)),
808        _ => None,
809    };
810
811    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
812
813    match cli.command {
814        Commands::Data { action } => handle_data(action, &config).await?,
815        Commands::Backtest {
816            action: BacktestCommand::Run(args),
817        } => args.run(&config).await?,
818        Commands::Backtest {
819            action: BacktestCommand::Batch(args),
820        } => args.run(&config).await?,
821        Commands::Live {
822            action: LiveCommand::Run(args),
823        } => args.run(&config).await?,
824        Commands::State { action } => handle_state(action, &config).await?,
825        Commands::Analyze { action } => handle_analyze(action)?,
826        Commands::Strategies => list_strategies(),
827        Commands::Monitor(args) => args.run(&config).await?,
828    }
829
830    Ok(())
831}
832
833async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
834    match cmd {
835        DataCommand::Download(args) => {
836            args.run(config).await?;
837        }
838        DataCommand::Validate(args) => {
839            args.run()?;
840        }
841        DataCommand::Resample(args) => {
842            args.run()?;
843        }
844        DataCommand::InspectParquet(args) => {
845            args.run()?;
846        }
847    }
848    Ok(())
849}
850
851async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
852    match cmd {
853        StateCommand::Inspect(args) => {
854            state::inspect_state(
855                args.resolved_path(config),
856                args.resolved_engine(config),
857                args.raw,
858            )
859            .await?;
860        }
861    }
862    Ok(())
863}
864
865fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
866    match cmd {
867        AnalyzeCommand::Execution(args) => {
868            analyze::run_execution(args.build_request()?, args.export_csv.as_deref())
869        }
870    }
871}
872
873impl BacktestRunArgs {
874    async fn run(&self, config: &AppConfig) -> Result<()> {
875        let contents = std::fs::read_to_string(&self.strategy_config)
876            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
877        let def: StrategyConfigFile =
878            toml::from_str(&contents).context("failed to parse strategy config file")?;
879        let strategy = load_strategy(&def.name, def.params)
880            .with_context(|| format!("failed to configure strategy {}", def.name))?;
881        let symbols = strategy.subscriptions();
882        if symbols.is_empty() {
883            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
884        }
885
886        let mode = match self.mode {
887            BacktestModeArg::Candle => BacktestMode::Candle,
888            BacktestModeArg::Tick => BacktestMode::Tick,
889        };
890
891        let markets_path = self
892            .markets_file
893            .clone()
894            .or_else(|| config.backtest.markets_file.clone())
895            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
896        let market_registry = Arc::new(
897            MarketRegistry::load_from_file(&markets_path).with_context(|| {
898                format!("failed to load markets from {}", markets_path.display())
899            })?,
900        );
901
902        let (market_stream, event_stream, execution_client, matching_engine) = match mode {
903            BacktestMode::Candle => {
904                let stream = self.build_candle_stream(&symbols)?;
905                (
906                    Some(stream),
907                    None,
908                    Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
909                    None,
910                )
911            }
912            BacktestMode::Tick => {
913                if self.lob_paths.is_empty() {
914                    bail!("--lob-data is required when --mode tick");
915                }
916                let source = self.detect_lob_source()?;
917                let engine = Arc::new(MatchingEngine::new(
918                    "matching-engine",
919                    symbols.clone(),
920                    reporting_balance(&config.backtest),
921                ));
922                let stream = match source {
923                    LobSource::Json(paths) => {
924                        let events = load_lob_events_from_paths(&paths)?;
925                        if events.is_empty() {
926                            bail!("no order book events loaded from --lob-data");
927                        }
928                        stream_from_events(events)
929                    }
930                    LobSource::FlightRecorder(root) => self
931                        .build_flight_recorder_stream(&root, &symbols)
932                        .context("failed to initialize flight recorder stream")?,
933                };
934                (
935                    None,
936                    Some(stream),
937                    engine.clone() as Arc<dyn ExecutionClient>,
938                    Some(engine),
939                )
940            }
941        };
942
943        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
944        let order_quantity = self.quantity;
945        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
946
947        let mut cfg = BacktestConfig::new(symbols[0].clone());
948        cfg.order_quantity = order_quantity;
949        cfg.initial_balances = clone_initial_balances(&config.backtest);
950        cfg.reporting_currency = config.backtest.reporting_currency.clone();
951        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
952        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
953        cfg.execution.latency_candles = self.latency_candles.max(1);
954        cfg.mode = mode;
955
956        let report = Backtester::new(
957            cfg,
958            strategy,
959            execution,
960            matching_engine,
961            market_registry,
962            market_stream,
963            event_stream,
964        )
965        .run()
966        .await
967        .context("backtest failed")?;
968        print_report(&report);
969        Ok(())
970    }
971
972    fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
973        if symbols.is_empty() {
974            bail!("strategy did not declare any subscriptions");
975        }
976        if self.data_paths.is_empty() {
977            let mut generated = Vec::new();
978            for (idx, symbol) in symbols.iter().enumerate() {
979                let offset = idx as i64 * 10;
980                generated.extend(synth_candles(symbol, self.candles, offset));
981            }
982            generated.sort_by_key(|c| c.timestamp);
983            if generated.is_empty() {
984                bail!("no synthetic candles generated; provide --data files instead");
985            }
986            return Ok(memory_market_stream(&symbols[0], generated));
987        }
988
989        match detect_data_format(&self.data_paths)? {
990            DataFormat::Csv => {
991                let mut candles = load_candles_from_paths(&self.data_paths)?;
992                if candles.is_empty() {
993                    bail!("no candles loaded from --data paths");
994                }
995                candles.sort_by_key(|c| c.timestamp);
996                Ok(memory_market_stream(&symbols[0], candles))
997            }
998            DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
999        }
1000    }
1001
1002    fn detect_lob_source(&self) -> Result<LobSource> {
1003        if self.lob_paths.len() == 1 {
1004            let path = &self.lob_paths[0];
1005            if path.is_dir() {
1006                return Ok(LobSource::FlightRecorder(path.clone()));
1007            }
1008        }
1009        if self.lob_paths.iter().all(|path| path.is_file()) {
1010            return Ok(LobSource::Json(self.lob_paths.clone()));
1011        }
1012        bail!(
1013            "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
1014        )
1015    }
1016
1017    fn build_flight_recorder_stream(
1018        &self,
1019        root: &Path,
1020        symbols: &[Symbol],
1021    ) -> Result<MarketEventStream> {
1022        let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
1023            .into_stream()
1024            .map(|event| event.map(MarketEvent::from));
1025        Ok(Box::pin(stream))
1026    }
1027}
1028
1029impl BacktestBatchArgs {
1030    async fn run(&self, config: &AppConfig) -> Result<()> {
1031        if self.config_paths.is_empty() {
1032            return Err(anyhow!("provide at least one --config path"));
1033        }
1034        if self.data_paths.is_empty() {
1035            return Err(anyhow!("provide at least one --data path for batch mode"));
1036        }
1037        let markets_path = self
1038            .markets_file
1039            .clone()
1040            .or_else(|| config.backtest.markets_file.clone())
1041            .ok_or_else(|| {
1042                anyhow!("batch mode requires --markets-file or backtest.markets_file")
1043            })?;
1044        let market_registry = Arc::new(
1045            MarketRegistry::load_from_file(&markets_path).with_context(|| {
1046                format!("failed to load markets from {}", markets_path.display())
1047            })?,
1048        );
1049        let data_format = detect_data_format(&self.data_paths)?;
1050        let mut aggregated = Vec::new();
1051        for config_path in &self.config_paths {
1052            let contents = std::fs::read_to_string(config_path).with_context(|| {
1053                format!("failed to read strategy config {}", config_path.display())
1054            })?;
1055            let def: StrategyConfigFile =
1056                toml::from_str(&contents).context("failed to parse strategy config file")?;
1057            let strategy = load_strategy(&def.name, def.params)
1058                .with_context(|| format!("failed to configure strategy {}", def.name))?;
1059            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1060            let order_quantity = self.quantity;
1061            let symbols = strategy.subscriptions();
1062            if symbols.is_empty() {
1063                bail!("strategy {} did not declare subscriptions", strategy.name());
1064            }
1065            let stream = match data_format {
1066                DataFormat::Csv => {
1067                    let mut candles = load_candles_from_paths(&self.data_paths)?;
1068                    if candles.is_empty() {
1069                        bail!("no candles loaded from --data paths");
1070                    }
1071                    candles.sort_by_key(|c| c.timestamp);
1072                    memory_market_stream(&symbols[0], candles)
1073                }
1074                DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
1075            };
1076            let execution_client: Arc<dyn ExecutionClient> =
1077                Arc::new(PaperExecutionClient::default());
1078            let execution =
1079                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1080            let mut cfg = BacktestConfig::new(symbols[0].clone());
1081            cfg.order_quantity = order_quantity;
1082            cfg.initial_balances = clone_initial_balances(&config.backtest);
1083            cfg.reporting_currency = config.backtest.reporting_currency.clone();
1084            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1085            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1086            cfg.execution.latency_candles = self.latency_candles.max(1);
1087
1088            let report = Backtester::new(
1089                cfg,
1090                strategy,
1091                execution,
1092                None,
1093                market_registry.clone(),
1094                Some(stream),
1095                None,
1096            )
1097            .run()
1098            .await
1099            .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1100            aggregated.push(BatchRow {
1101                config: config_path.display().to_string(),
1102                signals: 0, // Legacy field, can be removed or calculated from report
1103                orders: 0,  // Legacy field, can be removed or calculated from report
1104                dropped_orders: 0, // Legacy field, can be removed or calculated from report
1105                ending_equity: report.ending_equity,
1106            });
1107        }
1108
1109        if let Some(output) = &self.output {
1110            write_batch_report(output, &aggregated)?;
1111            println!("Batch report written to {}", output.display());
1112        }
1113        if aggregated.is_empty() {
1114            return Err(anyhow!("no batch jobs executed"));
1115        }
1116        Ok(())
1117    }
1118}
1119
1120impl LiveRunArgs {
1121    async fn run(&self, config: &AppConfig) -> Result<()> {
1122        let exchange_cfg = config
1123            .exchange
1124            .get(&self.exchange)
1125            .cloned()
1126            .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
1127
1128        let driver = exchange_cfg.driver.clone();
1129
1130        let contents = fs::read_to_string(&self.strategy_config)
1131            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1132        let def: StrategyConfigFile =
1133            toml::from_str(&contents).context("failed to parse strategy config file")?;
1134        let strategy = load_strategy(&def.name, def.params)
1135            .with_context(|| format!("failed to configure strategy {}", def.name))?;
1136        let symbols = strategy.subscriptions();
1137        if symbols.is_empty() {
1138            bail!("strategy did not declare any subscriptions");
1139        }
1140        if self.quantity <= Decimal::ZERO {
1141            bail!("--quantity must be greater than zero");
1142        }
1143        let quantity = self.quantity;
1144        let initial_balances = self.resolved_initial_balances(config);
1145        let reporting_currency = config.backtest.reporting_currency.clone();
1146        let markets_file = self
1147            .markets_file
1148            .clone()
1149            .or_else(|| config.backtest.markets_file.clone());
1150
1151        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1152        let category =
1153            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1154        let metrics_addr = self.resolved_metrics_addr(config)?;
1155        let control_addr = self.resolved_control_addr(config)?;
1156        let persistence_cfg = config.live.persistence_config();
1157        let persistence_engine = self
1158            .persistence
1159            .map(PersistenceEngine::from)
1160            .unwrap_or(persistence_cfg.engine);
1161        let state_path = self
1162            .state_path
1163            .clone()
1164            .unwrap_or_else(|| persistence_cfg.path.clone());
1165        let persistence = PersistenceSettings::new(persistence_engine, state_path);
1166        let alerting = self.build_alerting(config);
1167        let history = self.history.max(32);
1168        let reconciliation_interval = self.reconciliation_interval(config);
1169        let reconciliation_threshold = self.reconciliation_threshold(config);
1170        let orderbook_depth = self
1171            .orderbook_depth
1172            .unwrap_or(super::live::default_order_book_depth());
1173
1174        let settings = LiveSessionSettings {
1175            category,
1176            interval,
1177            quantity,
1178            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1179            fee_bps: self.fee_bps.max(Decimal::ZERO),
1180            history,
1181            metrics_addr,
1182            persistence,
1183            initial_balances,
1184            reporting_currency,
1185            markets_file,
1186            alerting,
1187            exec_backend: self.exec,
1188            risk: self.build_risk_config(config),
1189            reconciliation_interval,
1190            reconciliation_threshold,
1191            driver,
1192            orderbook_depth,
1193            record_path: Some(self.record_data.clone()),
1194            control_addr,
1195        };
1196
1197        info!(
1198            strategy = %def.name,
1199            symbols = ?symbols,
1200            exchange = %self.exchange,
1201            interval = %self.interval,
1202            driver = ?settings.driver,
1203            exec = ?self.exec,
1204            persistence_engine = ?settings.persistence.engine,
1205            state_path = %settings.persistence.state_path.display(),
1206            control_addr = %settings.control_addr,
1207            "starting live session"
1208        );
1209
1210        run_live(strategy, symbols, exchange_cfg, settings)
1211            .await
1212            .context("live session failed")
1213    }
1214}
1215
1216fn list_strategies() {
1217    println!("Built-in strategies:");
1218    for name in builtin_strategy_names() {
1219        println!("- {name}");
1220    }
1221}
1222
1223fn print_validation_summary(outcome: &ValidationOutcome) {
1224    const MAX_EXAMPLES: usize = 5;
1225    let summary = &outcome.summary;
1226    println!(
1227        "Validation summary for {} ({} candles)",
1228        summary.symbol, summary.rows
1229    );
1230    println!(
1231        "  Range: {} -> {}",
1232        summary.start.to_rfc3339(),
1233        summary.end.to_rfc3339()
1234    );
1235    println!("  Interval: {}", interval_label(summary.interval));
1236    println!("  Missing intervals: {}", summary.missing_candles);
1237    println!("  Duplicate intervals: {}", summary.duplicate_candles);
1238    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
1239    println!("  Price spikes flagged: {}", summary.price_spike_count);
1240    println!(
1241        "  Cross-source mismatches: {}",
1242        summary.cross_mismatch_count
1243    );
1244    println!("  Repaired candles generated: {}", summary.repaired_candles);
1245
1246    if !outcome.gaps.is_empty() {
1247        println!("  Gap examples:");
1248        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1249            println!(
1250                "    {} -> {} (missing {})",
1251                gap.start.to_rfc3339(),
1252                gap.end.to_rfc3339(),
1253                gap.missing
1254            );
1255        }
1256        if outcome.gaps.len() > MAX_EXAMPLES {
1257            println!(
1258                "    ... {} additional gap(s) omitted",
1259                outcome.gaps.len() - MAX_EXAMPLES
1260            );
1261        }
1262    }
1263
1264    if !outcome.price_spikes.is_empty() {
1265        println!("  Price spike examples:");
1266        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1267            println!(
1268                "    {} (change {:.2}%)",
1269                spike.timestamp.to_rfc3339(),
1270                spike.change_fraction * 100.0
1271            );
1272        }
1273        if outcome.price_spikes.len() > MAX_EXAMPLES {
1274            println!(
1275                "    ... {} additional spike(s) omitted",
1276                outcome.price_spikes.len() - MAX_EXAMPLES
1277            );
1278        }
1279    }
1280
1281    if !outcome.cross_mismatches.is_empty() {
1282        println!("  Cross-source mismatch examples:");
1283        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1284            println!(
1285                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
1286                miss.timestamp.to_rfc3339(),
1287                miss.primary_close,
1288                miss.reference_close,
1289                miss.delta_fraction * 100.0
1290            );
1291        }
1292        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1293            println!(
1294                "    ... {} additional mismatch(es) omitted",
1295                outcome.cross_mismatches.len() - MAX_EXAMPLES
1296            );
1297        }
1298    }
1299}
1300
1301fn print_report(report: &PerformanceReport) {
1302    println!("\n{}", report);
1303}
1304
1305fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1306    let mut candles = Vec::with_capacity(len);
1307    for i in 0..len {
1308        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1309        let open = base + (i as f64 % 3.0) * 10.0;
1310        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1311        let open_dec =
1312            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1313        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1314        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1315        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1316        candles.push(Candle {
1317            symbol: Symbol::from(symbol),
1318            interval: Interval::OneMinute,
1319            open: open_dec,
1320            high,
1321            low,
1322            close: close_dec,
1323            volume: Decimal::ONE,
1324            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1325                + Duration::minutes(offset_minutes),
1326        });
1327    }
1328    candles
1329}
1330
1331fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1332    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1333        return Ok(dt.with_timezone(&Utc));
1334    }
1335    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1336        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1337    }
1338    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1339        let dt = date
1340            .and_hms_opt(0, 0, 0)
1341            .ok_or_else(|| anyhow!("invalid date"))?;
1342        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1343    }
1344    Err(anyhow!("unable to parse datetime '{value}'"))
1345}
1346
1347#[derive(Deserialize)]
1348struct CandleCsvRow {
1349    symbol: Option<String>,
1350    timestamp: String,
1351    open: f64,
1352    high: f64,
1353    low: f64,
1354    close: f64,
1355    volume: f64,
1356}
1357
1358fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1359    let mut candles = Vec::new();
1360    for path in paths {
1361        let mut reader = csv::Reader::from_path(path)
1362            .with_context(|| format!("failed to open {}", path.display()))?;
1363        for record in reader.deserialize::<CandleCsvRow>() {
1364            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1365            let timestamp = parse_datetime(&row.timestamp)?;
1366            let symbol = row
1367                .symbol
1368                .clone()
1369                .or_else(|| infer_symbol_from_path(path))
1370                .ok_or_else(|| {
1371                    anyhow!(
1372                        "missing symbol column and unable to infer from path {}",
1373                        path.display()
1374                    )
1375                })?;
1376            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1377            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1378                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1379            })?;
1380            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1381                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1382            })?;
1383            let low = Decimal::from_f64(row.low)
1384                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1385            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1386                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1387            })?;
1388            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1389                anyhow!(
1390                    "invalid volume value '{}' in {}",
1391                    row.volume,
1392                    path.display()
1393                )
1394            })?;
1395            candles.push(Candle {
1396                symbol,
1397                interval,
1398                open,
1399                high,
1400                low,
1401                close,
1402                volume,
1403                timestamp,
1404            });
1405        }
1406    }
1407    Ok(candles)
1408}
1409
1410#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1411enum DataFormat {
1412    Csv,
1413    Parquet,
1414}
1415
1416fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1417    let mut detected: Option<DataFormat> = None;
1418    for path in paths {
1419        let ext = path
1420            .extension()
1421            .and_then(|ext| ext.to_str())
1422            .map(|ext| ext.to_ascii_lowercase())
1423            .unwrap_or_else(|| String::from(""));
1424        let current = if ext == "parquet" {
1425            DataFormat::Parquet
1426        } else {
1427            DataFormat::Csv
1428        };
1429        if let Some(existing) = detected {
1430            if existing != current {
1431                bail!("cannot mix CSV and Parquet inputs in --data");
1432            }
1433        } else {
1434            detected = Some(current);
1435        }
1436    }
1437    detected.ok_or_else(|| anyhow!("no data paths provided"))
1438}
1439
1440fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1441    Box::new(PaperMarketStream::from_data(
1442        symbol.to_string(),
1443        Vec::new(),
1444        candles,
1445    ))
1446}
1447
1448fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1449    Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1450}
1451
1452#[derive(Deserialize)]
1453#[serde(tag = "event", rename_all = "lowercase")]
1454enum LobEventRow {
1455    Snapshot {
1456        timestamp: String,
1457        symbol: Option<String>,
1458        bids: Vec<[f64; 2]>,
1459        asks: Vec<[f64; 2]>,
1460    },
1461    Depth {
1462        timestamp: String,
1463        symbol: Option<String>,
1464        bids: Vec<[f64; 2]>,
1465        asks: Vec<[f64; 2]>,
1466    },
1467    Trade {
1468        timestamp: String,
1469        symbol: Option<String>,
1470        side: String,
1471        price: f64,
1472        size: f64,
1473    },
1474}
1475
1476fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1477    let mut events = Vec::new();
1478    for path in paths {
1479        let file = File::open(path)
1480            .with_context(|| format!("failed to open order book file {}", path.display()))?;
1481        let symbol_hint = infer_symbol_from_path(path);
1482        for line in BufReader::new(file).lines() {
1483            let line =
1484                line.with_context(|| format!("failed to read line from {}", path.display()))?;
1485            if line.trim().is_empty() {
1486                continue;
1487            }
1488            let row: LobEventRow = serde_json::from_str(&line)
1489                .with_context(|| format!("invalid order book event in {}", path.display()))?;
1490            match row {
1491                LobEventRow::Snapshot {
1492                    timestamp,
1493                    symbol,
1494                    bids,
1495                    asks,
1496                } => {
1497                    let ts = parse_datetime(&timestamp)?;
1498                    let symbol = symbol
1499                        .or_else(|| symbol_hint.clone())
1500                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1501                    let bids = convert_levels(&bids)?;
1502                    let asks = convert_levels(&asks)?;
1503                    let book = OrderBook {
1504                        symbol: symbol.clone(),
1505                        bids,
1506                        asks,
1507                        timestamp: ts,
1508                        exchange_checksum: None,
1509                        local_checksum: None,
1510                    };
1511                    events.push(MarketEvent {
1512                        timestamp: ts,
1513                        kind: MarketEventKind::OrderBook(book),
1514                    });
1515                }
1516                LobEventRow::Depth {
1517                    timestamp,
1518                    symbol,
1519                    bids,
1520                    asks,
1521                } => {
1522                    let ts = parse_datetime(&timestamp)?;
1523                    let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1524                        anyhow!("missing symbol in depth update {}", path.display())
1525                    })?;
1526                    let bids = convert_levels(&bids)?;
1527                    let asks = convert_levels(&asks)?;
1528                    let update = DepthUpdate {
1529                        symbol: symbol.clone(),
1530                        bids,
1531                        asks,
1532                        timestamp: ts,
1533                    };
1534                    events.push(MarketEvent {
1535                        timestamp: ts,
1536                        kind: MarketEventKind::Depth(update),
1537                    });
1538                }
1539                LobEventRow::Trade {
1540                    timestamp,
1541                    symbol,
1542                    side,
1543                    price,
1544                    size,
1545                } => {
1546                    let ts = parse_datetime(&timestamp)?;
1547                    let symbol = symbol
1548                        .or_else(|| symbol_hint.clone())
1549                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1550                    let side = match side.to_lowercase().as_str() {
1551                        "buy" | "bid" | "b" => Side::Buy,
1552                        "sell" | "ask" | "s" => Side::Sell,
1553                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
1554                    };
1555                    let price = Decimal::from_f64(price).ok_or_else(|| {
1556                        anyhow!("invalid trade price '{}' in {}", price, path.display())
1557                    })?;
1558                    let size = Decimal::from_f64(size).ok_or_else(|| {
1559                        anyhow!("invalid trade size '{}' in {}", size, path.display())
1560                    })?;
1561                    let tick = Tick {
1562                        symbol: symbol.clone(),
1563                        price,
1564                        size,
1565                        side,
1566                        exchange_timestamp: ts,
1567                        received_at: ts,
1568                    };
1569                    events.push(MarketEvent {
1570                        timestamp: ts,
1571                        kind: MarketEventKind::Trade(tick),
1572                    });
1573                }
1574            }
1575        }
1576    }
1577    events.sort_by_key(|event| event.timestamp);
1578    Ok(events)
1579}
1580
1581fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1582    levels
1583        .iter()
1584        .map(|pair| {
1585            let price = Decimal::from_f64(pair[0])
1586                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1587            let size = Decimal::from_f64(pair[1])
1588                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1589            Ok(OrderBookLevel { price, size })
1590        })
1591        .collect()
1592}
1593
1594fn infer_symbol_from_path(path: &Path) -> Option<String> {
1595    path.parent()
1596        .and_then(|p| p.file_name())
1597        .map(|os| os.to_string_lossy().to_string())
1598}
1599
1600fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1601    path.file_stem()
1602        .and_then(|os| os.to_str())
1603        .and_then(|stem| stem.split('_').next())
1604        .and_then(|token| Interval::from_str(token).ok())
1605}
1606
1607fn default_output_path(
1608    config: &AppConfig,
1609    exchange: &str,
1610    symbol: &str,
1611    interval: Interval,
1612    start: DateTime<Utc>,
1613    end: DateTime<Utc>,
1614) -> PathBuf {
1615    let interval_part = interval_label(interval);
1616    let start_part = start.format("%Y%m%d").to_string();
1617    let end_part = end.format("%Y%m%d").to_string();
1618    config
1619        .data_path
1620        .join(exchange)
1621        .join(symbol)
1622        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1623}
1624
1625fn interval_label(interval: Interval) -> &'static str {
1626    match interval {
1627        Interval::OneSecond => "1s",
1628        Interval::OneMinute => "1m",
1629        Interval::FiveMinutes => "5m",
1630        Interval::FifteenMinutes => "15m",
1631        Interval::OneHour => "1h",
1632        Interval::FourHours => "4h",
1633        Interval::OneDay => "1d",
1634    }
1635}
1636
1637#[derive(Serialize)]
1638struct CandleRow<'a> {
1639    symbol: &'a str,
1640    timestamp: String,
1641    open: f64,
1642    high: f64,
1643    low: f64,
1644    close: f64,
1645    volume: f64,
1646}
1647
1648fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1649    if let Some(parent) = path.parent() {
1650        fs::create_dir_all(parent)
1651            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1652    }
1653    let mut writer =
1654        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1655    for candle in candles {
1656        let row = CandleRow {
1657            symbol: &candle.symbol,
1658            timestamp: candle.timestamp.to_rfc3339(),
1659            open: candle.open.to_f64().unwrap_or(0.0),
1660            high: candle.high.to_f64().unwrap_or(0.0),
1661            low: candle.low.to_f64().unwrap_or(0.0),
1662            close: candle.close.to_f64().unwrap_or(0.0),
1663            volume: candle.volume.to_f64().unwrap_or(0.0),
1664        };
1665        writer.serialize(row)?;
1666    }
1667    writer.flush()?;
1668    Ok(())
1669}
1670
1671#[derive(Serialize)]
1672struct BatchRow {
1673    config: String,
1674    signals: usize,
1675    orders: usize,
1676    dropped_orders: usize,
1677    ending_equity: f64,
1678}
1679
1680fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1681    if let Some(parent) = path.parent() {
1682        fs::create_dir_all(parent)
1683            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1684    }
1685    let mut writer =
1686        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1687    for row in rows {
1688        writer.serialize(row)?;
1689    }
1690    writer.flush()?;
1691    Ok(())
1692}
1693
1694fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1695    config
1696        .initial_balances
1697        .iter()
1698        .map(|(currency, amount)| (currency.clone(), *amount))
1699        .collect()
1700}
1701
1702fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1703    config
1704        .initial_balances
1705        .get(&config.reporting_currency)
1706        .copied()
1707        .unwrap_or_default()
1708}
1709
1710fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1711    let parts: Vec<_> = value.split(':').collect();
1712    match parts.as_slice() {
1713        ["fixed", val] => {
1714            let quantity =
1715                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1716            Ok(Box::new(FixedOrderSizer { quantity }))
1717        }
1718        ["fixed"] => {
1719            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1720            Ok(Box::new(FixedOrderSizer { quantity }))
1721        }
1722        ["percent", val] => {
1723            let percent =
1724                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1725            Ok(Box::new(PortfolioPercentSizer {
1726                percent: percent.max(Decimal::ZERO),
1727            }))
1728        }
1729        ["risk-adjusted", val] => {
1730            let risk_fraction = Decimal::from_str(val)
1731                .context("invalid risk fraction value (use decimals)")?;
1732            Ok(Box::new(RiskAdjustedSizer {
1733                risk_fraction: risk_fraction.max(Decimal::ZERO),
1734            }))
1735        }
1736        _ => Err(anyhow!(
1737            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1738        )),
1739    }
1740}