Expand description
§of_adapters
of_adapters defines the provider boundary between venue-specific feeds and the normalized Orderflow runtime.
It standardizes lifecycle, subscription, polling, and health reporting while keeping provider protocol details isolated.
§Core API
- Trait:
MarketDataAdapter - Factory:
create_adapter - Events:
RawEvent(Book,Trade) - Config:
AdapterConfig,ProviderKind,CredentialsRef - Health:
AdapterHealth
§New In 0.3.0
0.3.0 adds runtime-level aggregate health and opt-in circuit breaking around
adapter polling without changing the MarketDataAdapter trait. Existing
adapters continue to implement the same lifecycle, subscription, polling, and
health methods.
§New In 0.2.0
Relative to the 0.1.x line, the adapter layer now has materially stronger
live-path supervision:
- reconnect with backoff
- subscription replay after reconnect
- richer
AdapterHealth::protocol_info - stronger degraded and timeout handling in the live providers
The trait surface stayed stable; the hardening happened behind the same public adapter interface.
§Public API Inventory
Public types:
SubscribeReqAdapterHealthRawEventAdapterErrorAdapterResult<T>ProviderKindAdapterConfigCredentialsRefMockAdapter
Public functions and methods:
MarketDataAdapter trait methods:
connect()subscribe(SubscribeReq)unsubscribe(SymbolId)poll(&mut Vec<RawEvent>)health() -> AdapterHealth
§Provider Strategy
The crate is built around a feature-gated provider model:
- Always available:
Mockprovider - Optional:
Rithmic,CQG,Binance(enable via Cargo features)
This keeps the default build deterministic while allowing production adapters where needed.
Current provider notes:
Rithmic:- mock mode emits deterministic book and trade events for end-to-end testing
- live
ws:///wss://mode now performs websocket reachability validation before reporting connected - live mode tracks heartbeat/message activity, schedules reconnect with backoff, and replays subscriptions after reconnect
- live mode accepts normalized JSON
book/trade/heartbeatpayloads from bridge processes - health metadata includes mode, endpoint, app name, uptime, reconnect attempt, subscription count, and activity ages
CQG:- reconnect/resubscribe and sequencing logic are implemented
Binance:- live websocket transport parses trade and depth events
- live mode schedules reconnect with backoff on disconnect or market-data timeout
- reconnect replays active subscriptions automatically
- health metadata includes reconnect attempt, subscription count, and message/data ages
§Trait Contract
MarketDataAdapter is intentionally small, but each method has a specific contract:
connect()establishes transport/session state and should be idempotent where practical.subscribe(SubscribeReq)starts or updates delivery for one symbol and depth.unsubscribe(SymbolId)stops delivery for that symbol.poll(&mut Vec<RawEvent>)appends zero or more normalized events into the caller-owned buffer and returns the number appended.health()returns the latest transport/supervision state without mutating adapter state.
Normalization rules:
- adapters emit only
RawEvent::BookandRawEvent::Trade - provider-native protocol details stay inside the adapter implementation
- all emitted symbols, sequences, timestamps, and sides should already be normalized for runtime consumption
§AdapterConfig Reference
provider: selectsMock,Rithmic,Cqg, orBinance.credentials: optional env-var references for providers that need authenticated bootstrap.endpoint: websocket or provider endpoint URI for live adapters.app_name: optional client or bridge identifier used in health metadata where supported.
CredentialsRef contains env-var names, not secret values:
key_id_env: env var that stores the provider user/key idsecret_env: env var that stores the secret/password/token
§Health Semantics
AdapterHealth is the bridge between provider supervision and runtime quality decisions.
connected = truemeans the transport/session is considered updegraded = truemeans the adapter is reconnecting, stale, or otherwise unhealthy enough for runtime quality gatinglast_erroris the latest human-readable adapter failure if knownprotocol_infois provider-specific diagnostic text intended for logging and dashboards
§Factory Behavior
create_adapter is feature-gated.
ProviderKind::Mockis always available.- live providers require their Cargo feature to be enabled.
- requesting a provider without its feature returns
AdapterError::FeatureDisabled. - missing endpoint or credential references return
AdapterError::NotConfigured.
§Choosing an Adapter
- Use
MockAdapterfor deterministic tests, replay, and CI. - Use
RithmicorCQGwhen an authenticated futures feed is needed. - Use
Binancewhen a public crypto depth/trade feed is sufficient. - Keep provider-specific auth, transport, and reconnect tuning out of runtime code and inside the adapter layer.
§Create an Adapter
use of_adapters::{create_adapter, AdapterConfig, ProviderKind};
let cfg = AdapterConfig {
provider: ProviderKind::Mock,
..Default::default()
};
let mut adapter = create_adapter(&cfg).expect("adapter");
adapter.connect().expect("connect");
assert!(adapter.health().connected);§Mock Adapter for Tests and Replays
MockAdapter is useful for deterministic tests and simulation pipelines:
use of_adapters::{MarketDataAdapter, MockAdapter, RawEvent, SubscribeReq};
use of_core::{Side, SymbolId, TradePrint};
let symbol = SymbolId {
venue: "SIM".to_string(),
symbol: "TEST".to_string(),
};
let mut adapter = MockAdapter::default();
adapter.connect().expect("connect");
adapter.subscribe(SubscribeReq {
symbol: symbol.clone(),
depth_levels: 10,
}).expect("subscribe");
adapter.push_event(RawEvent::Trade(TradePrint {
symbol,
price: 100,
size: 1,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 1,
ts_recv_ns: 2,
}));
let mut out = Vec::new();
let n = adapter.poll(&mut out).expect("poll");
assert_eq!(n, 1);
assert_eq!(out.len(), 1);§Error Handling
All adapter operations return AdapterResult<T> with AdapterError, covering:
- disconnected state
- missing configuration
- provider feature not enabled at build time
- provider-specific operational errors
§Subscription Semantics
SubscribeReq::depth_levelsis advisory and provider-dependent; mock and depth-aware providers use it directly.- repeated subscribe calls for the same symbol should be treated as update-or-refresh, not as a duplicate stream request.
- unsubscribe should remove future delivery for that symbol, but does not retroactively clear already-polled events from the caller buffer.
§Real-World Use Cases
§1. Bridge a proprietary broker or exchange feed
If you already have SDK access for a venue that Orderflow does not support,
implement MarketDataAdapter and normalize that SDK’s messages into
RawEvent.
§2. Simulate live behavior in tests
Use MockAdapter to feed deterministic BookUpdate and TradePrint events
into of_runtime without network dependencies.
§3. Build a gateway process
Run your provider-specific adapter in one process and expose normalized output to downstream strategy or runtime layers. This keeps transport complexity out of the strategy code.
§How To Create Your Own Adapter
The minimum implementation steps are:
- define a struct that owns connection state, subscriptions, and any pending event queue
- implement
MarketDataAdapterfor that struct - normalize provider payloads into
RawEvent::BookandRawEvent::Trade - expose meaningful health through
AdapterHealth - add factory/config wiring if you want the adapter selectable via
create_adapter
Important design rules:
- keep emitted prices and sizes integer-normalized
- preserve provider sequence numbers whenever available
- preserve both exchange and local receive timestamps
- keep reconnect/backoff logic in the adapter, not in strategy code
- emit only normalized events from the public surface
§Detailed Example: Custom Adapter Skeleton
use of_adapters::{
AdapterHealth, AdapterResult, MarketDataAdapter, RawEvent, SubscribeReq,
};
use of_core::{BookAction, BookUpdate, Side, SymbolId, TradePrint};
#[derive(Default)]
struct DemoAdapter {
connected: bool,
subscribed: Vec<SymbolId>,
queue: Vec<RawEvent>,
}
impl DemoAdapter {
fn on_trade_message(&mut self, symbol: &SymbolId, price: i64, size: i64, side: Side, seq: u64) {
self.queue.push(RawEvent::Trade(TradePrint {
symbol: symbol.clone(),
price,
size,
aggressor_side: side,
sequence: seq,
ts_exchange_ns: 10,
ts_recv_ns: 20,
}));
}
fn on_book_message(&mut self, symbol: &SymbolId, level: u16, price: i64, size: i64, seq: u64) {
self.queue.push(RawEvent::Book(BookUpdate {
symbol: symbol.clone(),
side: Side::Bid,
level,
price,
size,
action: BookAction::Upsert,
sequence: seq,
ts_exchange_ns: 10,
ts_recv_ns: 20,
}));
}
}
impl MarketDataAdapter for DemoAdapter {
fn connect(&mut self) -> AdapterResult<()> {
self.connected = true;
Ok(())
}
fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()> {
self.subscribed.push(req.symbol);
Ok(())
}
fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()> {
self.subscribed.retain(|s| s != &symbol);
Ok(())
}
fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
let n = self.queue.len();
out.extend(self.queue.drain(..));
Ok(n)
}
fn health(&self) -> AdapterHealth {
AdapterHealth {
connected: self.connected,
degraded: false,
last_error: None,
protocol_info: Some(format!("subs={}", self.subscribed.len())),
}
}
}§Adapter Authoring Checklist
- normalize every provider event into
BookUpdateorTradePrint - treat
poll()as the public emission point - make
health()useful for operators, not just for tests - decide how reconnect/backoff affects
degraded - ensure unsubscribe removes future delivery for the symbol
Modules§
- binance
- Binance adapter implementation (feature-gated).
- cqg
- CQG adapter implementation (feature-gated).
- rithmic
- Rithmic adapter implementation (feature-gated).
Structs§
- Adapter
Config - Generic adapter factory configuration.
- Adapter
Health - Adapter connection and quality health snapshot.
- Credentials
Ref - Credential environment-variable references for adapter auth bootstrap.
- Mock
Adapter - Deterministic in-memory adapter for tests, demos, and replay harnesses.
- Subscribe
Req - Subscription request forwarded to adapters.
Enums§
- Adapter
Error - Adapter-level error variants.
- Provider
Kind - Provider selection used by adapter factory configuration.
- RawEvent
- Raw adapter event stream.
Traits§
- Market
Data Adapter - Common market-data adapter interface used by runtime.
Functions§
- create_
adapter - Creates a provider adapter from configuration.
Type Aliases§
- Adapter
Result - Result type alias used by adapter interfaces.