of_adapters 0.1.0

Provider adapters and market-data abstraction for the Orderflow engine
Documentation
use std::collections::{HashMap, VecDeque};
use std::time::Instant;

use of_core::{Side, SymbolId, TradePrint};

use crate::{
    AdapterConfig, AdapterError, AdapterHealth, AdapterResult, MarketDataAdapter, RawEvent,
    SubscribeReq,
};

#[derive(Debug, Clone)]
pub struct RithmicConfig {
    endpoint: String,
    user: String,
    pass: String,
    app_name: String,
}

impl RithmicConfig {
    pub fn from_adapter_config(cfg: &AdapterConfig) -> AdapterResult<Self> {
        let endpoint = cfg
            .endpoint
            .clone()
            .ok_or(AdapterError::NotConfigured("missing rithmic endpoint"))?;
        if !endpoint.starts_with("wss://")
            && !endpoint.starts_with("ws://")
            && !endpoint.starts_with("mock://")
        {
            return Err(AdapterError::NotConfigured(
                "rithmic endpoint must use wss://, ws://, or mock://",
            ));
        }

        let creds = cfg.credentials.as_ref().ok_or(AdapterError::NotConfigured(
            "missing rithmic credentials reference",
        ))?;
        let user = read_env(&creds.key_id_env)?;
        let pass = read_env(&creds.secret_env)?;
        let app_name = cfg
            .app_name
            .clone()
            .unwrap_or_else(|| "orderflow".to_string());

        Ok(Self {
            endpoint,
            user,
            pass,
            app_name,
        })
    }
}

fn read_env(name: &str) -> AdapterResult<String> {
    if name.trim().is_empty() {
        return Err(AdapterError::NotConfigured("empty env reference"));
    }
    let v =
        std::env::var(name).map_err(|_| AdapterError::NotConfigured("required rithmic env var missing"))?;
    if v.trim().is_empty() {
        return Err(AdapterError::NotConfigured("required rithmic env var empty"));
    }
    Ok(v)
}

#[derive(Debug)]
pub struct RithmicAdapter {
    cfg: RithmicConfig,
    connected: bool,
    degraded: bool,
    last_error: Option<String>,
    requested_depth: HashMap<SymbolId, u16>,
    queue: VecDeque<RawEvent>,
    sequence: u64,
    connected_at: Option<Instant>,
}

impl RithmicAdapter {
    pub fn from_config(cfg: &AdapterConfig) -> AdapterResult<Self> {
        let cfg = RithmicConfig::from_adapter_config(cfg)?;
        Ok(Self {
            cfg,
            connected: false,
            degraded: false,
            last_error: None,
            requested_depth: HashMap::new(),
            queue: VecDeque::new(),
            sequence: 0,
            connected_at: None,
        })
    }

    fn is_mock_mode(&self) -> bool {
        self.cfg.endpoint.starts_with("mock://")
    }

    fn synth_trade(&mut self, symbol: &SymbolId) {
        self.sequence = self.sequence.saturating_add(1);
        self.queue.push_back(RawEvent::Trade(TradePrint {
            symbol: symbol.clone(),
            price: 500_000 + (self.sequence % 16) as i64,
            size: 1 + (self.sequence % 4) as i64,
            aggressor_side: if self.sequence % 2 == 0 {
                Side::Ask
            } else {
                Side::Bid
            },
            sequence: self.sequence,
            ts_exchange_ns: self.sequence,
            ts_recv_ns: self.sequence,
        }));
    }
}

impl MarketDataAdapter for RithmicAdapter {
    fn connect(&mut self) -> AdapterResult<()> {
        let _ = (&self.cfg.user, &self.cfg.pass, &self.cfg.app_name);
        self.connected = true;
        self.degraded = false;
        self.last_error = None;
        self.connected_at = Some(Instant::now());
        Ok(())
    }

