1use crate::alerts::sanitize_webhook;
2use crate::analyze;
3use crate::data_validation::{validate_dataset, ValidationConfig, ValidationOutcome};
4use crate::live::{
5 run_live, ExecutionBackend, LiveSessionSettings, NamedExchange, PersistenceBackend,
6 PersistenceSettings,
7};
8use crate::state;
9use crate::telemetry::init_tracing;
10use crate::tui;
11use crate::PublicChannel;
12use arrow::util::pretty::print_batches;
13use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
14use std::collections::HashMap;
15use std::fs::{self, File};
16use std::io::{BufRead, BufReader};
17use std::net::SocketAddr;
18use std::path::{Path, PathBuf};
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::Duration as StdDuration;
22
23use anyhow::{anyhow, bail, Context, Result};
24use chrono::{DateTime, Days, Duration, NaiveDate, NaiveDateTime, Utc};
25use clap::{Args, Parser, Subcommand, ValueEnum};
26use csv::Writer;
27use futures::StreamExt;
28use rust_decimal::{
29 prelude::{FromPrimitive, ToPrimitive},
30 Decimal,
31};
32use serde::{Deserialize, Serialize};
33use tesser_backtester::reporting::PerformanceReport;
34use tesser_backtester::{
35 stream_from_events, BacktestConfig, BacktestMode, BacktestStream, Backtester, MarketEvent,
36 MarketEventKind, MarketEventStream,
37};
38use tesser_broker::{ExecutionClient, RouterExecutionClient};
39use tesser_config::{load_config, AppConfig, PersistenceEngine, RiskManagementConfig};
40use tesser_core::{
41 AssetId, Candle, DepthUpdate, ExchangeId, Interval, OrderBook, OrderBookLevel, Side, Symbol,
42 Tick,
43};
44use tesser_data::analytics::ExecutionAnalysisRequest;
45use tesser_data::download::{
46 BinanceDownloader, BybitDownloader, KlineRequest, NormalizedTrade, TradeRequest, TradeSource,
47};
48use tesser_data::io::{self, DatasetFormat as IoDatasetFormat, TicksWriter};
49use tesser_data::merger::UnifiedEventStream;
50use tesser_data::parquet::ParquetMarketStream;
51use tesser_data::transform::Resampler;
52use tesser_execution::{
53 ExecutionEngine, FixedOrderSizer, NoopRiskChecker, OrderSizer, PanicCloseConfig,
54 PanicCloseMode, PortfolioPercentSizer, RiskAdjustedSizer,
55};
56use tesser_markets::MarketRegistry;
57use tesser_paper::{
58 FeeModel, FeeScheduleConfig, MatchingEngine, MatchingEngineConfig, PaperExecutionClient,
59 PaperMarketStream, QueueModel,
60};
61use tesser_strategy::{builtin_strategy_names, load_strategy};
62use tracing::{info, warn};
63
64#[derive(Parser)]
65#[command(author, version, about = "Tesser Trading Framework")]
66pub struct Cli {
67 #[arg(short, long, action = clap::ArgAction::Count)]
69 verbose: u8,
70 #[arg(long, default_value = "default")]
72 env: String,
73 #[command(subcommand)]
74 command: Commands,
75}
76
77#[allow(clippy::large_enum_variant)]
78#[derive(Subcommand)]
79pub enum Commands {
80 Data {
82 #[command(subcommand)]
83 action: DataCommand,
84 },
85 Backtest {
87 #[command(subcommand)]
88 action: BacktestCommand,
89 },
90 Live {
92 #[command(subcommand)]
93 action: LiveCommand,
94 },
95 State {
97 #[command(subcommand)]
98 action: StateCommand,
99 },
100 Strategies,
102 Analyze {
104 #[command(subcommand)]
105 action: AnalyzeCommand,
106 },
107 Monitor(MonitorArgs),
109}
110
111#[derive(Subcommand)]
112pub enum DataCommand {
113 Download(DataDownloadArgs),
115 DownloadTrades(DataDownloadTradesArgs),
117 Validate(DataValidateArgs),
119 Resample(DataResampleArgs),
121 InspectParquet(DataInspectParquetArgs),
123}
124
125#[derive(Subcommand)]
126pub enum BacktestCommand {
127 Run(BacktestRunArgs),
129 Batch(BacktestBatchArgs),
131}
132
133#[derive(Subcommand)]
134pub enum LiveCommand {
135 Run(LiveRunArgs),
137}
138
139#[derive(Subcommand)]
140pub enum StateCommand {
141 Inspect(StateInspectArgs),
143}
144
145#[derive(Subcommand)]
146pub enum AnalyzeCommand {
147 Execution(AnalyzeExecutionArgs),
149}
150
151#[derive(Args)]
152pub struct DataDownloadArgs {
153 #[arg(long, default_value = "bybit")]
154 exchange: String,
155 #[arg(long)]
156 symbol: String,
157 #[arg(long, default_value = "linear")]
158 category: String,
159 #[arg(long, default_value = "1m")]
160 interval: String,
161 #[arg(long)]
162 start: String,
163 #[arg(long)]
164 end: Option<String>,
165 #[arg(long)]
166 output: Option<PathBuf>,
167 #[arg(long)]
169 skip_validation: bool,
170 #[arg(long)]
172 repair_missing: bool,
173 #[arg(long, default_value_t = 0.05)]
175 validation_jump_threshold: f64,
176 #[arg(long, default_value_t = 0.002)]
178 validation_reference_tolerance: f64,
179}
180
181#[derive(Args)]
182pub struct DataDownloadTradesArgs {
183 #[arg(long, default_value = "bybit")]
184 exchange: String,
185 #[arg(long)]
186 symbol: String,
187 #[arg(long, default_value = "linear")]
189 category: String,
190 #[arg(long)]
191 start: String,
192 #[arg(long)]
193 end: Option<String>,
194 #[arg(long)]
195 output: Option<PathBuf>,
196 #[arg(long)]
198 partition_by_day: bool,
199 #[arg(long, default_value_t = 1000)]
201 limit: usize,
202 #[arg(long, value_enum, default_value_t = TradeSourceArg::Rest)]
204 source: TradeSourceArg,
205 #[arg(long)]
207 resume: bool,
208 #[arg(long)]
210 bybit_public_url: Option<String>,
211 #[arg(long, value_enum, default_value_t = BinanceMarketArg::FuturesUm)]
213 binance_market: BinanceMarketArg,
214}
215
216#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
217pub enum TradeSourceArg {
218 Rest,
219 #[value(name = "bybit-public")]
220 BybitPublic,
221 #[value(name = "binance-public")]
222 BinancePublic,
223}
224
225#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
226pub enum BinanceMarketArg {
227 #[value(name = "spot")]
228 Spot,
229 #[value(name = "futures-um")]
230 FuturesUm,
231 #[value(name = "futures-cm")]
232 FuturesCm,
233}
234
235impl BinanceMarketArg {
236 fn base_path(self) -> &'static str {
237 match self {
238 Self::Spot => "https://data.binance.vision/data/spot/daily/aggTrades",
239 Self::FuturesUm => "https://data.binance.vision/data/futures/um/daily/aggTrades",
240 Self::FuturesCm => "https://data.binance.vision/data/futures/cm/daily/aggTrades",
241 }
242 }
243}
244
245#[derive(Args)]
246pub struct StateInspectArgs {
247 #[arg(long)]
249 path: Option<PathBuf>,
250 #[arg(long)]
252 raw: bool,
253}
254
255#[derive(Args)]
256pub struct AnalyzeExecutionArgs {
257 #[arg(long, value_name = "PATH", default_value = "data/flight_recorder")]
259 data_dir: PathBuf,
260 #[arg(long)]
262 start: Option<String>,
263 #[arg(long)]
265 end: Option<String>,
266 #[arg(long, value_name = "PATH")]
268 export_csv: Option<PathBuf>,
269}
270
271#[derive(Args)]
272pub struct MonitorArgs {
273 #[arg(long)]
275 control_addr: Option<String>,
276 #[arg(long, default_value_t = 250)]
278 tick_rate: u64,
279}
280
281impl StateInspectArgs {
282 fn resolved_path(&self, config: &AppConfig) -> PathBuf {
283 self.path
284 .clone()
285 .unwrap_or_else(|| config.live.persistence_config().path.clone())
286 }
287
288 fn resolved_engine(&self, config: &AppConfig) -> PersistenceEngine {
289 config.live.persistence_config().engine
290 }
291}
292
293impl MonitorArgs {
294 async fn run(&self, config: &AppConfig) -> Result<()> {
295 let addr = self
296 .control_addr
297 .clone()
298 .unwrap_or_else(|| config.live.control_addr.clone());
299 let refresh = self.tick_rate.max(50);
300 let monitor_config = tui::MonitorConfig::new(addr, StdDuration::from_millis(refresh));
301 tui::run_monitor(monitor_config).await
302 }
303}
304
305impl AnalyzeExecutionArgs {
306 fn build_request(&self) -> Result<ExecutionAnalysisRequest> {
307 let start = match &self.start {
308 Some(value) => Some(parse_datetime(value)?),
309 None => None,
310 };
311 let end = match &self.end {
312 Some(value) => Some(parse_datetime(value)?),
313 None => None,
314 };
315 Ok(ExecutionAnalysisRequest {
316 data_dir: self.data_dir.clone(),
317 start,
318 end,
319 })
320 }
321}
322
323impl DataDownloadArgs {
324 async fn run(&self, config: &AppConfig) -> Result<()> {
325 let exchange_cfg = config
326 .exchange
327 .get(&self.exchange)
328 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
329 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
330 let start = parse_datetime(&self.start)?;
331 let end = match &self.end {
332 Some(value) => parse_datetime(value)?,
333 None => Utc::now(),
334 };
335 if start >= end {
336 return Err(anyhow!("start time must be earlier than end time"));
337 }
338
339 info!(
340 "Downloading {} candles for {} ({})",
341 self.interval, self.symbol, self.exchange
342 );
343 let mut candles = match exchange_cfg.driver.as_str() {
344 "bybit" | "" => {
345 let downloader = BybitDownloader::new(&exchange_cfg.rest_url);
346 let request = KlineRequest::new(&self.category, &self.symbol, interval, start, end);
347 downloader
348 .download_klines(&request)
349 .await
350 .with_context(|| "failed to download candles from Bybit")?
351 }
352 "binance" => {
353 let downloader = BinanceDownloader::new(&exchange_cfg.rest_url);
354 let request = KlineRequest::new("", &self.symbol, interval, start, end);
355 downloader
356 .download_klines(&request)
357 .await
358 .with_context(|| "failed to download candles from Binance")?
359 }
360 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
361 };
362
363 if candles.is_empty() {
364 info!("No candles returned for {}", self.symbol);
365 return Ok(());
366 }
367
368 if !self.skip_validation {
369 let config = ValidationConfig {
370 price_jump_threshold: self.validation_jump_threshold.max(f64::EPSILON),
371 reference_tolerance: self.validation_reference_tolerance.max(f64::EPSILON),
372 repair_missing: self.repair_missing,
373 };
374 let outcome =
375 validate_dataset(candles.clone(), None, config).context("validation failed")?;
376 print_validation_summary(&outcome);
377 if self.repair_missing && outcome.summary.repaired_candles > 0 {
378 candles = outcome.repaired;
379 info!(
380 "Applied {} synthetic candle(s) to repair gaps",
381 outcome.summary.repaired_candles
382 );
383 }
384 }
385
386 let output_path = self.output.clone().unwrap_or_else(|| {
387 default_output_path(config, &self.exchange, &self.symbol, interval, start, end)
388 });
389 write_candles_csv(&output_path, &candles)?;
390 info!(
391 "Saved {} candles to {}",
392 candles.len(),
393 output_path.display()
394 );
395 Ok(())
396 }
397}
398
399impl DataDownloadTradesArgs {
400 async fn run(&self, config: &AppConfig) -> Result<()> {
401 let exchange_cfg = config
402 .exchange
403 .get(&self.exchange)
404 .ok_or_else(|| anyhow!("exchange profile '{}' not found in config", self.exchange))?;
405 let start = parse_datetime(&self.start)?;
406 let end = match &self.end {
407 Some(value) => parse_datetime(value)?,
408 None => Utc::now(),
409 };
410 if start >= end {
411 return Err(anyhow!("start time must be earlier than end time"));
412 }
413
414 info!(
415 "Downloading trades for {} on {} ({} -> {})",
416 self.symbol, self.exchange, start, end
417 );
418
419 let cache_dir = config
420 .data_path
421 .join("ticks")
422 .join("cache")
423 .join(&self.exchange)
424 .join(&self.symbol);
425 let mut base_request = TradeRequest::new(&self.symbol, start, end)
426 .with_limit(self.limit)
427 .with_archive_cache_dir(cache_dir)
428 .with_resume_archives(self.resume);
429 let driver = exchange_cfg.driver.as_str();
430 match driver {
431 "bybit" | "" => {
432 if self.source == TradeSourceArg::BinancePublic {
433 bail!("Binance public archives are not available for Bybit");
434 }
435 base_request = base_request.with_category(&self.category);
436 if self.source == TradeSourceArg::BybitPublic {
437 base_request = base_request.with_source(TradeSource::BybitPublicArchive);
438 if let Some(url) = &self.bybit_public_url {
439 base_request = base_request.with_public_data_url(url);
440 }
441 }
442 }
443 "binance" => match self.source {
444 TradeSourceArg::Rest => {}
445 TradeSourceArg::BinancePublic => {
446 base_request = base_request
447 .with_source(TradeSource::BinancePublicArchive)
448 .with_public_data_url(self.binance_market.base_path());
449 }
450 TradeSourceArg::BybitPublic => {
451 bail!("Bybit public archives are not available for Binance");
452 }
453 },
454 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
455 }
456
457 if self.partition_by_day {
458 self.download_partitioned(config, exchange_cfg, base_request, start, end)
459 .await
460 } else {
461 let trades = self
462 .fetch_trades(driver, &exchange_cfg.rest_url, base_request)
463 .await?;
464 if trades.is_empty() {
465 info!("No trades returned for {}", self.symbol);
466 return Ok(());
467 }
468 self.write_single(config, trades, start, end)
469 }
470 }
471
472 async fn download_partitioned(
473 &self,
474 config: &AppConfig,
475 exchange_cfg: &tesser_config::ExchangeConfig,
476 base_request: TradeRequest<'_>,
477 start: DateTime<Utc>,
478 end: DateTime<Utc>,
479 ) -> Result<()> {
480 let base_dir = self
481 .output
482 .clone()
483 .unwrap_or_else(|| default_tick_partition_dir(config, &self.exchange, &self.symbol));
484 let mut current = start.date_naive();
485 let final_date = end.date_naive();
486 let mut total = 0usize;
487 let mut files_written = 0usize;
488
489 while current <= final_date {
490 let next_date = current.checked_add_days(Days::new(1)).unwrap_or(current);
491 let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
492 current
493 .and_hms_opt(0, 0, 0)
494 .ok_or_else(|| anyhow!("invalid date {}", current))?,
495 Utc,
496 )
497 .max(start);
498 let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
499 next_date
500 .and_hms_opt(0, 0, 0)
501 .ok_or_else(|| anyhow!("invalid date {}", next_date))?,
502 Utc,
503 )
504 .min(end);
505
506 if day_start >= day_end {
507 if next_date == current {
508 break;
509 }
510 current = next_date;
511 continue;
512 }
513
514 let partition_path = partition_path(&base_dir, current);
515 if self.resume && partition_path.exists() {
516 info!(
517 "Skipping {} because {} already exists",
518 current,
519 partition_path.display()
520 );
521 if next_date == current {
522 break;
523 }
524 current = next_date;
525 continue;
526 }
527
528 let mut day_request = base_request.clone();
529 day_request.start = day_start;
530 day_request.end = day_end;
531 let trades = self
532 .fetch_trades(
533 exchange_cfg.driver.as_str(),
534 &exchange_cfg.rest_url,
535 day_request,
536 )
537 .await?;
538
539 if trades.is_empty() {
540 info!("No trades returned for {}", current);
541 if next_date == current {
542 break;
543 }
544 current = next_date;
545 continue;
546 }
547
548 let mut writer = TicksWriter::new(&partition_path);
549 let count = trades.len();
550 writer.extend(trades.into_iter().map(|trade| (trade.trade_id, trade.tick)));
551 writer.finish()?;
552 total += count;
553 files_written += 1;
554 info!(
555 "Saved {} raw trades for {} to {}",
556 count,
557 current,
558 partition_path.display()
559 );
560
561 if next_date == current {
562 break;
563 }
564 current = next_date;
565 }
566
567 info!(
568 "Partitioned {} raw trades across {} file(s) under {}",
569 total,
570 files_written,
571 base_dir.display()
572 );
573 Ok(())
574 }
575
576 async fn fetch_trades(
577 &self,
578 driver: &str,
579 rest_url: &str,
580 request: TradeRequest<'_>,
581 ) -> Result<Vec<NormalizedTrade>> {
582 match driver {
583 "bybit" | "" => {
584 let downloader = BybitDownloader::new(rest_url);
585 downloader
586 .download_trades(&request)
587 .await
588 .with_context(|| "failed to download trades from Bybit")
589 }
590 "binance" => {
591 let downloader = BinanceDownloader::new(rest_url);
592 downloader
593 .download_trades(&request)
594 .await
595 .with_context(|| "failed to download trades from Binance")
596 }
597 other => bail!("unknown exchange driver '{other}' for {}", self.exchange),
598 }
599 }
600
601 fn write_single(
602 &self,
603 config: &AppConfig,
604 trades: Vec<NormalizedTrade>,
605 start: DateTime<Utc>,
606 end: DateTime<Utc>,
607 ) -> Result<()> {
608 let output_path = self.output.clone().unwrap_or_else(|| {
609 default_tick_output_path(config, &self.exchange, &self.symbol, start, end)
610 });
611 let total = trades.len();
612 let mut writer = TicksWriter::new(&output_path);
613 writer.extend(trades.into_iter().map(|trade| (trade.trade_id, trade.tick)));
614 writer.finish()?;
615 info!("Saved {} raw trades to {}", total, output_path.display());
616 Ok(())
617 }
618}
619
620#[derive(Args)]
621pub struct DataValidateArgs {
622 #[arg(
624 long = "path",
625 value_name = "PATH",
626 num_args = 1..,
627 action = clap::ArgAction::Append
628 )]
629 paths: Vec<PathBuf>,
630 #[arg(
632 long = "reference",
633 value_name = "PATH",
634 num_args = 1..,
635 action = clap::ArgAction::Append
636 )]
637 reference_paths: Vec<PathBuf>,
638 #[arg(long, default_value_t = 0.05)]
640 jump_threshold: f64,
641 #[arg(long, default_value_t = 0.002)]
643 reference_tolerance: f64,
644 #[arg(long)]
646 repair_missing: bool,
647 #[arg(long)]
649 output: Option<PathBuf>,
650}
651
652impl DataValidateArgs {
653 fn run(&self) -> Result<()> {
654 if self.paths.is_empty() {
655 bail!("provide at least one --path for validation");
656 }
657 let candles =
658 load_candles_from_paths(&self.paths).with_context(|| "failed to load dataset")?;
659 if candles.is_empty() {
660 bail!("loaded dataset is empty; nothing to validate");
661 }
662 let reference = if self.reference_paths.is_empty() {
663 None
664 } else {
665 Some(
666 load_candles_from_paths(&self.reference_paths)
667 .with_context(|| "failed to load reference dataset")?,
668 )
669 };
670
671 let price_jump_threshold = if self.jump_threshold <= 0.0 {
672 0.0001
673 } else {
674 self.jump_threshold
675 };
676 let reference_tolerance = if self.reference_tolerance <= 0.0 {
677 0.0001
678 } else {
679 self.reference_tolerance
680 };
681
682 let config = ValidationConfig {
683 price_jump_threshold,
684 reference_tolerance,
685 repair_missing: self.repair_missing,
686 };
687
688 let outcome = validate_dataset(candles, reference, config)?;
689 print_validation_summary(&outcome);
690
691 if let Some(output) = &self.output {
692 write_candles_csv(output, &outcome.repaired)?;
693 info!(
694 "Wrote {} candles ({} new) to {}",
695 outcome.repaired.len(),
696 outcome.summary.repaired_candles,
697 output.display()
698 );
699 } else if self.repair_missing && outcome.summary.repaired_candles > 0 {
700 warn!(
701 "Detected {} gap(s) filled with synthetic candles but --output was not provided",
702 outcome.summary.repaired_candles
703 );
704 }
705
706 Ok(())
707 }
708}
709
710impl DataResampleArgs {
711 fn run(&self) -> Result<()> {
712 if !self.input.exists() {
713 bail!("input file {} not found", self.input.display());
714 }
715 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
716 let dataset = io::read_dataset(&self.input)
717 .with_context(|| format!("failed to read {}", self.input.display()))?;
718 let io::CandleDataset {
719 format: input_format,
720 candles,
721 } = dataset;
722 if candles.is_empty() {
723 bail!(
724 "dataset {} does not contain any candles",
725 self.input.display()
726 );
727 }
728 let input_len = candles.len();
729 let resampled = Resampler::resample(candles, interval);
730 if resampled.is_empty() {
731 bail!(
732 "no candles produced after resampling {}",
733 self.input.display()
734 );
735 }
736 let output_format = self
737 .format
738 .map(IoDatasetFormat::from)
739 .unwrap_or_else(|| IoDatasetFormat::from_path(&self.output));
740 io::write_dataset(&self.output, output_format, &resampled)
741 .with_context(|| format!("failed to write {}", self.output.display()))?;
742 info!(
743 "Resampled {} -> {} candles (input {:?} -> output {:?}) to {}",
744 input_len,
745 resampled.len(),
746 input_format,
747 output_format,
748 self.output.display()
749 );
750 Ok(())
751 }
752}
753
754#[derive(Args)]
755pub struct DataResampleArgs {
756 #[arg(long)]
757 input: PathBuf,
758 #[arg(long)]
759 output: PathBuf,
760 #[arg(long, default_value = "1h")]
761 interval: String,
762 #[arg(long, value_enum)]
764 format: Option<DatasetFormatArg>,
765}
766
767#[derive(Copy, Clone, Debug, ValueEnum)]
768pub enum DatasetFormatArg {
769 Csv,
770 Parquet,
771}
772
773impl From<DatasetFormatArg> for IoDatasetFormat {
774 fn from(value: DatasetFormatArg) -> Self {
775 match value {
776 DatasetFormatArg::Csv => IoDatasetFormat::Csv,
777 DatasetFormatArg::Parquet => IoDatasetFormat::Parquet,
778 }
779 }
780}
781
782#[derive(Args)]
783pub struct DataInspectParquetArgs {
784 #[arg(value_name = "PATH")]
786 path: PathBuf,
787 #[arg(long, default_value_t = 10)]
789 rows: usize,
790}
791
792impl DataInspectParquetArgs {
793 fn run(&self) -> Result<()> {
794 let limit = if self.rows == 0 {
795 usize::MAX
796 } else {
797 self.rows
798 };
799 let file = File::open(&self.path)
800 .with_context(|| format!("failed to open {}", self.path.display()))?;
801 let batch_size = limit.clamp(1, 8192);
802 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?
803 .with_batch_size(batch_size)
804 .build()?;
805
806 let mut printed = 0usize;
807 while printed < limit {
808 match reader.next() {
809 Some(Ok(batch)) => {
810 if batch.num_rows() == 0 {
811 continue;
812 }
813 let remaining = limit.saturating_sub(printed);
814 let take = remaining.min(batch.num_rows());
815 let display_batch = if take == batch.num_rows() {
816 batch
817 } else {
818 batch.slice(0, take)
819 };
820 print_batches(std::slice::from_ref(&display_batch))?;
821 printed += take;
822 }
823 Some(Err(err)) => return Err(err.into()),
824 None => break,
825 }
826 }
827
828 if printed == 0 {
829 println!("no rows available in {}", self.path.display());
830 } else if limit != usize::MAX {
831 println!("displayed {printed} row(s) from {}", self.path.display());
832 }
833
834 Ok(())
835 }
836}
837
838#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
839pub enum BacktestModeArg {
840 Candle,
841 Tick,
842}
843
844#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)]
845pub enum QueueModelArg {
846 Conserv,
847 Optimistic,
848}
849
850impl From<QueueModelArg> for QueueModel {
851 fn from(value: QueueModelArg) -> Self {
852 match value {
853 QueueModelArg::Conserv => QueueModel::Conservative,
854 QueueModelArg::Optimistic => QueueModel::Optimistic,
855 }
856 }
857}
858
859#[derive(Args)]
860pub struct BacktestRunArgs {
861 #[arg(long)]
862 strategy_config: PathBuf,
863 #[arg(long = "data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
865 data_paths: Vec<PathBuf>,
866 #[arg(long, default_value_t = 500)]
867 candles: usize,
868 #[arg(long, default_value = "0.01")]
869 quantity: Decimal,
870 #[arg(long, default_value = "0")]
872 slippage_bps: Decimal,
873 #[arg(long, default_value = "0")]
875 fee_bps: Decimal,
876 #[arg(long = "fee-schedule")]
878 fee_schedule: Option<PathBuf>,
879 #[arg(long, default_value_t = 1)]
881 latency_candles: usize,
882 #[arg(long, default_value = "fixed:0.01")]
884 sizer: String,
885 #[arg(long, value_enum, default_value = "candle")]
887 mode: BacktestModeArg,
888 #[arg(long = "lob-data", value_name = "PATH", num_args = 0.., action = clap::ArgAction::Append)]
890 lob_paths: Vec<PathBuf>,
891 #[arg(long = "sim-latency-ms", default_value_t = 0)]
893 sim_latency_ms: u64,
894 #[arg(long = "sim-queue-model", value_enum, default_value = "conserv")]
896 sim_queue_model: QueueModelArg,
897 #[arg(long)]
898 markets_file: Option<PathBuf>,
899}
900
901enum LobSource {
902 Json(Vec<PathBuf>),
903 FlightRecorder(PathBuf),
904}
905
906#[derive(Args)]
907pub struct BacktestBatchArgs {
908 #[arg(long = "config", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
910 config_paths: Vec<PathBuf>,
911 #[arg(long = "data", value_name = "PATH", num_args = 1.., action = clap::ArgAction::Append)]
913 data_paths: Vec<PathBuf>,
914 #[arg(long, default_value = "0.01")]
915 quantity: Decimal,
916 #[arg(long)]
918 output: Option<PathBuf>,
919 #[arg(long, default_value = "0")]
921 slippage_bps: Decimal,
922 #[arg(long, default_value = "0")]
924 fee_bps: Decimal,
925 #[arg(long = "fee-schedule")]
927 fee_schedule: Option<PathBuf>,
928 #[arg(long, default_value_t = 1)]
930 latency_candles: usize,
931 #[arg(long, default_value = "fixed:0.01")]
933 sizer: String,
934 #[arg(long)]
935 markets_file: Option<PathBuf>,
936}
937
938#[derive(Args)]
939pub struct LiveRunArgs {
940 #[arg(long)]
941 strategy_config: PathBuf,
942 #[arg(long, default_value = "paper_sandbox")]
943 exchange: String,
944 #[arg(long = "exchanges", value_name = "NAME", num_args = 0.., action = clap::ArgAction::Append)]
948 exchanges: Vec<String>,
949 #[arg(long, default_value = "linear")]
950 category: String,
951 #[arg(long, default_value = "1m")]
952 interval: String,
953 #[arg(long, default_value = "1")]
954 quantity: Decimal,
955 #[arg(
957 long = "exec",
958 default_value = "paper",
959 value_enum,
960 alias = "live-exec"
961 )]
962 exec: ExecutionBackend,
963 #[arg(long)]
965 state_path: Option<PathBuf>,
966 #[arg(long, value_enum)]
968 persistence: Option<PersistenceBackend>,
969 #[arg(long)]
970 metrics_addr: Option<String>,
971 #[arg(long)]
972 log_path: Option<PathBuf>,
973 #[arg(
975 long = "record-data",
976 value_name = "PATH",
977 default_value = "data/flight_recorder"
978 )]
979 record_data: PathBuf,
980 #[arg(long)]
982 control_addr: Option<String>,
983 #[arg(long)]
984 initial_equity: Option<Decimal>,
985 #[arg(long)]
986 markets_file: Option<PathBuf>,
987 #[arg(long, default_value = "0")]
988 slippage_bps: Decimal,
989 #[arg(long, default_value = "0")]
990 fee_bps: Decimal,
991 #[arg(long, default_value_t = 0)]
992 latency_ms: u64,
993 #[arg(long, default_value_t = 512)]
994 history: usize,
995 #[arg(long)]
996 reconciliation_interval_secs: Option<u64>,
997 #[arg(long)]
998 reconciliation_threshold: Option<Decimal>,
999 #[arg(long)]
1000 webhook_url: Option<String>,
1001 #[arg(long)]
1002 alert_max_data_gap_secs: Option<u64>,
1003 #[arg(long)]
1004 alert_max_order_failures: Option<u32>,
1005 #[arg(long)]
1006 alert_max_drawdown: Option<Decimal>,
1007 #[arg(long)]
1008 risk_max_order_qty: Option<Decimal>,
1009 #[arg(long)]
1010 risk_max_order_notional: Option<Decimal>,
1011 #[arg(long)]
1012 risk_max_position_qty: Option<Decimal>,
1013 #[arg(long)]
1014 risk_max_drawdown: Option<Decimal>,
1015 #[arg(long)]
1017 orderbook_depth: Option<usize>,
1018 #[arg(long, default_value = "fixed:1.0")]
1020 sizer: String,
1021 #[arg(long, value_enum, default_value = "market")]
1023 panic_mode: PanicModeArg,
1024 #[arg(long, default_value = "50")]
1026 panic_limit_offset_bps: Decimal,
1027}
1028
1029impl LiveRunArgs {
1030 fn resolved_log_path(&self, config: &AppConfig) -> PathBuf {
1031 self.log_path
1032 .clone()
1033 .unwrap_or_else(|| config.live.log_path.clone())
1034 }
1035
1036 fn resolved_metrics_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
1037 let addr = self
1038 .metrics_addr
1039 .clone()
1040 .unwrap_or_else(|| config.live.metrics_addr.clone());
1041 addr.parse()
1042 .with_context(|| format!("invalid metrics address '{addr}'"))
1043 }
1044
1045 fn resolved_control_addr(&self, config: &AppConfig) -> Result<SocketAddr> {
1046 let addr = self
1047 .control_addr
1048 .clone()
1049 .unwrap_or_else(|| config.live.control_addr.clone());
1050 addr.parse()
1051 .with_context(|| format!("invalid control address '{addr}'"))
1052 }
1053
1054 fn reconciliation_interval(&self, config: &AppConfig) -> StdDuration {
1055 let secs = self
1056 .reconciliation_interval_secs
1057 .unwrap_or(config.live.reconciliation_interval_secs)
1058 .max(1);
1059 StdDuration::from_secs(secs)
1060 }
1061
1062 fn reconciliation_threshold(&self, config: &AppConfig) -> Decimal {
1063 let configured = self
1064 .reconciliation_threshold
1065 .unwrap_or(config.live.reconciliation_threshold);
1066 if configured <= Decimal::ZERO {
1067 config.live.reconciliation_threshold.max(Decimal::new(1, 6))
1068 } else {
1069 configured
1070 }
1071 }
1072
1073 fn resolved_initial_balances(&self, config: &AppConfig) -> HashMap<AssetId, Decimal> {
1074 let mut balances = clone_initial_balances(&config.backtest);
1075 if let Some(value) = self.initial_equity {
1076 let reporting = AssetId::from(config.backtest.reporting_currency.as_str());
1077 balances.insert(reporting, value.max(Decimal::ZERO));
1078 }
1079 balances
1080 }
1081
1082 fn build_alerting(&self, config: &AppConfig) -> tesser_config::AlertingConfig {
1083 let mut alerting = config.live.alerting.clone();
1084 let webhook = self
1085 .webhook_url
1086 .clone()
1087 .or_else(|| alerting.webhook_url.clone());
1088 alerting.webhook_url = sanitize_webhook(webhook);
1089 if let Some(sec) = self.alert_max_data_gap_secs {
1090 alerting.max_data_gap_secs = sec;
1091 }
1092 if let Some(limit) = self.alert_max_order_failures {
1093 alerting.max_order_failures = limit;
1094 }
1095 if let Some(limit) = self.alert_max_drawdown {
1096 alerting.max_drawdown = limit.max(Decimal::ZERO);
1097 }
1098 alerting
1099 }
1100
1101 fn build_risk_config(&self, config: &AppConfig) -> RiskManagementConfig {
1102 let mut risk = config.risk_management.clone();
1103 if let Some(limit) = self.risk_max_order_qty {
1104 risk.max_order_quantity = limit.max(Decimal::ZERO);
1105 }
1106 if let Some(limit) = self.risk_max_order_notional {
1107 risk.max_order_notional = (limit > Decimal::ZERO).then_some(limit);
1108 }
1109 if let Some(limit) = self.risk_max_position_qty {
1110 risk.max_position_quantity = limit.max(Decimal::ZERO);
1111 }
1112 if let Some(limit) = self.risk_max_drawdown {
1113 risk.max_drawdown = limit.max(Decimal::ZERO);
1114 }
1115 risk
1116 }
1117}
1118
1119#[derive(Deserialize)]
1120struct StrategyConfigFile {
1121 #[serde(rename = "strategy_name")]
1122 name: String,
1123 #[serde(default = "empty_table")]
1124 params: toml::Value,
1125}
1126
1127fn empty_table() -> toml::Value {
1128 toml::Value::Table(Default::default())
1129}
1130
1131pub async fn run() -> Result<()> {
1132 let cli = Cli::parse();
1133 let config = load_config(Some(&cli.env)).context("failed to load configuration")?;
1134
1135 let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| match cli.verbose {
1136 0 => config.log_level.clone(),
1137 1 => "debug".to_string(),
1138 _ => "trace".to_string(),
1139 });
1140
1141 let log_override = match &cli.command {
1142 Commands::Live {
1143 action: LiveCommand::Run(args),
1144 } => Some(args.resolved_log_path(&config)),
1145 _ => None,
1146 };
1147
1148 init_tracing(&filter, log_override.as_deref()).context("failed to initialize logging")?;
1149
1150 match cli.command {
1151 Commands::Data { action } => handle_data(action, &config).await?,
1152 Commands::Backtest {
1153 action: BacktestCommand::Run(args),
1154 } => args.run(&config).await?,
1155 Commands::Backtest {
1156 action: BacktestCommand::Batch(args),
1157 } => args.run(&config).await?,
1158 Commands::Live {
1159 action: LiveCommand::Run(args),
1160 } => args.run(&config).await?,
1161 Commands::State { action } => handle_state(action, &config).await?,
1162 Commands::Analyze { action } => handle_analyze(action)?,
1163 Commands::Strategies => list_strategies(),
1164 Commands::Monitor(args) => args.run(&config).await?,
1165 }
1166
1167 Ok(())
1168}
1169
1170async fn handle_data(cmd: DataCommand, config: &AppConfig) -> Result<()> {
1171 match cmd {
1172 DataCommand::Download(args) => {
1173 args.run(config).await?;
1174 }
1175 DataCommand::DownloadTrades(args) => {
1176 args.run(config).await?;
1177 }
1178 DataCommand::Validate(args) => {
1179 args.run()?;
1180 }
1181 DataCommand::Resample(args) => {
1182 args.run()?;
1183 }
1184 DataCommand::InspectParquet(args) => {
1185 args.run()?;
1186 }
1187 }
1188 Ok(())
1189}
1190
1191async fn handle_state(cmd: StateCommand, config: &AppConfig) -> Result<()> {
1192 match cmd {
1193 StateCommand::Inspect(args) => {
1194 state::inspect_state(
1195 args.resolved_path(config),
1196 args.resolved_engine(config),
1197 args.raw,
1198 )
1199 .await?;
1200 }
1201 }
1202 Ok(())
1203}
1204
1205fn handle_analyze(cmd: AnalyzeCommand) -> Result<()> {
1206 match cmd {
1207 AnalyzeCommand::Execution(args) => {
1208 analyze::run_execution(args.build_request()?, args.export_csv.as_deref())
1209 }
1210 }
1211}
1212
1213impl BacktestRunArgs {
1214 async fn run(&self, config: &AppConfig) -> Result<()> {
1215 let contents = std::fs::read_to_string(&self.strategy_config)
1216 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1217 let def: StrategyConfigFile =
1218 toml::from_str(&contents).context("failed to parse strategy config file")?;
1219 let strategy = load_strategy(&def.name, def.params)
1220 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1221 let symbols = strategy.subscriptions();
1222 if symbols.is_empty() {
1223 return Err(anyhow::anyhow!("strategy did not declare subscriptions"));
1224 }
1225
1226 let mode = match self.mode {
1227 BacktestModeArg::Candle => BacktestMode::Candle,
1228 BacktestModeArg::Tick => BacktestMode::Tick,
1229 };
1230
1231 let markets_path = self
1232 .markets_file
1233 .clone()
1234 .or_else(|| config.backtest.markets_file.clone())
1235 .ok_or_else(|| anyhow!("backtest requires --markets-file or backtest.markets_file"))?;
1236 let market_registry = Arc::new(
1237 MarketRegistry::load_from_file(&markets_path).with_context(|| {
1238 format!("failed to load markets from {}", markets_path.display())
1239 })?,
1240 );
1241 let fee_schedule = self.resolve_fee_schedule()?;
1242 let fee_model_template = fee_schedule.build_model();
1243 let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1244 let initial_balances = clone_initial_balances(&config.backtest);
1245
1246 let (market_stream, event_stream, execution_client, matching_engine) = match mode {
1247 BacktestMode::Candle => {
1248 let stream = self.build_candle_stream(&symbols)?;
1249 let exec_client = build_sim_execution_client(
1250 "paper-backtest",
1251 &symbols,
1252 self.slippage_bps,
1253 fee_model_template.clone(),
1254 &initial_balances,
1255 reporting_currency,
1256 )
1257 .await;
1258 (Some(stream), None, exec_client, None)
1259 }
1260 BacktestMode::Tick => {
1261 if self.lob_paths.is_empty() {
1262 bail!("--lob-data is required when --mode tick");
1263 }
1264 let source = self.detect_lob_source()?;
1265 let latency_ms = self.sim_latency_ms.min(i64::MAX as u64);
1266 let fee_model = fee_model_template.clone();
1267 let execution_client = build_sim_execution_client(
1268 "paper-tick",
1269 &symbols,
1270 self.slippage_bps,
1271 fee_model_template.clone(),
1272 &initial_balances,
1273 reporting_currency,
1274 )
1275 .await;
1276 let engine = Arc::new(MatchingEngine::with_config(
1277 "matching-engine",
1278 symbols.clone(),
1279 reporting_balance(&config.backtest),
1280 MatchingEngineConfig {
1281 latency: Duration::milliseconds(latency_ms as i64),
1282 queue_model: self.sim_queue_model.into(),
1283 fee_model: fee_model.clone(),
1284 cash_asset: Some(reporting_currency),
1285 },
1286 ));
1287 let stream = match source {
1288 LobSource::Json(paths) => {
1289 let events = load_lob_events_from_paths(&paths)?;
1290 if events.is_empty() {
1291 bail!("no order book events loaded from --lob-data");
1292 }
1293 stream_from_events(events)
1294 }
1295 LobSource::FlightRecorder(root) => self
1296 .build_flight_recorder_stream(&root, &symbols)
1297 .context("failed to initialize flight recorder stream")?,
1298 };
1299 (None, Some(stream), execution_client, Some(engine))
1300 }
1301 };
1302
1303 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1304 let order_quantity = self.quantity;
1305 let execution = ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1306
1307 let mut cfg = BacktestConfig::new(symbols[0]);
1308 cfg.order_quantity = order_quantity;
1309 cfg.initial_balances = initial_balances.clone();
1310 cfg.reporting_currency = reporting_currency;
1311 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1312 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1313 cfg.execution.latency_candles = self.latency_candles.max(1);
1314 cfg.mode = mode;
1315
1316 let report = Backtester::new(
1317 cfg,
1318 strategy,
1319 execution,
1320 matching_engine,
1321 market_registry,
1322 market_stream,
1323 event_stream,
1324 )
1325 .run()
1326 .await
1327 .context("backtest failed")?;
1328 print_report(&report);
1329 Ok(())
1330 }
1331
1332 fn resolve_fee_schedule(&self) -> Result<FeeScheduleConfig> {
1333 if let Some(path) = &self.fee_schedule {
1334 load_fee_schedule_file(path)
1335 } else {
1336 Ok(FeeScheduleConfig::with_defaults(
1337 self.fee_bps.max(Decimal::ZERO),
1338 self.fee_bps.max(Decimal::ZERO),
1339 ))
1340 }
1341 }
1342
1343 fn build_candle_stream(&self, symbols: &[Symbol]) -> Result<BacktestStream> {
1344 if symbols.is_empty() {
1345 bail!("strategy did not declare any subscriptions");
1346 }
1347 if self.data_paths.is_empty() {
1348 let mut generated = Vec::new();
1349 for (idx, symbol) in symbols.iter().enumerate() {
1350 let offset = idx as i64 * 10;
1351 generated.extend(synth_candles(*symbol, self.candles, offset));
1352 }
1353 generated.sort_by_key(|c| c.timestamp);
1354 if generated.is_empty() {
1355 bail!("no synthetic candles generated; provide --data files instead");
1356 }
1357 return Ok(memory_market_stream(symbols[0], generated));
1358 }
1359
1360 match detect_data_format(&self.data_paths)? {
1361 DataFormat::Csv => {
1362 let mut candles = load_candles_from_paths(&self.data_paths)?;
1363 if candles.is_empty() {
1364 bail!("no candles loaded from --data paths");
1365 }
1366 candles.sort_by_key(|c| c.timestamp);
1367 Ok(memory_market_stream(symbols[0], candles))
1368 }
1369 DataFormat::Parquet => Ok(parquet_market_stream(symbols, self.data_paths.clone())),
1370 }
1371 }
1372
1373 fn detect_lob_source(&self) -> Result<LobSource> {
1374 if self.lob_paths.len() == 1 {
1375 let path = &self.lob_paths[0];
1376 if path.is_dir() {
1377 return Ok(LobSource::FlightRecorder(path.clone()));
1378 }
1379 }
1380 if self.lob_paths.iter().all(|path| path.is_file()) {
1381 return Ok(LobSource::Json(self.lob_paths.clone()));
1382 }
1383 bail!(
1384 "tick mode expects either JSONL files or a single flight-recorder directory via --lob-data"
1385 )
1386 }
1387
1388 fn build_flight_recorder_stream(
1389 &self,
1390 root: &Path,
1391 symbols: &[Symbol],
1392 ) -> Result<MarketEventStream> {
1393 let stream = UnifiedEventStream::from_flight_recorder(root, symbols)?
1394 .into_stream()
1395 .map(|event| event.map(MarketEvent::from));
1396 Ok(Box::pin(stream))
1397 }
1398}
1399
1400fn load_fee_schedule_file(path: &Path) -> Result<FeeScheduleConfig> {
1401 let contents = fs::read_to_string(path)
1402 .with_context(|| format!("failed to read fee schedule {}", path.display()))?;
1403 let cfg = match path.extension().and_then(|ext| ext.to_str()) {
1404 Some(ext) if ext.eq_ignore_ascii_case("json") => serde_json::from_str(&contents)
1405 .with_context(|| format!("failed to parse JSON fee schedule {}", path.display()))?,
1406 _ => toml::from_str(&contents)
1407 .with_context(|| format!("failed to parse TOML fee schedule {}", path.display()))?,
1408 };
1409 Ok(cfg)
1410}
1411
1412impl BacktestBatchArgs {
1413 async fn run(&self, config: &AppConfig) -> Result<()> {
1414 if self.config_paths.is_empty() {
1415 return Err(anyhow!("provide at least one --config path"));
1416 }
1417 if self.data_paths.is_empty() {
1418 return Err(anyhow!("provide at least one --data path for batch mode"));
1419 }
1420 let markets_path = self
1421 .markets_file
1422 .clone()
1423 .or_else(|| config.backtest.markets_file.clone())
1424 .ok_or_else(|| {
1425 anyhow!("batch mode requires --markets-file or backtest.markets_file")
1426 })?;
1427 let market_registry = Arc::new(
1428 MarketRegistry::load_from_file(&markets_path).with_context(|| {
1429 format!("failed to load markets from {}", markets_path.display())
1430 })?,
1431 );
1432 let data_format = detect_data_format(&self.data_paths)?;
1433 let mut aggregated = Vec::new();
1434 let fee_schedule = if let Some(path) = &self.fee_schedule {
1435 load_fee_schedule_file(path)?
1436 } else {
1437 FeeScheduleConfig::with_defaults(
1438 self.fee_bps.max(Decimal::ZERO),
1439 self.fee_bps.max(Decimal::ZERO),
1440 )
1441 };
1442 let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1443 let initial_balances = clone_initial_balances(&config.backtest);
1444 for config_path in &self.config_paths {
1445 let contents = std::fs::read_to_string(config_path).with_context(|| {
1446 format!("failed to read strategy config {}", config_path.display())
1447 })?;
1448 let def: StrategyConfigFile =
1449 toml::from_str(&contents).context("failed to parse strategy config file")?;
1450 let strategy = load_strategy(&def.name, def.params)
1451 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1452 let sizer = parse_sizer(&self.sizer, Some(self.quantity))?;
1453 let order_quantity = self.quantity;
1454 let symbols = strategy.subscriptions();
1455 if symbols.is_empty() {
1456 bail!("strategy {} did not declare subscriptions", strategy.name());
1457 }
1458 let stream = match data_format {
1459 DataFormat::Csv => {
1460 let mut candles = load_candles_from_paths(&self.data_paths)?;
1461 if candles.is_empty() {
1462 bail!("no candles loaded from --data paths");
1463 }
1464 candles.sort_by_key(|c| c.timestamp);
1465 memory_market_stream(symbols[0], candles)
1466 }
1467 DataFormat::Parquet => parquet_market_stream(&symbols, self.data_paths.clone()),
1468 };
1469 let execution_client = build_sim_execution_client(
1470 &format!("paper-batch-{}", def.name),
1471 &symbols,
1472 self.slippage_bps,
1473 fee_schedule.build_model(),
1474 &initial_balances,
1475 reporting_currency,
1476 )
1477 .await;
1478 let execution =
1479 ExecutionEngine::new(execution_client, sizer, Arc::new(NoopRiskChecker));
1480 let mut cfg = BacktestConfig::new(symbols[0]);
1481 cfg.order_quantity = order_quantity;
1482 cfg.initial_balances = initial_balances.clone();
1483 cfg.reporting_currency = reporting_currency;
1484 cfg.execution.slippage_bps = self.slippage_bps.max(Decimal::ZERO);
1485 cfg.execution.fee_bps = self.fee_bps.max(Decimal::ZERO);
1486 cfg.execution.latency_candles = self.latency_candles.max(1);
1487
1488 let report = Backtester::new(
1489 cfg,
1490 strategy,
1491 execution,
1492 None,
1493 market_registry.clone(),
1494 Some(stream),
1495 None,
1496 )
1497 .run()
1498 .await
1499 .with_context(|| format!("backtest failed for {}", config_path.display()))?;
1500 aggregated.push(BatchRow {
1501 config: config_path.display().to_string(),
1502 signals: 0, orders: 0, dropped_orders: 0, ending_equity: report.ending_equity,
1506 });
1507 }
1508
1509 if let Some(output) = &self.output {
1510 write_batch_report(output, &aggregated)?;
1511 println!("Batch report written to {}", output.display());
1512 }
1513 if aggregated.is_empty() {
1514 return Err(anyhow!("no batch jobs executed"));
1515 }
1516 Ok(())
1517 }
1518}
1519
1520async fn build_sim_execution_client(
1521 name_prefix: &str,
1522 symbols: &[Symbol],
1523 slippage_bps: Decimal,
1524 fee_model: Arc<dyn FeeModel>,
1525 initial_balances: &HashMap<AssetId, Decimal>,
1526 reporting_currency: AssetId,
1527) -> Arc<dyn ExecutionClient> {
1528 let mut grouped: HashMap<ExchangeId, Vec<Symbol>> = HashMap::new();
1529 for symbol in symbols {
1530 grouped.entry(symbol.exchange).or_default().push(*symbol);
1531 }
1532 if grouped.len() <= 1 {
1533 let (exchange, group) = grouped
1534 .into_iter()
1535 .next()
1536 .unwrap_or((ExchangeId::UNSPECIFIED, symbols.to_vec()));
1537 return build_paper_client_for_exchange(
1538 name_prefix,
1539 exchange,
1540 group,
1541 slippage_bps,
1542 fee_model,
1543 initial_balances,
1544 reporting_currency,
1545 )
1546 .await;
1547 }
1548 let mut routes = HashMap::new();
1549 for (exchange, group) in grouped {
1550 let client = build_paper_client_for_exchange(
1551 name_prefix,
1552 exchange,
1553 group,
1554 slippage_bps,
1555 fee_model.clone(),
1556 initial_balances,
1557 reporting_currency,
1558 )
1559 .await;
1560 routes.insert(exchange, client);
1561 }
1562 Arc::new(RouterExecutionClient::new(routes))
1563}
1564
1565async fn build_paper_client_for_exchange(
1566 name_prefix: &str,
1567 exchange: ExchangeId,
1568 symbols: Vec<Symbol>,
1569 slippage_bps: Decimal,
1570 fee_model: Arc<dyn FeeModel>,
1571 initial_balances: &HashMap<AssetId, Decimal>,
1572 reporting_currency: AssetId,
1573) -> Arc<dyn ExecutionClient> {
1574 let cash_asset = cash_asset_for_exchange(reporting_currency, exchange);
1575 let client = Arc::new(PaperExecutionClient::with_cash_asset(
1576 format!("{name_prefix}-{}", exchange),
1577 symbols,
1578 slippage_bps,
1579 fee_model,
1580 cash_asset,
1581 ));
1582 let balance = initial_balance_for_asset(initial_balances, cash_asset);
1583 client.initialize_balance(cash_asset, balance).await;
1584 let exec: Arc<dyn ExecutionClient> = client;
1585 exec
1586}
1587
1588fn cash_asset_for_exchange(reporting: AssetId, exchange: ExchangeId) -> AssetId {
1589 if reporting.exchange.is_specified() {
1590 if reporting.exchange == exchange {
1591 reporting
1592 } else {
1593 AssetId::from_code(exchange, reporting.code())
1594 }
1595 } else if exchange.is_specified() {
1596 AssetId::from_code(exchange, reporting.code())
1597 } else {
1598 reporting
1599 }
1600}
1601
1602fn initial_balance_for_asset(balances: &HashMap<AssetId, Decimal>, target: AssetId) -> Decimal {
1603 if let Some(amount) = balances
1604 .iter()
1605 .find(|(asset, _)| asset.exchange == target.exchange && asset.code() == target.code())
1606 .map(|(_, amount)| *amount)
1607 {
1608 return amount;
1609 }
1610 if target.exchange.is_specified() {
1611 if let Some(amount) = balances
1612 .iter()
1613 .find(|(asset, _)| !asset.exchange.is_specified() && asset.code() == target.code())
1614 .map(|(_, amount)| *amount)
1615 {
1616 return amount;
1617 }
1618 }
1619 Decimal::from(10_000)
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use super::*;
1625
1626 #[test]
1627 fn initial_balance_prefers_exchange_specific_entry() {
1628 let mut balances = HashMap::new();
1629 let exchange = ExchangeId::from("bybit_linear");
1630 let global = AssetId::from("USDT");
1631 let target = AssetId::from_code(exchange, "USDT");
1632 balances.insert(global, Decimal::new(5_000, 0));
1633 balances.insert(target, Decimal::new(2_500, 0));
1634 assert_eq!(
1635 initial_balance_for_asset(&balances, target),
1636 Decimal::new(2_500, 0)
1637 );
1638 }
1639
1640 #[test]
1641 fn initial_balance_falls_back_to_unspecified_exchange() {
1642 let mut balances = HashMap::new();
1643 balances.insert(AssetId::from("USDT"), Decimal::new(7_500, 0));
1644 let exchange = ExchangeId::from("binance_perp");
1645 let target = AssetId::from_code(exchange, "USDT");
1646 assert_eq!(
1647 initial_balance_for_asset(&balances, target),
1648 Decimal::new(7_500, 0)
1649 );
1650 }
1651}
1652
1653impl LiveRunArgs {
1654 async fn run(&self, config: &AppConfig) -> Result<()> {
1655 let exchange_names = if self.exchanges.is_empty() {
1656 vec![self.exchange.clone()]
1657 } else {
1658 let mut names = self.exchanges.clone();
1659 if !names.contains(&self.exchange) {
1660 names.insert(0, self.exchange.clone());
1661 }
1662 names
1663 };
1664 let profiles = config.exchange_profiles();
1665 let mut named_exchanges = Vec::new();
1666 for name in exchange_names {
1667 let config_entry = profiles
1668 .get(&name)
1669 .cloned()
1670 .ok_or_else(|| anyhow!("exchange profile {} not found", name))?;
1671 named_exchanges.push(NamedExchange {
1672 name,
1673 config: config_entry,
1674 });
1675 }
1676
1677 let contents = fs::read_to_string(&self.strategy_config)
1678 .with_context(|| format!("failed to read {}", self.strategy_config.display()))?;
1679 let def: StrategyConfigFile =
1680 toml::from_str(&contents).context("failed to parse strategy config file")?;
1681 let strategy = load_strategy(&def.name, def.params)
1682 .with_context(|| format!("failed to configure strategy {}", def.name))?;
1683 let symbols = strategy.subscriptions();
1684 if symbols.is_empty() {
1685 bail!("strategy did not declare any subscriptions");
1686 }
1687 if self.quantity <= Decimal::ZERO {
1688 bail!("--quantity must be greater than zero");
1689 }
1690 let quantity = self.quantity;
1691 let initial_balances = self.resolved_initial_balances(config);
1692 let reporting_currency = AssetId::from(config.backtest.reporting_currency.as_str());
1693 let markets_file = self
1694 .markets_file
1695 .clone()
1696 .or_else(|| config.backtest.markets_file.clone());
1697
1698 let interval: Interval = self.interval.parse().map_err(|err: String| anyhow!(err))?;
1699 let category =
1700 PublicChannel::from_str(&self.category).map_err(|err| anyhow!(err.to_string()))?;
1701 let metrics_addr = self.resolved_metrics_addr(config)?;
1702 let control_addr = self.resolved_control_addr(config)?;
1703 let persistence_cfg = config.live.persistence_config();
1704 let persistence_engine = self
1705 .persistence
1706 .map(PersistenceEngine::from)
1707 .unwrap_or(persistence_cfg.engine);
1708 let state_path = self
1709 .state_path
1710 .clone()
1711 .unwrap_or_else(|| persistence_cfg.path.clone());
1712 let persistence = PersistenceSettings::new(persistence_engine, state_path);
1713 let alerting = self.build_alerting(config);
1714 let history = self.history.max(32);
1715 let reconciliation_interval = self.reconciliation_interval(config);
1716 let reconciliation_threshold = self.reconciliation_threshold(config);
1717 let orderbook_depth = self
1718 .orderbook_depth
1719 .unwrap_or(super::live::default_order_book_depth());
1720 let panic_close = PanicCloseConfig {
1721 mode: self.panic_mode.into(),
1722 limit_offset_bps: self.panic_limit_offset_bps.max(Decimal::ZERO),
1723 };
1724
1725 let settings = LiveSessionSettings {
1726 category,
1727 interval,
1728 quantity,
1729 slippage_bps: self.slippage_bps.max(Decimal::ZERO),
1730 fee_bps: self.fee_bps.max(Decimal::ZERO),
1731 history,
1732 metrics_addr,
1733 persistence,
1734 initial_balances,
1735 reporting_currency,
1736 markets_file,
1737 alerting,
1738 exec_backend: self.exec,
1739 risk: self.build_risk_config(config),
1740 reconciliation_interval,
1741 reconciliation_threshold,
1742 orderbook_depth,
1743 record_path: Some(self.record_data.clone()),
1744 control_addr,
1745 panic_close,
1746 };
1747
1748 let exchange_labels: Vec<String> = named_exchanges
1749 .iter()
1750 .map(|ex| format!("{} ({})", ex.name, ex.config.driver))
1751 .collect();
1752
1753 info!(
1754 strategy = %def.name,
1755 symbols = ?symbols,
1756 exchanges = ?exchange_labels,
1757 interval = %self.interval,
1758 exec = ?self.exec,
1759 persistence_engine = ?settings.persistence.engine,
1760 state_path = %settings.persistence.state_path.display(),
1761 control_addr = %settings.control_addr,
1762 "starting live session"
1763 );
1764
1765 run_live(strategy, symbols, named_exchanges, settings)
1766 .await
1767 .context("live session failed")
1768 }
1769}
1770
1771fn list_strategies() {
1772 println!("Built-in strategies:");
1773 for name in builtin_strategy_names() {
1774 println!("- {name}");
1775 }
1776}
1777
1778fn print_validation_summary(outcome: &ValidationOutcome) {
1779 const MAX_EXAMPLES: usize = 5;
1780 let summary = &outcome.summary;
1781 println!(
1782 "Validation summary for {} ({} candles)",
1783 summary.symbol, summary.rows
1784 );
1785 println!(
1786 " Range: {} -> {}",
1787 summary.start.to_rfc3339(),
1788 summary.end.to_rfc3339()
1789 );
1790 println!(" Interval: {}", interval_label(summary.interval));
1791 println!(" Missing intervals: {}", summary.missing_candles);
1792 println!(" Duplicate intervals: {}", summary.duplicate_candles);
1793 println!(" Zero-volume candles: {}", summary.zero_volume_candles);
1794 println!(" Price spikes flagged: {}", summary.price_spike_count);
1795 println!(
1796 " Cross-source mismatches: {}",
1797 summary.cross_mismatch_count
1798 );
1799 println!(" Repaired candles generated: {}", summary.repaired_candles);
1800
1801 if !outcome.gaps.is_empty() {
1802 println!(" Gap examples:");
1803 for gap in outcome.gaps.iter().take(MAX_EXAMPLES) {
1804 println!(
1805 " {} -> {} (missing {})",
1806 gap.start.to_rfc3339(),
1807 gap.end.to_rfc3339(),
1808 gap.missing
1809 );
1810 }
1811 if outcome.gaps.len() > MAX_EXAMPLES {
1812 println!(
1813 " ... {} additional gap(s) omitted",
1814 outcome.gaps.len() - MAX_EXAMPLES
1815 );
1816 }
1817 }
1818
1819 if !outcome.price_spikes.is_empty() {
1820 println!(" Price spike examples:");
1821 for spike in outcome.price_spikes.iter().take(MAX_EXAMPLES) {
1822 println!(
1823 " {} (change {:.2}%)",
1824 spike.timestamp.to_rfc3339(),
1825 spike.change_fraction * 100.0
1826 );
1827 }
1828 if outcome.price_spikes.len() > MAX_EXAMPLES {
1829 println!(
1830 " ... {} additional spike(s) omitted",
1831 outcome.price_spikes.len() - MAX_EXAMPLES
1832 );
1833 }
1834 }
1835
1836 if !outcome.cross_mismatches.is_empty() {
1837 println!(" Cross-source mismatch examples:");
1838 for miss in outcome.cross_mismatches.iter().take(MAX_EXAMPLES) {
1839 println!(
1840 " {} primary {:.4} vs ref {:.4} ({:.2}%)",
1841 miss.timestamp.to_rfc3339(),
1842 miss.primary_close,
1843 miss.reference_close,
1844 miss.delta_fraction * 100.0
1845 );
1846 }
1847 if outcome.cross_mismatches.len() > MAX_EXAMPLES {
1848 println!(
1849 " ... {} additional mismatch(es) omitted",
1850 outcome.cross_mismatches.len() - MAX_EXAMPLES
1851 );
1852 }
1853 }
1854}
1855
1856fn print_report(report: &PerformanceReport) {
1857 println!("\n{}", report);
1858}
1859
1860fn synth_candles(symbol: Symbol, len: usize, offset_minutes: i64) -> Vec<Candle> {
1861 let mut candles = Vec::with_capacity(len);
1862 for i in 0..len {
1863 let base = 50_000.0 + ((i as f64) + offset_minutes as f64).sin() * 500.0;
1864 let open = base + (i as f64 % 3.0) * 10.0;
1865 let close = open + (i as f64 % 5.0) * 5.0 - 10.0;
1866 let open_dec =
1867 Decimal::from_f64(open).unwrap_or_else(|| Decimal::from_i64(base as i64).unwrap());
1868 let close_dec = Decimal::from_f64(close).unwrap_or(open_dec);
1869 let high = Decimal::from_f64(open.max(close) + 20.0).unwrap_or(open_dec);
1870 let low = Decimal::from_f64(open.min(close) - 20.0).unwrap_or(close_dec);
1871 candles.push(Candle {
1872 symbol,
1873 interval: Interval::OneMinute,
1874 open: open_dec,
1875 high,
1876 low,
1877 close: close_dec,
1878 volume: Decimal::ONE,
1879 timestamp: Utc::now() - Duration::minutes((len - i) as i64)
1880 + Duration::minutes(offset_minutes),
1881 });
1882 }
1883 candles
1884}
1885
1886fn parse_datetime(value: &str) -> Result<DateTime<Utc>> {
1887 if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
1888 return Ok(dt.with_timezone(&Utc));
1889 }
1890 if let Ok(dt) = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1891 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1892 }
1893 if let Ok(date) = NaiveDate::parse_from_str(value, "%Y-%m-%d") {
1894 let dt = date
1895 .and_hms_opt(0, 0, 0)
1896 .ok_or_else(|| anyhow!("invalid date"))?;
1897 return Ok(DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc));
1898 }
1899 Err(anyhow!("unable to parse datetime '{value}'"))
1900}
1901
1902#[derive(Deserialize)]
1903struct CandleCsvRow {
1904 symbol: Option<String>,
1905 timestamp: String,
1906 open: f64,
1907 high: f64,
1908 low: f64,
1909 close: f64,
1910 volume: f64,
1911}
1912
1913fn load_candles_from_paths(paths: &[PathBuf]) -> Result<Vec<Candle>> {
1914 let mut candles = Vec::new();
1915 for path in paths {
1916 let mut reader = csv::Reader::from_path(path)
1917 .with_context(|| format!("failed to open {}", path.display()))?;
1918 for record in reader.deserialize::<CandleCsvRow>() {
1919 let row = record.with_context(|| format!("invalid row in {}", path.display()))?;
1920 let timestamp = parse_datetime(&row.timestamp)?;
1921 let symbol_code = row
1922 .symbol
1923 .clone()
1924 .or_else(|| infer_symbol_from_path(path))
1925 .ok_or_else(|| {
1926 anyhow!(
1927 "missing symbol column and unable to infer from path {}",
1928 path.display()
1929 )
1930 })?;
1931 let symbol = Symbol::from(symbol_code.as_str());
1932 let interval = infer_interval_from_path(path).unwrap_or(Interval::OneMinute);
1933 let open = Decimal::from_f64(row.open).ok_or_else(|| {
1934 anyhow!("invalid open value '{}' in {}", row.open, path.display())
1935 })?;
1936 let high = Decimal::from_f64(row.high).ok_or_else(|| {
1937 anyhow!("invalid high value '{}' in {}", row.high, path.display())
1938 })?;
1939 let low = Decimal::from_f64(row.low)
1940 .ok_or_else(|| anyhow!("invalid low value '{}' in {}", row.low, path.display()))?;
1941 let close = Decimal::from_f64(row.close).ok_or_else(|| {
1942 anyhow!("invalid close value '{}' in {}", row.close, path.display())
1943 })?;
1944 let volume = Decimal::from_f64(row.volume).ok_or_else(|| {
1945 anyhow!(
1946 "invalid volume value '{}' in {}",
1947 row.volume,
1948 path.display()
1949 )
1950 })?;
1951 candles.push(Candle {
1952 symbol,
1953 interval,
1954 open,
1955 high,
1956 low,
1957 close,
1958 volume,
1959 timestamp,
1960 });
1961 }
1962 }
1963 Ok(candles)
1964}
1965
1966#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1967enum DataFormat {
1968 Csv,
1969 Parquet,
1970}
1971
1972fn detect_data_format(paths: &[PathBuf]) -> Result<DataFormat> {
1973 let mut detected: Option<DataFormat> = None;
1974 for path in paths {
1975 let ext = path
1976 .extension()
1977 .and_then(|ext| ext.to_str())
1978 .map(|ext| ext.to_ascii_lowercase())
1979 .unwrap_or_else(|| String::from(""));
1980 let current = if ext == "parquet" {
1981 DataFormat::Parquet
1982 } else {
1983 DataFormat::Csv
1984 };
1985 if let Some(existing) = detected {
1986 if existing != current {
1987 bail!("cannot mix CSV and Parquet inputs in --data");
1988 }
1989 } else {
1990 detected = Some(current);
1991 }
1992 }
1993 detected.ok_or_else(|| anyhow!("no data paths provided"))
1994}
1995
1996fn memory_market_stream(symbol: Symbol, candles: Vec<Candle>) -> BacktestStream {
1997 Box::new(PaperMarketStream::from_data(symbol, Vec::new(), candles))
1998}
1999
2000fn parquet_market_stream(symbols: &[Symbol], paths: Vec<PathBuf>) -> BacktestStream {
2001 Box::new(ParquetMarketStream::with_candles(symbols.to_vec(), paths))
2002}
2003
2004#[derive(Deserialize)]
2005#[serde(tag = "event", rename_all = "lowercase")]
2006enum LobEventRow {
2007 Snapshot {
2008 timestamp: String,
2009 symbol: Option<String>,
2010 bids: Vec<[f64; 2]>,
2011 asks: Vec<[f64; 2]>,
2012 },
2013 Depth {
2014 timestamp: String,
2015 symbol: Option<String>,
2016 bids: Vec<[f64; 2]>,
2017 asks: Vec<[f64; 2]>,
2018 },
2019 Trade {
2020 timestamp: String,
2021 symbol: Option<String>,
2022 side: String,
2023 price: f64,
2024 size: f64,
2025 },
2026}
2027
2028fn load_lob_events_from_paths(paths: &[PathBuf]) -> Result<Vec<MarketEvent>> {
2029 let mut events = Vec::new();
2030 for path in paths {
2031 let file = File::open(path)
2032 .with_context(|| format!("failed to open order book file {}", path.display()))?;
2033 let symbol_hint = infer_symbol_from_path(path);
2034 for line in BufReader::new(file).lines() {
2035 let line =
2036 line.with_context(|| format!("failed to read line from {}", path.display()))?;
2037 if line.trim().is_empty() {
2038 continue;
2039 }
2040 let row: LobEventRow = serde_json::from_str(&line)
2041 .with_context(|| format!("invalid order book event in {}", path.display()))?;
2042 match row {
2043 LobEventRow::Snapshot {
2044 timestamp,
2045 symbol,
2046 bids,
2047 asks,
2048 } => {
2049 let ts = parse_datetime(×tamp)?;
2050 let symbol_code = symbol
2051 .or_else(|| symbol_hint.clone())
2052 .ok_or_else(|| anyhow!("missing symbol in snapshot {}", path.display()))?;
2053 let symbol = Symbol::from(symbol_code.as_str());
2054 let bids = convert_levels(&bids)?;
2055 let asks = convert_levels(&asks)?;
2056 let book = OrderBook {
2057 symbol,
2058 bids,
2059 asks,
2060 timestamp: ts,
2061 exchange_checksum: None,
2062 local_checksum: None,
2063 };
2064 events.push(MarketEvent {
2065 timestamp: ts,
2066 kind: MarketEventKind::OrderBook(book),
2067 });
2068 }
2069 LobEventRow::Depth {
2070 timestamp,
2071 symbol,
2072 bids,
2073 asks,
2074 } => {
2075 let ts = parse_datetime(×tamp)?;
2076 let symbol_code = symbol.or_else(|| symbol_hint.clone()).ok_or_else(|| {
2077 anyhow!("missing symbol in depth update {}", path.display())
2078 })?;
2079 let symbol = Symbol::from(symbol_code.as_str());
2080 let bids = convert_levels(&bids)?;
2081 let asks = convert_levels(&asks)?;
2082 let update = DepthUpdate {
2083 symbol,
2084 bids,
2085 asks,
2086 timestamp: ts,
2087 };
2088 events.push(MarketEvent {
2089 timestamp: ts,
2090 kind: MarketEventKind::Depth(update),
2091 });
2092 }
2093 LobEventRow::Trade {
2094 timestamp,
2095 symbol,
2096 side,
2097 price,
2098 size,
2099 } => {
2100 let ts = parse_datetime(×tamp)?;
2101 let symbol_code = symbol
2102 .or_else(|| symbol_hint.clone())
2103 .ok_or_else(|| anyhow!("missing symbol in trade {}", path.display()))?;
2104 let symbol = Symbol::from(symbol_code.as_str());
2105 let side = match side.to_lowercase().as_str() {
2106 "buy" | "bid" | "b" => Side::Buy,
2107 "sell" | "ask" | "s" => Side::Sell,
2108 other => bail!("unsupported trade side '{other}' in {}", path.display()),
2109 };
2110 let price = Decimal::from_f64(price).ok_or_else(|| {
2111 anyhow!("invalid trade price '{}' in {}", price, path.display())
2112 })?;
2113 let size = Decimal::from_f64(size).ok_or_else(|| {
2114 anyhow!("invalid trade size '{}' in {}", size, path.display())
2115 })?;
2116 let tick = Tick {
2117 symbol,
2118 price,
2119 size,
2120 side,
2121 exchange_timestamp: ts,
2122 received_at: ts,
2123 };
2124 events.push(MarketEvent {
2125 timestamp: ts,
2126 kind: MarketEventKind::Trade(tick),
2127 });
2128 }
2129 }
2130 }
2131 }
2132 events.sort_by_key(|event| event.timestamp);
2133 Ok(events)
2134}
2135
2136fn convert_levels(levels: &[[f64; 2]]) -> Result<Vec<OrderBookLevel>> {
2137 levels
2138 .iter()
2139 .map(|pair| {
2140 let price = Decimal::from_f64(pair[0])
2141 .ok_or_else(|| anyhow!("invalid depth price {}", pair[0]))?;
2142 let size = Decimal::from_f64(pair[1])
2143 .ok_or_else(|| anyhow!("invalid depth size {}", pair[1]))?;
2144 Ok(OrderBookLevel { price, size })
2145 })
2146 .collect()
2147}
2148
2149fn infer_symbol_from_path(path: &Path) -> Option<String> {
2150 path.parent()
2151 .and_then(|p| p.file_name())
2152 .map(|os| os.to_string_lossy().to_string())
2153}
2154
2155fn infer_interval_from_path(path: &Path) -> Option<Interval> {
2156 path.file_stem()
2157 .and_then(|os| os.to_str())
2158 .and_then(|stem| stem.split('_').next())
2159 .and_then(|token| Interval::from_str(token).ok())
2160}
2161
2162fn default_output_path(
2163 config: &AppConfig,
2164 exchange: &str,
2165 symbol: &str,
2166 interval: Interval,
2167 start: DateTime<Utc>,
2168 end: DateTime<Utc>,
2169) -> PathBuf {
2170 let interval_part = interval_label(interval);
2171 let start_part = start.format("%Y%m%d").to_string();
2172 let end_part = end.format("%Y%m%d").to_string();
2173 config
2174 .data_path
2175 .join(exchange)
2176 .join(symbol)
2177 .join(format!("{}_{}-{}.csv", interval_part, start_part, end_part))
2178}
2179
2180fn default_tick_output_path(
2181 config: &AppConfig,
2182 exchange: &str,
2183 symbol: &str,
2184 start: DateTime<Utc>,
2185 end: DateTime<Utc>,
2186) -> PathBuf {
2187 let start_part = start.format("%Y%m%dT%H%M%S").to_string();
2188 let end_part = end.format("%Y%m%dT%H%M%S").to_string();
2189 config
2190 .data_path
2191 .join("ticks")
2192 .join(exchange)
2193 .join(format!("{symbol}_{start_part}-{end_part}.parquet"))
2194}
2195
2196fn default_tick_partition_dir(config: &AppConfig, exchange: &str, symbol: &str) -> PathBuf {
2197 config.data_path.join("ticks").join(exchange).join(symbol)
2198}
2199
2200fn partition_path(base_dir: &Path, date: NaiveDate) -> PathBuf {
2201 base_dir.join(format!("{}.parquet", date.format("%Y-%m-%d")))
2202}
2203
2204fn interval_label(interval: Interval) -> &'static str {
2205 match interval {
2206 Interval::OneSecond => "1s",
2207 Interval::OneMinute => "1m",
2208 Interval::FiveMinutes => "5m",
2209 Interval::FifteenMinutes => "15m",
2210 Interval::OneHour => "1h",
2211 Interval::FourHours => "4h",
2212 Interval::OneDay => "1d",
2213 }
2214}
2215
2216#[derive(Serialize)]
2217struct CandleRow {
2218 symbol: String,
2219 timestamp: String,
2220 open: f64,
2221 high: f64,
2222 low: f64,
2223 close: f64,
2224 volume: f64,
2225}
2226
2227fn write_candles_csv(path: &Path, candles: &[Candle]) -> Result<()> {
2228 if let Some(parent) = path.parent() {
2229 fs::create_dir_all(parent)
2230 .with_context(|| format!("failed to create directory {}", parent.display()))?;
2231 }
2232 let mut writer =
2233 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
2234 for candle in candles {
2235 let row = CandleRow {
2236 symbol: candle.symbol.code().to_string(),
2237 timestamp: candle.timestamp.to_rfc3339(),
2238 open: candle.open.to_f64().unwrap_or(0.0),
2239 high: candle.high.to_f64().unwrap_or(0.0),
2240 low: candle.low.to_f64().unwrap_or(0.0),
2241 close: candle.close.to_f64().unwrap_or(0.0),
2242 volume: candle.volume.to_f64().unwrap_or(0.0),
2243 };
2244 writer.serialize(&row)?;
2245 }
2246 writer.flush()?;
2247 Ok(())
2248}
2249
2250#[derive(Serialize)]
2251struct BatchRow {
2252 config: String,
2253 signals: usize,
2254 orders: usize,
2255 dropped_orders: usize,
2256 ending_equity: f64,
2257}
2258
2259fn write_batch_report(path: &Path, rows: &[BatchRow]) -> Result<()> {
2260 if let Some(parent) = path.parent() {
2261 fs::create_dir_all(parent)
2262 .with_context(|| format!("failed to create directory {}", parent.display()))?;
2263 }
2264 let mut writer =
2265 Writer::from_path(path).with_context(|| format!("failed to create {}", path.display()))?;
2266 for row in rows {
2267 writer.serialize(row)?;
2268 }
2269 writer.flush()?;
2270 Ok(())
2271}
2272
2273fn clone_initial_balances(config: &tesser_config::BacktestConfig) -> HashMap<AssetId, Decimal> {
2274 config
2275 .initial_balances
2276 .iter()
2277 .map(|(currency, amount)| (AssetId::from(currency.as_str()), *amount))
2278 .collect()
2279}
2280
2281fn reporting_balance(config: &tesser_config::BacktestConfig) -> Decimal {
2282 config
2283 .initial_balances
2284 .get(&config.reporting_currency)
2285 .copied()
2286 .unwrap_or_default()
2287}
2288
2289fn parse_sizer(value: &str, cli_quantity: Option<Decimal>) -> Result<Box<dyn OrderSizer>> {
2290 let parts: Vec<_> = value.split(':').collect();
2291 match parts.as_slice() {
2292 ["fixed", val] => {
2293 let quantity =
2294 Decimal::from_str(val).context("invalid fixed sizer quantity (use decimals)")?;
2295 Ok(Box::new(FixedOrderSizer { quantity }))
2296 }
2297 ["fixed"] => {
2298 let quantity = cli_quantity.unwrap_or(Decimal::ONE);
2299 Ok(Box::new(FixedOrderSizer { quantity }))
2300 }
2301 ["percent", val] => {
2302 let percent =
2303 Decimal::from_str(val).context("invalid percent sizer value (use decimals)")?;
2304 Ok(Box::new(PortfolioPercentSizer {
2305 percent: percent.max(Decimal::ZERO),
2306 }))
2307 }
2308 ["risk-adjusted", val] => {
2309 let risk_fraction = Decimal::from_str(val)
2310 .context("invalid risk fraction value (use decimals)")?;
2311 Ok(Box::new(RiskAdjustedSizer {
2312 risk_fraction: risk_fraction.max(Decimal::ZERO),
2313 }))
2314 }
2315 _ => Err(anyhow!(
2316 "invalid sizer format, expected 'fixed:value', 'percent:value', or 'risk-adjusted:value'"
2317 )),
2318 }
2319}
2320
2321#[derive(Clone, Copy, Debug, ValueEnum)]
2322pub enum PanicModeArg {
2323 Market,
2324 AggressiveLimit,
2325}
2326
2327impl From<PanicModeArg> for PanicCloseMode {
2328 fn from(arg: PanicModeArg) -> Self {
2329 match arg {
2330 PanicModeArg::Market => PanicCloseMode::Market,
2331 PanicModeArg::AggressiveLimit => PanicCloseMode::AggressiveLimit,
2332 }
2333 }
2334}