Skip to main content

Crate of_runtime

Crate of_runtime 

Source
Expand description

§of_runtime

of_runtime is the orchestration layer for live, replay, and external-ingest workflows. It wires adapter events into book state and analytics, applies quality-aware signal logic, and exposes snapshots plus health/metrics payloads.

§Runtime Responsibilities

  1. Connect and supervise a MarketDataAdapter.
  2. Process normalized RawEvent streams.
  3. Materialize book snapshots from normalized book updates.
  4. Update analytics using of_core.
  5. Evaluate signal modules from of_signals.
  6. Gate risk-sensitive output using DataQualityFlags.
  7. Optionally persist event streams via of_persist.

§New In 0.3.0

0.3.0 is an additive operational hardening release for of_runtime:

  • opt-in backpressure with Engine::with_max_events_per_poll and OF_RUNTIME_MAX_EVENTS_PER_POLL
  • opt-in adapter circuit breaking with Engine::with_circuit_breaker, OF_RUNTIME_CIRCUIT_BREAKER_FAILURES, and OF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS
  • additive aggregate health fields in health_json() and metrics_json()
  • end-to-end persist -> readback -> replay parity coverage for analytics, signals, and materialized book state

§New In 0.2.0

Relative to the 0.1.x line, of_runtime now adds or hardens:

  • real Engine::book_snapshot support
  • additive derived analytics, session candle, and interval candle snapshots
  • config compatibility reporting with ConfigLoadReport
  • stronger live supervision and richer health/metrics payloads
  • cleaner internal modularization without changing the public API

§Main Types

§Public API Inventory

Public types:

Public top-level functions:

Public ConfigLoadReport method:

Public Engine<A, S> methods:

§EngineConfig Field Reference

EngineConfig is the runtime control plane.

  • instance_id: logical engine name used in audit/metrics output.
  • enable_persistence: enables JSONL persistence through RollingStore.
  • data_root: persistence root directory when persistence is enabled.
  • audit_log_path: audit log file path used by runtime audit output.
  • audit_max_bytes: max bytes before audit rotation.
  • audit_max_files: max rotated audit files retained.
  • audit_redact_tokens: case-insensitive token list scrubbed from audit text.
  • data_retention_max_bytes: persisted-data byte cap; 0 disables the limit.
  • data_retention_max_age_secs: persisted-data age cap in seconds; 0 disables the limit.
  • adapter: provider configuration forwarded to of_adapters.
  • signal_threshold: default threshold used by build_default_engine.

§Lifecycle Contract

The runtime has a simple state machine:

  1. Build an Engine with Engine::new or build_default_engine.
  2. Optionally attach persistence with Engine::with_persistence.
  3. Call Engine::start before subscribe, poll, or external-ingest operations.
  4. Use either adapter polling or external ingest.
  5. Read snapshots and health/metrics as needed.
  6. Call Engine::stop when done.

Operational rules:

§External Ingest Contract

Use external ingest when a separate bridge, broker API, or custom parser already owns transport.

The runtime still applies its own sequence and stale checks on top of caller-supplied flags.

§Snapshot Semantics

All snapshot getters are additive and side-effect free.

Return behavior:

  • None means the runtime has not yet observed enough data to build that snapshot for the requested symbol.
  • Returned snapshots are clones of runtime state; callers can keep them without affecting engine internals.

§Health and Metrics Reference

  • Engine::health_seq is the cheap monotonic change counter for external polling loops.
  • Engine::health_json is the user-facing operational snapshot and includes connectivity, degradation, quality flags, supervision metadata, and tracked symbol counts.
  • Engine::metrics_json is the counter-oriented metrics payload and includes processed event counts, quality flag detail, and subsystem counts.
  • Engine::current_quality_flags_bits exposes the current runtime quality bitset directly for low-allocation callers.
  • Engine::last_events exposes the last processed raw event batch for inspection/testing.
  • Engine::with_max_events_per_poll optionally enables a per-poll drain limit for hosts that need explicit backpressure. The same limit can be set for default engines with OF_RUNTIME_MAX_EVENTS_PER_POLL.

Compatibility rule:

  • field names in JSON payloads are treated as stable once published
  • new metrics/health fields are added additively rather than replacing old fields

