use chrono::{DateTime, Utc};
use polars::prelude::*;
#[allow(unused_imports)]
use quantwave_core::traits::Next; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum BacktestError {
#[error("Polars error during simulation: {0}")]
Polars(#[from] PolarsError),
#[error("Invalid input: {0}")]
InvalidInput(String),
#[error("Data must be sorted by timestamp (and symbol for multi-symbol runs)")]
UnsortedData,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CostModel {
pub commission_bps: f64,
pub slippage_bps: f64,
pub initial_cash: f64,
}
impl Default for CostModel {
fn default() -> Self {
Self {
commission_bps: 5.0, slippage_bps: 2.0, initial_cash: 100_000.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BacktestConfig {
pub cost_model: CostModel,
pub timestamp_col: String,
pub symbol_col: Option<String>,
pub close_col: String,
pub signal_col: String,
pub entry_filter_col: Option<String>,
pub size_multiplier_col: Option<String>,
}
impl Default for BacktestConfig {
fn default() -> Self {
Self {
cost_model: CostModel::default(),
timestamp_col: "timestamp".to_string(),
symbol_col: None,
close_col: "close".to_string(),
signal_col: "signal".to_string(),
entry_filter_col: None,
size_multiplier_col: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Trade {
pub trade_id: u32,
pub symbol: Option<String>,
pub side: i8, pub entry_ts: DateTime<Utc>,
pub entry_price: f64,
pub entry_fill_price: f64, pub exit_ts: Option<DateTime<Utc>>,
pub exit_price: Option<f64>,
pub exit_fill_price: Option<f64>,
pub pnl_gross: f64,
pub costs: f64,
pub pnl_net: f64,
pub quantity: f64,
pub entry_metadata: Option<HashMap<String, f64>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EquityPoint {
pub ts: DateTime<Utc>,
pub symbol: Option<String>, pub equity: f64,
pub cash: f64,
pub position: f64, pub close: f64,
}
#[derive(Debug)]
pub struct BacktestResult {
pub trades: DataFrame,
pub equity_curve: DataFrame,
pub stats: HashMap<String, f64>,
}
#[derive(Debug, Clone)]
pub struct Bar {
pub ts: DateTime<Utc>,
pub close: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StrategySignal {
pub exposure: f64,
pub metadata: Option<HashMap<String, f64>>,
}
impl Default for StrategySignal {
fn default() -> Self {
Self {
exposure: 0.0,
metadata: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct PAEvent {
pub long: bool,
pub pole_height: Option<f64>,
pub strength: Option<f64>,
}
pub struct BacktestEngine {
config: BacktestConfig,
}
impl BacktestEngine {
pub fn new(config: BacktestConfig) -> Self {
Self { config }
}
pub fn with_default_costs() -> Self {
Self::new(BacktestConfig::default())
}
pub fn run(&self, lf: LazyFrame) -> Result<BacktestResult, BacktestError> {
let df = lf.collect()?;
if df.height() == 0 {
return Err(BacktestError::InvalidInput("empty dataframe".into()));
}
let ts_col = &self.config.timestamp_col;
let close_col = &self.config.close_col;
let sig_col = &self.config.signal_col;
for c in [ts_col, close_col, sig_col] {
if df.column(c).is_err() {
return Err(BacktestError::InvalidInput(format!("missing column: {}", c)));
}
}
let ts_series = df.column(ts_col)?.clone();
let close_ca = df.column(close_col)?.f64()?.clone();
let signal_series = df.column(sig_col)?;
let signal_vals: Vec<f64> = if signal_series.dtype().is_bool() {
signal_series
.bool()?
.into_iter()
.map(|b| if b.unwrap_or(false) { 1.0 } else { 0.0 })
.collect()
} else {
signal_series
.f64()?
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect()
};
let timestamps: Vec<DateTime<Utc>> = self.extract_timestamps(&ts_series)?;
let closes: Vec<f64> = close_ca.into_iter().map(|v| v.unwrap_or(f64::NAN)).collect();
if timestamps.len() != closes.len() || closes.len() != signal_vals.len() {
return Err(BacktestError::InvalidInput("column length mismatch".into()));
}
let cm = &self.config.cost_model;
let metas: Vec<Option<HashMap<String, f64>>> = vec![None; signal_vals.len()];
let (trades, equity_points) = run_simulation(
×tamps,
&closes,
|i| (signal_vals[i], metas[i].clone()),
cm,
);
let trades_df = self.trades_to_df(&trades)?;
let equity_df = self.equity_to_df(&equity_points)?;
let final_equity = equity_points.last().map(|e| e.equity).unwrap_or(cm.initial_cash);
let total_return = (final_equity - cm.initial_cash) / cm.initial_cash;
let num_trades = trades.len() as f64;
let mut stats = HashMap::new();
stats.insert("initial_cash".to_string(), cm.initial_cash);
stats.insert("final_equity".to_string(), final_equity);
stats.insert("total_return".to_string(), total_return);
stats.insert("num_trades".to_string(), num_trades);
stats.insert("net_pnl".to_string(), final_equity - cm.initial_cash);
Ok(BacktestResult {
trades: trades_df,
equity_curve: equity_df,
stats,
})
}
fn extract_timestamps(&self, col: &Column) -> Result<Vec<DateTime<Utc>>, BacktestError> {
let s = col.as_series().ok_or_else(|| BacktestError::InvalidInput("column has no series backing".into()))?;
if let Ok(ca) = s.datetime() {
return Ok(ca
.into_iter()
.map(|opt| {
opt.map(|v| {
let secs = v / 1000;
let nanos = ((v % 1000) * 1_000_000) as u32;
DateTime::<Utc>::from_timestamp(secs, nanos).unwrap_or_else(Utc::now)
})
.unwrap_or_else(Utc::now)
})
.collect());
}
if let Ok(ca) = s.i64() {
return Ok(ca
.into_iter()
.enumerate()
.map(|(i, opt)| {
let v = opt.unwrap_or(i as i64);
DateTime::<Utc>::from_timestamp(v, 0).unwrap_or_else(Utc::now)
})
.collect());
}
Err(BacktestError::InvalidInput(
"timestamp column must be Datetime or Int64 for this MVP".into(),
))
}
fn trades_to_df(&self, trades: &[Trade]) -> Result<DataFrame, PolarsError> {
if trades.is_empty() {
return Ok(DataFrame::new(vec![
Column::new("trade_id".into(), Vec::<u32>::new()),
Column::new("side".into(), Vec::<i8>::new()),
Column::new("entry_ts".into(), Vec::<i64>::new()),
Column::new("entry_price".into(), Vec::<f64>::new()),
Column::new("pnl_net".into(), Vec::<f64>::new()),
])?);
}
let ids: Vec<u32> = trades.iter().map(|t| t.trade_id).collect();
let sides: Vec<i8> = trades.iter().map(|t| t.side).collect();
let entry_ts: Vec<i64> = trades.iter().map(|t| t.entry_ts.timestamp()).collect();
let entry_px: Vec<f64> = trades.iter().map(|t| t.entry_price).collect();
let exit_ts: Vec<Option<i64>> = trades
.iter()
.map(|t| t.exit_ts.map(|d| d.timestamp()))
.collect();
let pnl: Vec<f64> = trades.iter().map(|t| t.pnl_net).collect();
DataFrame::new(vec![
Column::new("trade_id".into(), ids),
Column::new("side".into(), sides),
Column::new("entry_ts".into(), entry_ts),
Column::new("entry_price".into(), entry_px),
Column::new("exit_ts".into(), exit_ts),
Column::new("pnl_net".into(), pnl),
])
}
fn equity_to_df(&self, points: &[EquityPoint]) -> Result<DataFrame, PolarsError> {
if points.is_empty() {
return Ok(DataFrame::new(vec![
Column::new("ts".into(), Vec::<i64>::new()),
Column::new("equity".into(), Vec::<f64>::new()),
Column::new("position".into(), Vec::<f64>::new()),
])?);
}
let ts: Vec<i64> = points.iter().map(|p| p.ts.timestamp()).collect();
let eq: Vec<f64> = points.iter().map(|p| p.equity).collect();
let pos: Vec<f64> = points.iter().map(|p| p.position).collect();
let cash: Vec<f64> = points.iter().map(|p| p.cash).collect();
let close: Vec<f64> = points.iter().map(|p| p.close).collect();
DataFrame::new(vec![
Column::new("ts".into(), ts),
Column::new("equity".into(), eq),
Column::new("cash".into(), cash),
Column::new("position".into(), pos),
Column::new("close".into(), close),
])
}
}
pub fn backtest_simple_bool_signal(
ohlcv: DataFrame,
signal_col: &str,
) -> Result<BacktestResult, BacktestError> {
let config = BacktestConfig {
signal_col: signal_col.to_string(),
..Default::default()
};
let engine = BacktestEngine::new(config);
engine.run(ohlcv.lazy())
}
fn run_simulation(
timestamps: &[DateTime<Utc>],
closes: &[f64],
mut next_signal: impl FnMut(usize) -> (f64, Option<HashMap<String, f64>>),
cm: &CostModel,
) -> (Vec<Trade>, Vec<EquityPoint>) {
let slip = cm.slippage_bps / 10000.0;
let comm = cm.commission_bps / 10000.0;
let mut cash = cm.initial_cash;
let mut current_exposure: f64 = 0.0;
let mut entry_price: f64 = 0.0;
let mut entry_ts: Option<DateTime<Utc>> = None;
let mut trade_id: u32 = 0;
let mut trades: Vec<Trade> = Vec::new();
let mut equity_points: Vec<EquityPoint> = Vec::with_capacity(closes.len());
for i in 0..closes.len() {
let close = closes[i];
if !close.is_finite() {
let equity = cash + current_exposure * close;
equity_points.push(EquityPoint {
ts: timestamps[i],
symbol: None,
equity,
cash,
position: current_exposure,
close,
});
continue;
}
let (desired_exposure, meta) = next_signal(i);
let desired = if desired_exposure > 0.0 { desired_exposure } else { 0.0 };
let currently_in = current_exposure > 0.0;
if desired > 0.0 && !currently_in {
let fill_price = close * (1.0 + slip);
let notional = fill_price * desired;
let cost = notional * comm;
cash -= notional + cost;
current_exposure = desired;
entry_price = fill_price;
entry_ts = Some(timestamps[i]);
trade_id += 1;
} else if desired == 0.0 && currently_in {
let fill_price = close * (1.0 - slip);
let notional = fill_price * current_exposure;
let cost = notional * comm;
let gross_pnl = (fill_price - entry_price) * current_exposure;
let net_pnl = gross_pnl - cost;
cash += notional - cost;
if let Some(ets) = entry_ts {
trades.push(Trade {
trade_id,
symbol: None,
side: 1,
entry_ts: ets,
entry_price,
entry_fill_price: entry_price,
exit_ts: Some(timestamps[i]),
exit_price: Some(close),
exit_fill_price: Some(fill_price),
pnl_gross: gross_pnl,
costs: cost,
pnl_net: net_pnl,
quantity: current_exposure,
entry_metadata: meta.clone(),
});
}
current_exposure = 0.0;
entry_price = 0.0;
entry_ts = None;
}
let equity = cash + current_exposure * close;
equity_points.push(EquityPoint {
ts: timestamps[i],
symbol: None,
equity,
cash,
position: current_exposure,
close,
});
}
if current_exposure > 0.0 {
let last_close = *closes.last().unwrap();
let gross = (last_close - entry_price) * current_exposure;
if let Some(ets) = entry_ts {
trades.push(Trade {
trade_id,
symbol: None,
side: 1,
entry_ts: ets,
entry_price,
entry_fill_price: entry_price,
exit_ts: None,
exit_price: Some(last_close),
exit_fill_price: None,
pnl_gross: gross,
costs: 0.0,
pnl_net: gross,
quantity: current_exposure,
entry_metadata: None, });
}
}
(trades, equity_points)
}
pub fn run_streaming_simulation<G>(
bars: &[Bar],
mut generator: G,
config: BacktestConfig,
) -> Result<BacktestResult, BacktestError>
where
G: for<'a> Next<&'a Bar, Output = StrategySignal>,
{
if bars.is_empty() {
return Err(BacktestError::InvalidInput("empty bars".into()));
}
let timestamps: Vec<DateTime<Utc>> = bars.iter().map(|b| b.ts).collect();
let closes: Vec<f64> = bars.iter().map(|b| b.close).collect();
let cm = &config.cost_model;
let (trades, equity_points) = run_simulation(
×tamps,
&closes,
|i| {
let sig = generator.next(&bars[i]);
(sig.exposure, sig.metadata.clone())
},
cm,
);
let trades_df = if trades.is_empty() {
DataFrame::new(vec![
Column::new("trade_id".into(), Vec::<u32>::new()),
Column::new("side".into(), Vec::<i8>::new()),
Column::new("entry_ts".into(), Vec::<i64>::new()),
Column::new("entry_price".into(), Vec::<f64>::new()),
Column::new("pnl_net".into(), Vec::<f64>::new()),
])?
} else {
let ids: Vec<u32> = trades.iter().map(|t| t.trade_id).collect();
let sides: Vec<i8> = trades.iter().map(|t| t.side).collect();
let entry_ts: Vec<i64> = trades.iter().map(|t| t.entry_ts.timestamp()).collect();
let entry_px: Vec<f64> = trades.iter().map(|t| t.entry_price).collect();
let exit_ts: Vec<Option<i64>> = trades
.iter()
.map(|t| t.exit_ts.map(|d| d.timestamp()))
.collect();
let pnl: Vec<f64> = trades.iter().map(|t| t.pnl_net).collect();
DataFrame::new(vec![
Column::new("trade_id".into(), ids),
Column::new("side".into(), sides),
Column::new("entry_ts".into(), entry_ts),
Column::new("entry_price".into(), entry_px),
Column::new("exit_ts".into(), exit_ts),
Column::new("pnl_net".into(), pnl),
])?
};
let equity_df = if equity_points.is_empty() {
DataFrame::new(vec![
Column::new("ts".into(), Vec::<i64>::new()),
Column::new("equity".into(), Vec::<f64>::new()),
Column::new("position".into(), Vec::<f64>::new()),
])?
} else {
let ts: Vec<i64> = equity_points.iter().map(|p| p.ts.timestamp()).collect();
let eq: Vec<f64> = equity_points.iter().map(|p| p.equity).collect();
let pos: Vec<f64> = equity_points.iter().map(|p| p.position).collect();
let cash: Vec<f64> = equity_points.iter().map(|p| p.cash).collect();
let close: Vec<f64> = equity_points.iter().map(|p| p.close).collect();
DataFrame::new(vec![
Column::new("ts".into(), ts),
Column::new("equity".into(), eq),
Column::new("cash".into(), cash),
Column::new("position".into(), pos),
Column::new("close".into(), close),
])?
};
let final_equity = equity_points.last().map(|e| e.equity).unwrap_or(cm.initial_cash);
let total_return = (final_equity - cm.initial_cash) / cm.initial_cash;
let num_trades = trades.len() as f64;
let mut stats = HashMap::new();
stats.insert("initial_cash".to_string(), cm.initial_cash);
stats.insert("final_equity".to_string(), final_equity);
stats.insert("total_return".to_string(), total_return);
stats.insert("num_trades".to_string(), num_trades);
stats.insert("net_pnl".to_string(), final_equity - cm.initial_cash);
Ok(BacktestResult {
trades: trades_df,
equity_curve: equity_df,
stats,
})
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_relative_eq;
use polars::prelude::*;
use rand::Rng;
use quantwave_core::features::CyberCycleFeatureExtractor;
use quantwave_core::regimes::tar::TAR;
use quantwave_core::regimes::MarketRegime;
use quantwave_core::traits::Next;
use std::collections::HashMap;
#[test]
fn test_basic_long_only_flip_on_synthetic() {
let n: usize = 6;
let timestamps: Vec<i64> = (0..n).map(|i| 1_700_000_000i64 + (i as i64) * 3600).collect(); let closes = vec![100.0, 101.0, 102.5, 103.0, 102.0, 101.0];
let signals = vec![0.0, 1.0, 1.0, 1.0, 0.0, 0.0];
let df = DataFrame::new(vec![
Column::new("timestamp".into(), timestamps),
Column::new("close".into(), closes.clone()),
Column::new("signal".into(), signals),
])
.unwrap();
let result = backtest_simple_bool_signal(df, "signal").expect("sim should succeed");
assert_eq!(result.trades.height(), 1);
let num_trades: f64 = *result.stats.get("num_trades").unwrap();
assert_relative_eq!(num_trades, 1.0, epsilon = 1e-9);
let final_eq = *result.stats.get("final_equity").unwrap();
let init = 100_000.0;
assert!(final_eq > init, "equity should grow on winning long: {} vs {}", final_eq, init);
assert_eq!(result.equity_curve.height(), n);
let last_equity = result
.equity_curve
.column("equity")
.unwrap()
.f64()
.unwrap()
.get(n - 1)
.unwrap();
assert_relative_eq!(last_equity, final_eq, epsilon = 1e-6);
}
#[test]
fn test_flat_always_signal_produces_no_trades_and_flat_equity() {
let n: usize = 5;
let ts: Vec<i64> = (0..n).map(|i| 1_700_000_100 + i as i64).collect();
let closes = vec![100.0; n];
let signals = vec![0.0; n];
let df = DataFrame::new(vec![
Column::new("timestamp".into(), ts),
Column::new("close".into(), closes),
Column::new("signal".into(), signals),
])
.unwrap();
let result = backtest_simple_bool_signal(df, "signal").unwrap();
assert_eq!(result.trades.height(), 0);
let num = *result.stats.get("num_trades").unwrap();
assert_relative_eq!(num, 0.0, epsilon = 1e-9);
let final_equity_val = *result.stats.get("final_equity").unwrap();
assert_relative_eq!(final_equity_val, 100_000.0, epsilon = 1e-4);
}
#[test]
fn test_synthetic_with_small_random_walk_and_bool_signal_matches_manual_calc() {
let mut rng = rand::thread_rng();
let n: usize = 8;
let mut price = 100.0_f64;
let mut closes = Vec::with_capacity(n);
let signals = vec![0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0]; let mut ts = Vec::with_capacity(n);
for i in 0..n {
ts.push(1_700_000_200 + i as i64);
closes.push(price);
price += rng.gen_range(-0.8..1.2);
}
let df = DataFrame::new(vec![
Column::new("timestamp".into(), ts.clone()),
Column::new("close".into(), closes.clone()),
Column::new("signal".into(), signals.clone()),
])
.unwrap();
let result = backtest_simple_bool_signal(df.clone(), "signal").unwrap();
let slip = 0.0002;
let comm = 0.0005;
let init = 100_000.0;
let mut cash = init;
let mut pos = 0.0;
let mut entry = 0.0;
let mut manual_equity = init;
for i in 0..n {
let c = closes[i];
let s = signals[i] > 0.0;
if s && pos == 0.0 {
let fp = c * (1.0 + slip);
cash -= fp * (1.0 + comm);
pos = 1.0;
entry = fp;
} else if !s && pos > 0.0 {
let fp = c * (1.0 - slip);
cash += fp * (1.0 - comm);
let _g = (fp - entry) * pos;
let cost = fp * comm;
cash += -cost; pos = 0.0;
}
manual_equity = cash + pos * c;
}
let engine_final = *result.stats.get("final_equity").unwrap();
assert_relative_eq!(engine_final, manual_equity, epsilon = 0.5);
}
#[derive(Debug, Clone)]
struct SyntheticPoleHeightDetector {
window: Vec<f64>,
max_len: usize,
}
impl SyntheticPoleHeightDetector {
fn new(max_len: usize) -> Self {
Self {
window: Vec::with_capacity(max_len),
max_len,
}
}
}
#[derive(Debug, Clone, Copy)]
struct PoleOutput {
pole_height: f64,
_strength: f64, }
impl Next<f64> for SyntheticPoleHeightDetector {
type Output = PoleOutput;
fn next(&mut self, price: f64) -> PoleOutput {
self.window.push(price);
if self.window.len() > self.max_len {
self.window.remove(0);
}
let h = if self.window.len() >= 3 {
let mn = self.window.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let mx = self.window.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
(mx - mn).max(0.1)
} else {
1.0
};
PoleOutput {
pole_height: h,
_strength: (h / 8.0).clamp(0.3, 1.0),
}
}
}
#[derive(Debug, Clone)]
struct RegimeFeaturePAStrategy {
regime: TAR,
cycle: CyberCycleFeatureExtractor,
pa: SyntheticPoleHeightDetector,
feat_thresh: f64,
}
impl RegimeFeaturePAStrategy {
fn new() -> Self {
Self {
regime: TAR::new(105.0), cycle: CyberCycleFeatureExtractor::new(14),
pa: SyntheticPoleHeightDetector::new(6),
feat_thresh: 0.02,
}
}
}
impl Next<&Bar> for RegimeFeaturePAStrategy {
type Output = StrategySignal;
fn next(&mut self, bar: &Bar) -> StrategySignal {
let regime = self.regime.next(bar.close);
let feat = self.cycle.next(bar.close);
let pa = self.pa.next(bar.close);
let regime_ok = matches!(
regime,
MarketRegime::Steady | MarketRegime::Cluster(_) | MarketRegime::Bull
);
let feat_ok = feat.cycle_momentum.abs() > self.feat_thresh;
let exposure = if regime_ok && feat_ok {
(pa.pole_height / 4.0).clamp(0.4, 2.2)
} else {
0.0
};
let mut meta = HashMap::new();
meta.insert("pole_height".to_string(), pa.pole_height);
meta.insert("cycle_momentum".to_string(), feat.cycle_momentum);
meta.insert(
"regime_ok".to_string(),
if regime_ok { 1.0 } else { 0.0 },
);
StrategySignal {
exposure,
metadata: Some(meta),
}
}
}
#[test]
fn test_batch_vs_streaming_parity_regime_feature_rich_pa_pole_sizing() {
let n: usize = 120;
let mut timestamps = Vec::with_capacity(n);
let mut closes = Vec::with_capacity(n);
let mut price = 100.0_f64;
for i in 0..n {
let secs = 1_700_000_500i64 + (i as i64) * 3600;
timestamps.push(chrono::DateTime::<chrono::Utc>::from_timestamp(secs, 0).unwrap());
let wave = (i as f64 * 0.18).sin() * 4.5;
price = 101.5 + wave + (i as f64 * 0.008);
closes.push(price);
}
let bars: Vec<Bar> = timestamps
.iter()
.zip(closes.iter())
.map(|(&ts, &close)| Bar { ts, close })
.collect();
let mut batch_gen = RegimeFeaturePAStrategy::new();
let mut exposures: Vec<f64> = Vec::with_capacity(n);
for bar in &bars {
let s = batch_gen.next(bar);
exposures.push(s.exposure);
}
let df = DataFrame::new(vec![
Column::new("timestamp".into(), timestamps.iter().map(|t| t.timestamp()).collect::<Vec<_>>()),
Column::new("close".into(), closes.clone()),
Column::new("signal".into(), exposures.clone()),
])
.unwrap();
let batch_res = backtest_simple_bool_signal(df, "signal").expect("batch parity run");
let stream_gen = RegimeFeaturePAStrategy::new();
let stream_res = run_streaming_simulation(&bars, stream_gen, BacktestConfig::default())
.expect("streaming parity run");
let b_eq = batch_res
.equity_curve
.column("equity")
.unwrap()
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect::<Vec<_>>();
let s_eq = stream_res
.equity_curve
.column("equity")
.unwrap()
.f64()
.unwrap()
.into_iter()
.map(|v| v.unwrap_or(0.0))
.collect::<Vec<_>>();
assert_eq!(b_eq.len(), s_eq.len(), "equity curve lengths must match");
for (i, (b, s)) in b_eq.iter().zip(s_eq.iter()).enumerate() {
approx::assert_relative_eq!(
*b,
*s,
epsilon = 1e-8,
max_relative = 1e-8
);
if (b - s).abs() > 1e-7 {
panic!("equity diverged at bar {}: {} vs {}", i, b, s);
}
}
let keys = ["final_equity", "net_pnl", "num_trades"];
for k in keys {
let bv = *batch_res.stats.get(k).unwrap();
let sv = *stream_res.stats.get(k).unwrap();
approx::assert_relative_eq!(bv, sv, epsilon = 1e-6, max_relative = 1e-6);
}
assert_eq!(
batch_res.trades.height(),
stream_res.trades.height(),
"trade counts must match exactly for parity"
);
assert!(
batch_res.trades.height() >= 1,
"parity test strategy must generate >=1 trade on synthetic data"
);
}
}
#[cfg(test)]
mod integration_example_between_epics {
use super::*;
use polars::prelude::*;
use quantwave_core::features::HurstFeatureExtractor;
#[test]
fn ml_features_feed_backtester_with_metadata() {
let n = 60;
let closes: Vec<f64> = (0..n).map(|i| 100.0 + i as f64 * 0.25).collect();
let timestamps: Vec<i64> = (0..n).map(|i| 1_700_000_000i64 + i as i64).collect();
let mut h_ext = HurstFeatureExtractor::new(15);
let mut exposures = Vec::new();
for &c in &closes {
let f = h_ext.next(c);
let regime_ok = true; let exposure = if regime_ok && f.persistence > 0.52 { 1.0 } else { 0.0 };
exposures.push(exposure);
}
let lf = df![
"timestamp" => timestamps,
"close" => closes,
"exposure" => exposures,
]
.unwrap()
.lazy();
let config = BacktestConfig {
signal_col: "exposure".to_string(),
..Default::default()
};
let result = BacktestEngine::new(config).run(lf).unwrap();
println!(
"Integration smoke test: {} trades produced using ML feature (Hurst) driven exposure",
result.trades.height()
);
assert!(result.equity_curve.height() == n);
}
}