Expand description
§of_runtime
of_runtime is the orchestration layer for live, replay, and external-ingest workflows.
It wires adapter events into book state and analytics, applies quality-aware signal logic, and exposes snapshots plus health/metrics payloads.
§Runtime Responsibilities
- Connect and supervise a
MarketDataAdapter. - Process normalized
RawEventstreams. - Materialize book snapshots from normalized book updates.
- Update analytics using
of_core. - Evaluate signal modules from
of_signals. - Gate risk-sensitive output using
DataQualityFlags. - Optionally persist event streams via
of_persist.
§New In 0.3.0
0.3.0 is an additive operational hardening release for of_runtime:
- opt-in backpressure with
Engine::with_max_events_per_pollandOF_RUNTIME_MAX_EVENTS_PER_POLL - opt-in adapter circuit breaking with
Engine::with_circuit_breaker,OF_RUNTIME_CIRCUIT_BREAKER_FAILURES, andOF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS - additive aggregate health fields in
health_json()andmetrics_json() - end-to-end persist -> readback -> replay parity coverage for analytics, signals, and materialized book state
§New In 0.2.0
Relative to the 0.1.x line, of_runtime now adds or hardens:
- real
Engine::book_snapshotsupport - additive derived analytics, session candle, and interval candle snapshots
- config compatibility reporting with
ConfigLoadReport - stronger live supervision and richer health/metrics payloads
- cleaner internal modularization without changing the public API
§Main Types
EngineConfig- runtime, adapter, audit, and retention config.Engine<A, S>- generic runtime over adapter and signal module.DefaultEngine- boxed adapter + default delta signal.RuntimeError- lifecycle/config/adapter/io errors.ExternalFeedPolicy- stale and sequence policy for non-adapter ingest mode.- Snapshot accessors:
§Public API Inventory
Public types:
EngineConfigRuntimeErrorExternalFeedPolicyEngine<A, S>DefaultEngineConfigCompatibilityModeConfigLoadReport
Public top-level functions:
build_default_engineload_engine_config_from_pathload_engine_config_report_from_pathvalidate_startup_config
Public ConfigLoadReport method:
Public Engine<A, S> methods:
Engine::newEngine::with_persistenceEngine::startEngine::stopEngine::subscribeEngine::unsubscribeEngine::reset_symbol_sessionEngine::configure_external_feedEngine::set_external_reconnectingEngine::external_health_tickEngine::ingest_tradeEngine::ingest_bookEngine::poll_onceEngine::analytics_snapshotEngine::derived_analytics_snapshotEngine::session_candle_snapshotEngine::interval_candle_snapshotEngine::book_snapshotEngine::signal_snapshotEngine::metrics_jsonEngine::health_seqEngine::health_jsonEngine::last_eventsEngine::current_quality_flags_bitsEngine::with_max_events_per_pollEngine::with_circuit_breaker
§EngineConfig Field Reference
EngineConfig is the runtime control plane.
instance_id: logical engine name used in audit/metrics output.enable_persistence: enables JSONL persistence throughRollingStore.data_root: persistence root directory when persistence is enabled.audit_log_path: audit log file path used by runtime audit output.audit_max_bytes: max bytes before audit rotation.audit_max_files: max rotated audit files retained.audit_redact_tokens: case-insensitive token list scrubbed from audit text.data_retention_max_bytes: persisted-data byte cap;0disables the limit.data_retention_max_age_secs: persisted-data age cap in seconds;0disables the limit.adapter: provider configuration forwarded toof_adapters.signal_threshold: default threshold used bybuild_default_engine.
§Lifecycle Contract
The runtime has a simple state machine:
- Build an
EnginewithEngine::neworbuild_default_engine. - Optionally attach persistence with
Engine::with_persistence. - Call
Engine::startbefore subscribe, poll, or external-ingest operations. - Use either adapter polling or external ingest.
- Read snapshots and health/metrics as needed.
- Call
Engine::stopwhen done.
Operational rules:
Engine::startvalidates config and connects the adapter.Engine::subscribeandEngine::unsubscriberequire a started engine.Engine::poll_onceis for adapter-driven mode.Engine::ingest_tradeandEngine::ingest_bookare for externally-fed mode.- Snapshot getters return
Option<_>and remainNoneuntil enough symbol data has been observed. Engine::reset_symbol_sessionclears session analytics for one symbol without dropping the subscription itself.
§External Ingest Contract
Use external ingest when a separate bridge, broker API, or custom parser already owns transport.
Engine::configure_external_feedenables stale/sequence supervision rules.Engine::set_external_reconnectinglets a bridge tell the runtime it is currently degraded/reconnecting.Engine::external_health_tickadvances stale-feed supervision when no data has arrived recently.Engine::ingest_tradeandEngine::ingest_bookaccept caller-suppliedDataQualityFlagsso upstream bridges can forward their own quality judgments.
The runtime still applies its own sequence and stale checks on top of caller-supplied flags.
§Snapshot Semantics
All snapshot getters are additive and side-effect free.
Engine::analytics_snapshotreturns the base analytics payload for one symbol.Engine::derived_analytics_snapshotreturns the additive totals view for one symbol.Engine::session_candle_snapshotreturns one session candle for one symbol.Engine::interval_candle_snapshotcomputes one rolling-window candle for one symbol using the providedwindow_ns.Engine::book_snapshotreturns the materialized book when book updates have been seen.Engine::signal_snapshotreturns the latest evaluated signal state for one symbol.
Return behavior:
Nonemeans the runtime has not yet observed enough data to build that snapshot for the requested symbol.- Returned snapshots are clones of runtime state; callers can keep them without affecting engine internals.
§Health and Metrics Reference
Engine::health_seqis the cheap monotonic change counter for external polling loops.Engine::health_jsonis the user-facing operational snapshot and includes connectivity, degradation, quality flags, supervision metadata, and tracked symbol counts.Engine::metrics_jsonis the counter-oriented metrics payload and includes processed event counts, quality flag detail, and subsystem counts.Engine::current_quality_flags_bitsexposes the current runtime quality bitset directly for low-allocation callers.Engine::last_eventsexposes the last processed raw event batch for inspection/testing.Engine::with_max_events_per_polloptionally enables a per-poll drain limit for hosts that need explicit backpressure. The same limit can be set for default engines withOF_RUNTIME_MAX_EVENTS_PER_POLL.
Compatibility rule:
- field names in JSON payloads are treated as stable once published
- new metrics/health fields are added additively rather than replacing old fields
§Config Loading and Validation Reference
load_engine_config_from_pathreturns only the parsedEngineConfig.load_engine_config_report_from_pathalso returns file format, compatibility mode, and optional warning text.validate_startup_configchecks required env vars, retention settings, adapter endpoint rules, and signal/audit sanity before going live.ConfigCompatibilityMode::Strictmeans typed TOML/JSON parsing succeeded directly.ConfigCompatibilityMode::LegacyFallbackmeans an older flat-key config was accepted through the compatibility loader.ConfigLoadReport::used_legacy_fallbackis the simplest check for surfacing upgrade guidance in hosts or CLIs.
§Persistence Integration
When persistence is enabled, the runtime writes normalized book and trade streams through RollingStore.
- Persistence is optional and does not change runtime snapshot semantics.
- Retention limits are enforced through
RetentionPolicy. - The runtime persists normalized events, not provider-native wire payloads.
- Readback and replay are handled by
of_persistandexamples/replay_cli.
§End-to-End Example (Adapter Polling)
use of_adapters::MockAdapter;
use of_core::{DataQualityFlags, SymbolId};
use of_runtime::{Engine, EngineConfig};
use of_signals::DeltaMomentumSignal;
let adapter = MockAdapter::default();
let signal = DeltaMomentumSignal::new(100);
let mut engine = Engine::new(EngineConfig::default(), adapter, signal);
engine.start()?;
engine.subscribe(SymbolId {
venue: "SIM".to_string(),
symbol: "ESM6".to_string(),
}, 10)?;
let _processed = engine.poll_once(DataQualityFlags::NONE)?;
engine.stop();§External Ingest Example (Broker Bridge)
use of_adapters::MockAdapter;
use of_core::{DataQualityFlags, Side, SymbolId, TradePrint};
use of_runtime::{Engine, EngineConfig, ExternalFeedPolicy};
use of_signals::DeltaMomentumSignal;
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
DeltaMomentumSignal::default(),
);
engine.start()?;
engine.configure_external_feed(ExternalFeedPolicy {
stale_after_ms: 15_000,
enforce_sequence: true,
})?;
engine.ingest_trade(TradePrint {
symbol: SymbolId { venue: "BINANCE".into(), symbol: "BTCUSDT".into() },
price: 62_500_00,
size: 100,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 2,
}, DataQualityFlags::NONE)?;
let health = engine.health_json();
assert!(health.contains("\"started\":true"));§Snapshot Contracts
Engine::book_snapshotreturns the materialized book state for a symbol when book updates have been observed.- Book snapshots contain
bidsandasksarrays, each ordered bylevel. Engine::analytics_snapshotandEngine::signal_snapshotretain their current payload semantics.Engine::derived_analytics_snapshotexposes additive session metrics without changing the original analytics snapshot contract.Engine::session_candle_snapshotexposes additive candle-style session state withopen,high,low,close,trade_count, and first/last exchange timestamps.Engine::interval_candle_snapshotexposes a rolling-window candle view for a caller-suppliedwindow_nswithopen,high,low,close,trade_count,total_volume,vwap, and first/last exchange timestamps.
§Health and Metrics Contracts
Engine::metrics_jsonexposes runtime counters and adapter status.Engine::health_jsonexposes connectivity/degradation/reconnect state and active quality flags.Engine::health_seqincrements on meaningful health transitions for cheap external polling.- Existing JSON field names remain stable; new observability fields are added additively.
Engine::health_jsonalso includesquality_flags_detail,tracked_symbols,processed_events, and external supervision fields such asexternal_last_ingest_ns.Engine::metrics_jsonalso includeshealth_seq, per-subsystem symbol counts,quality_flags_detail, external sequence-cache counts, aggregate adapter health, circuit-breaker state,max_events_per_poll, andbackpressure_dropped_eventsfor live diagnostics.
§Backpressure Policy
Backpressure is disabled by default to preserve existing poll behavior. When
with_max_events_per_poll(Some(n)) or OF_RUNTIME_MAX_EVENTS_PER_POLL=n is
set, a poll that drains more than n adapter events processes the first n
events, drops the remainder from that drain, sets the ADAPTER_DEGRADED
quality flag, and returns a backpressure error. The C ABI maps that condition to
OF_ERR_BACKPRESSURE.
§Circuit Breaker Policy
Adapter circuit breaking is disabled by default to preserve existing polling
semantics. Direct Rust callers can opt in with
with_circuit_breaker(failure_threshold, cooldown_ms). Default engines can opt
in with OF_RUNTIME_CIRCUIT_BREAKER_FAILURES; the cooldown defaults to 1000 ms
and can be overridden with OF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS.
When enabled, consecutive adapter poll failures open the circuit for the
configured cooldown window. Polls attempted during that window return a
circuit_open adapter error, mark the runtime degraded, and expose
circuit_breaker_*, adapter_total_count, adapter_healthy_count, and
runtime_health_status fields through health and metrics JSON.
§Config Loading
Use load_engine_config_from_path to load TOML config files, load_engine_config_report_from_path
when you also want compatibility diagnostics, and validate_startup_config to fail fast on
missing credentials or invalid startup settings before going live.
Preferred config files use typed TOML/JSON shapes with nested adapter and adapter.credentials
sections. Legacy flat config shapes are still accepted through a compatibility fallback so older
deployments continue to load without source changes. ConfigLoadReport tells you whether strict
parsing succeeded or the legacy fallback path was required.
§Operational Guidance
- Call
Engine::startbefore subscribe/poll/ingest operations. - Use
configure_external_feed+external_health_tickwhen ingesting from non-adapter bridges. - For deterministic simulation, pair
MockAdapterwith replayed events and fixed timestamps. - Prefer
load_engine_config_report_from_pathin user-facing CLIs or services so you can warn when a config only loaded through compatibility fallback.
§Real-World Use Cases
§1. Live runtime with adapter-managed transport
Use the engine as the orchestration layer when the adapter owns connectivity and you want signal generation, health, persistence, and snapshots in one place.
§2. Broker bridge or gateway ingestion
If another process already owns the transport, use configure_external_feed,
ingest_trade, and ingest_book to keep quality supervision and analytics
inside the runtime.
§3. Deterministic replay and regression testing
Pair MockAdapter or external ingest with fixed event sequences and compare
snapshots, health transitions, and signal outputs.
§Detailed Example: Bridge-Driven Runtime
use of_adapters::MockAdapter;
use of_core::{BookAction, BookUpdate, DataQualityFlags, Side, SymbolId, TradePrint};
use of_runtime::{Engine, EngineConfig, ExternalFeedPolicy};
use of_signals::CompositeSignal;
let symbol = SymbolId {
venue: "BINANCE".into(),
symbol: "BTCUSDT".into(),
};
let mut engine = Engine::new(
EngineConfig::default(),
MockAdapter::default(),
CompositeSignal::default(),
);
engine.start()?;
engine.configure_external_feed(ExternalFeedPolicy {
stale_after_ms: 5_000,
enforce_sequence: true,
})?;
engine.ingest_book(
BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level: 0,
price: 6_250_000,
size: 10,
action: BookAction::Upsert,
sequence: 1,
ts_exchange_ns: 1_000,
ts_recv_ns: 1_200,
},
DataQualityFlags::NONE,
)?;
engine.ingest_trade(
TradePrint {
symbol: symbol.clone(),
price: 6_250_100,
size: 2,
aggressor_side: Side::Ask,
sequence: 2,
ts_exchange_ns: 2_000,
ts_recv_ns: 2_100,
},
DataQualityFlags::NONE,
)?;
if let Some(book) = engine.book_snapshot(&symbol) {
println!("best bid levels={}", book.bids.len());
}
if let Some(derived) = engine.derived_analytics_snapshot(&symbol) {
println!("trade_count={} vwap={}", derived.trade_count, derived.vwap);
}§Strategy Construction Pattern With of_runtime
A practical runtime-backed strategy flow is:
- subscribe one or more symbols
- ingest or poll normalized events
- read
analytics_snapshotandderived_analytics_snapshotfor context - use
signal_snapshotas the strategy-facing decision surface - block action whenever
current_quality_flags_bits()orsignal_snapshot.quality_flagsindicates degradation - persist the session so the same sequence can be replayed later
Structs§
- Config
Load Report - Detailed result for config-file loading.
- Engine
- Runtime engine over a market-data adapter and signal module.
- Engine
Config - Runtime engine configuration.
- External
Feed Policy - Policy controlling quality constraints for externally-ingested feeds.
Enums§
- Config
Compatibility Mode - Indicates how a runtime config file was accepted.
- Runtime
Error - Runtime errors surfaced by engine lifecycle and processing.
Functions§
- build_
default_ engine - Builds the default runtime engine using configured provider and signal module.
- load_
engine_ config_ from_ path - Loads engine config from
.tomlor.json-like config file. - load_
engine_ config_ report_ from_ path - Loads engine config and reports whether legacy compatibility fallback was required.
- validate_
startup_ config - Validates startup configuration and environment prerequisites.
Type Aliases§
- Default
Engine - Default engine type used by C ABI and high-level bindings.