#[cfg(test)]
mod tests {
use std::fs;
use std::path::PathBuf;
use of_adapters::{
AdapterConfig, AdapterError, AdapterHealth, AdapterResult, CredentialsRef,
MarketDataAdapter, MockAdapter, ProviderKind, RawEvent, SubscribeReq,
};
use of_core::{BookAction, BookUpdate, DataQualityFlags, Side, SignalState, SymbolId, TradePrint};
use of_persist::{RollingStore, StoredEvent};
use of_signals::DeltaMomentumSignal;
use super::*;
#[test]
fn engine_processes_trade_and_updates_snapshots() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut adapter = MockAdapter::default();
adapter.push_event(RawEvent::Trade(TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 10,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 2,
}));
let mut engine = Engine::new(
EngineConfig::default(),
adapter,
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine.poll_once(DataQualityFlags::NONE).expect("poll failed");
let analytics = engine.analytics_snapshot(&symbol).expect("analytics missing");
assert_eq!(analytics.delta, 10);
let signal = engine.signal_snapshot(&symbol).expect("signal missing");
assert_eq!(signal.state, SignalState::LongBias);
}
#[test]
fn engine_ingests_external_events_and_updates_snapshots() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 504900,
size: 20,
action: BookAction::Upsert,
sequence: 1,
ts_exchange_ns: 10,
ts_recv_ns: 11,
},
DataQualityFlags::NONE,
)
.expect("book ingest failed");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 7,
aggressor_side: Side::Ask,
sequence: 2,
ts_exchange_ns: 12,
ts_recv_ns: 13,
},
DataQualityFlags::ADAPTER_DEGRADED,
)
.expect("trade ingest failed");
let analytics = engine.analytics_snapshot(&symbol).expect("analytics missing");
assert_eq!(analytics.delta, 7);
let signal = engine.signal_snapshot(&symbol).expect("signal missing");
assert_eq!(signal.state, SignalState::Blocked);
assert_eq!(signal.quality_flags, DataQualityFlags::ADAPTER_DEGRADED.bits());
assert_eq!(signal.reason, "blocked_by_quality_gate");
assert_eq!(engine.last_events().len(), 1);
}
#[test]
fn engine_materializes_book_snapshot_from_updates() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 504900,
size: 20,
action: BookAction::Upsert,
sequence: 1,
ts_exchange_ns: 10,
ts_recv_ns: 11,
},
DataQualityFlags::NONE,
)
.expect("bid ingest failed");
engine
.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Ask,
level: 1,
price: 505100,
size: 12,
action: BookAction::Upsert,
sequence: 2,
ts_exchange_ns: 12,
ts_recv_ns: 13,
},
DataQualityFlags::NONE,
)
.expect("ask ingest failed");
engine
.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 0,
size: 0,
action: BookAction::Delete,
sequence: 3,
ts_exchange_ns: 14,
ts_recv_ns: 15,
},
DataQualityFlags::NONE,
)
.expect("delete ingest failed");
let snapshot = engine.book_snapshot(&symbol).expect("book snapshot missing");
assert!(snapshot.bids.is_empty());
assert_eq!(snapshot.asks.len(), 1);
assert_eq!(snapshot.asks[0].level, 1);
assert_eq!(snapshot.asks[0].price, 505100);
assert_eq!(snapshot.asks[0].size, 12);
assert_eq!(snapshot.last_sequence, 3);
assert_eq!(snapshot.ts_exchange_ns, 14);
assert_eq!(snapshot.ts_recv_ns, 15);
assert!(engine.metrics_json().contains("\"symbols\":1"));
}
#[test]
fn engine_exposes_derived_analytics_snapshot() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 10,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 2,
},
DataQualityFlags::NONE,
)
.expect("trade 1");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 504900,
size: 5,
aggressor_side: Side::Bid,
sequence: 2,
ts_exchange_ns: 3,
ts_recv_ns: 4,
},
DataQualityFlags::NONE,
)
.expect("trade 2");
let derived = engine
.derived_analytics_snapshot(&symbol)
.expect("derived analytics missing");
assert_eq!(derived.total_volume, 15);
assert_eq!(derived.trade_count, 2);
assert_eq!(derived.average_trade_size, 7);
assert_eq!(derived.vwap, 504966);
assert_eq!(derived.imbalance_bps, 3333);
}
#[test]
fn engine_exposes_session_candle_snapshot() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 10,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 10,
ts_recv_ns: 11,
},
DataQualityFlags::NONE,
)
.expect("trade 1");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 504900,
size: 5,
aggressor_side: Side::Bid,
sequence: 2,
ts_exchange_ns: 20,
ts_recv_ns: 21,
},
DataQualityFlags::NONE,
)
.expect("trade 2");
let candle = engine
.session_candle_snapshot(&symbol)
.expect("session candle missing");
assert_eq!(candle.open, 505000);
assert_eq!(candle.high, 505000);
assert_eq!(candle.low, 504900);
assert_eq!(candle.close, 504900);
assert_eq!(candle.trade_count, 2);
assert_eq!(candle.first_ts_exchange_ns, 10);
assert_eq!(candle.last_ts_exchange_ns, 20);
}
#[test]
fn engine_exposes_interval_candle_snapshot() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
for (price, size, side, seq, ts) in [
(505000, 10, Side::Ask, 1u64, 10u64),
(504900, 5, Side::Bid, 2u64, 40u64),
(505100, 8, Side::Ask, 3u64, 100u64),
] {
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price,
size,
aggressor_side: side,
sequence: seq,
ts_exchange_ns: ts,
ts_recv_ns: ts + 1,
},
DataQualityFlags::NONE,
)
.expect("trade ingest");
}
let candle = engine
.interval_candle_snapshot(&symbol, 70)
.expect("interval candle missing");
assert_eq!(candle.window_ns, 70);
assert_eq!(candle.open, 504900);
assert_eq!(candle.high, 505100);
assert_eq!(candle.low, 504900);
assert_eq!(candle.close, 505100);
assert_eq!(candle.trade_count, 2);
assert_eq!(candle.total_volume, 13);
assert_eq!(candle.vwap, 505023);
assert_eq!(candle.first_ts_exchange_ns, 40);
assert_eq!(candle.last_ts_exchange_ns, 100);
}
#[test]
fn external_supervisor_sets_sequence_and_order_flags() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.configure_external_feed(ExternalFeedPolicy {
stale_after_ms: 0,
enforce_sequence: true,
})
.expect("configure external feed");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 1,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 1,
},
DataQualityFlags::NONE,
)
.expect("ingest seq1");
let s1 = engine.signal_snapshot(&symbol).expect("signal 1");
assert_eq!(s1.quality_flags, 0);
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505001,
size: 1,
aggressor_side: Side::Ask,
sequence: 3,
ts_exchange_ns: 2,
ts_recv_ns: 2,
},
DataQualityFlags::NONE,
)
.expect("ingest seq3");
let s2 = engine.signal_snapshot(&symbol).expect("signal 2");
assert!(s2.quality_flags & DataQualityFlags::SEQUENCE_GAP.bits() != 0);
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505002,
size: 1,
aggressor_side: Side::Ask,
sequence: 2,
ts_exchange_ns: 3,
ts_recv_ns: 3,
},
DataQualityFlags::NONE,
)
.expect("ingest seq2");
let s3 = engine.signal_snapshot(&symbol).expect("signal 3");
assert!(s3.quality_flags & DataQualityFlags::OUT_OF_ORDER.bits() != 0);
}
#[test]
fn external_supervisor_reconnecting_and_stale_flags_affect_health() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.configure_external_feed(ExternalFeedPolicy {
stale_after_ms: 1,
enforce_sequence: true,
})
.expect("configure external feed");
engine
.set_external_reconnecting(true)
.expect("set reconnecting true");
let degraded = engine.health_json();
assert!(degraded.contains(&format!(
"\"quality_flags\":{}",
DataQualityFlags::ADAPTER_DEGRADED.bits()
)));
assert!(degraded.contains("\"quality_flags_detail\":[\"ADAPTER_DEGRADED\"]"));
assert!(degraded.contains("\"external_feed_reconnecting\":true"));
engine
.set_external_reconnecting(false)
.expect("set reconnecting false");
engine
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 1,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 1,
},
DataQualityFlags::NONE,
)
.expect("ingest");
std::thread::sleep(std::time::Duration::from_millis(3));
engine.external_health_tick().expect("health tick");
let stale = engine.health_json();
assert!(stale.contains(&format!(
"\"quality_flags\":{}",
DataQualityFlags::STALE_FEED.bits()
)));
assert!(stale.contains("\"quality_flags_detail\":[\"STALE_FEED\"]"));
assert!(stale.contains("\"external_last_ingest_ns\":"));
}
#[test]
fn default_builder_wires_mock_provider() {
let cfg = EngineConfig {
adapter: AdapterConfig {
provider: ProviderKind::Mock,
..AdapterConfig::default()
},
..EngineConfig::default()
};
let mut engine = build_default_engine(cfg).expect("build should work");
engine.start().expect("start should work");
let metrics = engine.metrics_json();
assert!(metrics.contains("\"started\":true"));
assert!(metrics.contains("\"adapter_protocol_info\""));
assert!(metrics.contains("\"health_seq\":"));
assert!(metrics.contains("\"quality_flags_detail\":"));
}
#[test]
fn health_and_metrics_include_additive_observability_fields() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
engine.start().expect("start failed");
engine.subscribe(symbol.clone(), 10).expect("sub failed");
engine
.configure_external_feed(ExternalFeedPolicy {
stale_after_ms: 15_000,
enforce_sequence: true,
})
.expect("configure external feed");
engine
.ingest_trade(
TradePrint {
symbol,
price: 505000,
size: 2,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 11,
ts_recv_ns: 12,
},
DataQualityFlags::NONE,
)
.expect("ingest trade");
let health = engine.health_json();
assert!(health.contains("\"tracked_symbols\":1"));
assert!(health.contains("\"processed_events\":1"));
assert!(health.contains("\"external_feed_enabled\":true"));
assert!(health.contains("\"external_sequence_enforced\":true"));
assert!(health.contains("\"quality_flags_detail\":[]"));
let metrics = engine.metrics_json();
assert!(metrics.contains("\"book_symbols\":0"));
assert!(metrics.contains("\"analytics_symbols\":1"));
assert!(metrics.contains("\"signal_symbols\":1"));
assert!(metrics.contains("\"external_trade_sequence_symbols\":1"));
assert!(metrics.contains("\"external_book_sequence_symbols\":0"));
assert!(metrics.contains("\"external_last_ingest_ns\":"));
}
#[test]
fn parses_toml_file_config() {
let path = write_temp_file(
"runtime_cfg.toml",
r#"
instance_id = "from_toml"
enable_persistence = true
signal_threshold = 250
provider = "mock"
data_root = "data_local"
audit_log_path = "audit/local.log"
"#,
);
let cfg = load_engine_config_from_path(path.to_str().expect("valid path"))
.expect("toml parse should work");
assert_eq!(cfg.instance_id, "from_toml");
assert!(cfg.enable_persistence);
assert_eq!(cfg.signal_threshold, 250);
assert_eq!(cfg.data_root, "data_local");
assert_eq!(cfg.audit_log_path, "audit/local.log");
assert!(matches!(cfg.adapter.provider, ProviderKind::Mock));
}
#[test]
fn validates_non_mock_requires_env_refs() {
let cfg = EngineConfig {
adapter: AdapterConfig {
provider: ProviderKind::Cqg,
endpoint: Some("cqg://example".to_string()),
credentials: Some(CredentialsRef {
key_id_env: "OF_TEST_MISSING_KEY".to_string(),
secret_env: "OF_TEST_MISSING_SECRET".to_string(),
}),
..AdapterConfig::default()
},
..EngineConfig::default()
};
let err = validate_startup_config(&cfg).expect_err("missing env vars should fail");
assert!(format!("{err}").contains("missing required env var"));
}
#[test]
fn parses_binance_provider_without_credentials() {
let path = write_temp_file(
"runtime_cfg_binance.toml",
r#"
instance_id = "from_toml_binance"
provider = "binance"
endpoint = "mock://binance"
"#,
);
let cfg = load_engine_config_from_path(path.to_str().expect("valid path"))
.expect("toml parse should work");
assert!(matches!(cfg.adapter.provider, ProviderKind::Binance));
validate_startup_config(&cfg).expect("binance should not require creds");
}
#[test]
fn parses_nested_toml_config_strictly() {
let path = write_temp_file(
"runtime_cfg_nested.toml",
r#"
instance_id = "strict_toml"
enable_persistence = true
signal_threshold = 300
data_root = "strict_data"
audit_log_path = "audit/strict.log"
audit_redact_tokens = ["secret", "token"]
[adapter]
provider = "cqg"
endpoint = "wss://demoapi.cqg.com/feed"
app_name = "strict-runtime"
[adapter.credentials]
key_id_env = "OF_STRICT_KEY"
secret_env = "OF_STRICT_SECRET"
"#,
);
let cfg = load_engine_config_from_path(path.to_str().expect("valid path"))
.expect("strict toml parse should work");
assert_eq!(cfg.instance_id, "strict_toml");
assert!(cfg.enable_persistence);
assert_eq!(cfg.signal_threshold, 300);
assert_eq!(cfg.audit_redact_tokens, vec!["secret", "token"]);
assert!(matches!(cfg.adapter.provider, ProviderKind::Cqg));
assert_eq!(
cfg.adapter.endpoint.as_deref(),
Some("wss://demoapi.cqg.com/feed")
);
assert_eq!(cfg.adapter.app_name.as_deref(), Some("strict-runtime"));
let creds = cfg.adapter.credentials.expect("credentials");
assert_eq!(creds.key_id_env, "OF_STRICT_KEY");
assert_eq!(creds.secret_env, "OF_STRICT_SECRET");
let report = load_engine_config_report_from_path(path.to_str().expect("valid path"))
.expect("strict report should work");
assert_eq!(report.format, "toml");
assert_eq!(report.compatibility_mode, ConfigCompatibilityMode::Strict);
assert!(!report.used_legacy_fallback());
assert!(report.warning.is_none());
}
#[test]
fn parses_nested_json_config_strictly() {
let path = write_temp_file(
"runtime_cfg_nested.json",
r#"{
"instance_id": "strict_json",
"signal_threshold": 175,
"audit_redact_tokens": ["secret", "password"],
"adapter": {
"provider": "binance",
"endpoint": "wss://stream.binance.com:9443/ws",
"app_name": "strict-json-runtime"
}
}"#,
);
let cfg = load_engine_config_from_path(path.to_str().expect("valid path"))
.expect("strict json parse should work");
assert_eq!(cfg.instance_id, "strict_json");
assert_eq!(cfg.signal_threshold, 175);
assert_eq!(cfg.audit_redact_tokens, vec!["secret", "password"]);
assert!(matches!(cfg.adapter.provider, ProviderKind::Binance));
assert_eq!(
cfg.adapter.endpoint.as_deref(),
Some("wss://stream.binance.com:9443/ws")
);
assert_eq!(cfg.adapter.app_name.as_deref(), Some("strict-json-runtime"));
let report = load_engine_config_report_from_path(path.to_str().expect("valid path"))
.expect("strict report should work");
assert_eq!(report.format, "json");
assert_eq!(report.compatibility_mode, ConfigCompatibilityMode::Strict);
assert!(!report.used_legacy_fallback());
assert!(report.warning.is_none());
}
#[test]
fn legacy_fallback_still_accepts_flat_json_like_shape() {
let path = write_temp_file(
"runtime_cfg_legacy.json",
"{\n\"instance_id\": \"legacy_json\",\n\"provider\": \"mock\",\n\"signal_threshold\": \"250\",\n\"audit_redact_tokens\": \"secret,token\"\n}\n",
);
let cfg = load_engine_config_from_path(path.to_str().expect("valid path"))
.expect("legacy fallback should still work");
assert_eq!(cfg.instance_id, "legacy_json");
assert!(matches!(cfg.adapter.provider, ProviderKind::Mock));
assert_eq!(cfg.signal_threshold, 250);
assert_eq!(cfg.audit_redact_tokens, vec!["secret", "token"]);
let report = load_engine_config_report_from_path(path.to_str().expect("valid path"))
.expect("legacy report should still work");
assert_eq!(report.format, "json");
assert_eq!(report.compatibility_mode, ConfigCompatibilityMode::LegacyFallback);
assert!(report.used_legacy_fallback());
assert!(report.warning.as_deref().unwrap_or("").contains("legacy json fallback"));
}
#[test]
fn audit_log_rotates_and_redacts() {
let base = temp_dir("audit_rotate");
let audit_path = base.join("audit.log");
let data_root = base.join("data");
let mut engine = build_default_engine(EngineConfig {
instance_id: "audit-test".to_string(),
enable_persistence: false,
data_root: data_root.to_string_lossy().to_string(),
audit_log_path: audit_path.to_string_lossy().to_string(),
audit_max_bytes: 180,
audit_max_files: 2,
audit_redact_tokens: vec!["token".to_string()],
data_retention_max_bytes: 1024,
data_retention_max_age_secs: 60,
adapter: AdapterConfig::default(),
signal_threshold: 100,
})
.expect("engine build should work");
engine.start().expect("start should work");
for i in 0..6 {
engine
.subscribe(
SymbolId {
venue: "CME".to_string(),
symbol: format!("ES_token_{i}"),
},
10,
)
.expect("subscribe should work");
}
engine.stop();
let current = fs::read_to_string(&audit_path).expect("current audit must exist");
assert!(current.contains("[REDACTED]"));
assert!(rotated_path(&audit_path, 1).exists());
}
#[test]
fn reset_symbol_session_clears_analytics() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut adapter = MockAdapter::default();
adapter.push_event(RawEvent::Trade(TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 10,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 2,
}));
let mut engine = Engine::new(
EngineConfig::default(),
adapter,
DeltaMomentumSignal::new(5),
);
engine.start().expect("start");
engine.subscribe(symbol.clone(), 10).expect("subscribe");
engine.poll_once(DataQualityFlags::NONE).expect("poll");
let pre = engine.analytics_snapshot(&symbol).expect("pre");
assert!(pre.cumulative_delta > 0);
engine
.reset_symbol_session(symbol.clone())
.expect("reset session");
let post = engine.analytics_snapshot(&symbol).expect("post");
assert_eq!(post.delta, 0);
assert_eq!(post.cumulative_delta, 0);
assert_eq!(post.point_of_control, 0);
}
#[test]
fn poll_backpressure_limit_marks_health_and_counts_drops() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut adapter = MockAdapter::default();
for sequence in 1..=3 {
adapter.push_event(RawEvent::Trade(TradePrint {
symbol: symbol.clone(),
price: 505000,
size: 1,
aggressor_side: Side::Ask,
sequence,
ts_exchange_ns: sequence,
ts_recv_ns: sequence + 1,
}));
}
let mut engine = Engine::new(
EngineConfig::default(),
adapter,
DeltaMomentumSignal::new(5),
)
.with_max_events_per_poll(Some(2));
engine.start().expect("start");
engine.subscribe(symbol.clone(), 10).expect("subscribe");
let err = engine
.poll_once(DataQualityFlags::NONE)
.expect_err("poll should report backpressure");
assert!(err.is_backpressure());
assert_eq!(engine.last_events().len(), 2);
let analytics = engine.analytics_snapshot(&symbol).expect("analytics");
assert_eq!(analytics.buy_volume, 2);
assert_eq!(engine.current_quality_flags_bits(), DataQualityFlags::ADAPTER_DEGRADED.bits());
let metrics = engine.metrics_json();
assert!(metrics.contains("\"max_events_per_poll\":2"));
assert!(metrics.contains("\"backpressure_dropped_events\":1"));
}
#[test]
fn circuit_breaker_opens_after_repeated_adapter_failures() {
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let mut engine = Engine::new(
EngineConfig::default(),
FailingAdapter::default(),
DeltaMomentumSignal::new(5),
)
.with_circuit_breaker(1, 60_000);
engine.start().expect("start");
engine.subscribe(symbol, 10).expect("subscribe");
let first = engine
.poll_once(DataQualityFlags::NONE)
.expect_err("first poll should fail from adapter");
assert!(!first.is_circuit_open());
let second = engine
.poll_once(DataQualityFlags::NONE)
.expect_err("second poll should hit open circuit");
assert!(second.is_circuit_open());
assert_eq!(engine.current_quality_flags_bits(), DataQualityFlags::ADAPTER_DEGRADED.bits());
let health = engine.health_json();
assert!(health.contains("\"adapter_total_count\":1"));
assert!(health.contains("\"adapter_healthy_count\":0"));
assert!(health.contains("\"runtime_health_status\":\"degraded\""));
assert!(health.contains("\"circuit_breaker_enabled\":true"));
assert!(health.contains("\"circuit_breaker_open\":true"));
assert!(health.contains("\"circuit_breaker_consecutive_failures\":1"));
assert!(health.contains("\"circuit_breaker_opened_count\":1"));
let metrics = engine.metrics_json();
assert!(metrics.contains("\"adapter_total_count\":1"));
assert!(metrics.contains("\"adapter_healthy_count\":0"));
assert!(metrics.contains("\"circuit_breaker_open\":true"));
}
#[test]
fn persisted_events_replay_to_matching_runtime_state() {
let root = temp_dir("e2e_persist_replay");
let symbol = SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
};
let persistence = RollingStore::new(&root).expect("store");
let mut live = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
)
.with_persistence(Some(persistence.clone()));
live.start().expect("live start");
live.subscribe(symbol.clone(), 10).expect("live subscribe");
live.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 504900,
size: 20,
action: BookAction::Upsert,
sequence: 1,
ts_exchange_ns: 10,
ts_recv_ns: 11,
},
DataQualityFlags::NONE,
)
.expect("live book");
for (sequence, price, size, side) in [
(2, 505000, 10, Side::Ask),
(3, 504900, 4, Side::Bid),
(4, 505100, 7, Side::Ask),
] {
live.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price,
size,
aggressor_side: side,
sequence,
ts_exchange_ns: sequence * 10,
ts_recv_ns: sequence * 10 + 1,
},
DataQualityFlags::NONE,
)
.expect("live trade");
}
let replay_events = persistence
.read_events(&symbol.venue, &symbol.symbol)
.expect("read persisted events");
assert_eq!(replay_events.len(), 4);
let mut replay = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::new(5),
);
replay.start().expect("replay start");
replay.subscribe(symbol.clone(), 10).expect("replay subscribe");
for event in replay_events {
match event {
StoredEvent::Book(book) => replay
.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: book.side,
level: book.level,
price: book.price,
size: book.size,
action: book.action,
sequence: book.sequence,
ts_exchange_ns: 0,
ts_recv_ns: 0,
},
DataQualityFlags::NONE,
)
.expect("replay book"),
StoredEvent::Trade(trade) => replay
.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: trade.price,
size: trade.size,
aggressor_side: trade.aggressor_side,
sequence: trade.sequence,
ts_exchange_ns: 0,
ts_recv_ns: 0,
},
DataQualityFlags::NONE,
)
.expect("replay trade"),
}
}
let live_analytics = live.analytics_snapshot(&symbol).expect("live analytics");
let replay_analytics = replay
.analytics_snapshot(&symbol)
.expect("replay analytics");
assert_eq!(replay_analytics.delta, live_analytics.delta);
assert_eq!(
replay_analytics.cumulative_delta,
live_analytics.cumulative_delta
);
assert_eq!(replay_analytics.buy_volume, live_analytics.buy_volume);
assert_eq!(replay_analytics.sell_volume, live_analytics.sell_volume);
assert_eq!(replay_analytics.last_price, live_analytics.last_price);
assert_eq!(
replay_analytics.point_of_control,
live_analytics.point_of_control
);
let live_signal = live.signal_snapshot(&symbol).expect("live signal");
let replay_signal = replay.signal_snapshot(&symbol).expect("replay signal");
assert_eq!(replay_signal.state, live_signal.state);
assert_eq!(replay_signal.confidence_bps, live_signal.confidence_bps);
let live_book = live.book_snapshot(&symbol).expect("live book snapshot");
let replay_book = replay.book_snapshot(&symbol).expect("replay book snapshot");
assert_eq!(replay_book.bids, live_book.bids);
assert_eq!(replay_book.asks, live_book.asks);
}
fn write_temp_file(name: &str, content: &str) -> PathBuf {
let mut path = std::env::temp_dir();
let nonce = format!(
"{}_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock ok")
.as_nanos(),
name
);
path.push(nonce);
fs::write(&path, content).expect("temp file write should work");
path
}
fn temp_dir(name: &str) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!(
"{}_{}_{}",
std::process::id(),
name,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock ok")
.as_nanos()
));
fs::create_dir_all(&path).expect("temp dir create should work");
path
}
#[derive(Debug, Default)]
struct FailingAdapter {
connected: bool,
}
impl MarketDataAdapter for FailingAdapter {
fn connect(&mut self) -> AdapterResult<()> {
self.connected = true;
Ok(())
}
fn subscribe(&mut self, _req: SubscribeReq) -> AdapterResult<()> {
Ok(())
}
fn unsubscribe(&mut self, _symbol: SymbolId) -> AdapterResult<()> {
Ok(())
}
fn poll(&mut self, _out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
Err(AdapterError::Other("forced poll failure".to_string()))
}
fn health(&self) -> AdapterHealth {
AdapterHealth {
connected: self.connected,
degraded: false,
last_error: None,
protocol_info: Some("failing_adapter".to_string()),
}
}
}
}