pub mod depth;
pub mod greeks;
pub mod historical;
pub mod options;
pub mod quotes;
pub mod subscription;
pub mod trades;
use crate::{
error::DataError,
event::{DataKind, MarketEvent},
};
use chrono::Utc;
use depth::DepthAggregator;
use greeks::GreeksAggregator;
use ibapi::{client::blocking::Client, contracts::SecurityType};
use quotes::QuoteAggregator;
use rust_decimal::Decimal;
use rustrade_instrument::{exchange::ExchangeId, ibkr::ContractRegistry};
use serde::{Deserialize, Serialize};
use std::{
panic::{AssertUnwindSafe, catch_unwind},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use subscription::{IbkrSubscription, IbkrSubscriptionKind};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IbkrStreamConfig {
pub host: String,
pub port: u16,
pub client_id: i32,
}
impl Default for IbkrStreamConfig {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 4002,
client_id: 100,
}
}
}
#[derive(Debug)]
pub struct IbkrMarketStream<K> {
rx: mpsc::UnboundedReceiver<Result<MarketEvent<K, DataKind>, DataError>>,
client: Arc<Client>,
}
impl<K> futures::Stream for IbkrMarketStream<K> {
type Item = Result<MarketEvent<K, DataKind>, DataError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.rx).poll_recv(cx)
}
}
impl<K> IbkrMarketStream<K>
where
K: Clone + Send + 'static,
{
pub fn init(
config: IbkrStreamConfig,
contracts: Arc<ContractRegistry>,
subscriptions: Vec<IbkrSubscription<K>>,
) -> Result<Self, DataError> {
let url = format!("{}:{}", config.host, config.port);
info!(%url, client_id = config.client_id, "Connecting to IB for market data");
let client = Client::connect(&url, config.client_id)
.map_err(|e| DataError::Socket(format!("IB connect: {e}")))?;
let client = Arc::new(client);
let (tx, rx) = mpsc::unbounded_channel();
let mut active_subscriptions = 0;
for sub in subscriptions {
let contract = match contracts.get_contract(&sub.instrument) {
Some(c) => c,
None => {
warn!(
instrument = %sub.instrument,
"Contract not found in registry, skipping subscription"
);
continue;
}
};
let spawn_result = match sub.kind {
IbkrSubscriptionKind::Quotes => Self::run_quotes_subscription(
client.clone(),
contract,
sub.key.clone(),
tx.clone(),
),
IbkrSubscriptionKind::Depth { rows } => Self::run_depth_subscription(
client.clone(),
contract,
sub.key.clone(),
rows,
tx.clone(),
),
IbkrSubscriptionKind::Trades => Self::run_trades_subscription(
client.clone(),
contract,
sub.key.clone(),
tx.clone(),
),
IbkrSubscriptionKind::OptionGreeks => Self::run_option_greeks_subscription(
client.clone(),
contract,
sub.key.clone(),
tx.clone(),
),
};
match spawn_result {
Ok(()) => active_subscriptions += 1,
Err(e) => {
warn!(instrument = %sub.instrument, error = %e, "Failed to spawn subscription worker");
}
}
}
if active_subscriptions == 0 {
return Err(DataError::Socket(
"No subscriptions activated (check logs for per-subscription errors)".to_string(),
));
}
Ok(Self { rx, client })
}
pub fn disconnect(&self) {
debug!("Disconnecting IbkrMarketStream");
self.client.disconnect();
}
fn run_quotes_subscription(
client: Arc<Client>,
contract: ibapi::contracts::Contract,
key: K,
tx: mpsc::UnboundedSender<Result<MarketEvent<K, DataKind>, DataError>>,
) -> Result<(), DataError> {
let symbol = contract.symbol.to_string();
let symbol_clone = symbol.clone();
let tx_panic = tx.clone();
std::thread::Builder::new()
.name(format!("ibkr-quotes-{symbol}"))
.spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
debug!(symbol = %symbol, "Starting quotes subscription");
let sub = match client.market_data(&contract).subscribe() {
Ok(s) => s,
Err(e) => {
error!(symbol = %symbol, error = %e, "Failed to subscribe to quotes");
let _ = tx.send(Err(DataError::Socket(format!(
"quotes subscription {symbol}: {e}"
))));
return;
}
};
let mut aggregator = QuoteAggregator::new();
for tick in sub {
let now = Utc::now();
if let Some(l1) = aggregator.update(&tick, now) {
let event = MarketEvent {
time_exchange: l1.last_update_time,
time_received: now,
exchange: ExchangeId::Ibkr,
instrument: key.clone(),
kind: DataKind::OrderBookL1(l1),
};
if tx.send(Ok(event)).is_err() {
break;
}
}
}
debug!(symbol = %symbol, "Quotes subscription ended");
}));
if let Err(panic_info) = result {
let msg = panic_info
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic_info.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!(symbol = %symbol_clone, "Quotes worker panicked: {msg}");
let _ = tx_panic.send(Err(DataError::Socket(format!(
"quotes subscription {symbol_clone} panicked: {msg}"
))));
}
})
.map_err(|e| DataError::Socket(format!("Failed to spawn quotes thread: {e}")))?;
Ok(())
}
fn run_depth_subscription(
client: Arc<Client>,
contract: ibapi::contracts::Contract,
key: K,
rows: i32,
tx: mpsc::UnboundedSender<Result<MarketEvent<K, DataKind>, DataError>>,
) -> Result<(), DataError> {
let symbol = contract.symbol.to_string();
let symbol_clone = symbol.clone();
let tx_panic = tx.clone();
std::thread::Builder::new()
.name(format!("ibkr-depth-{symbol}"))
.spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
debug!(symbol = %symbol, rows, "Starting depth subscription");
let sub = match client.market_depth(&contract, rows, false) {
Ok(s) => s,
Err(e) => {
error!(symbol = %symbol, error = %e, "Failed to subscribe to depth");
let _ = tx.send(Err(DataError::Socket(format!(
"depth subscription {symbol}: {e}"
))));
return;
}
};
let mut aggregator = DepthAggregator::new();
for depth in sub {
if let Some(book_event) = aggregator.update(&depth) {
let now = Utc::now();
let event = MarketEvent {
time_exchange: now,
time_received: now,
exchange: ExchangeId::Ibkr,
instrument: key.clone(),
kind: DataKind::OrderBook(book_event),
};
if tx.send(Ok(event)).is_err() {
break;
}
}
}
debug!(symbol = %symbol, "Depth subscription ended");
}));
if let Err(panic_info) = result {
let msg = panic_info
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic_info.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!(symbol = %symbol_clone, "Depth worker panicked: {msg}");
let _ = tx_panic.send(Err(DataError::Socket(format!(
"depth subscription {symbol_clone} panicked: {msg}"
))));
}
})
.map_err(|e| DataError::Socket(format!("Failed to spawn depth thread: {e}")))?;
Ok(())
}
fn run_trades_subscription(
client: Arc<Client>,
contract: ibapi::contracts::Contract,
key: K,
tx: mpsc::UnboundedSender<Result<MarketEvent<K, DataKind>, DataError>>,
) -> Result<(), DataError> {
let symbol = contract.symbol.to_string();
let symbol_clone = symbol.clone();
let tx_panic = tx.clone();
std::thread::Builder::new()
.name(format!("ibkr-trades-{symbol}"))
.spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
debug!(symbol = %symbol, "Starting trades subscription");
let sub = match client.tick_by_tick_all_last(&contract, 0, false) {
Ok(s) => s,
Err(e) => {
error!(symbol = %symbol, error = %e, "Failed to subscribe to trades");
let _ = tx.send(Err(DataError::Socket(format!(
"trades subscription {symbol}: {e}"
))));
return;
}
};
for trade in sub {
let now = Utc::now();
let public_trade = match trades::from_ib_trade(&trade) {
Some(t) => t,
None => continue, };
let time_exchange = trades::parse_trade_time(&trade, now);
let event = MarketEvent {
time_exchange,
time_received: now,
exchange: ExchangeId::Ibkr,
instrument: key.clone(),
kind: DataKind::Trade(public_trade),
};
if tx.send(Ok(event)).is_err() {
break;
}
}
debug!(symbol = %symbol, "Trades subscription ended");
}));
if let Err(panic_info) = result {
let msg = panic_info
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic_info.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!(symbol = %symbol_clone, "Trades worker panicked: {msg}");
let _ = tx_panic.send(Err(DataError::Socket(format!(
"trades subscription {symbol_clone} panicked: {msg}"
))));
}
})
.map_err(|e| DataError::Socket(format!("Failed to spawn trades thread: {e}")))?;
Ok(())
}
fn run_option_greeks_subscription(
client: Arc<Client>,
contract: ibapi::contracts::Contract,
key: K,
tx: mpsc::UnboundedSender<Result<MarketEvent<K, DataKind>, DataError>>,
) -> Result<(), DataError> {
if contract.security_type != SecurityType::Option {
return Err(DataError::Socket(format!(
"option Greeks subscription requires SecurityType::Option, got {:?} for {}",
contract.security_type, contract.symbol
)));
}
let symbol = contract.symbol.to_string();
let symbol_clone = symbol.clone();
let tx_panic = tx.clone();
std::thread::Builder::new()
.name(format!("ibkr-greeks-{symbol}"))
.spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
debug!(symbol = %symbol, "Starting option Greeks subscription");
let sub = match client.market_data(&contract).subscribe() {
Ok(s) => s,
Err(e) => {
error!(symbol = %symbol, error = %e, "Failed to subscribe to option Greeks");
let _ = tx.send(Err(DataError::Socket(format!(
"option Greeks subscription {symbol}: {e}"
))));
return;
}
};
let aggregator = GreeksAggregator::new();
for tick in sub {
if let Some(greeks) = aggregator.update(&tick) {
let now = Utc::now();
let event = MarketEvent {
time_exchange: now,
time_received: now,
exchange: ExchangeId::Ibkr,
instrument: key.clone(),
kind: DataKind::OptionGreeks(greeks),
};
if tx.send(Ok(event)).is_err() {
break;
}
}
}
debug!(symbol = %symbol, "Option Greeks subscription ended");
}));
if let Err(panic_info) = result {
let msg = panic_info
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic_info.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!(symbol = %symbol_clone, "Option Greeks worker panicked: {msg}");
let _ = tx_panic.send(Err(DataError::Socket(format!(
"option Greeks subscription {symbol_clone} panicked: {msg}"
))));
}
})
.map_err(|e| DataError::Socket(format!("Failed to spawn option Greeks thread: {e}")))?;
Ok(())
}
}
impl<K> Drop for IbkrMarketStream<K> {
fn drop(&mut self) {
debug!("Dropping IbkrMarketStream, disconnecting client");
self.client.disconnect();
}
}
pub(crate) fn decimal_from_f64(value: f64) -> Option<Decimal> {
if !value.is_finite() {
return None;
}
Decimal::try_from(value).ok()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decimal_from_f64_handles_special_values() {
assert!(decimal_from_f64(100.0).is_some());
assert!(decimal_from_f64(-50.5).is_some());
assert!(decimal_from_f64(0.0).is_some());
assert!(decimal_from_f64(f64::NAN).is_none());
assert!(decimal_from_f64(f64::INFINITY).is_none());
assert!(decimal_from_f64(f64::NEG_INFINITY).is_none());
assert!(
decimal_from_f64(f64::MAX).is_none(),
"f64::MAX should not convert to Decimal"
);
}
}