of_runtime
of_runtime is the orchestration layer for live, replay, and external-ingest workflows.
It wires adapter events into book state and analytics, applies quality-aware signal logic, and exposes snapshots plus health/metrics payloads.
Runtime Responsibilities
- Connect and supervise a
MarketDataAdapter. - Process normalized
RawEventstreams. - Materialize book snapshots from normalized book updates.
- Update analytics using
of_core. - Evaluate signal modules from
of_signals. - Gate risk-sensitive output using
DataQualityFlags. - Optionally persist event streams via
of_persist.
New In 0.3.0
0.3.0 is an additive operational hardening release for of_runtime:
- opt-in backpressure with [
Engine::with_max_events_per_poll] andOF_RUNTIME_MAX_EVENTS_PER_POLL - opt-in adapter circuit breaking with [
Engine::with_circuit_breaker],OF_RUNTIME_CIRCUIT_BREAKER_FAILURES, andOF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS - additive aggregate health fields in
health_json()andmetrics_json() - end-to-end persist -> readback -> replay parity coverage for analytics, signals, and materialized book state
New In 0.2.0
Relative to the 0.1.x line, of_runtime now adds or hardens:
- real [
Engine::book_snapshot] support - additive derived analytics, session candle, and interval candle snapshots
- config compatibility reporting with [
ConfigLoadReport] - stronger live supervision and richer health/metrics payloads
- cleaner internal modularization without changing the public API
Main Types
- [
EngineConfig] - runtime, adapter, audit, and retention config. - [
Engine<A, S>] - generic runtime over adapter and signal module. - [
DefaultEngine] - boxed adapter + default delta signal. - [
RuntimeError] - lifecycle/config/adapter/io errors. - [
ExternalFeedPolicy] - stale and sequence policy for non-adapter ingest mode. - Snapshot accessors:
- [
Engine::book_snapshot] - [
Engine::analytics_snapshot] - [
Engine::derived_analytics_snapshot] - [
Engine::session_candle_snapshot] - [
Engine::interval_candle_snapshot] - [
Engine::signal_snapshot]
- [
Public API Inventory
Public types:
- [
EngineConfig] - [
RuntimeError] - [
ExternalFeedPolicy] - [
Engine<A, S>] - [
DefaultEngine] - [
ConfigCompatibilityMode] - [
ConfigLoadReport]
Public top-level functions:
- [
build_default_engine] - [
load_engine_config_from_path] - [
load_engine_config_report_from_path] - [
validate_startup_config]
Public ConfigLoadReport method:
- [
ConfigLoadReport::used_legacy_fallback]
Public Engine<A, S> methods:
- [
Engine::new] - [
Engine::with_persistence] - [
Engine::start] - [
Engine::stop] - [
Engine::subscribe] - [
Engine::unsubscribe] - [
Engine::reset_symbol_session] - [
Engine::configure_external_feed] - [
Engine::set_external_reconnecting] - [
Engine::external_health_tick] - [
Engine::ingest_trade] - [
Engine::ingest_book] - [
Engine::poll_once] - [
Engine::analytics_snapshot] - [
Engine::derived_analytics_snapshot] - [
Engine::session_candle_snapshot] - [
Engine::interval_candle_snapshot] - [
Engine::book_snapshot] - [
Engine::signal_snapshot] - [
Engine::metrics_json] - [
Engine::health_seq] - [
Engine::health_json] - [
Engine::last_events] - [
Engine::current_quality_flags_bits] - [
Engine::with_max_events_per_poll] - [
Engine::with_circuit_breaker]
EngineConfig Field Reference
[EngineConfig] is the runtime control plane.
instance_id: logical engine name used in audit/metrics output.enable_persistence: enables JSONL persistence throughRollingStore.data_root: persistence root directory when persistence is enabled.audit_log_path: audit log file path used by runtime audit output.audit_max_bytes: max bytes before audit rotation.audit_max_files: max rotated audit files retained.audit_redact_tokens: case-insensitive token list scrubbed from audit text.data_retention_max_bytes: persisted-data byte cap;0disables the limit.data_retention_max_age_secs: persisted-data age cap in seconds;0disables the limit.adapter: provider configuration forwarded to [of_adapters].signal_threshold: default threshold used by [build_default_engine].
Lifecycle Contract
The runtime has a simple state machine:
- Build an [
Engine] with [Engine::new] or [build_default_engine]. - Optionally attach persistence with [
Engine::with_persistence]. - Call [
Engine::start] before subscribe, poll, or external-ingest operations. - Use either adapter polling or external ingest.
- Read snapshots and health/metrics as needed.
- Call [
Engine::stop] when done.
Operational rules:
- [
Engine::start] validates config and connects the adapter. - [
Engine::subscribe] and [Engine::unsubscribe] require a started engine. - [
Engine::poll_once] is for adapter-driven mode. - [
Engine::ingest_trade] and [Engine::ingest_book] are for externally-fed mode. - Snapshot getters return
Option<_>and remainNoneuntil enough symbol data has been observed. - [
Engine::reset_symbol_session] clears session analytics for one symbol without dropping the subscription itself.
External Ingest Contract
Use external ingest when a separate bridge, broker API, or custom parser already owns transport.
- [
Engine::configure_external_feed] enables stale/sequence supervision rules. - [
Engine::set_external_reconnecting] lets a bridge tell the runtime it is currently degraded/reconnecting. - [
Engine::external_health_tick] advances stale-feed supervision when no data has arrived recently. - [
Engine::ingest_trade] and [Engine::ingest_book] accept caller-suppliedDataQualityFlagsso upstream bridges can forward their own quality judgments.
The runtime still applies its own sequence and stale checks on top of caller-supplied flags.
Snapshot Semantics
All snapshot getters are additive and side-effect free.
- [
Engine::analytics_snapshot] returns the base analytics payload for one symbol. - [
Engine::derived_analytics_snapshot] returns the additive totals view for one symbol. - [
Engine::session_candle_snapshot] returns one session candle for one symbol. - [
Engine::interval_candle_snapshot] computes one rolling-window candle for one symbol using the providedwindow_ns. - [
Engine::book_snapshot] returns the materialized book when book updates have been seen. - [
Engine::signal_snapshot] returns the latest evaluated signal state for one symbol.
Return behavior:
Nonemeans the runtime has not yet observed enough data to build that snapshot for the requested symbol.- Returned snapshots are clones of runtime state; callers can keep them without affecting engine internals.
Health and Metrics Reference
- [
Engine::health_seq] is the cheap monotonic change counter for external polling loops. - [
Engine::health_json] is the user-facing operational snapshot and includes connectivity, degradation, quality flags, supervision metadata, and tracked symbol counts. - [
Engine::metrics_json] is the counter-oriented metrics payload and includes processed event counts, quality flag detail, and subsystem counts. - [
Engine::current_quality_flags_bits] exposes the current runtime quality bitset directly for low-allocation callers. - [
Engine::last_events] exposes the last processed raw event batch for inspection/testing. - [
Engine::with_max_events_per_poll] optionally enables a per-poll drain limit for hosts that need explicit backpressure. The same limit can be set for default engines withOF_RUNTIME_MAX_EVENTS_PER_POLL.
Compatibility rule:
- field names in JSON payloads are treated as stable once published
- new metrics/health fields are added additively rather than replacing old fields
Config Loading and Validation Reference
- [
load_engine_config_from_path] returns only the parsed [EngineConfig]. - [
load_engine_config_report_from_path] also returns file format, compatibility mode, and optional warning text. - [
validate_startup_config] checks required env vars, retention settings, adapter endpoint rules, and signal/audit sanity before going live. - [
ConfigCompatibilityMode::Strict] means typed TOML/JSON parsing succeeded directly. - [
ConfigCompatibilityMode::LegacyFallback] means an older flat-key config was accepted through the compatibility loader. - [
ConfigLoadReport::used_legacy_fallback] is the simplest check for surfacing upgrade guidance in hosts or CLIs.
Persistence Integration
When persistence is enabled, the runtime writes normalized book and trade streams through RollingStore.
- Persistence is optional and does not change runtime snapshot semantics.
- Retention limits are enforced through
RetentionPolicy. - The runtime persists normalized events, not provider-native wire payloads.
- Readback and replay are handled by
of_persistandexamples/replay_cli.
End-to-End Example (Adapter Polling)
use MockAdapter;
use ;
use ;
use DeltaMomentumSignal;
let adapter = default;
let signal = new;
let mut engine = new;
engine.start?;
engine.subscribe?;
let _processed = engine.poll_once?;
engine.stop;
# Ok::
External Ingest Example (Broker Bridge)
use MockAdapter;
use ;
use ;
use DeltaMomentumSignal;
let mut engine = new;
engine.start?;
engine.configure_external_feed?;
engine.ingest_trade?;
let health = engine.health_json;
assert!;
# Ok::
Snapshot Contracts
- [
Engine::book_snapshot] returns the materialized book state for a symbol when book updates have been observed. - Book snapshots contain
bidsandasksarrays, each ordered bylevel. - [
Engine::analytics_snapshot] and [Engine::signal_snapshot] retain their current payload semantics. - [
Engine::derived_analytics_snapshot] exposes additive session metrics without changing the original analytics snapshot contract. - [
Engine::session_candle_snapshot] exposes additive candle-style session state withopen,high,low,close,trade_count, and first/last exchange timestamps. - [
Engine::interval_candle_snapshot] exposes a rolling-window candle view for a caller-suppliedwindow_nswithopen,high,low,close,trade_count,total_volume,vwap, and first/last exchange timestamps.
Health and Metrics Contracts
- [
Engine::metrics_json] exposes runtime counters and adapter status. - [
Engine::health_json] exposes connectivity/degradation/reconnect state and active quality flags. - [
Engine::health_seq] increments on meaningful health transitions for cheap external polling. - Existing JSON field names remain stable; new observability fields are added additively.
- [
Engine::health_json] also includesquality_flags_detail,tracked_symbols,processed_events, and external supervision fields such asexternal_last_ingest_ns. - [
Engine::metrics_json] also includeshealth_seq, per-subsystem symbol counts,quality_flags_detail, external sequence-cache counts, aggregate adapter health, circuit-breaker state,max_events_per_poll, andbackpressure_dropped_eventsfor live diagnostics.
Backpressure Policy
Backpressure is disabled by default to preserve existing poll behavior. When
with_max_events_per_poll(Some(n)) or OF_RUNTIME_MAX_EVENTS_PER_POLL=n is
set, a poll that drains more than n adapter events processes the first n
events, drops the remainder from that drain, sets the ADAPTER_DEGRADED
quality flag, and returns a backpressure error. The C ABI maps that condition to
OF_ERR_BACKPRESSURE.
Circuit Breaker Policy
Adapter circuit breaking is disabled by default to preserve existing polling
semantics. Direct Rust callers can opt in with
with_circuit_breaker(failure_threshold, cooldown_ms). Default engines can opt
in with OF_RUNTIME_CIRCUIT_BREAKER_FAILURES; the cooldown defaults to 1000 ms
and can be overridden with OF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS.
When enabled, consecutive adapter poll failures open the circuit for the
configured cooldown window. Polls attempted during that window return a
circuit_open adapter error, mark the runtime degraded, and expose
circuit_breaker_*, adapter_total_count, adapter_healthy_count, and
runtime_health_status fields through health and metrics JSON.
Config Loading
Use [load_engine_config_from_path] to load TOML config files, [load_engine_config_report_from_path]
when you also want compatibility diagnostics, and [validate_startup_config] to fail fast on
missing credentials or invalid startup settings before going live.
Preferred config files use typed TOML/JSON shapes with nested adapter and adapter.credentials
sections. Legacy flat config shapes are still accepted through a compatibility fallback so older
deployments continue to load without source changes. ConfigLoadReport tells you whether strict
parsing succeeded or the legacy fallback path was required.
Operational Guidance
- Call [
Engine::start] before subscribe/poll/ingest operations. - Use
configure_external_feed+external_health_tickwhen ingesting from non-adapter bridges. - For deterministic simulation, pair
MockAdapterwith replayed events and fixed timestamps. - Prefer [
load_engine_config_report_from_path] in user-facing CLIs or services so you can warn when a config only loaded through compatibility fallback.
Real-World Use Cases
1. Live runtime with adapter-managed transport
Use the engine as the orchestration layer when the adapter owns connectivity and you want signal generation, health, persistence, and snapshots in one place.
2. Broker bridge or gateway ingestion
If another process already owns the transport, use configure_external_feed,
ingest_trade, and ingest_book to keep quality supervision and analytics
inside the runtime.
3. Deterministic replay and regression testing
Pair MockAdapter or external ingest with fixed event sequences and compare
snapshots, health transitions, and signal outputs.
Detailed Example: Bridge-Driven Runtime
use MockAdapter;
use ;
use ;
use CompositeSignal;
let symbol = SymbolId ;
let mut engine = new;
engine.start?;
engine.configure_external_feed?;
engine.ingest_book?;
engine.ingest_trade?;
if let Some = engine.book_snapshot
if let Some = engine.derived_analytics_snapshot
# Ok::
Strategy Construction Pattern With of_runtime
A practical runtime-backed strategy flow is:
- subscribe one or more symbols
- ingest or poll normalized events
- read
analytics_snapshotandderived_analytics_snapshotfor context - use
signal_snapshotas the strategy-facing decision surface - block action whenever
current_quality_flags_bits()orsignal_snapshot.quality_flagsindicates degradation - persist the session so the same sequence can be replayed later