tesser_cli/
app.rs

1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5    run_live, ExecutionBackend, LiveSessionSettings, NamedExchange, PersistenceBackend,
6    PersistenceSettings,
7};
8use crate::state;
9use crate::telemetry::init_tracing;
10use crate::tui;
11use crate::PublicChannel;
12use arrow::util::pretty::print_batches;
13use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
14use std::collections::HashMap;
15use std::fs::{self, File};
16use std::io::{BufRead, BufReader};
17use std::net::SocketAddr;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration as StdDuration;
22
23use anyhow::{anyhow, bail, Context, Result};
24use chrono::{DateTime, Days, Duration, NaiveDate, NaiveDateTime, Utc};
25use clap::{Args, Parser, Subcommand, ValueEnum};
26use csv::Writer;
27use futures::StreamExt;
28use rust_decimal::{
29    prelude::{FromPrimitive, ToPrimitive},
30    Decimal,
31};
32use serde::{Deserialize, Serialize};
33use tesser_backtester::reporting::PerformanceReport;
34use tesser_backtester::{
35    stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
36    MarketEventKind, MarketEventStream,
37};
38use tesser_broker::{ExecutionClient, RouterExecutionClient};
39use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
40use tesser_core::{
41    AssetId, Candle, DepthUpdate, ExchangeId, Interval, OrderBook, OrderBookLevel, Side, Symbol,
42    Tick,
43};
44use tesser_data::analytics::ExecutionAnalysisRequest;
45use tesser_data::download::{
46    BinanceDownloader, BybitDownloader, KlineRequest, NormalizedTrade, TradeRequest, TradeSource,
47};
48use tesser_data::io::{self, DatasetFormat as IoDatasetFormat, TicksWriter};
49use tesser_data::merger::UnifiedEventStream;
50use tesser_data::parquet::ParquetMarketStream;
51use tesser_data::transform::Resampler;
52use tesser_execution::{
53    ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PanicCloseConfig,
54    PanicCloseMode, PortfolioPercentSizer, RiskAdjustedSizer,
55};
56use tesser_markets::MarketRegistry;
57use tesser_paper::{
58    FeeModel, FeeScheduleConfig, MatchingEngine, MatchingEngineConfig, PaperExecutionClient,
59    PaperMarketStream, QueueModel,
60};
61use tesser_strategy::{builtin_strategy_names, load_strategy};
62use tracing::{info, warn};
63
64#[derive(Parser)]
65#[command(author, version, about = "Tesser Trading Framework")]
66pub struct Cli {
67    /// Increases logging verbosity (-v debug, -vv trace)
68    #[arg(short, long, action = clap::ArgAction::Count)]
69    verbose: u8,
70    /// Selects which configuration environment to load (maps to config/{env}.toml)
71    #[arg(long, default_value = "default")]
72    env: String,
73    #[command(subcommand)]
74    command: Commands,
75}
76
77#[allow(clippy::large_enum_variant)]
78#[derive(Subcommand)]
79pub enum Commands {
80    /// Data engineering tasks
81    Data {
82        #[command(subcommand)]
83        action: DataCommand,
84    },
85    /// Backtesting workflows
86    Backtest {
87        #[command(subcommand)]
88        action: BacktestCommand,
89    },
90    /// Live trading workflows
91    Live {
92        #[command(subcommand)]
93        action: LiveCommand,
94    },
95    /// Inspect or repair persisted runtime state
96    State {
97        #[command(subcommand)]
98        action: StateCommand,
99    },
100    /// Strategy management helpers
101    Strategies,
102    /// Analyze previously recorded trading sessions
103    Analyze {
104        #[command(subcommand)]
105        action: AnalyzeCommand,
106    },
107    /// Launch the real-time TUI dashboard
108    Monitor(MonitorArgs),
109}
110
111#[derive(Subcommand)]
112pub enum DataCommand {
113    /// Download historical market data
114    Download(DataDownloadArgs),
115    /// Download historical trade ticks
116    DownloadTrades(DataDownloadTradesArgs),
117    /// Validate and optionally repair a local data set
118    Validate(DataValidateArgs),
119    /// Resample existing data (placeholder)
120    Resample(DataResampleArgs),
121    /// Inspect a parquet file emitted by the flight recorder
122    InspectParquet(DataInspectParquetArgs),
123}
124
125#[derive(Subcommand)]
126pub enum BacktestCommand {
127    /// Run a single backtest from a strategy config file
128    Run(BacktestRunArgs),
129    /// Run multiple strategy configs and aggregate the results
130    Batch(BacktestBatchArgs),
131}
132
133#[derive(Subcommand)]
134pub enum LiveCommand {
135    /// Start a live trading session (scaffolding)
136    Run(LiveRunArgs),
137}
138
139#[derive(Subcommand)]
140pub enum StateCommand {
141    /// Inspect the persisted live state snapshot
142    Inspect(StateInspectArgs),
143}
144
145#[derive(Subcommand)]
146pub enum AnalyzeCommand {
147    /// Generate a TCA-lite execution report using flight recorder files
148    Execution(AnalyzeExecutionArgs),
149}
150
151#[derive(Args)]
152pub struct DataDownloadArgs {
153    #[arg(long, default_value = "bybit")]
154    exchange: String,
155    #[arg(long)]
156    symbol: String,
157    #[arg(long, default_value = "linear")]
158    category: String,
159    #[arg(long, default_value = "1m")]
160    interval: String,
161    #[arg(long)]
162    start: String,
163    #[arg(long)]
164    end: Option<String>,
165    #[arg(long)]
166    output: Option<PathBuf>,
167    /// Skip automatic validation after download completes
168    #[arg(long)]
169    skip_validation: bool,
170    /// Attempt to repair gaps detected during validation
171    #[arg(long)]
172    repair_missing: bool,
173    /// Max allowed close-to-close jump when auto-validating (fractional)
174    #[arg(long, default_value_t = 0.05)]
175    validation_jump_threshold: f64,
176    /// Allowed divergence between primary and reference closes (fractional)
177    #[arg(long, default_value_t = 0.002)]
178    validation_reference_tolerance: f64,
179}
180
181#[derive(Args)]
182pub struct DataDownloadTradesArgs {
183    #[arg(long, default_value = "bybit")]
184    exchange: String,
185    #[arg(long)]
186    symbol: String,
187    /// Bybit market category (e.g., linear, inverse, option, spot)
188    #[arg(long, default_value = "linear")]
189    category: String,
190    #[arg(long)]
191    start: String,
192    #[arg(long)]
193    end: Option<String>,
194    #[arg(long)]
195    output: Option<PathBuf>,
196    /// Partition parquet output by trading day
197    #[arg(long)]
198    partition_by_day: bool,
199    /// Per-request REST chunk size (max 1000)
200    #[arg(long, default_value_t = 1000)]
201    limit: usize,
202    /// Trade data source (Bybit only)
203    #[arg(long, value_enum, default_value_t = TradeSourceArg::Rest)]
204    source: TradeSourceArg,
205    /// Skip partitions that already exist
206    #[arg(long)]
207    resume: bool,
208    /// Override Bybit public archive base URL
209    #[arg(long)]
210    bybit_public_url: Option<String>,
211    /// Binance public archive market (spot/futures)
212    #[arg(long, value_enum, default_value_t = BinanceMarketArg::FuturesUm)]
213    binance_market: BinanceMarketArg,
214}
215
216#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
217pub enum TradeSourceArg {
218    Rest,
219    #[value(name = "bybit-public")]
220    BybitPublic,
221    #[value(name = "binance-public")]
222    BinancePublic,
223}
224
225#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
226pub enum BinanceMarketArg {
227    #[value(name = "spot")]
228    Spot,
229    #[value(name = "futures-um")]
230    FuturesUm,
231    #[value(name = "futures-cm")]
232    FuturesCm,
233}
234
235impl BinanceMarketArg {
236    fn base_path(self) -> &'static str {
237        match self {
238            Self::Spot => "https://data.binance.vision/data/spot/daily/aggTrades",
239            Self::FuturesUm => "https://data.binance.vision/data/futures/um/daily/aggTrades",
240            Self::FuturesCm => "https://data.binance.vision/data/futures/cm/daily/aggTrades",
241        }
242    }
243}
244
245#[derive(Args)]
246pub struct StateInspectArgs {
247    /// Path to the persisted state (file for sqlite, directory for lmdb)
248    #[arg(long)]
249    path: Option<PathBuf>,
250    /// Emit the raw JSON payload stored inside the database
251    #[arg(long)]
252    raw: bool,
253}
254
255#[derive(Args)]
256pub struct AnalyzeExecutionArgs {
257    /// Directory containing flight recorder parquet partitions
258    #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
259    data_dir: PathBuf,
260    /// Inclusive start of the analysis window (RFC3339, `YYYY-mm-dd`, etc.)
261    #[arg(long)]
262    start: Option<String>,
263    /// Inclusive end of the analysis window
264    #[arg(long)]
265    end: Option<String>,
266    /// Optional CSV file path for exporting ExecutionStats
267    #[arg(long, value_name = "PATH")]
268    export_csv: Option<PathBuf>,
269}
270
271#[derive(Args)]
272pub struct MonitorArgs {
273    /// Control plane address (overrides config.live.control_addr)
274    #[arg(long)]
275    control_addr: Option<String>,
276    /// UI refresh rate in milliseconds
277    #[arg(long, default_value_t = 250)]
278    tick_rate: u64,
279}
280
281impl StateInspectArgs {
282    fn resolved_path(&self, config: &AppConfig) -> PathBuf {
283        self.path
284            .clone()
285            .unwrap_or_else(|| config.live.persistence_config().path.clone())
286    }
287
288    fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
289        config.live.persistence_config().engine
290    }
291}
292
293impl MonitorArgs {
294    async fn run(&self, config: &AppConfig) -> Result<()> {
295        let addr = self
296            .control_addr
297            .clone()
298            .unwrap_or_else(|| config.live.control_addr.clone());
299        let refresh = self.tick_rate.max(50);
300        let monitor_config = tui::MonitorConfig::new(addr, StdDuration::from_millis(refresh));
301        tui::run_monitor(monitor_config).await
302    }
303}
304
305impl AnalyzeExecutionArgs {
306    fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
307        let start = match &self.start {
308            Some(value) => Some(parse_datetime(value)?),
309            None => None,
310        };
311        let end = match &self.end {
312            Some(value) => Some(parse_datetime(value)?),
313            None => None,
314        };
315        Ok(ExecutionAnalysisRequest {
316            data_dir: self.data_dir.clone(),
317            start,
318            end,
319        })
320    }
321}
322
323impl DataDownloadArgs {
324    async fn run(&self, config: &AppConfig) -> Result<()> {
325        let exchange_cfg = config
326            .exchange
327            .get(&self.exchange)
328            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
329        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
330        let start = parse_datetime(&self.start)?;
331        let end = match &self.end {
332            Some(value) => parse_datetime(value)?,
333            None => Utc::now(),
334        };
335        if start >= end {
336            return Err(anyhow!("start time must be earlier than end time"));
337        }
338
339        info!(
340            "Downloading {} candles for {} ({})",
341            self.interval, self.symbol, self.exchange
342        );
343        let mut candles = match exchange_cfg.driver.as_str() {
344            "bybit" | "" => {
345                let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
346                let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
347                downloader
348                    .download_klines(&request)
349                    .await
350                    .with_context(|| "failed to download candles from Bybit")?
351            }
352            "binance" => {
353                let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
354                let request = KlineRequest::new("", &self.symbol, interval, start, end);
355                downloader
356                    .download_klines(&request)
357                    .await
358                    .with_context(|| "failed to download candles from Binance")?
359            }
360            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
361        };
362
363        if candles.is_empty() {
364            info!("No candles returned for {}", self.symbol);
365            return Ok(());
366        }
367
368        if !self.skip_validation {
369            let config = ValidationConfig {
370                price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
371                reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
372                repair_missing: self.repair_missing,
373            };
374            let outcome =
375                validate_dataset(candles.clone(), None, config).context("validation failed")?;
376            print_validation_summary(&outcome);
377            if self.repair_missing && outcome.summary.repaired_candles > 0 {
378                candles = outcome.repaired;
379                info!(
380                    "Applied {} synthetic candle(s) to repair gaps",
381                    outcome.summary.repaired_candles
382                );
383            }
384        }
385
386        let output_path = self.output.clone().unwrap_or_else(|| {
387            default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
388        });
389        write_candles_csv(&output_path, &candles)?;
390        info!(
391            "Saved {} candles to {}",
392            candles.len(),
393            output_path.display()
394        );
395        Ok(())
396    }
397}
398
399impl DataDownloadTradesArgs {
400    async fn run(&self, config: &AppConfig) -> Result<()> {
401        let exchange_cfg = config
402            .exchange
403            .get(&self.exchange)
404            .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
405        let start = parse_datetime(&self.start)?;
406        let end = match &self.end {
407            Some(value) => parse_datetime(value)?,
408            None => Utc::now(),
409        };
410        if start >= end {
411            return Err(anyhow!("start time must be earlier than end time"));
412        }
413
414        info!(
415            "Downloading trades for {} on {} ({} -> {})",
416            self.symbol, self.exchange, start, end
417        );
418
419        let cache_dir = config
420            .data_path
421            .join("ticks")
422            .join("cache")
423            .join(&self.exchange)
424            .join(&self.symbol);
425        let mut base_request = TradeRequest::new(&self.symbol, start, end)
426            .with_limit(self.limit)
427            .with_archive_cache_dir(cache_dir)
428            .with_resume_archives(self.resume);
429        let driver = exchange_cfg.driver.as_str();
430        match driver {
431            "bybit" | "" => {
432                if self.source == TradeSourceArg::BinancePublic {
433                    bail!("Binance public archives are not available for Bybit");
434                }
435                base_request = base_request.with_category(&self.category);
436                if self.source == TradeSourceArg::BybitPublic {
437                    base_request = base_request.with_source(TradeSource::BybitPublicArchive);
438                    if let Some(url) = &self.bybit_public_url {
439                        base_request = base_request.with_public_data_url(url);
440                    }
441                }
442            }
443            "binance" => match self.source {
444                TradeSourceArg::Rest => {}
445                TradeSourceArg::BinancePublic => {
446                    base_request = base_request
447                        .with_source(TradeSource::BinancePublicArchive)
448                        .with_public_data_url(self.binance_market.base_path());
449                }
450                TradeSourceArg::BybitPublic => {
451                    bail!("Bybit public archives are not available for Binance");
452                }
453            },
454            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
455        }
456
457        if self.partition_by_day {
458            self.download_partitioned(config, exchange_cfg, base_request, start, end)
459                .await
460        } else {
461            let trades = self
462                .fetch_trades(driver, &exchange_cfg.rest_url, base_request)
463                .await?;
464            if trades.is_empty() {
465                info!("No trades returned for {}", self.symbol);
466                return Ok(());
467            }
468            self.write_single(config, trades, start, end)
469        }
470    }
471
472    async fn download_partitioned(
473        &self,
474        config: &AppConfig,
475        exchange_cfg: &tesser_config::ExchangeConfig,
476        base_request: TradeRequest<'_>,
477        start: DateTime<Utc>,
478        end: DateTime<Utc>,
479    ) -> Result<()> {
480        let base_dir = self
481            .output
482            .clone()
483            .unwrap_or_else(|| default_tick_partition_dir(config, &self.exchange, &self.symbol));
484        let mut current = start.date_naive();
485        let final_date = end.date_naive();
486        let mut total = 0usize;
487        let mut files_written = 0usize;
488
489        while current <= final_date {
490            let next_date = current.checked_add_days(Days::new(1)).unwrap_or(current);
491            let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
492                current
493                    .and_hms_opt(0, 0, 0)
494                    .ok_or_else(|| anyhow!("invalid date {}", current))?,
495                Utc,
496            )
497            .max(start);
498            let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
499                next_date
500                    .and_hms_opt(0, 0, 0)
501                    .ok_or_else(|| anyhow!("invalid date {}", next_date))?,
502                Utc,
503            )
504            .min(end);
505
506            if day_start >= day_end {
507                if next_date == current {
508                    break;
509                }
510                current = next_date;
511                continue;
512            }
513
514            let partition_path = partition_path(&base_dir, current);
515            if self.resume && partition_path.exists() {
516                info!(
517                    "Skipping {} because {} already exists",
518                    current,
519                    partition_path.display()
520                );
521                if next_date == current {
522                    break;
523                }
524                current = next_date;
525                continue;
526            }
527
528            let mut day_request = base_request.clone();
529            day_request.start = day_start;
530            day_request.end = day_end;
531            let trades = self
532                .fetch_trades(
533                    exchange_cfg.driver.as_str(),
534                    &exchange_cfg.rest_url,
535                    day_request,
536                )
537                .await?;
538
539            if trades.is_empty() {
540                info!("No trades returned for {}", current);
541                if next_date == current {
542                    break;
543                }
544                current = next_date;
545                continue;
546            }
547
548            let mut writer = TicksWriter::new(&partition_path);
549            let count = trades.len();
550            writer.extend(trades.into_iter().map(|trade| (trade.trade_id, trade.tick)));
551            writer.finish()?;
552            total += count;
553            files_written += 1;
554            info!(
555                "Saved {} raw trades for {} to {}",
556                count,
557                current,
558                partition_path.display()
559            );
560
561            if next_date == current {
562                break;
563            }
564            current = next_date;
565        }
566
567        info!(
568            "Partitioned {} raw trades across {} file(s) under {}",
569            total,
570            files_written,
571            base_dir.display()
572        );
573        Ok(())
574    }
575
576    async fn fetch_trades(
577        &self,
578        driver: &str,
579        rest_url: &str,
580        request: TradeRequest<'_>,
581    ) -> Result<Vec<NormalizedTrade>> {
582        match driver {
583            "bybit" | "" => {
584                let downloader = BybitDownloader::new(rest_url);
585                downloader
586                    .download_trades(&request)
587                    .await
588                    .with_context(|| "failed to download trades from Bybit")
589            }
590            "binance" => {
591                let downloader = BinanceDownloader::new(rest_url);
592                downloader
593                    .download_trades(&request)
594                    .await
595                    .with_context(|| "failed to download trades from Binance")
596            }
597            other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
598        }
599    }
600
601    fn write_single(
602        &self,
603        config: &AppConfig,
604        trades: Vec<NormalizedTrade>,
605        start: DateTime<Utc>,
606        end: DateTime<Utc>,
607    ) -> Result<()> {
608        let output_path = self.output.clone().unwrap_or_else(|| {
609            default_tick_output_path(config, &self.exchange, &self.symbol, start, end)
610        });
611        let total = trades.len();
612        let mut writer = TicksWriter::new(&output_path);
613        writer.extend(trades.into_iter().map(|trade| (trade.trade_id, trade.tick)));
614        writer.finish()?;
615        info!("Saved {} raw trades to {}", total, output_path.display());
616        Ok(())
617    }
618}
619
620#[derive(Args)]
621pub struct DataValidateArgs {
622    /// One or more CSV files to inspect
623    #[arg(
624        long = "path",
625        value_name = "PATH",
626        num_args = 1..,
627        action = clap::ArgAction::Append
628    )]
629    paths: Vec<PathBuf>,
630    /// Optional reference data set(s) used for cross validation
631    #[arg(
632        long = "reference",
633        value_name = "PATH",
634        num_args = 1..,
635        action = clap::ArgAction::Append
636    )]
637    reference_paths: Vec<PathBuf>,
638    /// Max allowed close-to-close jump before flagging (fractional, 0.05 = 5%)
639    #[arg(long, default_value_t = 0.05)]
640    jump_threshold: f64,
641    /// Allowed divergence between primary and reference closes (fractional)
642    #[arg(long, default_value_t = 0.002)]
643    reference_tolerance: f64,
644    /// Attempt to fill gaps by synthesizing candles
645    #[arg(long)]
646    repair_missing: bool,
647    /// Location to write the repaired dataset
648    #[arg(long)]
649    output: Option<PathBuf>,
650}
651
652impl DataValidateArgs {
653    fn run(&self) -> Result<()> {
654        if self.paths.is_empty() {
655            bail!("provide at least one --path for validation");
656        }
657        let candles =
658            load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
659        if candles.is_empty() {
660            bail!("loaded dataset is empty; nothing to validate");
661        }
662        let reference = if self.reference_paths.is_empty() {
663            None
664        } else {
665            Some(
666                load_candles_from_paths(&self.reference_paths)
667                    .with_context(|| "failed to load reference dataset")?,
668            )
669        };
670
671        let price_jump_threshold = if self.jump_threshold <= 0.0 {
672            0.0001
673        } else {
674            self.jump_threshold
675        };
676        let reference_tolerance = if self.reference_tolerance <= 0.0 {
677            0.0001
678        } else {
679            self.reference_tolerance
680        };
681
682        let config = ValidationConfig {
683            price_jump_threshold,
684            reference_tolerance,
685            repair_missing: self.repair_missing,
686        };
687
688        let outcome = validate_dataset(candles, reference, config)?;
689        print_validation_summary(&outcome);
690
691        if let Some(output) = &self.output {
692            write_candles_csv(output, &outcome.repaired)?;
693            info!(
694                "Wrote {} candles ({} new) to {}",
695                outcome.repaired.len(),
696                outcome.summary.repaired_candles,
697                output.display()
698            );
699        } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
700            warn!(
701                "Detected {} gap(s) filled with synthetic candles but --output was not provided",
702                outcome.summary.repaired_candles
703            );
704        }
705
706        Ok(())
707    }
708}
709
710impl DataResampleArgs {
711    fn run(&self) -> Result<()> {
712        if !self.input.exists() {
713            bail!("input file {} not found", self.input.display());
714        }
715        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
716        let dataset = io::read_dataset(&self.input)
717            .with_context(|| format!("failed to read {}", self.input.display()))?;
718        let io::CandleDataset {
719            format: input_format,
720            candles,
721        } = dataset;
722        if candles.is_empty() {
723            bail!(
724                "dataset {} does not contain any candles",
725                self.input.display()
726            );
727        }
728        let input_len = candles.len();
729        let resampled = Resampler::resample(candles, interval);
730        if resampled.is_empty() {
731            bail!(
732                "no candles produced after resampling {}",
733                self.input.display()
734            );
735        }
736        let output_format = self
737            .format
738            .map(IoDatasetFormat::from)
739            .unwrap_or_else(|| IoDatasetFormat::from_path(&self.output));
740        io::write_dataset(&self.output, output_format, &resampled)
741            .with_context(|| format!("failed to write {}", self.output.display()))?;
742        info!(
743            "Resampled {} -> {} candles (input {:?} -> output {:?}) to {}",
744            input_len,
745            resampled.len(),
746            input_format,
747            output_format,
748            self.output.display()
749        );
750        Ok(())
751    }
752}
753
754#[derive(Args)]
755pub struct DataResampleArgs {
756    #[arg(long)]
757    input: PathBuf,
758    #[arg(long)]
759    output: PathBuf,
760    #[arg(long, default_value = "1h")]
761    interval: String,
762    /// Force the output format (defaults to --output extension)
763    #[arg(long, value_enum)]
764    format: Option<DatasetFormatArg>,
765}
766
767#[derive(Copy, Clone, Debug, ValueEnum)]
768pub enum DatasetFormatArg {
769    Csv,
770    Parquet,
771}
772
773impl From<DatasetFormatArg> for IoDatasetFormat {
774    fn from(value: DatasetFormatArg) -> Self {
775        match value {
776            DatasetFormatArg::Csv => IoDatasetFormat::Csv,
777            DatasetFormatArg::Parquet => IoDatasetFormat::Parquet,
778        }
779    }
780}
781
782#[derive(Args)]
783pub struct DataInspectParquetArgs {
784    /// Path to the parquet file produced by the flight recorder
785    #[arg(value_name = "PATH")]
786    path: PathBuf,
787    /// Number of rows to display (0 prints the entire file)
788    #[arg(long, default_value_t = 10)]
789    rows: usize,
790}
791
792impl DataInspectParquetArgs {
793    fn run(&self) -> Result<()> {
794        let limit = if self.rows == 0 {
795            usize::MAX
796        } else {
797            self.rows
798        };
799        let file = File::open(&self.path)
800            .with_context(|| format!("failed to open {}", self.path.display()))?;
801        let batch_size = limit.clamp(1, 8192);
802        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
803            .with_batch_size(batch_size)
804            .build()?;
805
806        let mut printed = 0usize;
807        while printed < limit {
808            match reader.next() {
809                Some(Ok(batch)) => {
810                    if batch.num_rows() == 0 {
811                        continue;
812                    }
813                    let remaining = limit.saturating_sub(printed);
814                    let take = remaining.min(batch.num_rows());
815                    let display_batch = if take == batch.num_rows() {
816                        batch
817                    } else {
818                        batch.slice(0, take)
819                    };
820                    print_batches(std::slice::from_ref(&display_batch))?;
821                    printed += take;
822                }
823                Some(Err(err)) => return Err(err.into()),
824                None => break,
825            }
826        }
827
828        if printed == 0 {
829            println!("no rows available in {}", self.path.display());
830        } else if limit != usize::MAX {
831            println!("displayed {printed} row(s) from {}", self.path.display());
832        }
833
834        Ok(())
835    }
836}
837
838#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
839pub enum BacktestModeArg {
840    Candle,
841    Tick,
842}
843
844#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
845pub enum QueueModelArg {
846    Conserv,
847    Optimistic,
848}
849
850impl From<QueueModelArg> for QueueModel {
851    fn from(value: QueueModelArg) -> Self {
852        match value {
853            QueueModelArg::Conserv => QueueModel::Conservative,
854            QueueModelArg::Optimistic => QueueModel::Optimistic,
855        }
856    }
857}
858
859#[derive(Args)]
860pub struct BacktestRunArgs {
861    #[arg(long)]
862    strategy_config: PathBuf,
863    /// One or more CSV files with historical candles (symbol,timestamp,...)
864    #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
865    data_paths: Vec<PathBuf>,
866    #[arg(long, default_value_t = 500)]
867    candles: usize,
868    #[arg(long, default_value = "0.01")]
869    quantity: Decimal,
870    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
871    #[arg(long, default_value = "0")]
872    slippage_bps: Decimal,
873    /// Trading fees in basis points applied to notional
874    #[arg(long, default_value = "0")]
875    fee_bps: Decimal,
876    /// Optional fee schedule describing maker/taker rates
877    #[arg(long = "fee-schedule")]
878    fee_schedule: Option<PathBuf>,
879    /// Number of candles between signal and execution
880    #[arg(long, default_value_t = 1)]
881    latency_candles: usize,
882    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
883    #[arg(long, default_value = "fixed:0.01")]
884    sizer: String,
885    /// Selects the data source driving fills (`candle` or `tick`)
886    #[arg(long, value_enum, default_value = "candle")]
887    mode: BacktestModeArg,
888    /// One or more JSONL files or a flight-recorder directory containing tick/order book events (required for `--mode tick`)
889    #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
890    lob_paths: Vec<PathBuf>,
891    /// Round-trip latency, in milliseconds, applied to limit order placements/cancellations during tick-mode sims
892    #[arg(long = "sim-latency-ms", default_value_t = 0)]
893    sim_latency_ms: u64,
894    /// Queue modeling assumption used when simulating passive fills
895    #[arg(long = "sim-queue-model", value_enum, default_value = "conserv")]
896    sim_queue_model: QueueModelArg,
897    #[arg(long)]
898    markets_file: Option<PathBuf>,
899}
900
901enum LobSource {
902    Json(Vec<PathBuf>),
903    FlightRecorder(PathBuf),
904}
905
906#[derive(Args)]
907pub struct BacktestBatchArgs {
908    /// Glob or directory containing strategy config files
909    #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
910    config_paths: Vec<PathBuf>,
911    /// Candle CSVs available to every strategy
912    #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
913    data_paths: Vec<PathBuf>,
914    #[arg(long, default_value = "0.01")]
915    quantity: Decimal,
916    /// Optional output CSV summarizing results
917    #[arg(long)]
918    output: Option<PathBuf>,
919    /// Symmetric slippage in basis points (1 bp = 0.01%) applied to fills
920    #[arg(long, default_value = "0")]
921    slippage_bps: Decimal,
922    /// Trading fees in basis points applied to notional
923    #[arg(long, default_value = "0")]
924    fee_bps: Decimal,
925    /// Optional fee schedule describing maker/taker rates
926    #[arg(long = "fee-schedule")]
927    fee_schedule: Option<PathBuf>,
928    /// Number of candles between signal and execution
929    #[arg(long, default_value_t = 1)]
930    latency_candles: usize,
931    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
932    #[arg(long, default_value = "fixed:0.01")]
933    sizer: String,
934    #[arg(long)]
935    markets_file: Option<PathBuf>,
936}
937
938#[derive(Args)]
939pub struct LiveRunArgs {
940    #[arg(long)]
941    strategy_config: PathBuf,
942    #[arg(long, default_value = "paper_sandbox")]
943    exchange: String,
944    /// Optional list of additional exchange profiles to load (comma separated or repeated)
945    ///
946    /// Each entry must match a `[exchange.NAME]` table or a `[[exchanges]]` block in the selected config.
947    #[arg(long = "exchanges", value_name = "NAME", num_args = 0.., action = clap::ArgAction::Append)]
948    exchanges: Vec<String>,
949    #[arg(long, default_value = "linear")]
950    category: String,
951    #[arg(long, default_value = "1m")]
952    interval: String,
953    #[arg(long, default_value = "1")]
954    quantity: Decimal,
955    /// Selects which execution backend to use (`paper` or `bybit`)
956    #[arg(
957        long = "exec",
958        default_value = "paper",
959        value_enum,
960        alias = "live-exec"
961    )]
962    exec: ExecutionBackend,
963    /// Path to persisted state (file for sqlite, directory for lmdb)
964    #[arg(long)]
965    state_path: Option<PathBuf>,
966    /// Persistence backend to use for runtime state (overrides config.live.persistence.engine)
967    #[arg(long, value_enum)]
968    persistence: Option<PersistenceBackend>,
969    #[arg(long)]
970    metrics_addr: Option<String>,
971    #[arg(long)]
972    log_path: Option<PathBuf>,
973    /// Directory where parquet market/trade data is recorded
974    #[arg(
975        long = "record-data",
976        value_name = "PATH",
977        default_value = "data/flight_recorder"
978    )]
979    record_data: PathBuf,
980    /// Control plane gRPC bind address (overrides config.live.control_addr)
981    #[arg(long)]
982    control_addr: Option<String>,
983    #[arg(long)]
984    initial_equity: Option<Decimal>,
985    #[arg(long)]
986    markets_file: Option<PathBuf>,
987    #[arg(long, default_value = "0")]
988    slippage_bps: Decimal,
989    #[arg(long, default_value = "0")]
990    fee_bps: Decimal,
991    #[arg(long, default_value_t = 0)]
992    latency_ms: u64,
993    #[arg(long, default_value_t = 512)]
994    history: usize,
995    #[arg(long)]
996    reconciliation_interval_secs: Option<u64>,
997    #[arg(long)]
998    reconciliation_threshold: Option<Decimal>,
999    #[arg(long)]
1000    webhook_url: Option<String>,
1001    #[arg(long)]
1002    alert_max_data_gap_secs: Option<u64>,
1003    #[arg(long)]
1004    alert_max_order_failures: Option<u32>,
1005    #[arg(long)]
1006    alert_max_drawdown: Option<Decimal>,
1007    #[arg(long)]
1008    risk_max_order_qty: Option<Decimal>,
1009    #[arg(long)]
1010    risk_max_order_notional: Option<Decimal>,
1011    #[arg(long)]
1012    risk_max_position_qty: Option<Decimal>,
1013    #[arg(long)]
1014    risk_max_drawdown: Option<Decimal>,
1015    /// Bybit orderbook depth to subscribe to (e.g., 1, 25, 50). Ignored for other connectors.
1016    #[arg(long)]
1017    orderbook_depth: Option<usize>,
1018    /// Order sizer (e.g. "fixed:0.01", "percent:0.02")
1019    #[arg(long, default_value = "fixed:1.0")]
1020    sizer: String,
1021    /// Panic-close mode used when multi-leg groups desync
1022    #[arg(long, value_enum, default_value = "market")]
1023    panic_mode: PanicModeArg,
1024    /// Basis points added/subtracted from the last price when using `panic_mode=aggressive-limit`
1025    #[arg(long, default_value = "50")]
1026    panic_limit_offset_bps: Decimal,
1027}
1028
1029impl LiveRunArgs {
1030    fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
1031        self.log_path
1032            .clone()
1033            .unwrap_or_else(|| config.live.log_path.clone())
1034    }
1035
1036    fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
1037        let addr = self
1038            .metrics_addr
1039            .clone()
1040            .unwrap_or_else(|| config.live.metrics_addr.clone());
1041        addr.parse()
1042            .with_context(|| format!("invalid metrics address '{addr}'"))
1043    }
1044
1045    fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
1046        let addr = self
1047            .control_addr
1048            .clone()
1049            .unwrap_or_else(|| config.live.control_addr.clone());
1050        addr.parse()
1051            .with_context(|| format!("invalid control address '{addr}'"))
1052    }
1053
1054    fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
1055        let secs = self
1056            .reconciliation_interval_secs
1057            .unwrap_or(config.live.reconciliation_interval_secs)
1058            .max(1);
1059        StdDuration::from_secs(secs)
1060    }
1061
1062    fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
1063        let configured = self
1064            .reconciliation_threshold
1065            .unwrap_or(config.live.reconciliation_threshold);
1066        if configured <= Decimal::ZERO {
1067            config.live.reconciliation_threshold.max(Decimal::new(1, 6))
1068        } else {
1069            configured
1070        }
1071    }
1072
1073    fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<AssetId, Decimal> {
1074        let mut balances = clone_initial_balances(&config.backtest);
1075        if let Some(value) = self.initial_equity {
1076            let reporting = AssetId::from(config.backtest.reporting_currency.as_str());
1077            balances.insert(reporting, value.max(Decimal::ZERO));
1078        }
1079        balances
1080    }
1081
1082    fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
1083        let mut alerting = config.live.alerting.clone();
1084        let webhook = self
1085            .webhook_url
1086            .clone()
1087            .or_else(|| alerting.webhook_url.clone());
1088        alerting.webhook_url = sanitize_webhook(webhook);
1089        if let Some(sec) = self.alert_max_data_gap_secs {
1090            alerting.max_data_gap_secs = sec;
1091        }
1092        if let Some(limit) = self.alert_max_order_failures {
1093            alerting.max_order_failures = limit;
1094        }
1095        if let Some(limit) = self.alert_max_drawdown {
1096            alerting.max_drawdown = limit.max(Decimal::ZERO);
1097        }
1098        alerting
1099    }
1100
1101    fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
1102        let mut risk = config.risk_management.clone();
1103        if let Some(limit) = self.risk_max_order_qty {
1104            risk.max_order_quantity = limit.max(Decimal::ZERO);
1105        }
1106        if let Some(limit) = self.risk_max_order_notional {
1107            risk.max_order_notional = (limit > Decimal::ZERO).then_some(limit);
1108        }
1109        if let Some(limit) = self.risk_max_position_qty {
1110            risk.max_position_quantity = limit.max(Decimal::ZERO);
1111        }
1112        if let Some(limit) = self.risk_max_drawdown {
1113            risk.max_drawdown = limit.max(Decimal::ZERO);
1114        }
1115        risk
1116    }
1117}
1118
1119#[derive(Deserialize)]
1120struct StrategyConfigFile {
1121    #[serde(rename = "strategy_name")]
1122    name: String,
1123    #[serde(default = "empty_table")]
1124    params: toml::Value,
1125}
1126
1127fn empty_table() -> toml::Value {
1128    toml::Value::Table(Default::default())
1129}
1130
1131pub async fn run() -> Result<()> {
1132    let cli = Cli::parse();
1133    let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
1134
1135    let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
1136        0 => config.log_level.clone(),
1137        1 => "debug".to_string(),
1138        _ => "trace".to_string(),
1139    });
1140
1141    let log_override = match &cli.command {
1142        Commands::Live {
1143            action: LiveCommand::Run(args),
1144        } => Some(args.resolved_log_path(&config)),
1145        _ => None,
1146    };
1147
1148    init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
1149
1150    match cli.command {
1151        Commands::Data { action } => handle_data(action, &config).await?,
1152        Commands::Backtest {
1153            action: BacktestCommand::Run(args),
1154        } => args.run(&config).await?,
1155        Commands::Backtest {
1156            action: BacktestCommand::Batch(args),
1157        } => args.run(&config).await?,
1158        Commands::Live {
1159            action: LiveCommand::Run(args),
1160        } => args.run(&config).await?,
1161        Commands::State { action } => handle_state(action, &config).await?,
1162        Commands::Analyze { action } => handle_analyze(action)?,
1163        Commands::Strategies => list_strategies(),
1164        Commands::Monitor(args) => args.run(&config).await?,
1165    }
1166
1167    Ok(())
1168}
1169
1170async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
1171    match cmd {
1172        DataCommand::Download(args) => {
1173            args.run(config).await?;
1174        }
1175        DataCommand::DownloadTrades(args) => {
1176            args.run(config).await?;
1177        }
1178        DataCommand::Validate(args) => {
1179            args.run()?;
1180        }
1181        DataCommand::Resample(args) => {
1182            args.run()?;
1183        }
1184        DataCommand::InspectParquet(args) => {
1185            args.run()?;
1186        }
1187    }
1188    Ok(())
1189}
1190
1191async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
1192    match cmd {
1193        StateCommand::Inspect(args) => {
1194            state::inspect_state(
1195                args.resolved_path(config),
1196                args.resolved_engine(config),
1197                args.raw,
1198            )
1199            .await?;
1200        }
1201    }
1202    Ok(())
1203}
1204
1205fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
1206    match cmd {
1207        AnalyzeCommand::Execution(args) => {
1208            analyze::run_execution(args.build_request()?, args.export_csv.as_deref())
1209        }
1210    }
1211}
1212
1213impl BacktestRunArgs {
1214    async fn run(&self, config: &AppConfig) -> Result<()> {
1215        let contents = std::fs::read_to_string(&self.strategy_config)
1216            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1217        let def: StrategyConfigFile =
1218            toml::from_str(&contents).context("failed to parse strategy config file")?;
1219        let strategy = load_strategy(&def.name, def.params)
1220            .with_context(|| format!("failed to configure strategy {}", def.name))?;
1221        let symbols = strategy.subscriptions();
1222        if symbols.is_empty() {
1223            return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
1224        }
1225
1226        let mode = match self.mode {
1227            BacktestModeArg::Candle => BacktestMode::Candle,
1228            BacktestModeArg::Tick => BacktestMode::Tick,
1229        };
1230
1231        let markets_path = self
1232            .markets_file
1233            .clone()
1234            .or_else(|| config.backtest.markets_file.clone())
1235            .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
1236        let market_registry = Arc::new(
1237            MarketRegistry::load_from_file(&markets_path).with_context(|| {
1238                format!("failed to load markets from {}", markets_path.display())
1239            })?,
1240        );
1241        let fee_schedule = self.resolve_fee_schedule()?;
1242        let fee_model_template = fee_schedule.build_model();
1243        let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1244        let initial_balances = clone_initial_balances(&config.backtest);
1245
1246        let (market_stream, event_stream, execution_client, matching_engine) = match mode {
1247            BacktestMode::Candle => {
1248                let stream = self.build_candle_stream(&symbols)?;
1249                let exec_client = build_sim_execution_client(
1250                    "paper-backtest",
1251                    &symbols,
1252                    self.slippage_bps,
1253                    fee_model_template.clone(),
1254                    &initial_balances,
1255                    reporting_currency,
1256                )
1257                .await;
1258                (Some(stream), None, exec_client, None)
1259            }
1260            BacktestMode::Tick => {
1261                if self.lob_paths.is_empty() {
1262                    bail!("--lob-data is required when --mode tick");
1263                }
1264                let source = self.detect_lob_source()?;
1265                let latency_ms = self.sim_latency_ms.min(i64::MAX as u64);
1266                let fee_model = fee_model_template.clone();
1267                let execution_client = build_sim_execution_client(
1268                    "paper-tick",
1269                    &symbols,
1270                    self.slippage_bps,
1271                    fee_model_template.clone(),
1272                    &initial_balances,
1273                    reporting_currency,
1274                )
1275                .await;
1276                let engine = Arc::new(MatchingEngine::with_config(
1277                    "matching-engine",
1278                    symbols.clone(),
1279                    reporting_balance(&config.backtest),
1280                    MatchingEngineConfig {
1281                        latency: Duration::milliseconds(latency_ms as i64),
1282                        queue_model: self.sim_queue_model.into(),
1283                        fee_model: fee_model.clone(),
1284                        cash_asset: Some(reporting_currency),
1285                    },
1286                ));
1287                let stream = match source {
1288                    LobSource::Json(paths) => {
1289                        let events = load_lob_events_from_paths(&paths)?;
1290                        if events.is_empty() {
1291                            bail!("no order book events loaded from --lob-data");
1292                        }
1293                        stream_from_events(events)
1294                    }
1295                    LobSource::FlightRecorder(root) => self
1296                        .build_flight_recorder_stream(&root, &symbols)
1297                        .context("failed to initialize flight recorder stream")?,
1298                };
1299                (None, Some(stream), execution_client, Some(engine))
1300            }
1301        };
1302
1303        let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1304        let order_quantity = self.quantity;
1305        let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1306
1307        let mut cfg = BacktestConfig::new(symbols[0]);
1308        cfg.order_quantity = order_quantity;
1309        cfg.initial_balances = initial_balances.clone();
1310        cfg.reporting_currency = reporting_currency;
1311        cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1312        cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1313        cfg.execution.latency_candles = self.latency_candles.max(1);
1314        cfg.mode = mode;
1315
1316        let report = Backtester::new(
1317            cfg,
1318            strategy,
1319            execution,
1320            matching_engine,
1321            market_registry,
1322            market_stream,
1323            event_stream,
1324        )
1325        .run()
1326        .await
1327        .context("backtest failed")?;
1328        print_report(&report);
1329        Ok(())
1330    }
1331
1332    fn resolve_fee_schedule(&self) -> Result<FeeScheduleConfig> {
1333        if let Some(path) = &self.fee_schedule {
1334            load_fee_schedule_file(path)
1335        } else {
1336            Ok(FeeScheduleConfig::with_defaults(
1337                self.fee_bps.max(Decimal::ZERO),
1338                self.fee_bps.max(Decimal::ZERO),
1339            ))
1340        }
1341    }
1342
1343    fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
1344        if symbols.is_empty() {
1345            bail!("strategy did not declare any subscriptions");
1346        }
1347        if self.data_paths.is_empty() {
1348            let mut generated = Vec::new();
1349            for (idx, symbol) in symbols.iter().enumerate() {
1350                let offset = idx as i64 * 10;
1351                generated.extend(synth_candles(*symbol, self.candles, offset));
1352            }
1353            generated.sort_by_key(|c| c.timestamp);
1354            if generated.is_empty() {
1355                bail!("no synthetic candles generated; provide --data files instead");
1356            }
1357            return Ok(memory_market_stream(symbols[0], generated));
1358        }
1359
1360        match detect_data_format(&self.data_paths)? {
1361            DataFormat::Csv => {
1362                let mut candles = load_candles_from_paths(&self.data_paths)?;
1363                if candles.is_empty() {
1364                    bail!("no candles loaded from --data paths");
1365                }
1366                candles.sort_by_key(|c| c.timestamp);
1367                Ok(memory_market_stream(symbols[0], candles))
1368            }
1369            DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
1370        }
1371    }
1372
1373    fn detect_lob_source(&self) -> Result<LobSource> {
1374        if self.lob_paths.len() == 1 {
1375            let path = &self.lob_paths[0];
1376            if path.is_dir() {
1377                return Ok(LobSource::FlightRecorder(path.clone()));
1378            }
1379        }
1380        if self.lob_paths.iter().all(|path| path.is_file()) {
1381            return Ok(LobSource::Json(self.lob_paths.clone()));
1382        }
1383        bail!(
1384            "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
1385        )
1386    }
1387
1388    fn build_flight_recorder_stream(
1389        &self,
1390        root: &Path,
1391        symbols: &[Symbol],
1392    ) -> Result<MarketEventStream> {
1393        let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
1394            .into_stream()
1395            .map(|event| event.map(MarketEvent::from));
1396        Ok(Box::pin(stream))
1397    }
1398}
1399
1400fn load_fee_schedule_file(path: &Path) -> Result<FeeScheduleConfig> {
1401    let contents = fs::read_to_string(path)
1402        .with_context(|| format!("failed to read fee schedule {}", path.display()))?;
1403    let cfg = match path.extension().and_then(|ext| ext.to_str()) {
1404        Some(ext) if ext.eq_ignore_ascii_case("json") => serde_json::from_str(&contents)
1405            .with_context(|| format!("failed to parse JSON fee schedule {}", path.display()))?,
1406        _ => toml::from_str(&contents)
1407            .with_context(|| format!("failed to parse TOML fee schedule {}", path.display()))?,
1408    };
1409    Ok(cfg)
1410}
1411
1412impl BacktestBatchArgs {
1413    async fn run(&self, config: &AppConfig) -> Result<()> {
1414        if self.config_paths.is_empty() {
1415            return Err(anyhow!("provide at least one --config path"));
1416        }
1417        if self.data_paths.is_empty() {
1418            return Err(anyhow!("provide at least one --data path for batch mode"));
1419        }
1420        let markets_path = self
1421            .markets_file
1422            .clone()
1423            .or_else(|| config.backtest.markets_file.clone())
1424            .ok_or_else(|| {
1425                anyhow!("batch mode requires --markets-file or backtest.markets_file")
1426            })?;
1427        let market_registry = Arc::new(
1428            MarketRegistry::load_from_file(&markets_path).with_context(|| {
1429                format!("failed to load markets from {}", markets_path.display())
1430            })?,
1431        );
1432        let data_format = detect_data_format(&self.data_paths)?;
1433        let mut aggregated = Vec::new();
1434        let fee_schedule = if let Some(path) = &self.fee_schedule {
1435            load_fee_schedule_file(path)?
1436        } else {
1437            FeeScheduleConfig::with_defaults(
1438                self.fee_bps.max(Decimal::ZERO),
1439                self.fee_bps.max(Decimal::ZERO),
1440            )
1441        };
1442        let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1443        let initial_balances = clone_initial_balances(&config.backtest);
1444        for config_path in &self.config_paths {
1445            let contents = std::fs::read_to_string(config_path).with_context(|| {
1446                format!("failed to read strategy config {}", config_path.display())
1447            })?;
1448            let def: StrategyConfigFile =
1449                toml::from_str(&contents).context("failed to parse strategy config file")?;
1450            let strategy = load_strategy(&def.name, def.params)
1451                .with_context(|| format!("failed to configure strategy {}", def.name))?;
1452            let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1453            let order_quantity = self.quantity;
1454            let symbols = strategy.subscriptions();
1455            if symbols.is_empty() {
1456                bail!("strategy {} did not declare subscriptions", strategy.name());
1457            }
1458            let stream = match data_format {
1459                DataFormat::Csv => {
1460                    let mut candles = load_candles_from_paths(&self.data_paths)?;
1461                    if candles.is_empty() {
1462                        bail!("no candles loaded from --data paths");
1463                    }
1464                    candles.sort_by_key(|c| c.timestamp);
1465                    memory_market_stream(symbols[0], candles)
1466                }
1467                DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
1468            };
1469            let execution_client = build_sim_execution_client(
1470                &format!("paper-batch-{}", def.name),
1471                &symbols,
1472                self.slippage_bps,
1473                fee_schedule.build_model(),
1474                &initial_balances,
1475                reporting_currency,
1476            )
1477            .await;
1478            let execution =
1479                ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1480            let mut cfg = BacktestConfig::new(symbols[0]);
1481            cfg.order_quantity = order_quantity;
1482            cfg.initial_balances = initial_balances.clone();
1483            cfg.reporting_currency = reporting_currency;
1484            cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1485            cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1486            cfg.execution.latency_candles = self.latency_candles.max(1);
1487
1488            let report = Backtester::new(
1489                cfg,
1490                strategy,
1491                execution,
1492                None,
1493                market_registry.clone(),
1494                Some(stream),
1495                None,
1496            )
1497            .run()
1498            .await
1499            .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1500            aggregated.push(BatchRow {
1501                config: config_path.display().to_string(),
1502                signals: 0, // Legacy field, can be removed or calculated from report
1503                orders: 0,  // Legacy field, can be removed or calculated from report
1504                dropped_orders: 0, // Legacy field, can be removed or calculated from report
1505                ending_equity: report.ending_equity,
1506            });
1507        }
1508
1509        if let Some(output) = &self.output {
1510            write_batch_report(output, &aggregated)?;
1511            println!("Batch report written to {}", output.display());
1512        }
1513        if aggregated.is_empty() {
1514            return Err(anyhow!("no batch jobs executed"));
1515        }
1516        Ok(())
1517    }
1518}
1519
1520async fn build_sim_execution_client(
1521    name_prefix: &str,
1522    symbols: &[Symbol],
1523    slippage_bps: Decimal,
1524    fee_model: Arc<dyn FeeModel>,
1525    initial_balances: &HashMap<AssetId, Decimal>,
1526    reporting_currency: AssetId,
1527) -> Arc<dyn ExecutionClient> {
1528    let mut grouped: HashMap<ExchangeId, Vec<Symbol>> = HashMap::new();
1529    for symbol in symbols {
1530        grouped.entry(symbol.exchange).or_default().push(*symbol);
1531    }
1532    if grouped.len() <= 1 {
1533        let (exchange, group) = grouped
1534            .into_iter()
1535            .next()
1536            .unwrap_or((ExchangeId::UNSPECIFIED, symbols.to_vec()));
1537        return build_paper_client_for_exchange(
1538            name_prefix,
1539            exchange,
1540            group,
1541            slippage_bps,
1542            fee_model,
1543            initial_balances,
1544            reporting_currency,
1545        )
1546        .await;
1547    }
1548    let mut routes = HashMap::new();
1549    for (exchange, group) in grouped {
1550        let client = build_paper_client_for_exchange(
1551            name_prefix,
1552            exchange,
1553            group,
1554            slippage_bps,
1555            fee_model.clone(),
1556            initial_balances,
1557            reporting_currency,
1558        )
1559        .await;
1560        routes.insert(exchange, client);
1561    }
1562    Arc::new(RouterExecutionClient::new(routes))
1563}
1564
1565async fn build_paper_client_for_exchange(
1566    name_prefix: &str,
1567    exchange: ExchangeId,
1568    symbols: Vec<Symbol>,
1569    slippage_bps: Decimal,
1570    fee_model: Arc<dyn FeeModel>,
1571    initial_balances: &HashMap<AssetId, Decimal>,
1572    reporting_currency: AssetId,
1573) -> Arc<dyn ExecutionClient> {
1574    let cash_asset = cash_asset_for_exchange(reporting_currency, exchange);
1575    let client = Arc::new(PaperExecutionClient::with_cash_asset(
1576        format!("{name_prefix}-{}", exchange),
1577        symbols,
1578        slippage_bps,
1579        fee_model,
1580        cash_asset,
1581    ));
1582    let balance = initial_balance_for_asset(initial_balances, cash_asset);
1583    client.initialize_balance(cash_asset, balance).await;
1584    let exec: Arc<dyn ExecutionClient> = client;
1585    exec
1586}
1587
1588fn cash_asset_for_exchange(reporting: AssetId, exchange: ExchangeId) -> AssetId {
1589    if reporting.exchange.is_specified() {
1590        if reporting.exchange == exchange {
1591            reporting
1592        } else {
1593            AssetId::from_code(exchange, reporting.code())
1594        }
1595    } else if exchange.is_specified() {
1596        AssetId::from_code(exchange, reporting.code())
1597    } else {
1598        reporting
1599    }
1600}
1601
1602fn initial_balance_for_asset(balances: &HashMap<AssetId, Decimal>, target: AssetId) -> Decimal {
1603    if let Some(amount) = balances
1604        .iter()
1605        .find(|(asset, _)| asset.exchange == target.exchange && asset.code() == target.code())
1606        .map(|(_, amount)| *amount)
1607    {
1608        return amount;
1609    }
1610    if target.exchange.is_specified() {
1611        if let Some(amount) = balances
1612            .iter()
1613            .find(|(asset, _)| !asset.exchange.is_specified() && asset.code() == target.code())
1614            .map(|(_, amount)| *amount)
1615        {
1616            return amount;
1617        }
1618    }
1619    Decimal::from(10_000)
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use super::*;
1625
1626    #[test]
1627    fn initial_balance_prefers_exchange_specific_entry() {
1628        let mut balances = HashMap::new();
1629        let exchange = ExchangeId::from("bybit_linear");
1630        let global = AssetId::from("USDT");
1631        let target = AssetId::from_code(exchange, "USDT");
1632        balances.insert(global, Decimal::new(5_000, 0));
1633        balances.insert(target, Decimal::new(2_500, 0));
1634        assert_eq!(
1635            initial_balance_for_asset(&balances, target),
1636            Decimal::new(2_500, 0)
1637        );
1638    }
1639
1640    #[test]
1641    fn initial_balance_falls_back_to_unspecified_exchange() {
1642        let mut balances = HashMap::new();
1643        balances.insert(AssetId::from("USDT"), Decimal::new(7_500, 0));
1644        let exchange = ExchangeId::from("binance_perp");
1645        let target = AssetId::from_code(exchange, "USDT");
1646        assert_eq!(
1647            initial_balance_for_asset(&balances, target),
1648            Decimal::new(7_500, 0)
1649        );
1650    }
1651}
1652
1653impl LiveRunArgs {
1654    async fn run(&self, config: &AppConfig) -> Result<()> {
1655        let exchange_names = if self.exchanges.is_empty() {
1656            vec![self.exchange.clone()]
1657        } else {
1658            let mut names = self.exchanges.clone();
1659            if !names.contains(&self.exchange) {
1660                names.insert(0, self.exchange.clone());
1661            }
1662            names
1663        };
1664        let profiles = config.exchange_profiles();
1665        let mut named_exchanges = Vec::new();
1666        for name in exchange_names {
1667            let config_entry = profiles
1668                .get(&name)
1669                .cloned()
1670                .ok_or_else(|| anyhow!("exchange profile {} not found", name))?;
1671            named_exchanges.push(NamedExchange {
1672                name,
1673                config: config_entry,
1674            });
1675        }
1676
1677        let contents = fs::read_to_string(&self.strategy_config)
1678            .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1679        let def: StrategyConfigFile =
1680            toml::from_str(&contents).context("failed to parse strategy config file")?;
1681        let strategy = load_strategy(&def.name, def.params)
1682            .with_context(|| format!("failed to configure strategy {}", def.name))?;
1683        let symbols = strategy.subscriptions();
1684        if symbols.is_empty() {
1685            bail!("strategy did not declare any subscriptions");
1686        }
1687        if self.quantity <= Decimal::ZERO {
1688            bail!("--quantity must be greater than zero");
1689        }
1690        let quantity = self.quantity;
1691        let initial_balances = self.resolved_initial_balances(config);
1692        let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1693        let markets_file = self
1694            .markets_file
1695            .clone()
1696            .or_else(|| config.backtest.markets_file.clone());
1697
1698        let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1699        let category =
1700            PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1701        let metrics_addr = self.resolved_metrics_addr(config)?;
1702        let control_addr = self.resolved_control_addr(config)?;
1703        let persistence_cfg = config.live.persistence_config();
1704        let persistence_engine = self
1705            .persistence
1706            .map(PersistenceEngine::from)
1707            .unwrap_or(persistence_cfg.engine);
1708        let state_path = self
1709            .state_path
1710            .clone()
1711            .unwrap_or_else(|| persistence_cfg.path.clone());
1712        let persistence = PersistenceSettings::new(persistence_engine, state_path);
1713        let alerting = self.build_alerting(config);
1714        let history = self.history.max(32);
1715        let reconciliation_interval = self.reconciliation_interval(config);
1716        let reconciliation_threshold = self.reconciliation_threshold(config);
1717        let orderbook_depth = self
1718            .orderbook_depth
1719            .unwrap_or(super::live::default_order_book_depth());
1720        let panic_close = PanicCloseConfig {
1721            mode: self.panic_mode.into(),
1722            limit_offset_bps: self.panic_limit_offset_bps.max(Decimal::ZERO),
1723        };
1724
1725        let settings = LiveSessionSettings {
1726            category,
1727            interval,
1728            quantity,
1729            slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1730            fee_bps: self.fee_bps.max(Decimal::ZERO),
1731            history,
1732            metrics_addr,
1733            persistence,
1734            initial_balances,
1735            reporting_currency,
1736            markets_file,
1737            alerting,
1738            exec_backend: self.exec,
1739            risk: self.build_risk_config(config),
1740            reconciliation_interval,
1741            reconciliation_threshold,
1742            orderbook_depth,
1743            record_path: Some(self.record_data.clone()),
1744            control_addr,
1745            panic_close,
1746        };
1747
1748        let exchange_labels: Vec<String> = named_exchanges
1749            .iter()
1750            .map(|ex| format!("{} ({})", ex.name, ex.config.driver))
1751            .collect();
1752
1753        info!(
1754            strategy = %def.name,
1755            symbols = ?symbols,
1756            exchanges = ?exchange_labels,
1757            interval = %self.interval,
1758            exec = ?self.exec,
1759            persistence_engine = ?settings.persistence.engine,
1760            state_path = %settings.persistence.state_path.display(),
1761            control_addr = %settings.control_addr,
1762            "starting live session"
1763        );
1764
1765        run_live(strategy, symbols, named_exchanges, settings)
1766            .await
1767            .context("live session failed")
1768    }
1769}
1770
1771fn list_strategies() {
1772    println!("Built-in strategies:");
1773    for name in builtin_strategy_names() {
1774        println!("- {name}");
1775    }
1776}
1777
1778fn print_validation_summary(outcome: &ValidationOutcome) {
1779    const MAX_EXAMPLES: usize = 5;
1780    let summary = &outcome.summary;
1781    println!(
1782        "Validation summary for {} ({} candles)",
1783        summary.symbol, summary.rows
1784    );
1785    println!(
1786        "  Range: {} -> {}",
1787        summary.start.to_rfc3339(),
1788        summary.end.to_rfc3339()
1789    );
1790    println!("  Interval: {}", interval_label(summary.interval));
1791    println!("  Missing intervals: {}", summary.missing_candles);
1792    println!("  Duplicate intervals: {}", summary.duplicate_candles);
1793    println!("  Zero-volume candles: {}", summary.zero_volume_candles);
1794    println!("  Price spikes flagged: {}", summary.price_spike_count);
1795    println!(
1796        "  Cross-source mismatches: {}",
1797        summary.cross_mismatch_count
1798    );
1799    println!("  Repaired candles generated: {}", summary.repaired_candles);
1800
1801    if !outcome.gaps.is_empty() {
1802        println!("  Gap examples:");
1803        for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1804            println!(
1805                "    {} -> {} (missing {})",
1806                gap.start.to_rfc3339(),
1807                gap.end.to_rfc3339(),
1808                gap.missing
1809            );
1810        }
1811        if outcome.gaps.len() > MAX_EXAMPLES {
1812            println!(
1813                "    ... {} additional gap(s) omitted",
1814                outcome.gaps.len() - MAX_EXAMPLES
1815            );
1816        }
1817    }
1818
1819    if !outcome.price_spikes.is_empty() {
1820        println!("  Price spike examples:");
1821        for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1822            println!(
1823                "    {} (change {:.2}%)",
1824                spike.timestamp.to_rfc3339(),
1825                spike.change_fraction * 100.0
1826            );
1827        }
1828        if outcome.price_spikes.len() > MAX_EXAMPLES {
1829            println!(
1830                "    ... {} additional spike(s) omitted",
1831                outcome.price_spikes.len() - MAX_EXAMPLES
1832            );
1833        }
1834    }
1835
1836    if !outcome.cross_mismatches.is_empty() {
1837        println!("  Cross-source mismatch examples:");
1838        for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1839            println!(
1840                "    {} primary {:.4} vs ref {:.4} ({:.2}%)",
1841                miss.timestamp.to_rfc3339(),
1842                miss.primary_close,
1843                miss.reference_close,
1844                miss.delta_fraction * 100.0
1845            );
1846        }
1847        if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1848            println!(
1849                "    ... {} additional mismatch(es) omitted",
1850                outcome.cross_mismatches.len() - MAX_EXAMPLES
1851            );
1852        }
1853    }
1854}
1855
1856fn print_report(report: &PerformanceReport) {
1857    println!("\n{}", report);
1858}
1859
1860fn synth_candles(symbol: Symbol, len: usize, offset_minutes: i64) -> Vec<Candle> {
1861    let mut candles = Vec::with_capacity(len);
1862    for i in 0..len {
1863        let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1864        let open = base + (i as f64 % 3.0) * 10.0;
1865        let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1866        let open_dec =
1867            Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1868        let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1869        let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1870        let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1871        candles.push(Candle {
1872            symbol,
1873            interval: Interval::OneMinute,
1874            open: open_dec,
1875            high,
1876            low,
1877            close: close_dec,
1878            volume: Decimal::ONE,
1879            timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1880                + Duration::minutes(offset_minutes),
1881        });
1882    }
1883    candles
1884}
1885
1886fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1887    if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1888        return Ok(dt.with_timezone(&Utc));
1889    }
1890    if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1891        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1892    }
1893    if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1894        let dt = date
1895            .and_hms_opt(0, 0, 0)
1896            .ok_or_else(|| anyhow!("invalid date"))?;
1897        return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1898    }
1899    Err(anyhow!("unable to parse datetime '{value}'"))
1900}
1901
1902#[derive(Deserialize)]
1903struct CandleCsvRow {
1904    symbol: Option<String>,
1905    timestamp: String,
1906    open: f64,
1907    high: f64,
1908    low: f64,
1909    close: f64,
1910    volume: f64,
1911}
1912
1913fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1914    let mut candles = Vec::new();
1915    for path in paths {
1916        let mut reader = csv::Reader::from_path(path)
1917            .with_context(|| format!("failed to open {}", path.display()))?;
1918        for record in reader.deserialize::<CandleCsvRow>() {
1919            let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1920            let timestamp = parse_datetime(&row.timestamp)?;
1921            let symbol_code = row
1922                .symbol
1923                .clone()
1924                .or_else(|| infer_symbol_from_path(path))
1925                .ok_or_else(|| {
1926                    anyhow!(
1927                        "missing symbol column and unable to infer from path {}",
1928                        path.display()
1929                    )
1930                })?;
1931            let symbol = Symbol::from(symbol_code.as_str());
1932            let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1933            let open = Decimal::from_f64(row.open).ok_or_else(|| {
1934                anyhow!("invalid open value '{}' in {}", row.open, path.display())
1935            })?;
1936            let high = Decimal::from_f64(row.high).ok_or_else(|| {
1937                anyhow!("invalid high value '{}' in {}", row.high, path.display())
1938            })?;
1939            let low = Decimal::from_f64(row.low)
1940                .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1941            let close = Decimal::from_f64(row.close).ok_or_else(|| {
1942                anyhow!("invalid close value '{}' in {}", row.close, path.display())
1943            })?;
1944            let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1945                anyhow!(
1946                    "invalid volume value '{}' in {}",
1947                    row.volume,
1948                    path.display()
1949                )
1950            })?;
1951            candles.push(Candle {
1952                symbol,
1953                interval,
1954                open,
1955                high,
1956                low,
1957                close,
1958                volume,
1959                timestamp,
1960            });
1961        }
1962    }
1963    Ok(candles)
1964}
1965
1966#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1967enum DataFormat {
1968    Csv,
1969    Parquet,
1970}
1971
1972fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1973    let mut detected: Option<DataFormat> = None;
1974    for path in paths {
1975        let ext = path
1976            .extension()
1977            .and_then(|ext| ext.to_str())
1978            .map(|ext| ext.to_ascii_lowercase())
1979            .unwrap_or_else(|| String::from(""));
1980        let current = if ext == "parquet" {
1981            DataFormat::Parquet
1982        } else {
1983            DataFormat::Csv
1984        };
1985        if let Some(existing) = detected {
1986            if existing != current {
1987                bail!("cannot mix CSV and Parquet inputs in --data");
1988            }
1989        } else {
1990            detected = Some(current);
1991        }
1992    }
1993    detected.ok_or_else(|| anyhow!("no data paths provided"))
1994}
1995
1996fn memory_market_stream(symbol: Symbol, candles: Vec<Candle>) -> BacktestStream {
1997    Box::new(PaperMarketStream::from_data(symbol, Vec::new(), candles))
1998}
1999
2000fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
2001    Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
2002}
2003
2004#[derive(Deserialize)]
2005#[serde(tag = "event", rename_all = "lowercase")]
2006enum LobEventRow {
2007    Snapshot {
2008        timestamp: String,
2009        symbol: Option<String>,
2010        bids: Vec<[f64; 2]>,
2011        asks: Vec<[f64; 2]>,
2012    },
2013    Depth {
2014        timestamp: String,
2015        symbol: Option<String>,
2016        bids: Vec<[f64; 2]>,
2017        asks: Vec<[f64; 2]>,
2018    },
2019    Trade {
2020        timestamp: String,
2021        symbol: Option<String>,
2022        side: String,
2023        price: f64,
2024        size: f64,
2025    },
2026}
2027
2028fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
2029    let mut events = Vec::new();
2030    for path in paths {
2031        let file = File::open(path)
2032            .with_context(|| format!("failed to open order book file {}", path.display()))?;
2033        let symbol_hint = infer_symbol_from_path(path);
2034        for line in BufReader::new(file).lines() {
2035            let line =
2036                line.with_context(|| format!("failed to read line from {}", path.display()))?;
2037            if line.trim().is_empty() {
2038                continue;
2039            }
2040            let row: LobEventRow = serde_json::from_str(&line)
2041                .with_context(|| format!("invalid order book event in {}", path.display()))?;
2042            match row {
2043                LobEventRow::Snapshot {
2044                    timestamp,
2045                    symbol,
2046                    bids,
2047                    asks,
2048                } => {
2049                    let ts = parse_datetime(&timestamp)?;
2050                    let symbol_code = symbol
2051                        .or_else(|| symbol_hint.clone())
2052                        .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
2053                    let symbol = Symbol::from(symbol_code.as_str());
2054                    let bids = convert_levels(&bids)?;
2055                    let asks = convert_levels(&asks)?;
2056                    let book = OrderBook {
2057                        symbol,
2058                        bids,
2059                        asks,
2060                        timestamp: ts,
2061                        exchange_checksum: None,
2062                        local_checksum: None,
2063                    };
2064                    events.push(MarketEvent {
2065                        timestamp: ts,
2066                        kind: MarketEventKind::OrderBook(book),
2067                    });
2068                }
2069                LobEventRow::Depth {
2070                    timestamp,
2071                    symbol,
2072                    bids,
2073                    asks,
2074                } => {
2075                    let ts = parse_datetime(&timestamp)?;
2076                    let symbol_code = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
2077                        anyhow!("missing symbol in depth update {}", path.display())
2078                    })?;
2079                    let symbol = Symbol::from(symbol_code.as_str());
2080                    let bids = convert_levels(&bids)?;
2081                    let asks = convert_levels(&asks)?;
2082                    let update = DepthUpdate {
2083                        symbol,
2084                        bids,
2085                        asks,
2086                        timestamp: ts,
2087                    };
2088                    events.push(MarketEvent {
2089                        timestamp: ts,
2090                        kind: MarketEventKind::Depth(update),
2091                    });
2092                }
2093                LobEventRow::Trade {
2094                    timestamp,
2095                    symbol,
2096                    side,
2097                    price,
2098                    size,
2099                } => {
2100                    let ts = parse_datetime(&timestamp)?;
2101                    let symbol_code = symbol
2102                        .or_else(|| symbol_hint.clone())
2103                        .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
2104                    let symbol = Symbol::from(symbol_code.as_str());
2105                    let side = match side.to_lowercase().as_str() {
2106                        "buy" | "bid" | "b" => Side::Buy,
2107                        "sell" | "ask" | "s" => Side::Sell,
2108                        other => bail!("unsupported trade side '{other}' in {}", path.display()),
2109                    };
2110                    let price = Decimal::from_f64(price).ok_or_else(|| {
2111                        anyhow!("invalid trade price '{}' in {}", price, path.display())
2112                    })?;
2113                    let size = Decimal::from_f64(size).ok_or_else(|| {
2114                        anyhow!("invalid trade size '{}' in {}", size, path.display())
2115                    })?;
2116                    let tick = Tick {
2117                        symbol,
2118                        price,
2119                        size,
2120                        side,
2121                        exchange_timestamp: ts,
2122                        received_at: ts,
2123                    };
2124                    events.push(MarketEvent {
2125                        timestamp: ts,
2126                        kind: MarketEventKind::Trade(tick),
2127                    });
2128                }
2129            }
2130        }
2131    }
2132    events.sort_by_key(|event| event.timestamp);
2133    Ok(events)
2134}
2135
2136fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
2137    levels
2138        .iter()
2139        .map(|pair| {
2140            let price = Decimal::from_f64(pair[0])
2141                .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
2142            let size = Decimal::from_f64(pair[1])
2143                .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
2144            Ok(OrderBookLevel { price, size })
2145        })
2146        .collect()
2147}
2148
2149fn infer_symbol_from_path(path: &Path) -> Option<String> {
2150    path.parent()
2151        .and_then(|p| p.file_name())
2152        .map(|os| os.to_string_lossy().to_string())
2153}
2154
2155fn infer_interval_from_path(path: &Path) -> Option<Interval> {
2156    path.file_stem()
2157        .and_then(|os| os.to_str())
2158        .and_then(|stem| stem.split('_').next())
2159        .and_then(|token| Interval::from_str(token).ok())
2160}
2161
2162fn default_output_path(
2163    config: &AppConfig,
2164    exchange: &str,
2165    symbol: &str,
2166    interval: Interval,
2167    start: DateTime<Utc>,
2168    end: DateTime<Utc>,
2169) -> PathBuf {
2170    let interval_part = interval_label(interval);
2171    let start_part = start.format("%Y%m%d").to_string();
2172    let end_part = end.format("%Y%m%d").to_string();
2173    config
2174        .data_path
2175        .join(exchange)
2176        .join(symbol)
2177        .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
2178}
2179
2180fn default_tick_output_path(
2181    config: &AppConfig,
2182    exchange: &str,
2183    symbol: &str,
2184    start: DateTime<Utc>,
2185    end: DateTime<Utc>,
2186) -> PathBuf {
2187    let start_part = start.format("%Y%m%dT%H%M%S").to_string();
2188    let end_part = end.format("%Y%m%dT%H%M%S").to_string();
2189    config
2190        .data_path
2191        .join("ticks")
2192        .join(exchange)
2193        .join(format!("{symbol}_{start_part}-{end_part}.parquet"))
2194}
2195
2196fn default_tick_partition_dir(config: &AppConfig, exchange: &str, symbol: &str) -> PathBuf {
2197    config.data_path.join("ticks").join(exchange).join(symbol)
2198}
2199
2200fn partition_path(base_dir: &Path, date: NaiveDate) -> PathBuf {
2201    base_dir.join(format!("{}.parquet", date.format("%Y-%m-%d")))
2202}
2203
2204fn interval_label(interval: Interval) -> &'static str {
2205    match interval {
2206        Interval::OneSecond => "1s",
2207        Interval::OneMinute => "1m",
2208        Interval::FiveMinutes => "5m",
2209        Interval::FifteenMinutes => "15m",
2210        Interval::OneHour => "1h",
2211        Interval::FourHours => "4h",
2212        Interval::OneDay => "1d",
2213    }
2214}
2215
2216#[derive(Serialize)]
2217struct CandleRow {
2218    symbol: String,
2219    timestamp: String,
2220    open: f64,
2221    high: f64,
2222    low: f64,
2223    close: f64,
2224    volume: f64,
2225}
2226
2227fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
2228    if let Some(parent) = path.parent() {
2229        fs::create_dir_all(parent)
2230            .with_context(|| format!("failed to create directory {}", parent.display()))?;
2231    }
2232    let mut writer =
2233        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
2234    for candle in candles {
2235        let row = CandleRow {
2236            symbol: candle.symbol.code().to_string(),
2237            timestamp: candle.timestamp.to_rfc3339(),
2238            open: candle.open.to_f64().unwrap_or(0.0),
2239            high: candle.high.to_f64().unwrap_or(0.0),
2240            low: candle.low.to_f64().unwrap_or(0.0),
2241            close: candle.close.to_f64().unwrap_or(0.0),
2242            volume: candle.volume.to_f64().unwrap_or(0.0),
2243        };
2244        writer.serialize(&row)?;
2245    }
2246    writer.flush()?;
2247    Ok(())
2248}
2249
2250#[derive(Serialize)]
2251struct BatchRow {
2252    config: String,
2253    signals: usize,
2254    orders: usize,
2255    dropped_orders: usize,
2256    ending_equity: f64,
2257}
2258
2259fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
2260    if let Some(parent) = path.parent() {
2261        fs::create_dir_all(parent)
2262            .with_context(|| format!("failed to create directory {}", parent.display()))?;
2263    }
2264    let mut writer =
2265        Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
2266    for row in rows {
2267        writer.serialize(row)?;
2268    }
2269    writer.flush()?;
2270    Ok(())
2271}
2272
2273fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<AssetId, Decimal> {
2274    config
2275        .initial_balances
2276        .iter()
2277        .map(|(currency, amount)| (AssetId::from(currency.as_str()), *amount))
2278        .collect()
2279}
2280
2281fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
2282    config
2283        .initial_balances
2284        .get(&config.reporting_currency)
2285        .copied()
2286        .unwrap_or_default()
2287}
2288
2289fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
2290    let parts: Vec<_> = value.split(':').collect();
2291    match parts.as_slice() {
2292        ["fixed", val] => {
2293            let quantity =
2294                Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
2295            Ok(Box::new(FixedOrderSizer { quantity }))
2296        }
2297        ["fixed"] => {
2298            let quantity = cli_quantity.unwrap_or(Decimal::ONE);
2299            Ok(Box::new(FixedOrderSizer { quantity }))
2300        }
2301        ["percent", val] => {
2302            let percent =
2303                Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
2304            Ok(Box::new(PortfolioPercentSizer {
2305                percent: percent.max(Decimal::ZERO),
2306            }))
2307        }
2308        ["risk-adjusted", val] => {
2309            let risk_fraction = Decimal::from_str(val)
2310                .context("invalid risk fraction value (use decimals)")?;
2311            Ok(Box::new(RiskAdjustedSizer {
2312                risk_fraction: risk_fraction.max(Decimal::ZERO),
2313            }))
2314        }
2315        _ => Err(anyhow!(
2316            "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
2317        )),
2318    }
2319}
2320
2321#[derive(Clone, Copy, Debug, ValueEnum)]
2322pub enum PanicModeArg {
2323    Market,
2324    AggressiveLimit,
2325}
2326
2327impl From<PanicModeArg> for PanicCloseMode {
2328    fn from(arg: PanicModeArg) -> Self {
2329        match arg {
2330            PanicModeArg::Market => PanicCloseMode::Market,
2331            PanicModeArg::AggressiveLimit => PanicCloseMode::AggressiveLimit,
2332        }
2333    }
2334}