#![doc = include_str!("../README.md")]
use std::error::Error;
use std::fmt;
use of_core::{BookUpdate, SymbolId, TradePrint};
#[derive(Debug, Clone)]
pub struct SubscribeReq {
pub symbol: SymbolId,
pub depth_levels: u16,
}
#[derive(Debug, Clone, Default)]
pub struct AdapterHealth {
pub connected: bool,
pub degraded: bool,
pub last_error: Option<String>,
pub protocol_info: Option<String>,
}
#[derive(Debug, Clone)]
pub enum RawEvent {
Book(BookUpdate),
Trade(TradePrint),
}
#[derive(Debug, Clone)]
pub enum AdapterError {
Disconnected,
NotConfigured(&'static str),
FeatureDisabled(&'static str),
Other(String),
}
impl fmt::Display for AdapterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AdapterError::Disconnected => write!(f, "adapter disconnected"),
AdapterError::NotConfigured(msg) => write!(f, "adapter misconfigured: {msg}"),
AdapterError::FeatureDisabled(msg) => write!(f, "adapter feature disabled: {msg}"),
AdapterError::Other(msg) => write!(f, "adapter error: {msg}"),
}
}
}
impl Error for AdapterError {}
pub type AdapterResult<T> = Result<T, AdapterError>;
pub trait MarketDataAdapter: Send {
fn connect(&mut self) -> AdapterResult<()>;
fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()>;
fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()>;
fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize>;
fn health(&self) -> AdapterHealth;
}
impl MarketDataAdapter for Box<dyn MarketDataAdapter> {
fn connect(&mut self) -> AdapterResult<()> {
self.as_mut().connect()
}
fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()> {
self.as_mut().subscribe(req)
}
fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()> {
self.as_mut().unsubscribe(symbol)
}
fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
self.as_mut().poll(out)
}
fn health(&self) -> AdapterHealth {
self.as_ref().health()
}
}
#[derive(Debug, Clone)]
pub enum ProviderKind {
Mock,
Rithmic,
Cqg,
Binance,
}
#[derive(Debug, Clone)]
pub struct AdapterConfig {
pub provider: ProviderKind,
pub credentials: Option<CredentialsRef>,
pub endpoint: Option<String>,
pub app_name: Option<String>,
}
impl Default for AdapterConfig {
fn default() -> Self {
Self {
provider: ProviderKind::Mock,
credentials: None,
endpoint: None,
app_name: None,
}
}
}
#[derive(Debug, Clone)]
pub struct CredentialsRef {
pub key_id_env: String,
pub secret_env: String,
}
pub fn create_adapter(cfg: &AdapterConfig) -> AdapterResult<Box<dyn MarketDataAdapter>> {
match cfg.provider {
ProviderKind::Mock => Ok(Box::new(MockAdapter::default())),
ProviderKind::Rithmic => create_rithmic_adapter(cfg),
ProviderKind::Cqg => create_cqg_adapter(cfg),
ProviderKind::Binance => create_binance_adapter(cfg),
}
}
fn create_rithmic_adapter(cfg: &AdapterConfig) -> AdapterResult<Box<dyn MarketDataAdapter>> {
#[cfg(feature = "rithmic")]
{
let adapter = rithmic::RithmicAdapter::from_config(cfg)?;
return Ok(Box::new(adapter));
}
#[cfg(not(feature = "rithmic"))]
{
let _ = cfg;
Err(AdapterError::FeatureDisabled(
"compile with --features rithmic to enable",
))
}
}
fn create_cqg_adapter(cfg: &AdapterConfig) -> AdapterResult<Box<dyn MarketDataAdapter>> {
#[cfg(feature = "cqg")]
{
let adapter = cqg::CqgAdapter::from_config(cfg)?;
return Ok(Box::new(adapter));
}
#[cfg(not(feature = "cqg"))]
{
let _ = cfg;
Err(AdapterError::FeatureDisabled(
"compile with --features cqg to enable",
))
}
}
fn create_binance_adapter(cfg: &AdapterConfig) -> AdapterResult<Box<dyn MarketDataAdapter>> {
#[cfg(feature = "binance")]
{
let adapter = binance::BinanceAdapter::from_config(cfg)?;
return Ok(Box::new(adapter));
}
#[cfg(not(feature = "binance"))]
{
let _ = cfg;
Err(AdapterError::FeatureDisabled(
"compile with --features binance to enable",
))
}
}
#[derive(Debug, Default)]
pub struct MockAdapter {
pub connected: bool,
pub subscribed: Vec<SubscribeReq>,
queue: Vec<RawEvent>,
}
impl MockAdapter {
pub fn push_event(&mut self, event: RawEvent) {
self.queue.push(event);
}
}
impl MarketDataAdapter for MockAdapter {
fn connect(&mut self) -> AdapterResult<()> {
self.connected = true;
Ok(())
}
fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()> {
if !self.connected {
return Err(AdapterError::Disconnected);
}
self.subscribed.push(req);
Ok(())
}
fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
if !self.connected {
return Err(AdapterError::Disconnected);
}
let n = self.queue.len();
out.extend(self.queue.drain(..));
Ok(n)
}
fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()> {
if !self.connected {
return Err(AdapterError::Disconnected);
}
self.subscribed.retain(|s| s.symbol != symbol);
Ok(())
}
fn health(&self) -> AdapterHealth {
AdapterHealth {
connected: self.connected,
degraded: false,
last_error: None,
protocol_info: Some("mock_adapter".to_string()),
}
}
}
#[cfg(feature = "rithmic")]
pub mod rithmic;
#[cfg(feature = "cqg")]
pub mod cqg;
#[cfg(feature = "binance")]
pub mod binance;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn factory_returns_mock_by_default() {
let cfg = AdapterConfig::default();
let mut adapter = create_adapter(&cfg).expect("adapter should be created");
adapter.connect().expect("connect should work");
assert!(adapter.health().connected);
}
#[cfg(not(feature = "rithmic"))]
#[test]
fn factory_rejects_disabled_provider_features() {
let cfg = AdapterConfig {
provider: ProviderKind::Rithmic,
..AdapterConfig::default()
};
match create_adapter(&cfg) {
Err(AdapterError::FeatureDisabled(_)) => {}
Err(other) => panic!("unexpected error variant: {other}"),
Ok(_) => panic!("expected feature-disabled error"),
}
}
}