    fn subscribe(&mut self, req: SubscribeReq) -> AdapterResult<()> {
        if !self.connected {
            return Err(AdapterError::Disconnected);
        }
        if req.depth_levels == 0 {
            self.requested_depth.remove(&req.symbol);
            return Ok(());
        }

        self.requested_depth
            .insert(req.symbol.clone(), req.depth_levels);
        if self.is_mock_mode() {
            self.synth_trade(&req.symbol);
        }
        Ok(())
    }

    fn unsubscribe(&mut self, symbol: SymbolId) -> AdapterResult<()> {
        if !self.connected {
            return Err(AdapterError::Disconnected);
        }
        self.requested_depth.remove(&symbol);
        Ok(())
    }

    fn poll(&mut self, out: &mut Vec<RawEvent>) -> AdapterResult<usize> {
        if !self.connected {
            return Err(AdapterError::Disconnected);
        }
        if self.is_mock_mode() {
            let symbols: Vec<SymbolId> = self.requested_depth.keys().cloned().collect();
            for symbol in symbols {
                self.synth_trade(&symbol);
            }
        }

        let n = self.queue.len();
        out.extend(self.queue.drain(..));
        Ok(n)
    }

    fn health(&self) -> AdapterHealth {
        AdapterHealth {
            connected: self.connected,
            degraded: self.degraded,
            last_error: self.last_error.clone(),
            protocol_info: Some("provider=rithmic;wire=scaffold_v1".to_string()),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{AdapterConfig, CredentialsRef, ProviderKind};

    fn cfg(endpoint: &str) -> AdapterConfig {
        std::env::set_var("RITHMIC_TEST_USER", "demo_user");
        std::env::set_var("RITHMIC_TEST_PASS", "demo_pass");
        AdapterConfig {
            provider: ProviderKind::Rithmic,
            credentials: Some(CredentialsRef {
                key_id_env: "RITHMIC_TEST_USER".to_string(),
                secret_env: "RITHMIC_TEST_PASS".to_string(),
            }),
            endpoint: Some(endpoint.to_string()),
            app_name: Some("orderflow-tests".to_string()),
        }
    }

    #[test]
    fn connects_subscribes_and_polls() {
        let mut adapter = RithmicAdapter::from_config(&cfg("mock://rithmic")).expect("cfg");
        adapter.connect().expect("connect");
        adapter
            .subscribe(SubscribeReq {
                symbol: SymbolId {
                    venue: "CME".to_string(),
                    symbol: "ESM6".to_string(),
                },
                depth_levels: 10,
            })
            .expect("sub");
        let mut out = Vec::new();
        let n = adapter.poll(&mut out).expect("poll");
        assert!(n > 0);
    }

    #[test]
    fn explicit_unsubscribe_removes_symbol() {
        let mut adapter = RithmicAdapter::from_config(&cfg("mock://rithmic")).expect("cfg");
        adapter.connect().expect("connect");
        let symbol = SymbolId {
            venue: "CME".to_string(),
            symbol: "NQM6".to_string(),
        };
        adapter
            .subscribe(SubscribeReq {
                symbol: symbol.clone(),
                depth_levels: 10,
            })
            .expect("sub");
        adapter.unsubscribe(symbol.clone()).expect("unsub");
        assert!(!adapter.requested_depth.contains_key(&symbol));
    }

    #[test]
    fn zero_depth_aliases_unsubscribe() {
        let mut adapter = RithmicAdapter::from_config(&cfg("mock://rithmic")).expect("cfg");
        adapter.connect().expect("connect");
        let symbol = SymbolId {
            venue: "CME".to_string(),
            symbol: "RTYM6".to_string(),
        };
        adapter
            .subscribe(SubscribeReq {
                symbol: symbol.clone(),
                depth_levels: 10,
            })
            .expect("sub");
        adapter
            .subscribe(SubscribeReq {
                symbol: symbol.clone(),
                depth_levels: 0,
            })
            .expect("zero");
        assert!(!adapter.requested_depth.contains_key(&symbol));
    }
}