tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5    run_live, ExecutionBackend, LiveSessionSettings, PersistenceBackend, PersistenceSettings,
6};
7use crate::state;
8use crate::telemetry::init_tracing;
9use crate::PublicChannel;
10use arrow::util::pretty::print_batches;
11use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
12use std::collections::HashMap;
13use std::fs::{self, File};
14use std::io::{BufRead, BufReader};
15use std::net::SocketAddr;
16use std::path::{Path, PathBuf};
17use std::str::FromStr;
18use std::sync::Arc;
19use std::time::Duration as StdDuration;
20
21use anyhow::{anyhow, bail, Context, Result};
22use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
23use clap::{Args, Parser, Subcommand, ValueEnum};
24use csv::Writer;
25use futures::StreamExt;
26use rust_decimal::{
27    prelude::{FromPrimitive, ToPrimitive},
28    Decimal,
29};
30use serde::{Deserialize, Serialize};
31use tesser_backtester::reporting::PerformanceReport;
32use tesser_backtester::{
33    stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
34    MarketEventKind, MarketEventStream,
35};
36use tesser_broker::ExecutionClient;
37use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
38use tesser_core::{Candle, DepthUpdate, Interval, OrderBook, OrderBookLevel, Side, Symbol, Tick};
39use tesser_data::analytics::ExecutionAnalysisRequest;
40use tesser_data::download::{BinanceDownloader, BybitDownloader, KlineRequest};
41use tesser_data::merger::UnifiedEventStream;
42use tesser_data::parquet::ParquetMarketStream;
43use tesser_execution::{
44    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PortfolioPercentSizer,
45    RiskAdjustedSizer,
46};
47use tesser_markets::MarketRegistry;
48use tesser_paper::{MatchingEngine, PaperExecutionClient, PaperMarketStream};
49use tesser_strategy::{builtin_strategy_names, load_strategy};
50use tracing::{info, warn};
51
52#[derive(Parser)]
53#[command(author, version, about = "Tesser Trading Framework")]
54pub struct Cli {
55    /// Increases logging verbosity (-v debug, -vv trace)
56    #[arg(short, long, action = clap::ArgAction::Count)]
57    verbose: u8,
58    /// Selects which configuration environment to load (maps to config/{env}.toml)
59    #[arg(long, default_value = "default")]
60    env: String,
61    #[command(subcommand)]
62    command: Commands,
63}
64
65#[allow(clippy::large_enum_variant)]
66#[derive(Subcommand)]
67pub enum Commands {
68    /// Data engineering tasks
69    Data {
70        #[command(subcommand)]
71        action: DataCommand,
72    },
73    /// Backtesting workflows
74    Backtest {
75        #[command(subcommand)]
76        action: BacktestCommand,
77    },
78    /// Live trading workflows
79    Live {
80        #[command(subcommand)]
81        action: LiveCommand,
82    },
83    /// Inspect or repair persisted runtime state
84    State {
85        #[command(subcommand)]
86        action: StateCommand,
87    },
88    /// Strategy management helpers
89    Strategies,
90    /// Analyze previously recorded trading sessions
91    Analyze {
92        #[command(subcommand)]
93        action: AnalyzeCommand,
94    },
95}
96
97#[derive(Subcommand)]
98pub enum DataCommand {
99    /// Download historical market data
100    Download(DataDownloadArgs),
101    /// Validate and optionally repair a local data set
102    Validate(DataValidateArgs),
103    /// Resample existing data (placeholder)
104    Resample(DataResampleArgs),
105    /// Inspect a parquet file emitted by the flight recorder
106    InspectParquet(DataInspectParquetArgs),
107}
108
109#[derive(Subcommand)]
110pub enum BacktestCommand {
111    /// Run a single backtest from a strategy config file
112    Run(BacktestRunArgs),
113    /// Run multiple strategy configs and aggregate the results
114    Batch(BacktestBatchArgs),
115}
116
117#[derive(Subcommand)]
118pub enum LiveCommand {
119    /// Start a live trading session (scaffolding)
120    Run(LiveRunArgs),
121}
122
123#[derive(Subcommand)]
124pub enum StateCommand {
125    /// Inspect the persisted live state snapshot
126    Inspect(StateInspectArgs),
127}
128
129#[derive(Subcommand)]
130pub enum AnalyzeCommand {
131    /// Generate a TCA-lite execution report using flight recorder files
132    Execution(AnalyzeExecutionArgs),
133}
134
135#[derive(Args)]
136pub struct DataDownloadArgs {
137    #[arg(long, default_value = "bybit")]
138    exchange: String,
139    #[arg(long)]
140    symbol: String,
141    #[arg(long, default_value = "linear")]
142    category: String,
143    #[arg(long, default_value = "1m")]
144    interval: String,
145    #[arg(long)]
146    start: String,
147    #[arg(long)]
148    end: Option<String>,
149    #[arg(long)]
150    output: Option<PathBuf>,
151    /// Skip automatic validation after download completes
152    #[arg(long)]
153    skip_validation: bool,
154    /// Attempt to repair gaps detected during validation
155    #[arg(long)]
156    repair_missing: bool,
157    /// Max allowed close-to-close jump when auto-validating (fractional)
158    #[arg(long, default_value_t = 0.05)]
159    validation_jump_threshold: f64,
160    /// Allowed divergence between primary and reference closes (fractional)
161    #[arg(long, default_value_t = 0.002)]
162    validation_reference_tolerance: f64,
163}
164
165#[derive(Args)]
166pub struct StateInspectArgs {
167    /// Path to the persisted state (file for sqlite, directory for lmdb)
168    #[arg(long)]
169    path: Option<PathBuf>,
170    /// Emit the raw JSON payload stored inside the database
171    #[arg(long)]
172    raw: bool,
173}
174
175#[derive(Args)]
176pub struct AnalyzeExecutionArgs {
177    /// Directory containing flight recorder parquet partitions
178    #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
179    data_dir: PathBuf,
180    /// Inclusive start of the analysis window (RFC3339, `YYYY-mm-dd`, etc.)
181    #[arg(long)]
182    start: Option<String>,
183    /// Inclusive end of the analysis window
184    #[arg(long)]
185    end: Option<String>,
186}
187
188impl StateInspectArgs {
189    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
190        self.path
191            .clone()
192            .unwrap_or_else(|| config.live.persistence_config().path.clone())
193    }
194
195    fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
196        config.live.persistence_config().engine
197    }
198}
199
200impl AnalyzeExecutionArgs {
201    fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
202        let start = match &self.start {
203            Some(value) => Some(parse_datetime(value)?),
204            None => None,
205        };
206        let end = match &self.end {
207            Some(value) => Some(parse_datetime(value)?),
208            None => None,
209        };
210        Ok(ExecutionAnalysisRequest {
211            data_dir: self.data_dir.clone(),
212            start,
213            end,
214        })
215    }
216}
217
218impl DataDownloadArgs {
219    async fn run(&self, config: &AppConfig) -> Result<()> {
220        let exchange_cfg = config
221            .exchange
222            .get(&self.exchange)
223            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
224        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
225        let start = parse_datetime(&self.start)?;
226        let end = match &self.end {
227            Some(value) => parse_datetime(value)?,
228            None => Utc::now(),
229        };
230        if start >= end {
231            return Err(anyhow!("start time must be earlier than end time"));
232        }
233
234        info!(
235            "Downloading {} candles for {} ({})",
236            self.interval, self.symbol, self.exchange
237        );
238        let mut candles = match exchange_cfg.driver.as_str() {
239            "bybit" | "" => {
240                let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
241                let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
242                downloader
243                    .download_klines(&request)
244                    .await
245                    .with_context(|| "failed to download candles from Bybit")?
246            }
247            "binance" => {
248                let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
249                let request = KlineRequest::new("", &self.symbol, interval, start, end);
250                downloader
251                    .download_klines(&request)
252                    .await
253                    .with_context(|| "failed to download candles from Binance")?
254            }
255            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
256        };
257
258        if candles.is_empty() {
259            info!("No candles returned for {}", self.symbol);
260            return Ok(());
261        }
262
263        if !self.skip_validation {
264            let config = ValidationConfig {
265                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
266                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
267                repair_missing: self.repair_missing,
268            };
269            let outcome =
270                validate_dataset(candles.clone(), None, config).context("validation failed")?;
271            print_validation_summary(&outcome);
272            if self.repair_missing && outcome.summary.repaired_candles > 0 {
273                candles = outcome.repaired;
274                info!(
275                    "Applied {} synthetic candle(s) to repair gaps",
276                    outcome.summary.repaired_candles
277                );
278            }
279        }
280
281        let output_path = self.output.clone().unwrap_or_else(|| {
282            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
283        });
284        write_candles_csv(&output_path, &candles)?;
285        info!(
286            "Saved {} candles to {}",
287            candles.len(),
288            output_path.display()
289        );
290        Ok(())
291    }
292}
293
294#[derive(Args)]
295pub struct DataValidateArgs {
296    /// One or more CSV files to inspect
297    #[arg(
298        long = "path",
299        value_name = "PATH",
300        num_args = 1..,
301        action = clap::ArgAction::Append
302    )]
303    paths: Vec<PathBuf>,
304    /// Optional reference data set(s) used for cross validation
305    #[arg(
306        long = "reference",
307        value_name = "PATH",
308        num_args = 1..,
309        action = clap::ArgAction::Append
310    )]
311    reference_paths: Vec<PathBuf>,
312    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
313    #[arg(long, default_value_t = 0.05)]
314    jump_threshold: f64,
315    /// Allowed divergence between primary and reference closes (fractional)
316    #[arg(long, default_value_t = 0.002)]
317    reference_tolerance: f64,
318    /// Attempt to fill gaps by synthesizing candles
319    #[arg(long)]
320    repair_missing: bool,
321    /// Location to write the repaired dataset
322    #[arg(long)]
323    output: Option<PathBuf>,
324}
325
326impl DataValidateArgs {
327    fn run(&self) -> Result<()> {
328        if self.paths.is_empty() {
329            bail!("provide at least one --path for validation");
330        }
331        let candles =
332            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
333        if candles.is_empty() {
334            bail!("loaded dataset is empty; nothing to validate");
335        }
336        let reference = if self.reference_paths.is_empty() {
337            None
338        } else {
339            Some(
340                load_candles_from_paths(&self.reference_paths)
341                    .with_context(|| "failed to load reference dataset")?,
342            )
343        };
344
345        let price_jump_threshold = if self.jump_threshold <= 0.0 {
346            0.0001
347        } else {
348            self.jump_threshold
349        };
350        let reference_tolerance = if self.reference_tolerance <= 0.0 {
351            0.0001
352        } else {
353            self.reference_tolerance
354        };
355
356        let config = ValidationConfig {
357            price_jump_threshold,
358            reference_tolerance,
359            repair_missing: self.repair_missing,
360        };
361
362        let outcome = validate_dataset(candles, reference, config)?;
363        print_validation_summary(&outcome);
364
365        if let Some(output) = &self.output {
366            write_candles_csv(output, &outcome.repaired)?;
367            info!(
368                "Wrote {} candles ({} new) to {}",
369                outcome.repaired.len(),
370                outcome.summary.repaired_candles,
371                output.display()
372            );
373        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
374            warn!(
375                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
376                outcome.summary.repaired_candles
377            );
378        }
379
380        Ok(())
381    }
382}
383
384#[derive(Args)]
385pub struct DataResampleArgs {
386    #[arg(long)]
387    input: PathBuf,
388    #[arg(long)]
389    output: PathBuf,
390    #[arg(long, default_value = "1h")]
391    interval: String,
392}
393
394#[derive(Args)]
395pub struct DataInspectParquetArgs {
396    /// Path to the parquet file produced by the flight recorder
397    #[arg(value_name = "PATH")]
398    path: PathBuf,
399    /// Number of rows to display (0 prints the entire file)
400    #[arg(long, default_value_t = 10)]
401    rows: usize,
402}
403
404impl DataInspectParquetArgs {
405    fn run(&self) -> Result<()> {
406        let limit = if self.rows == 0 {
407            usize::MAX
408        } else {
409            self.rows
410        };
411        let file = File::open(&self.path)
412            .with_context(|| format!("failed to open {}", self.path.display()))?;
413        let batch_size = limit.clamp(1, 8192);
414        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
415            .with_batch_size(batch_size)
416            .build()?;
417
418        let mut printed = 0usize;
419        while printed < limit {
420            match reader.next() {
421                Some(Ok(batch)) => {
422                    if batch.num_rows() == 0 {
423                        continue;
424                    }
425                    let remaining = limit.saturating_sub(printed);
426                    let take = remaining.min(batch.num_rows());
427                    let display_batch = if take == batch.num_rows() {
428                        batch
429                    } else {
430                        batch.slice(0, take)
431                    };
432                    print_batches(std::slice::from_ref(&display_batch))?;
433                    printed += take;
434                }
435                Some(Err(err)) => return Err(err.into()),
436                None => break,
437            }
438        }
439
440        if printed == 0 {
441            println!("no rows available in {}", self.path.display());
442        } else if limit != usize::MAX {
443            println!("displayed {printed} row(s) from {}", self.path.display());
444        }
445
446        Ok(())
447    }
448}
449
450#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
451pub enum BacktestModeArg {
452    Candle,
453    Tick,
454}
455
456#[derive(Args)]
457pub struct BacktestRunArgs {
458    #[arg(long)]
459    strategy_config: PathBuf,
460    /// One or more CSV files with historical candles (symbol,timestamp,...)
461    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
462    data_paths: Vec<PathBuf>,
463    #[arg(long, default_value_t = 500)]
464    candles: usize,
465    #[arg(long, default_value = "0.01")]
466    quantity: Decimal,
467    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
468    #[arg(long, default_value = "0")]
469    slippage_bps: Decimal,
470    /// Trading fees in basis points applied to notional
471    #[arg(long, default_value = "0")]
472    fee_bps: Decimal,
473    /// Number of candles between signal and execution
474    #[arg(long, default_value_t = 1)]
475    latency_candles: usize,
476    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
477    #[arg(long, default_value = "fixed:0.01")]
478    sizer: String,
479    /// Selects the data source driving fills (`candle` or `tick`)
480    #[arg(long, value_enum, default_value = "candle")]
481    mode: BacktestModeArg,
482    /// One or more JSONL files or a flight-recorder directory containing tick/order book events (required for `--mode tick`)
483    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
484    lob_paths: Vec<PathBuf>,
485    #[arg(long)]
486    markets_file: Option<PathBuf>,
487}
488
489enum LobSource {
490    Json(Vec<PathBuf>),
491    FlightRecorder(PathBuf),
492}
493
494#[derive(Args)]
495pub struct BacktestBatchArgs {
496    /// Glob or directory containing strategy config files
497    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
498    config_paths: Vec<PathBuf>,
499    /// Candle CSVs available to every strategy
500    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
501    data_paths: Vec<PathBuf>,
502    #[arg(long, default_value = "0.01")]
503    quantity: Decimal,
504    /// Optional output CSV summarizing results
505    #[arg(long)]
506    output: Option<PathBuf>,
507    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
508    #[arg(long, default_value = "0")]
509    slippage_bps: Decimal,
510    /// Trading fees in basis points applied to notional
511    #[arg(long, default_value = "0")]
512    fee_bps: Decimal,
513    /// Number of candles between signal and execution
514    #[arg(long, default_value_t = 1)]
515    latency_candles: usize,
516    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
517    #[arg(long, default_value = "fixed:0.01")]
518    sizer: String,
519    #[arg(long)]
520    markets_file: Option<PathBuf>,
521}
522
523#[derive(Args)]
524pub struct LiveRunArgs {
525    #[arg(long)]
526    strategy_config: PathBuf,
527    #[arg(long, default_value = "paper_sandbox")]
528    exchange: String,
529    #[arg(long, default_value = "linear")]
530    category: String,
531    #[arg(long, default_value = "1m")]
532    interval: String,
533    #[arg(long, default_value = "1")]
534    quantity: Decimal,
535    /// Selects which execution backend to use (`paper` or `bybit`)
536    #[arg(
537        long = "exec",
538        default_value = "paper",
539        value_enum,
540        alias = "live-exec"
541    )]
542    exec: ExecutionBackend,
543    /// Path to persisted state (file for sqlite, directory for lmdb)
544    #[arg(long)]
545    state_path: Option<PathBuf>,
546    /// Persistence backend to use for runtime state (overrides config.live.persistence.engine)
547    #[arg(long, value_enum)]
548    persistence: Option<PersistenceBackend>,
549    #[arg(long)]
550    metrics_addr: Option<String>,
551    #[arg(long)]
552    log_path: Option<PathBuf>,
553    /// Directory where parquet market/trade data is recorded
554    #[arg(
555        long = "record-data",
556        value_name = "PATH",
557        default_value = "data/flight_recorder"
558    )]
559    record_data: PathBuf,
560    /// Control plane gRPC bind address (overrides config.live.control_addr)
561    #[arg(long)]
562    control_addr: Option<String>,
563    #[arg(long)]
564    initial_equity: Option<Decimal>,
565    #[arg(long)]
566    markets_file: Option<PathBuf>,
567    #[arg(long, default_value = "0")]
568    slippage_bps: Decimal,
569    #[arg(long, default_value = "0")]
570    fee_bps: Decimal,
571    #[arg(long, default_value_t = 0)]
572    latency_ms: u64,
573    #[arg(long, default_value_t = 512)]
574    history: usize,
575    #[arg(long)]
576    reconciliation_interval_secs: Option<u64>,
577    #[arg(long)]
578    reconciliation_threshold: Option<Decimal>,
579    #[arg(long)]
580    webhook_url: Option<String>,
581    #[arg(long)]
582    alert_max_data_gap_secs: Option<u64>,
583    #[arg(long)]
584    alert_max_order_failures: Option<u32>,
585    #[arg(long)]
586    alert_max_drawdown: Option<Decimal>,
587    #[arg(long)]
588    risk_max_order_qty: Option<Decimal>,
589    #[arg(long)]
590    risk_max_position_qty: Option<Decimal>,
591    #[arg(long)]
592    risk_max_drawdown: Option<Decimal>,
593    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50). Ignored for other connectors.
594    #[arg(long)]
595    orderbook_depth: Option<usize>,
596    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
597    #[arg(long, default_value = "fixed:1.0")]
598    sizer: String,
599}
600
601impl LiveRunArgs {
602    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
603        self.log_path
604            .clone()
605            .unwrap_or_else(|| config.live.log_path.clone())
606    }
607
608    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
609        let addr = self
610            .metrics_addr
611            .clone()
612            .unwrap_or_else(|| config.live.metrics_addr.clone());
613        addr.parse()
614            .with_context(|| format!("invalid metrics address '{addr}'"))
615    }
616
617    fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
618        let addr = self
619            .control_addr
620            .clone()
621            .unwrap_or_else(|| config.live.control_addr.clone());
622        addr.parse()
623            .with_context(|| format!("invalid control address '{addr}'"))
624    }
625
626    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
627        let secs = self
628            .reconciliation_interval_secs
629            .unwrap_or(config.live.reconciliation_interval_secs)
630            .max(1);
631        StdDuration::from_secs(secs)
632    }
633
634    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
635        let configured = self
636            .reconciliation_threshold
637            .unwrap_or(config.live.reconciliation_threshold);
638        if configured <= Decimal::ZERO {
639            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
640        } else {
641            configured
642        }
643    }
644
645    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<Symbol, Decimal> {
646        let mut balances = clone_initial_balances(&config.backtest);
647        if let Some(value) = self.initial_equity {
648            balances.insert(
649                config.backtest.reporting_currency.clone(),
650                value.max(Decimal::ZERO),
651            );
652        }
653        balances
654    }
655
656    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
657        let mut alerting = config.live.alerting.clone();
658        let webhook = self
659            .webhook_url
660            .clone()
661            .or_else(|| alerting.webhook_url.clone());
662        alerting.webhook_url = sanitize_webhook(webhook);
663        if let Some(sec) = self.alert_max_data_gap_secs {
664            alerting.max_data_gap_secs = sec;
665        }
666        if let Some(limit) = self.alert_max_order_failures {
667            alerting.max_order_failures = limit;
668        }
669        if let Some(limit) = self.alert_max_drawdown {
670            alerting.max_drawdown = limit.max(Decimal::ZERO);
671        }
672        alerting
673    }
674
675    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
676        let mut risk = config.risk_management.clone();
677        if let Some(limit) = self.risk_max_order_qty {
678            risk.max_order_quantity = limit.max(Decimal::ZERO);
679        }
680        if let Some(limit) = self.risk_max_position_qty {
681            risk.max_position_quantity = limit.max(Decimal::ZERO);
682        }
683        if let Some(limit) = self.risk_max_drawdown {
684            risk.max_drawdown = limit.max(Decimal::ZERO);
685        }
686        risk
687    }
688}
689
690#[derive(Deserialize)]
691struct StrategyConfigFile {
692    #[serde(rename = "strategy_name")]
693    name: String,
694    #[serde(default = "empty_table")]
695    params: toml::Value,
696}
697
698fn empty_table() -> toml::Value {
699    toml::Value::Table(Default::default())
700}
701
702pub async fn run() -> Result<()> {
703    let cli = Cli::parse();
704    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
705
706    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
707        0 => config.log_level.clone(),
708        1 => "debug".to_string(),
709        _ => "trace".to_string(),
710    });
711
712    let log_override = match &cli.command {
713        Commands::Live {
714            action: LiveCommand::Run(args),
715        } => Some(args.resolved_log_path(&config)),
716        _ => None,
717    };
718
719    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
720
721    match cli.command {
722        Commands::Data { action } => handle_data(action, &config).await?,
723        Commands::Backtest {
724            action: BacktestCommand::Run(args),
725        } => args.run(&config).await?,
726        Commands::Backtest {
727            action: BacktestCommand::Batch(args),
728        } => args.run(&config).await?,
729        Commands::Live {
730            action: LiveCommand::Run(args),
731        } => args.run(&config).await?,
732        Commands::State { action } => handle_state(action, &config).await?,
733        Commands::Analyze { action } => handle_analyze(action)?,
734        Commands::Strategies => list_strategies(),
735    }
736
737    Ok(())
738}
739
740async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
741    match cmd {
742        DataCommand::Download(args) => {
743            args.run(config).await?;
744        }
745        DataCommand::Validate(args) => {
746            args.run()?;
747        }
748        DataCommand::Resample(args) => {
749            info!(
750                "stub: resampling {} into {} at {}",
751                args.input.display(),
752                args.output.display(),
753                args.interval
754            );
755        }
756        DataCommand::InspectParquet(args) => {
757            args.run()?;
758        }
759    }
760    Ok(())
761}
762
763async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
764    match cmd {
765        StateCommand::Inspect(args) => {
766            state::inspect_state(
767                args.resolved_path(config),
768                args.resolved_engine(config),
769                args.raw,
770            )
771            .await?;
772        }
773    }
774    Ok(())
775}
776
777fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
778    match cmd {
779        AnalyzeCommand::Execution(args) => analyze::run_execution(args.build_request()?),
780    }
781}
782
783impl BacktestRunArgs {
784    async fn run(&self, config: &AppConfig) -> Result<()> {
785        let contents = std::fs::read_to_string(&self.strategy_config)
786            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
787        let def: StrategyConfigFile =
788            toml::from_str(&contents).context("failed to parse strategy config file")?;
789        let strategy = load_strategy(&def.name, def.params)
790            .with_context(|| format!("failed to configure strategy {}", def.name))?;
791        let symbols = strategy.subscriptions();
792        if symbols.is_empty() {
793            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
794        }
795
796        let mode = match self.mode {
797            BacktestModeArg::Candle => BacktestMode::Candle,
798            BacktestModeArg::Tick => BacktestMode::Tick,
799        };
800
801        let markets_path = self
802            .markets_file
803            .clone()
804            .or_else(|| config.backtest.markets_file.clone())
805            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
806        let market_registry = Arc::new(
807            MarketRegistry::load_from_file(&markets_path).with_context(|| {
808                format!("failed to load markets from {}", markets_path.display())
809            })?,
810        );
811
812        let (market_stream, event_stream, execution_client, matching_engine) = match mode {
813            BacktestMode::Candle => {
814                let stream = self.build_candle_stream(&symbols)?;
815                (
816                    Some(stream),
817                    None,
818                    Arc::new(PaperExecutionClient::default()) as Arc<dyn ExecutionClient>,
819                    None,
820                )
821            }
822            BacktestMode::Tick => {
823                if self.lob_paths.is_empty() {
824                    bail!("--lob-data is required when --mode tick");
825                }
826                let source = self.detect_lob_source()?;
827                let engine = Arc::new(MatchingEngine::new(
828                    "matching-engine",
829                    symbols.clone(),
830                    reporting_balance(&config.backtest),
831                ));
832                let stream = match source {
833                    LobSource::Json(paths) => {
834                        let events = load_lob_events_from_paths(&paths)?;
835                        if events.is_empty() {
836                            bail!("no order book events loaded from --lob-data");
837                        }
838                        stream_from_events(events)
839                    }
840                    LobSource::FlightRecorder(root) => self
841                        .build_flight_recorder_stream(&root, &symbols)
842                        .context("failed to initialize flight recorder stream")?,
843                };
844                (
845                    None,
846                    Some(stream),
847                    engine.clone() as Arc<dyn ExecutionClient>,
848                    Some(engine),
849                )
850            }
851        };
852
853        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
854        let order_quantity = self.quantity;
855        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
856
857        let mut cfg = BacktestConfig::new(symbols[0].clone());
858        cfg.order_quantity = order_quantity;
859        cfg.initial_balances = clone_initial_balances(&config.backtest);
860        cfg.reporting_currency = config.backtest.reporting_currency.clone();
861        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
862        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
863        cfg.execution.latency_candles = self.latency_candles.max(1);
864        cfg.mode = mode;
865
866        let report = Backtester::new(
867            cfg,
868            strategy,
869            execution,
870            matching_engine,
871            market_registry,
872            market_stream,
873            event_stream,
874        )
875        .run()
876        .await
877        .context("backtest failed")?;
878        print_report(&report);
879        Ok(())
880    }
881
882    fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
883        if symbols.is_empty() {
884            bail!("strategy did not declare any subscriptions");
885        }
886        if self.data_paths.is_empty() {
887            let mut generated = Vec::new();
888            for (idx, symbol) in symbols.iter().enumerate() {
889                let offset = idx as i64 * 10;
890                generated.extend(synth_candles(symbol, self.candles, offset));
891            }
892            generated.sort_by_key(|c| c.timestamp);
893            if generated.is_empty() {
894                bail!("no synthetic candles generated; provide --data files instead");
895            }
896            return Ok(memory_market_stream(&symbols[0], generated));
897        }
898
899        match detect_data_format(&self.data_paths)? {
900            DataFormat::Csv => {
901                let mut candles = load_candles_from_paths(&self.data_paths)?;
902                if candles.is_empty() {
903                    bail!("no candles loaded from --data paths");
904                }
905                candles.sort_by_key(|c| c.timestamp);
906                Ok(memory_market_stream(&symbols[0], candles))
907            }
908            DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
909        }
910    }
911
912    fn detect_lob_source(&self) -> Result<LobSource> {
913        if self.lob_paths.len() == 1 {
914            let path = &self.lob_paths[0];
915            if path.is_dir() {
916                return Ok(LobSource::FlightRecorder(path.clone()));
917            }
918        }
919        if self.lob_paths.iter().all(|path| path.is_file()) {
920            return Ok(LobSource::Json(self.lob_paths.clone()));
921        }
922        bail!(
923            "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
924        )
925    }
926
927    fn build_flight_recorder_stream(
928        &self,
929        root: &Path,
930        symbols: &[Symbol],
931    ) -> Result<MarketEventStream> {
932        let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
933            .into_stream()
934            .map(|event| event.map(MarketEvent::from));
935        Ok(Box::pin(stream))
936    }
937}
938
939impl BacktestBatchArgs {
940    async fn run(&self, config: &AppConfig) -> Result<()> {
941        if self.config_paths.is_empty() {
942            return Err(anyhow!("provide at least one --config path"));
943        }
944        if self.data_paths.is_empty() {
945            return Err(anyhow!("provide at least one --data path for batch mode"));
946        }
947        let markets_path = self
948            .markets_file
949            .clone()
950            .or_else(|| config.backtest.markets_file.clone())
951            .ok_or_else(|| {
952                anyhow!("batch mode requires --markets-file or backtest.markets_file")
953            })?;
954        let market_registry = Arc::new(
955            MarketRegistry::load_from_file(&markets_path).with_context(|| {
956                format!("failed to load markets from {}", markets_path.display())
957            })?,
958        );
959        let data_format = detect_data_format(&self.data_paths)?;
960        let mut aggregated = Vec::new();
961        for config_path in &self.config_paths {
962            let contents = std::fs::read_to_string(config_path).with_context(|| {
963                format!("failed to read strategy config {}", config_path.display())
964            })?;
965            let def: StrategyConfigFile =
966                toml::from_str(&contents).context("failed to parse strategy config file")?;
967            let strategy = load_strategy(&def.name, def.params)
968                .with_context(|| format!("failed to configure strategy {}", def.name))?;
969            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
970            let order_quantity = self.quantity;
971            let symbols = strategy.subscriptions();
972            if symbols.is_empty() {
973                bail!("strategy {} did not declare subscriptions", strategy.name());
974            }
975            let stream = match data_format {
976                DataFormat::Csv => {
977                    let mut candles = load_candles_from_paths(&self.data_paths)?;
978                    if candles.is_empty() {
979                        bail!("no candles loaded from --data paths");
980                    }
981                    candles.sort_by_key(|c| c.timestamp);
982                    memory_market_stream(&symbols[0], candles)
983                }
984                DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
985            };
986            let execution_client: Arc<dyn ExecutionClient> =
987                Arc::new(PaperExecutionClient::default());
988            let execution =
989                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
990            let mut cfg = BacktestConfig::new(symbols[0].clone());
991            cfg.order_quantity = order_quantity;
992            cfg.initial_balances = clone_initial_balances(&config.backtest);
993            cfg.reporting_currency = config.backtest.reporting_currency.clone();
994            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
995            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
996            cfg.execution.latency_candles = self.latency_candles.max(1);
997
998            let report = Backtester::new(
999                cfg,
1000                strategy,
1001                execution,
1002                None,
1003                market_registry.clone(),
1004                Some(stream),
1005                None,
1006            )
1007            .run()
1008            .await
1009            .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1010            aggregated.push(BatchRow {
1011                config: config_path.display().to_string(),
1012                signals: 0, // Legacy field, can be removed or calculated from report
1013                orders: 0,  // Legacy field, can be removed or calculated from report
1014                dropped_orders: 0, // Legacy field, can be removed or calculated from report
1015                ending_equity: report.ending_equity,
1016            });
1017        }
1018
1019        if let Some(output) = &self.output {
1020            write_batch_report(output, &aggregated)?;
1021            println!("Batch report written to {}", output.display());
1022        }
1023        if aggregated.is_empty() {
1024            return Err(anyhow!("no batch jobs executed"));
1025        }
1026        Ok(())
1027    }
1028}
1029
1030impl LiveRunArgs {
1031    async fn run(&self, config: &AppConfig) -> Result<()> {
1032        let exchange_cfg = config
1033            .exchange
1034            .get(&self.exchange)
1035            .cloned()
1036            .ok_or_else(|| anyhow!("exchange profile {} not found", self.exchange))?;
1037
1038        let driver = exchange_cfg.driver.clone();
1039
1040        let contents = fs::read_to_string(&self.strategy_config)
1041            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1042        let def: StrategyConfigFile =
1043            toml::from_str(&contents).context("failed to parse strategy config file")?;
1044        let strategy = load_strategy(&def.name, def.params)
1045            .with_context(|| format!("failed to configure strategy {}", def.name))?;
1046        let symbols = strategy.subscriptions();
1047        if symbols.is_empty() {
1048            bail!("strategy did not declare any subscriptions");
1049        }
1050        if self.quantity <= Decimal::ZERO {
1051            bail!("--quantity must be greater than zero");
1052        }
1053        let quantity = self.quantity;
1054        let initial_balances = self.resolved_initial_balances(config);
1055        let reporting_currency = config.backtest.reporting_currency.clone();
1056        let markets_file = self
1057            .markets_file
1058            .clone()
1059            .or_else(|| config.backtest.markets_file.clone());
1060
1061        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1062        let category =
1063            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1064        let metrics_addr = self.resolved_metrics_addr(config)?;
1065        let control_addr = self.resolved_control_addr(config)?;
1066        let persistence_cfg = config.live.persistence_config();
1067        let persistence_engine = self
1068            .persistence
1069            .map(PersistenceEngine::from)
1070            .unwrap_or(persistence_cfg.engine);
1071        let state_path = self
1072            .state_path
1073            .clone()
1074            .unwrap_or_else(|| persistence_cfg.path.clone());
1075        let persistence = PersistenceSettings::new(persistence_engine, state_path);
1076        let alerting = self.build_alerting(config);
1077        let history = self.history.max(32);
1078        let reconciliation_interval = self.reconciliation_interval(config);
1079        let reconciliation_threshold = self.reconciliation_threshold(config);
1080        let orderbook_depth = self
1081            .orderbook_depth
1082            .unwrap_or(super::live::default_order_book_depth());
1083
1084        let settings = LiveSessionSettings {
1085            category,
1086            interval,
1087            quantity,
1088            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1089            fee_bps: self.fee_bps.max(Decimal::ZERO),
1090            history,
1091            metrics_addr,
1092            persistence,
1093            initial_balances,
1094            reporting_currency,
1095            markets_file,
1096            alerting,
1097            exec_backend: self.exec,
1098            risk: self.build_risk_config(config),
1099            reconciliation_interval,
1100            reconciliation_threshold,
1101            driver,
1102            orderbook_depth,
1103            record_path: Some(self.record_data.clone()),
1104            control_addr,
1105        };
1106
1107        info!(
1108            strategy = %def.name,
1109            symbols = ?symbols,
1110            exchange = %self.exchange,
1111            interval = %self.interval,
1112            driver = ?settings.driver,
1113            exec = ?self.exec,
1114            persistence_engine = ?settings.persistence.engine,
1115            state_path = %settings.persistence.state_path.display(),
1116            control_addr = %settings.control_addr,
1117            "starting live session"
1118        );
1119
1120        run_live(strategy, symbols, exchange_cfg, settings)
1121            .await
1122            .context("live session failed")
1123    }
1124}
1125
1126fn list_strategies() {
1127    println!("Built-in strategies:");
1128    for name in builtin_strategy_names() {
1129        println!("- {name}");
1130    }
1131}
1132
1133fn print_validation_summary(outcome: &ValidationOutcome) {
1134    const MAX_EXAMPLES: usize = 5;
1135    let summary = &outcome.summary;
1136    println!(
1137        "Validation summary for {} ({} candles)",
1138        summary.symbol, summary.rows
1139    );
1140    println!(
1141        "  Range: {} -> {}",
1142        summary.start.to_rfc3339(),
1143        summary.end.to_rfc3339()
1144    );
1145    println!("  Interval: {}", interval_label(summary.interval));
1146    println!("  Missing intervals: {}", summary.missing_candles);
1147    println!("  Duplicate intervals: {}", summary.duplicate_candles);
1148    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
1149    println!("  Price spikes flagged: {}", summary.price_spike_count);
1150    println!(
1151        "  Cross-source mismatches: {}",
1152        summary.cross_mismatch_count
1153    );
1154    println!("  Repaired candles generated: {}", summary.repaired_candles);
1155
1156    if !outcome.gaps.is_empty() {
1157        println!("  Gap examples:");
1158        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1159            println!(
1160                "    {} -> {} (missing {})",
1161                gap.start.to_rfc3339(),
1162                gap.end.to_rfc3339(),
1163                gap.missing
1164            );
1165        }
1166        if outcome.gaps.len() > MAX_EXAMPLES {
1167            println!(
1168                "    ... {} additional gap(s) omitted",
1169                outcome.gaps.len() - MAX_EXAMPLES
1170            );
1171        }
1172    }
1173
1174    if !outcome.price_spikes.is_empty() {
1175        println!("  Price spike examples:");
1176        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1177            println!(
1178                "    {} (change {:.2}%)",
1179                spike.timestamp.to_rfc3339(),
1180                spike.change_fraction * 100.0
1181            );
1182        }
1183        if outcome.price_spikes.len() > MAX_EXAMPLES {
1184            println!(
1185                "    ... {} additional spike(s) omitted",
1186                outcome.price_spikes.len() - MAX_EXAMPLES
1187            );
1188        }
1189    }
1190
1191    if !outcome.cross_mismatches.is_empty() {
1192        println!("  Cross-source mismatch examples:");
1193        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1194            println!(
1195                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
1196                miss.timestamp.to_rfc3339(),
1197                miss.primary_close,
1198                miss.reference_close,
1199                miss.delta_fraction * 100.0
1200            );
1201        }
1202        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1203            println!(
1204                "    ... {} additional mismatch(es) omitted",
1205                outcome.cross_mismatches.len() - MAX_EXAMPLES
1206            );
1207        }
1208    }
1209}
1210
1211fn print_report(report: &PerformanceReport) {
1212    println!("\n{}", report);
1213}
1214
1215fn synth_candles(symbol: &str, len: usize, offset_minutes: i64) -> Vec<Candle> {
1216    let mut candles = Vec::with_capacity(len);
1217    for i in 0..len {
1218        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1219        let open = base + (i as f64 % 3.0) * 10.0;
1220        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1221        let open_dec =
1222            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1223        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1224        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1225        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1226        candles.push(Candle {
1227            symbol: Symbol::from(symbol),
1228            interval: Interval::OneMinute,
1229            open: open_dec,
1230            high,
1231            low,
1232            close: close_dec,
1233            volume: Decimal::ONE,
1234            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1235                + Duration::minutes(offset_minutes),
1236        });
1237    }
1238    candles
1239}
1240
1241fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1242    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1243        return Ok(dt.with_timezone(&Utc));
1244    }
1245    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1246        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1247    }
1248    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1249        let dt = date
1250            .and_hms_opt(0, 0, 0)
1251            .ok_or_else(|| anyhow!("invalid date"))?;
1252        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1253    }
1254    Err(anyhow!("unable to parse datetime '{value}'"))
1255}
1256
1257#[derive(Deserialize)]
1258struct CandleCsvRow {
1259    symbol: Option<String>,
1260    timestamp: String,
1261    open: f64,
1262    high: f64,
1263    low: f64,
1264    close: f64,
1265    volume: f64,
1266}
1267
1268fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1269    let mut candles = Vec::new();
1270    for path in paths {
1271        let mut reader = csv::Reader::from_path(path)
1272            .with_context(|| format!("failed to open {}", path.display()))?;
1273        for record in reader.deserialize::<CandleCsvRow>() {
1274            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1275            let timestamp = parse_datetime(&row.timestamp)?;
1276            let symbol = row
1277                .symbol
1278                .clone()
1279                .or_else(|| infer_symbol_from_path(path))
1280                .ok_or_else(|| {
1281                    anyhow!(
1282                        "missing symbol column and unable to infer from path {}",
1283                        path.display()
1284                    )
1285                })?;
1286            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1287            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1288                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1289            })?;
1290            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1291                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1292            })?;
1293            let low = Decimal::from_f64(row.low)
1294                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1295            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1296                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1297            })?;
1298            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1299                anyhow!(
1300                    "invalid volume value '{}' in {}",
1301                    row.volume,
1302                    path.display()
1303                )
1304            })?;
1305            candles.push(Candle {
1306                symbol,
1307                interval,
1308                open,
1309                high,
1310                low,
1311                close,
1312                volume,
1313                timestamp,
1314            });
1315        }
1316    }
1317    Ok(candles)
1318}
1319
1320#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1321enum DataFormat {
1322    Csv,
1323    Parquet,
1324}
1325
1326fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1327    let mut detected: Option<DataFormat> = None;
1328    for path in paths {
1329        let ext = path
1330            .extension()
1331            .and_then(|ext| ext.to_str())
1332            .map(|ext| ext.to_ascii_lowercase())
1333            .unwrap_or_else(|| String::from(""));
1334        let current = if ext == "parquet" {
1335            DataFormat::Parquet
1336        } else {
1337            DataFormat::Csv
1338        };
1339        if let Some(existing) = detected {
1340            if existing != current {
1341                bail!("cannot mix CSV and Parquet inputs in --data");
1342            }
1343        } else {
1344            detected = Some(current);
1345        }
1346    }
1347    detected.ok_or_else(|| anyhow!("no data paths provided"))
1348}
1349
1350fn memory_market_stream(symbol: &str, candles: Vec<Candle>) -> BacktestStream {
1351    Box::new(PaperMarketStream::from_data(
1352        symbol.to_string(),
1353        Vec::new(),
1354        candles,
1355    ))
1356}
1357
1358fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
1359    Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
1360}
1361
1362#[derive(Deserialize)]
1363#[serde(tag = "event", rename_all = "lowercase")]
1364enum LobEventRow {
1365    Snapshot {
1366        timestamp: String,
1367        symbol: Option<String>,
1368        bids: Vec<[f64; 2]>,
1369        asks: Vec<[f64; 2]>,
1370    },
1371    Depth {
1372        timestamp: String,
1373        symbol: Option<String>,
1374        bids: Vec<[f64; 2]>,
1375        asks: Vec<[f64; 2]>,
1376    },
1377    Trade {
1378        timestamp: String,
1379        symbol: Option<String>,
1380        side: String,
1381        price: f64,
1382        size: f64,
1383    },
1384}
1385
1386fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
1387    let mut events = Vec::new();
1388    for path in paths {
1389        let file = File::open(path)
1390            .with_context(|| format!("failed to open order book file {}", path.display()))?;
1391        let symbol_hint = infer_symbol_from_path(path);
1392        for line in BufReader::new(file).lines() {
1393            let line =
1394                line.with_context(|| format!("failed to read line from {}", path.display()))?;
1395            if line.trim().is_empty() {
1396                continue;
1397            }
1398            let row: LobEventRow = serde_json::from_str(&line)
1399                .with_context(|| format!("invalid order book event in {}", path.display()))?;
1400            match row {
1401                LobEventRow::Snapshot {
1402                    timestamp,
1403                    symbol,
1404                    bids,
1405                    asks,
1406                } => {
1407                    let ts = parse_datetime(&timestamp)?;
1408                    let symbol = symbol
1409                        .or_else(|| symbol_hint.clone())
1410                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
1411                    let bids = convert_levels(&bids)?;
1412                    let asks = convert_levels(&asks)?;
1413                    let book = OrderBook {
1414                        symbol: symbol.clone(),
1415                        bids,
1416                        asks,
1417                        timestamp: ts,
1418                        exchange_checksum: None,
1419                        local_checksum: None,
1420                    };
1421                    events.push(MarketEvent {
1422                        timestamp: ts,
1423                        kind: MarketEventKind::OrderBook(book),
1424                    });
1425                }
1426                LobEventRow::Depth {
1427                    timestamp,
1428                    symbol,
1429                    bids,
1430                    asks,
1431                } => {
1432                    let ts = parse_datetime(&timestamp)?;
1433                    let symbol = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
1434                        anyhow!("missing symbol in depth update {}", path.display())
1435                    })?;
1436                    let bids = convert_levels(&bids)?;
1437                    let asks = convert_levels(&asks)?;
1438                    let update = DepthUpdate {
1439                        symbol: symbol.clone(),
1440                        bids,
1441                        asks,
1442                        timestamp: ts,
1443                    };
1444                    events.push(MarketEvent {
1445                        timestamp: ts,
1446                        kind: MarketEventKind::Depth(update),
1447                    });
1448                }
1449                LobEventRow::Trade {
1450                    timestamp,
1451                    symbol,
1452                    side,
1453                    price,
1454                    size,
1455                } => {
1456                    let ts = parse_datetime(&timestamp)?;
1457                    let symbol = symbol
1458                        .or_else(|| symbol_hint.clone())
1459                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
1460                    let side = match side.to_lowercase().as_str() {
1461                        "buy" | "bid" | "b" => Side::Buy,
1462                        "sell" | "ask" | "s" => Side::Sell,
1463                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
1464                    };
1465                    let price = Decimal::from_f64(price).ok_or_else(|| {
1466                        anyhow!("invalid trade price '{}' in {}", price, path.display())
1467                    })?;
1468                    let size = Decimal::from_f64(size).ok_or_else(|| {
1469                        anyhow!("invalid trade size '{}' in {}", size, path.display())
1470                    })?;
1471                    let tick = Tick {
1472                        symbol: symbol.clone(),
1473                        price,
1474                        size,
1475                        side,
1476                        exchange_timestamp: ts,
1477                        received_at: ts,
1478                    };
1479                    events.push(MarketEvent {
1480                        timestamp: ts,
1481                        kind: MarketEventKind::Trade(tick),
1482                    });
1483                }
1484            }
1485        }
1486    }
1487    events.sort_by_key(|event| event.timestamp);
1488    Ok(events)
1489}
1490
1491fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
1492    levels
1493        .iter()
1494        .map(|pair| {
1495            let price = Decimal::from_f64(pair[0])
1496                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
1497            let size = Decimal::from_f64(pair[1])
1498                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
1499            Ok(OrderBookLevel { price, size })
1500        })
1501        .collect()
1502}
1503
1504fn infer_symbol_from_path(path: &Path) -> Option<String> {
1505    path.parent()
1506        .and_then(|p| p.file_name())
1507        .map(|os| os.to_string_lossy().to_string())
1508}
1509
1510fn infer_interval_from_path(path: &Path) -> Option<Interval> {
1511    path.file_stem()
1512        .and_then(|os| os.to_str())
1513        .and_then(|stem| stem.split('_').next())
1514        .and_then(|token| Interval::from_str(token).ok())
1515}
1516
1517fn default_output_path(
1518    config: &AppConfig,
1519    exchange: &str,
1520    symbol: &str,
1521    interval: Interval,
1522    start: DateTime<Utc>,
1523    end: DateTime<Utc>,
1524) -> PathBuf {
1525    let interval_part = interval_label(interval);
1526    let start_part = start.format("%Y%m%d").to_string();
1527    let end_part = end.format("%Y%m%d").to_string();
1528    config
1529        .data_path
1530        .join(exchange)
1531        .join(symbol)
1532        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
1533}
1534
1535fn interval_label(interval: Interval) -> &'static str {
1536    match interval {
1537        Interval::OneSecond => "1s",
1538        Interval::OneMinute => "1m",
1539        Interval::FiveMinutes => "5m",
1540        Interval::FifteenMinutes => "15m",
1541        Interval::OneHour => "1h",
1542        Interval::FourHours => "4h",
1543        Interval::OneDay => "1d",
1544    }
1545}
1546
1547#[derive(Serialize)]
1548struct CandleRow<'a> {
1549    symbol: &'a str,
1550    timestamp: String,
1551    open: f64,
1552    high: f64,
1553    low: f64,
1554    close: f64,
1555    volume: f64,
1556}
1557
1558fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
1559    if let Some(parent) = path.parent() {
1560        fs::create_dir_all(parent)
1561            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1562    }
1563    let mut writer =
1564        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1565    for candle in candles {
1566        let row = CandleRow {
1567            symbol: &candle.symbol,
1568            timestamp: candle.timestamp.to_rfc3339(),
1569            open: candle.open.to_f64().unwrap_or(0.0),
1570            high: candle.high.to_f64().unwrap_or(0.0),
1571            low: candle.low.to_f64().unwrap_or(0.0),
1572            close: candle.close.to_f64().unwrap_or(0.0),
1573            volume: candle.volume.to_f64().unwrap_or(0.0),
1574        };
1575        writer.serialize(row)?;
1576    }
1577    writer.flush()?;
1578    Ok(())
1579}
1580
1581#[derive(Serialize)]
1582struct BatchRow {
1583    config: String,
1584    signals: usize,
1585    orders: usize,
1586    dropped_orders: usize,
1587    ending_equity: f64,
1588}
1589
1590fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
1591    if let Some(parent) = path.parent() {
1592        fs::create_dir_all(parent)
1593            .with_context(|| format!("failed to create directory {}", parent.display()))?;
1594    }
1595    let mut writer =
1596        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
1597    for row in rows {
1598        writer.serialize(row)?;
1599    }
1600    writer.flush()?;
1601    Ok(())
1602}
1603
1604fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<Symbol, Decimal> {
1605    config
1606        .initial_balances
1607        .iter()
1608        .map(|(currency, amount)| (currency.clone(), *amount))
1609        .collect()
1610}
1611
1612fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
1613    config
1614        .initial_balances
1615        .get(&config.reporting_currency)
1616        .copied()
1617        .unwrap_or_default()
1618}
1619
1620fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
1621    let parts: Vec<_> = value.split(':').collect();
1622    match parts.as_slice() {
1623        ["fixed", val] => {
1624            let quantity =
1625                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
1626            Ok(Box::new(FixedOrderSizer { quantity }))
1627        }
1628        ["fixed"] => {
1629            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
1630            Ok(Box::new(FixedOrderSizer { quantity }))
1631        }
1632        ["percent", val] => {
1633            let percent =
1634                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
1635            Ok(Box::new(PortfolioPercentSizer {
1636                percent: percent.max(Decimal::ZERO),
1637            }))
1638        }
1639        ["risk-adjusted", val] => {
1640            let risk_fraction = Decimal::from_str(val)
1641                .context("invalid risk fraction value (use decimals)")?;
1642            Ok(Box::new(RiskAdjustedSizer {
1643                risk_fraction: risk_fraction.max(Decimal::ZERO),
1644            }))
1645        }
1646        _ => Err(anyhow!(
1647            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
1648        )),
1649    }
1650}