of_adapters 0.2.0

Provider adapters and market-data abstraction for the Orderflow engine
Documentation

of_adapters

of_adapters defines the provider boundary between venue-specific feeds and the normalized Orderflow runtime. It standardizes lifecycle, subscription, polling, and health reporting while keeping provider protocol details isolated.

Core API

  • Trait: [MarketDataAdapter]
  • Factory: [create_adapter]
  • Events: [RawEvent] (Book, Trade)
  • Config: [AdapterConfig], [ProviderKind], [CredentialsRef]
  • Health: [AdapterHealth]

New In 0.2.0

Relative to the 0.1.x line, the adapter layer now has materially stronger live-path supervision:

  • reconnect with backoff
  • subscription replay after reconnect
  • richer AdapterHealth::protocol_info
  • stronger degraded and timeout handling in the live providers

The trait surface stayed stable; the hardening happened behind the same public adapter interface.

Public API Inventory

Public types:

  • [SubscribeReq]
  • [AdapterHealth]
  • [RawEvent]
  • [AdapterError]
  • [AdapterResult<T>]
  • [ProviderKind]
  • [AdapterConfig]
  • [CredentialsRef]
  • [MockAdapter]

Public functions and methods:

  • [create_adapter]
  • [MockAdapter::push_event]

[MarketDataAdapter] trait methods:

  • connect()
  • subscribe(SubscribeReq)
  • unsubscribe(SymbolId)
  • poll(&mut Vec<RawEvent>)
  • health() -> AdapterHealth

Provider Strategy

The crate is built around a feature-gated provider model:

  • Always available: Mock provider
  • Optional: Rithmic, CQG, Binance (enable via Cargo features)

This keeps the default build deterministic while allowing production adapters where needed.

Current provider notes:

  • Rithmic:
    • mock mode emits deterministic book and trade events for end-to-end testing
    • live ws:// / wss:// mode now performs websocket reachability validation before reporting connected
    • live mode tracks heartbeat/message activity, schedules reconnect with backoff, and replays subscriptions after reconnect
    • live mode accepts normalized JSON book / trade / heartbeat payloads from bridge processes
    • health metadata includes mode, endpoint, app name, uptime, reconnect attempt, subscription count, and activity ages
  • CQG:
    • reconnect/resubscribe and sequencing logic are implemented
  • Binance:
    • live websocket transport parses trade and depth events
    • live mode schedules reconnect with backoff on disconnect or market-data timeout
    • reconnect replays active subscriptions automatically
    • health metadata includes reconnect attempt, subscription count, and message/data ages

Trait Contract

[MarketDataAdapter] is intentionally small, but each method has a specific contract:

  • connect() establishes transport/session state and should be idempotent where practical.
  • subscribe(SubscribeReq) starts or updates delivery for one symbol and depth.
  • unsubscribe(SymbolId) stops delivery for that symbol.
  • poll(&mut Vec<RawEvent>) appends zero or more normalized events into the caller-owned buffer and returns the number appended.
  • health() returns the latest transport/supervision state without mutating adapter state.

Normalization rules:

  • adapters emit only [RawEvent::Book] and [RawEvent::Trade]
  • provider-native protocol details stay inside the adapter implementation
  • all emitted symbols, sequences, timestamps, and sides should already be normalized for runtime consumption

AdapterConfig Reference

  • provider: selects Mock, Rithmic, Cqg, or Binance.
  • credentials: optional env-var references for providers that need authenticated bootstrap.
  • endpoint: websocket or provider endpoint URI for live adapters.
  • app_name: optional client or bridge identifier used in health metadata where supported.

[CredentialsRef] contains env-var names, not secret values:

  • key_id_env: env var that stores the provider user/key id
  • secret_env: env var that stores the secret/password/token

Health Semantics

[AdapterHealth] is the bridge between provider supervision and runtime quality decisions.

  • connected = true means the transport/session is considered up
  • degraded = true means the adapter is reconnecting, stale, or otherwise unhealthy enough for runtime quality gating
  • last_error is the latest human-readable adapter failure if known
  • protocol_info is provider-specific diagnostic text intended for logging and dashboards

Factory Behavior

[create_adapter] is feature-gated.

  • ProviderKind::Mock is always available.
  • live providers require their Cargo feature to be enabled.
  • requesting a provider without its feature returns [AdapterError::FeatureDisabled].
  • missing endpoint or credential references return [AdapterError::NotConfigured].

Choosing an Adapter

  • Use [MockAdapter] for deterministic tests, replay, and CI.
  • Use Rithmic or CQG when an authenticated futures feed is needed.
  • Use Binance when a public crypto depth/trade feed is sufficient.
  • Keep provider-specific auth, transport, and reconnect tuning out of runtime code and inside the adapter layer.

Create an Adapter

use of_adapters::{create_adapter, AdapterConfig, ProviderKind};

let cfg = AdapterConfig {
    provider: ProviderKind::Mock,
    ..Default::default()
};

let mut adapter = create_adapter(&cfg).expect("adapter");
adapter.connect().expect("connect");
assert!(adapter.health().connected);