§Config Loading and Validation Reference

§Persistence Integration

When persistence is enabled, the runtime writes normalized book and trade streams through RollingStore.

  • Persistence is optional and does not change runtime snapshot semantics.
  • Retention limits are enforced through RetentionPolicy.
  • The runtime persists normalized events, not provider-native wire payloads.
  • Readback and replay are handled by of_persist and examples/replay_cli.

§End-to-End Example (Adapter Polling)

use of_adapters::MockAdapter;
use of_core::{DataQualityFlags, SymbolId};
use of_runtime::{Engine, EngineConfig};
use of_signals::DeltaMomentumSignal;

let adapter = MockAdapter::default();
let signal = DeltaMomentumSignal::new(100);
let mut engine = Engine::new(EngineConfig::default(), adapter, signal);

engine.start()?;
engine.subscribe(SymbolId {
    venue: "SIM".to_string(),
    symbol: "ESM6".to_string(),
}, 10)?;

let _processed = engine.poll_once(DataQualityFlags::NONE)?;
engine.stop();

§External Ingest Example (Broker Bridge)

use of_adapters::MockAdapter;
use of_core::{DataQualityFlags, Side, SymbolId, TradePrint};
use of_runtime::{Engine, EngineConfig, ExternalFeedPolicy};
use of_signals::DeltaMomentumSignal;

let mut engine = Engine::new(
    EngineConfig::default(),
    MockAdapter::default(),
    DeltaMomentumSignal::default(),
);

engine.start()?;
engine.configure_external_feed(ExternalFeedPolicy {
    stale_after_ms: 15_000,
    enforce_sequence: true,
})?;

engine.ingest_trade(TradePrint {
    symbol: SymbolId { venue: "BINANCE".into(), symbol: "BTCUSDT".into() },
    price: 62_500_00,
    size: 100,
    aggressor_side: Side::Ask,
    sequence: 1,
    ts_exchange_ns: 1,
    ts_recv_ns: 2,
}, DataQualityFlags::NONE)?;

let health = engine.health_json();
assert!(health.contains("\"started\":true"));

§Snapshot Contracts

§Health and Metrics Contracts

  • Engine::metrics_json exposes runtime counters and adapter status.
  • Engine::health_json exposes connectivity/degradation/reconnect state and active quality flags.
  • Engine::health_seq increments on meaningful health transitions for cheap external polling.
  • Existing JSON field names remain stable; new observability fields are added additively.
  • Engine::health_json also includes quality_flags_detail, tracked_symbols, processed_events, and external supervision fields such as external_last_ingest_ns.
  • Engine::metrics_json also includes health_seq, per-subsystem symbol counts, quality_flags_detail, external sequence-cache counts, aggregate adapter health, circuit-breaker state, max_events_per_poll, and backpressure_dropped_events for live diagnostics.

§Backpressure Policy

Backpressure is disabled by default to preserve existing poll behavior. When with_max_events_per_poll(Some(n)) or OF_RUNTIME_MAX_EVENTS_PER_POLL=n is set, a poll that drains more than n adapter events processes the first n events, drops the remainder from that drain, sets the ADAPTER_DEGRADED quality flag, and returns a backpressure error. The C ABI maps that condition to OF_ERR_BACKPRESSURE.

§Circuit Breaker Policy

Adapter circuit breaking is disabled by default to preserve existing polling semantics. Direct Rust callers can opt in with with_circuit_breaker(failure_threshold, cooldown_ms). Default engines can opt in with OF_RUNTIME_CIRCUIT_BREAKER_FAILURES; the cooldown defaults to 1000 ms and can be overridden with OF_RUNTIME_CIRCUIT_BREAKER_COOLDOWN_MS.

When enabled, consecutive adapter poll failures open the circuit for the configured cooldown window. Polls attempted during that window return a circuit_open adapter error, mark the runtime degraded, and expose circuit_breaker_*, adapter_total_count, adapter_healthy_count, and runtime_health_status fields through health and metrics JSON.

§Config Loading

Use load_engine_config_from_path to load TOML config files, load_engine_config_report_from_path when you also want compatibility diagnostics, and validate_startup_config to fail fast on missing credentials or invalid startup settings before going live.