Mock Adapter for Tests and Replays

[MockAdapter] is useful for deterministic tests and simulation pipelines:

use of_adapters::{MarketDataAdapter, MockAdapter, RawEvent, SubscribeReq};
use of_core::{Side, SymbolId, TradePrint};

let symbol = SymbolId {
    venue: "SIM".to_string(),
    symbol: "TEST".to_string(),
};

let mut adapter = MockAdapter::default();
adapter.connect().expect("connect");
adapter.subscribe(SubscribeReq {
    symbol: symbol.clone(),
    depth_levels: 10,
}).expect("subscribe");

adapter.push_event(RawEvent::Trade(TradePrint {
    symbol,
    price: 100,
    size: 1,
    aggressor_side: Side::Ask,
    sequence: 1,
    ts_exchange_ns: 1,
    ts_recv_ns: 2,
}));

let mut out = Vec::new();
let n = adapter.poll(&mut out).expect("poll");
assert_eq!(n, 1);
assert_eq!(out.len(), 1);

Error Handling

All adapter operations return [AdapterResult<T>] with [AdapterError], covering:

  • disconnected state
  • missing configuration
  • provider feature not enabled at build time
  • provider-specific operational errors

Subscription Semantics

  • [SubscribeReq::depth_levels] is advisory and provider-dependent; mock and depth-aware providers use it directly.
  • repeated subscribe calls for the same symbol should be treated as update-or-refresh, not as a duplicate stream request.
  • unsubscribe should remove future delivery for that symbol, but does not retroactively clear already-polled events from the caller buffer.

Real-World Use Cases

1. Bridge a proprietary broker or exchange feed

If you already have SDK access for a venue that Orderflow does not support, implement [MarketDataAdapter] and normalize that SDK’s messages into [RawEvent].

2. Simulate live behavior in tests

Use [MockAdapter] to feed deterministic BookUpdate and TradePrint events into of_runtime without network dependencies.

3. Build a gateway process

Run your provider-specific adapter in one process and expose normalized output to downstream strategy or runtime layers. This keeps transport complexity out of the strategy code.

How To Create Your Own Adapter

The minimum implementation steps are:

  1. define a struct that owns connection state, subscriptions, and any pending event queue
  2. implement [MarketDataAdapter] for that struct
  3. normalize provider payloads into [RawEvent::Book] and [RawEvent::Trade]
  4. expose meaningful health through [AdapterHealth]
  5. add factory/config wiring if you want the adapter selectable via [create_adapter]

Important design rules:

  • keep emitted prices and sizes integer-normalized
  • preserve provider sequence numbers whenever available
  • preserve both exchange and local receive timestamps
  • keep reconnect/backoff logic in the adapter, not in strategy code
  • emit only normalized events from the public surface

Detailed Example: Custom Adapter Skeleton

use of_adapters::{
    AdapterHealth, AdapterResult, MarketDataAdapter, RawEvent, SubscribeReq,
};
use of_core::{BookAction, BookUpdate, Side, SymbolId, TradePrint};

#[derive(Default)]
struct DemoAdapter {
    connected: bool,
    subscribed: Vec<SymbolId>,
    queue: Vec<RawEvent>,
}

impl DemoAdapter {
    fn on_trade_message(&mut self, symbol: &SymbolId, price: i64, size: i64, side: Side, seq: u64) {
        self.queue.push(RawEvent::Trade(TradePrint {
            symbol: symbol.clone(),
            price,
            size,
            aggressor_side: side,
            sequence: seq,
            ts_exchange_ns: 10,
            ts_recv_ns: 20,
        }));
    }

    fn on_book_message(&mut self, symbol: &SymbolId, level: u16, price: i64, size: i64, seq: u64) {
        self.queue.push(RawEvent::Book(BookUpdate {
            symbol: symbol.clone(),
            side: Side::Bid,
            level,
            price,
            size,
            action: BookAction::Upsert,
            sequence: seq,
            ts_exchange_ns: 10,
            ts_recv_ns: 20,
        }));
    }
}

impl MarketDataAdapter for DemoAdapter {
    fn connect(&mut self) -> AdapterResult<()> {
        self.connected = true;
        Ok(())
    }

    fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()> {
        self.subscribed.push(req.symbol);
        Ok(())
    }

    fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()> {
        self.subscribed.retain(|s| s != &symbol);
        Ok(())
    }

    fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
        let n = self.queue.len();
        out.extend(self.queue.drain(..));
        Ok(n)
    }

    fn health(&self) -> AdapterHealth {
        AdapterHealth {
            connected: self.connected,
            degraded: false,
            last_error: None,
            protocol_info: Some(format!("subs={}", self.subscribed.len())),
        }
    }
}

Adapter Authoring Checklist

  • normalize every provider event into BookUpdate or TradePrint
  • treat poll() as the public emission point
  • make health() useful for operators, not just for tests
  • decide how reconnect/backoff affects degraded
  • ensure unsubscribe removes future delivery for the symbol