Preferred config files use typed TOML/JSON shapes with nested adapter and adapter.credentials sections. Legacy flat config shapes are still accepted through a compatibility fallback so older deployments continue to load without source changes. ConfigLoadReport tells you whether strict parsing succeeded or the legacy fallback path was required.

§Operational Guidance

  • Call Engine::start before subscribe/poll/ingest operations.
  • Use configure_external_feed + external_health_tick when ingesting from non-adapter bridges.
  • For deterministic simulation, pair MockAdapter with replayed events and fixed timestamps.
  • Prefer load_engine_config_report_from_path in user-facing CLIs or services so you can warn when a config only loaded through compatibility fallback.

§Real-World Use Cases

§1. Live runtime with adapter-managed transport

Use the engine as the orchestration layer when the adapter owns connectivity and you want signal generation, health, persistence, and snapshots in one place.

§2. Broker bridge or gateway ingestion

If another process already owns the transport, use configure_external_feed, ingest_trade, and ingest_book to keep quality supervision and analytics inside the runtime.

§3. Deterministic replay and regression testing

Pair MockAdapter or external ingest with fixed event sequences and compare snapshots, health transitions, and signal outputs.

§Detailed Example: Bridge-Driven Runtime

use of_adapters::MockAdapter;
use of_core::{BookAction, BookUpdate, DataQualityFlags, Side, SymbolId, TradePrint};
use of_runtime::{Engine, EngineConfig, ExternalFeedPolicy};
use of_signals::CompositeSignal;

let symbol = SymbolId {
    venue: "BINANCE".into(),
    symbol: "BTCUSDT".into(),
};

let mut engine = Engine::new(
    EngineConfig::default(),
    MockAdapter::default(),
    CompositeSignal::default(),
);

engine.start()?;
engine.configure_external_feed(ExternalFeedPolicy {
    stale_after_ms: 5_000,
    enforce_sequence: true,
})?;

engine.ingest_book(
    BookUpdate {
        symbol: symbol.clone(),
        side: Side::Bid,
        level: 0,
        price: 6_250_000,
        size: 10,
        action: BookAction::Upsert,
        sequence: 1,
        ts_exchange_ns: 1_000,
        ts_recv_ns: 1_200,
    },
    DataQualityFlags::NONE,
)?;

engine.ingest_trade(
    TradePrint {
        symbol: symbol.clone(),
        price: 6_250_100,
        size: 2,
        aggressor_side: Side::Ask,
        sequence: 2,
        ts_exchange_ns: 2_000,
        ts_recv_ns: 2_100,
    },
    DataQualityFlags::NONE,
)?;

if let Some(book) = engine.book_snapshot(&symbol) {
    println!("best bid levels={}", book.bids.len());
}
if let Some(derived) = engine.derived_analytics_snapshot(&symbol) {
    println!("trade_count={} vwap={}", derived.trade_count, derived.vwap);
}

§Strategy Construction Pattern With of_runtime

A practical runtime-backed strategy flow is:

  1. subscribe one or more symbols
  2. ingest or poll normalized events
  3. read analytics_snapshot and derived_analytics_snapshot for context
  4. use signal_snapshot as the strategy-facing decision surface
  5. block action whenever current_quality_flags_bits() or signal_snapshot.quality_flags indicates degradation
  6. persist the session so the same sequence can be replayed later

Structs§

ConfigLoadReport
Detailed result for config-file loading.
Engine
Runtime engine over a market-data adapter and signal module.
EngineConfig
Runtime engine configuration.
ExternalFeedPolicy
Policy controlling quality constraints for externally-ingested feeds.

Enums§

ConfigCompatibilityMode
Indicates how a runtime config file was accepted.
RuntimeError
Runtime errors surfaced by engine lifecycle and processing.

Functions§

build_default_engine
Builds the default runtime engine using configured provider and signal module.
load_engine_config_from_path
Loads engine config from .toml or .json-like config file.
load_engine_config_report_from_path
Loads engine config and reports whether legacy compatibility fallback was required.
validate_startup_config
Validates startup configuration and environment prerequisites.

Type Aliases§

DefaultEngine
Default engine type used by C ABI and high-level bindings.