use super::options::{OptionChainEntry, OptionGreeks};
use crate::{
books::Level,
error::DataError,
subscription::{book::OrderBookL1, candle::Candle, trade::PublicTrade},
};
use chrono::{DateTime, Utc};
use ibapi::{
client::blocking::Client,
contracts::{Contract, SecurityType},
market_data::{
TradingHours,
historical::{BarSize, Duration, TickBidAsk, TickLast, WhatToShow},
},
};
use rust_decimal::Decimal;
use smol_str::format_smolstr;
use std::{
hash::{Hash, Hasher},
sync::Arc,
};
use time::OffsetDateTime;
use tracing::{debug, info, warn};
pub use ibapi::market_data::historical::ToDuration;
#[derive(Debug)]
pub struct IbkrHistoricalData {
client: Arc<Client>,
}
impl IbkrHistoricalData {
pub fn connect(url: &str, client_id: i32) -> Result<Self, DataError> {
info!(%url, client_id, "Connecting to IB for historical data");
let client = Client::connect(url, client_id)
.map_err(|e| DataError::Socket(format!("IB connect: {e}")))?;
Ok(Self {
client: Arc::new(client),
})
}
pub fn from_client(client: Arc<Client>) -> Self {
Self { client }
}
pub fn disconnect(&self) {
debug!("Disconnecting IbkrHistoricalData");
self.client.disconnect();
}
pub async fn fetch_candles(
&self,
request: HistoricalRequest,
) -> Result<Vec<Candle>, DataError> {
let client = self.client.clone();
let symbol = request.contract.symbol.clone();
debug!(
symbol = %symbol,
bar_size = ?request.bar_size,
duration = ?request.duration,
"Fetching historical data"
);
let candles = tokio::task::spawn_blocking(move || {
let trading_hours = if request.regular_trading_hours_only {
TradingHours::Regular
} else {
TradingHours::Extended
};
let historical_data = client
.historical_data(
&request.contract,
request.end_date,
request.duration,
request.bar_size,
request.what_to_show,
trading_hours,
)
.map_err(|e| DataError::Socket(format!("historical data: {e}")))?;
let mut candles = Vec::with_capacity(historical_data.bars.len());
for bar in &historical_data.bars {
candles.push(bar_to_candle(bar)?);
}
Ok::<_, DataError>(candles)
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("historical_data task panicked: {e}"))
} else {
DataError::Socket(format!("historical_data task cancelled: {e}"))
}
})??;
debug!(symbol = %symbol, count = candles.len(), "Received historical bars");
Ok(candles)
}
pub async fn fetch_historical_ticks(
&self,
request: HistoricalTickRequest,
) -> Result<Vec<PublicTrade>, DataError> {
if request.start.is_none() && request.end.is_none() {
return Err(DataError::Socket(
"HistoricalTickRequest: at least one of `start` or `end` must be set".into(),
));
}
let client = self.client.clone();
let symbol = request.contract.symbol.clone();
debug!(
symbol = %symbol,
number_of_ticks = request.number_of_ticks,
"Fetching historical trade ticks"
);
let trades = tokio::task::spawn_blocking(move || {
let trading_hours = if request.regular_trading_hours_only {
TradingHours::Regular
} else {
TradingHours::Extended
};
let subscription = client
.historical_ticks_trade(
&request.contract,
request.start,
request.end,
request.number_of_ticks,
trading_hours,
)
.map_err(|e| DataError::Socket(format!("historical_ticks_trade: {e}")))?;
let mut trades =
Vec::with_capacity(usize::try_from(request.number_of_ticks).unwrap_or(0));
for (seq, tick) in subscription.into_iter().enumerate() {
if let Some(trade) = tick_last_to_public_trade(&tick, seq) {
trades.push(trade);
}
}
Ok::<_, DataError>(trades)
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("historical_ticks_trade task panicked: {e}"))
} else {
DataError::Socket(format!("historical_ticks_trade task cancelled: {e}"))
}
})??;
debug!(symbol = %symbol, count = trades.len(), "Received historical trade ticks");
Ok(trades)
}
pub async fn fetch_historical_bid_ask(
&self,
request: HistoricalTickRequest,
ignore_size: bool,
) -> Result<Vec<OrderBookL1>, DataError> {
if request.start.is_none() && request.end.is_none() {
return Err(DataError::Socket(
"HistoricalTickRequest: at least one of `start` or `end` must be set".into(),
));
}
let client = self.client.clone();
let symbol = request.contract.symbol.clone();
debug!(
symbol = %symbol,
number_of_ticks = request.number_of_ticks,
ignore_size,
"Fetching historical bid/ask ticks"
);
let quotes = tokio::task::spawn_blocking(move || {
let trading_hours = if request.regular_trading_hours_only {
TradingHours::Regular
} else {
TradingHours::Extended
};
let subscription = client
.historical_ticks_bid_ask(
&request.contract,
request.start,
request.end,
request.number_of_ticks,
trading_hours,
ignore_size,
)
.map_err(|e| DataError::Socket(format!("historical_ticks_bid_ask: {e}")))?;
let mut quotes =
Vec::with_capacity(usize::try_from(request.number_of_ticks).unwrap_or(0));
for tick in subscription {
if let Some(l1) = tick_bid_ask_to_order_book_l1(&tick) {
quotes.push(l1);
}
}
Ok::<_, DataError>(quotes)
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("historical_ticks_bid_ask task panicked: {e}"))
} else {
DataError::Socket(format!("historical_ticks_bid_ask task cancelled: {e}"))
}
})??;
debug!(symbol = %symbol, count = quotes.len(), "Received historical bid/ask ticks");
Ok(quotes)
}
pub async fn calculate_theoretical_greeks(
&self,
contract: &Contract,
volatility: f64,
underlying_price: f64,
) -> Result<OptionGreeks, DataError> {
let client = self.client.clone();
let contract = contract.clone();
let symbol = contract.symbol.to_string();
debug!(
symbol = %symbol,
volatility,
underlying_price,
"Calculating theoretical option Greeks"
);
let greeks = tokio::task::spawn_blocking(move || {
let computation = client
.calculate_option_price(&contract, volatility, underlying_price)
.map_err(|e| DataError::Socket(format!("calculate_option_price: {e}")))?;
Ok::<_, DataError>(OptionGreeks::from_ib(&computation))
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("calculate_option_price task panicked: {e}"))
} else {
DataError::Socket(format!("calculate_option_price task cancelled: {e}"))
}
})??;
debug!(
symbol = %symbol,
delta = ?greeks.delta,
gamma = ?greeks.gamma,
"Calculated option Greeks"
);
Ok(greeks)
}
pub async fn calculate_implied_volatility(
&self,
contract: &Contract,
option_price: f64,
underlying_price: f64,
) -> Result<OptionGreeks, DataError> {
let client = self.client.clone();
let contract = contract.clone();
let symbol = contract.symbol.to_string();
debug!(
symbol = %symbol,
option_price,
underlying_price,
"Calculating implied volatility"
);
let greeks = tokio::task::spawn_blocking(move || {
let computation = client
.calculate_implied_volatility(&contract, option_price, underlying_price)
.map_err(|e| DataError::Socket(format!("calculate_implied_volatility: {e}")))?;
Ok::<_, DataError>(OptionGreeks::from_ib(&computation))
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("calculate_implied_volatility task panicked: {e}"))
} else {
DataError::Socket(format!("calculate_implied_volatility task cancelled: {e}"))
}
})??;
debug!(
symbol = %symbol,
iv = ?greeks.implied_volatility,
"Calculated implied volatility"
);
Ok(greeks)
}
pub async fn fetch_option_chain(
&self,
symbol: &str,
exchange: &str,
security_type: SecurityType,
contract_id: i32,
) -> Result<Vec<OptionChainEntry>, DataError> {
let client = self.client.clone();
let symbol = symbol.to_string();
let exchange = exchange.to_string();
debug!(
symbol = %symbol,
exchange = %exchange,
"Fetching option chain"
);
let chains = tokio::task::spawn_blocking(move || {
let subscription = client
.option_chain(&symbol, &exchange, security_type, contract_id)
.map_err(|e| DataError::Socket(format!("option_chain: {e}")))?;
let mut entries = Vec::with_capacity(16);
for chain in subscription {
entries.push(OptionChainEntry::from_ib(&chain));
}
debug!(symbol = %symbol, count = entries.len(), "Received option chain entries");
Ok::<_, DataError>(entries)
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("option_chain task panicked: {e}"))
} else {
DataError::Socket(format!("option_chain task cancelled: {e}"))
}
})??;
Ok(chains)
}
}
impl Drop for IbkrHistoricalData {
fn drop(&mut self) {
if Arc::strong_count(&self.client) == 1 {
debug!("Dropping IbkrHistoricalData (sole owner), disconnecting client");
self.client.disconnect();
}
}
}
#[derive(Debug, Clone)]
pub struct HistoricalRequest {
pub contract: Contract,
pub end_date: Option<OffsetDateTime>,
pub duration: Duration,
pub bar_size: BarSize,
pub what_to_show: WhatToShow,
pub regular_trading_hours_only: bool,
}
impl HistoricalRequest {
pub fn daily_trades(contract: Contract, days: i32) -> Self {
Self {
contract,
end_date: None,
duration: days.days(),
bar_size: BarSize::Day,
what_to_show: WhatToShow::Trades,
regular_trading_hours_only: true,
}
}
}
#[derive(Debug, Clone)]
pub struct HistoricalTickRequest {
pub contract: Contract,
pub start: Option<OffsetDateTime>,
pub end: Option<OffsetDateTime>,
pub number_of_ticks: i32,
pub regular_trading_hours_only: bool,
}
impl HistoricalTickRequest {
pub fn recent(contract: Contract, count: i32) -> Self {
Self {
contract,
start: None,
end: Some(OffsetDateTime::now_utc()),
number_of_ticks: count.min(1000),
regular_trading_hours_only: true,
}
}
}
fn tick_last_to_public_trade(tick: &TickLast, seq: usize) -> Option<PublicTrade> {
if !tick.price.is_finite() {
warn!(
price = tick.price,
"Historical tick has non-finite price, skipping"
);
return None;
}
let price = Decimal::try_from(tick.price).ok()?;
let amount = Decimal::from(tick.size);
Some(PublicTrade {
id: generate_tick_id(tick.timestamp, tick.price, tick.size, seq),
price,
amount,
side: None,
})
}
fn parse_tick_timestamp(timestamp: OffsetDateTime) -> Option<DateTime<Utc>> {
let unix_secs = timestamp.unix_timestamp();
let dt = DateTime::from_timestamp(unix_secs, timestamp.nanosecond());
if dt.is_none() {
warn!(
unix_timestamp = unix_secs,
"Invalid tick timestamp from IB, skipping tick"
);
}
dt
}
fn generate_tick_id(
timestamp: OffsetDateTime,
price: f64,
size: i32,
seq: usize,
) -> smol_str::SmolStr {
let mut hasher = fnv::FnvHasher::default();
timestamp.unix_timestamp_nanos().hash(&mut hasher);
price.to_bits().hash(&mut hasher);
size.hash(&mut hasher);
seq.hash(&mut hasher);
format_smolstr!("{:016x}", hasher.finish())
}
fn tick_bid_ask_to_order_book_l1(tick: &TickBidAsk) -> Option<OrderBookL1> {
if !tick.price_bid.is_finite() || !tick.price_ask.is_finite() {
warn!(
bid = tick.price_bid,
ask = tick.price_ask,
"Historical tick has non-finite price, skipping"
);
return None;
}
let bid_price = Decimal::try_from(tick.price_bid).ok()?;
let ask_price = Decimal::try_from(tick.price_ask).ok()?;
let bid_amount = Decimal::from(tick.size_bid);
let ask_amount = Decimal::from(tick.size_ask);
Some(OrderBookL1 {
last_update_time: parse_tick_timestamp(tick.timestamp)?,
best_bid: Some(Level::new(bid_price, bid_amount)),
best_ask: Some(Level::new(ask_price, ask_amount)),
})
}
fn bar_to_candle(bar: &ibapi::market_data::historical::Bar) -> Result<Candle, DataError> {
let close_time = DateTime::from_timestamp(bar.date.unix_timestamp(), bar.date.nanosecond())
.ok_or_else(|| {
DataError::Socket(format!(
"IB timestamp {} out of DateTime<Utc> range",
bar.date.unix_timestamp()
))
})?;
let open =
Decimal::try_from(bar.open).map_err(|e| DataError::Socket(format!("parse open: {e}")))?;
let high =
Decimal::try_from(bar.high).map_err(|e| DataError::Socket(format!("parse high: {e}")))?;
let low =
Decimal::try_from(bar.low).map_err(|e| DataError::Socket(format!("parse low: {e}")))?;
let close =
Decimal::try_from(bar.close).map_err(|e| DataError::Socket(format!("parse close: {e}")))?;
let volume = Decimal::try_from(bar.volume)
.map_err(|e| DataError::Socket(format!("parse volume: {e}")))?;
Ok(Candle {
close_time,
open,
high,
low,
close,
volume,
#[allow(clippy::cast_sign_loss)] trade_count: bar.count.max(0) as u64,
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use chrono::Datelike;
use time::macros::datetime;
#[test]
fn bar_to_candle_converts_all_fields() {
use rust_decimal_macros::dec;
let bar = ibapi::market_data::historical::Bar {
date: datetime!(2024-01-15 16:00 UTC),
open: 150.0,
high: 155.0,
low: 149.0,
close: 153.5,
volume: 1_000_000.0,
wap: 152.0,
count: 50_000,
};
let candle = bar_to_candle(&bar).unwrap();
assert_eq!(candle.open, dec!(150));
assert_eq!(candle.high, dec!(155));
assert_eq!(candle.low, dec!(149));
assert_eq!(candle.close, dec!(153.5));
assert_eq!(candle.volume, dec!(1_000_000));
assert_eq!(candle.trade_count, 50_000);
assert_eq!(candle.close_time.year(), 2024);
assert_eq!(candle.close_time.month(), 1); assert_eq!(candle.close_time.day(), 15);
assert_eq!(candle.close_time.timestamp(), bar.date.unix_timestamp());
}
#[test]
fn bar_to_candle_handles_negative_count() {
let bar = ibapi::market_data::historical::Bar {
date: datetime!(2024-01-15 16:00 UTC),
open: 100.0,
high: 100.0,
low: 100.0,
close: 100.0,
volume: 0.0,
wap: 0.0,
count: -1, };
let candle = bar_to_candle(&bar).unwrap();
assert_eq!(candle.trade_count, 0);
}
#[test]
fn historical_request_daily_trades_builder() {
let contract = Contract::stock("AAPL").build();
let request = HistoricalRequest::daily_trades(contract, 30);
assert_eq!(request.contract.symbol.as_str(), "AAPL");
assert!(request.end_date.is_none());
assert!(matches!(request.bar_size, BarSize::Day));
assert!(matches!(request.what_to_show, WhatToShow::Trades));
assert!(request.regular_trading_hours_only);
}
#[test]
fn historical_tick_request_recent_builder() {
let contract = Contract::stock("AAPL").build();
let request = HistoricalTickRequest::recent(contract, 500);
assert_eq!(request.contract.symbol.as_str(), "AAPL");
assert!(request.start.is_none());
assert!(request.end.is_some()); assert_eq!(request.number_of_ticks, 500);
assert!(request.regular_trading_hours_only);
}
#[test]
fn historical_tick_request_recent_clamps_to_1000() {
let contract = Contract::stock("AAPL").build();
let request = HistoricalTickRequest::recent(contract, 5000);
assert_eq!(request.number_of_ticks, 1000);
}
fn make_tick_last(unix_time: i64, price: f64, size: i32) -> TickLast {
use ibapi::market_data::historical::TickAttributeLast;
TickLast {
timestamp: time::OffsetDateTime::from_unix_timestamp(unix_time)
.unwrap_or(time::OffsetDateTime::UNIX_EPOCH),
tick_attribute_last: TickAttributeLast {
past_limit: false,
unreported: false,
},
price,
size,
exchange: String::new(),
special_conditions: String::new(),
}
}
fn make_tick_bid_ask(
unix_time: i64,
bid_price: f64,
bid_size: i32,
ask_price: f64,
ask_size: i32,
) -> TickBidAsk {
use ibapi::market_data::historical::TickAttributeBidAsk;
TickBidAsk {
timestamp: time::OffsetDateTime::from_unix_timestamp(unix_time)
.unwrap_or(time::OffsetDateTime::UNIX_EPOCH),
tick_attribute_bid_ask: TickAttributeBidAsk {
bid_past_low: false,
ask_past_high: false,
},
price_bid: bid_price,
price_ask: ask_price,
size_bid: bid_size,
size_ask: ask_size,
}
}
#[test]
fn tick_last_converts_to_public_trade() {
use rust_decimal_macros::dec;
let tick = make_tick_last(1700000000, 150.25, 100);
let trade = tick_last_to_public_trade(&tick, 0).unwrap();
assert_eq!(trade.price, dec!(150.25));
assert_eq!(trade.amount, dec!(100));
assert!(trade.side.is_none());
assert!(!trade.id.is_empty());
}
#[test]
fn tick_last_rejects_non_finite_price() {
let tick = make_tick_last(1700000000, f64::NAN, 100);
assert!(tick_last_to_public_trade(&tick, 0).is_none());
let tick = make_tick_last(1700000000, f64::INFINITY, 100);
assert!(tick_last_to_public_trade(&tick, 0).is_none());
}
#[test]
fn tick_last_generates_unique_ids() {
let tick1 = make_tick_last(1700000000, 150.25, 100);
let tick2 = make_tick_last(1700000001, 150.25, 100);
let tick3 = make_tick_last(1700000000, 150.26, 100);
let id1 = generate_tick_id(tick1.timestamp, tick1.price, tick1.size, 0);
let id2 = generate_tick_id(tick2.timestamp, tick2.price, tick2.size, 0);
let id3 = generate_tick_id(tick3.timestamp, tick3.price, tick3.size, 0);
assert_ne!(id1, id2);
assert_ne!(id1, id3);
assert_ne!(id2, id3);
}
#[test]
fn tick_last_same_data_same_seq_same_id() {
let tick1 = make_tick_last(1700000000, 150.25, 100);
let tick2 = make_tick_last(1700000000, 150.25, 100);
let id1 = generate_tick_id(tick1.timestamp, tick1.price, tick1.size, 0);
let id2 = generate_tick_id(tick2.timestamp, tick2.price, tick2.size, 0);
assert_eq!(id1, id2);
}
#[test]
fn tick_last_same_data_different_seq_different_id() {
let tick1 = make_tick_last(1700000000, 150.25, 100);
let tick2 = make_tick_last(1700000000, 150.25, 100);
let id1 = generate_tick_id(tick1.timestamp, tick1.price, tick1.size, 0);
let id2 = generate_tick_id(tick2.timestamp, tick2.price, tick2.size, 1);
assert_ne!(id1, id2);
}
#[test]
fn tick_bid_ask_converts_to_order_book_l1() {
use rust_decimal_macros::dec;
let tick = make_tick_bid_ask(1700000000, 150.00, 500, 150.05, 300);
let l1 = tick_bid_ask_to_order_book_l1(&tick).unwrap();
let bid = l1.best_bid.unwrap();
let ask = l1.best_ask.unwrap();
assert_eq!(bid.price, dec!(150.00));
assert_eq!(bid.amount, dec!(500));
assert_eq!(ask.price, dec!(150.05));
assert_eq!(ask.amount, dec!(300));
assert_eq!(l1.last_update_time.timestamp(), 1700000000);
}
#[test]
fn tick_bid_ask_rejects_non_finite_prices() {
let tick = make_tick_bid_ask(1700000000, f64::NAN, 500, 150.05, 300);
assert!(tick_bid_ask_to_order_book_l1(&tick).is_none());
let tick = make_tick_bid_ask(1700000000, 150.00, 500, f64::INFINITY, 300);
assert!(tick_bid_ask_to_order_book_l1(&tick).is_none());
}
#[test]
fn parse_tick_timestamp_converts_correctly() {
let ts = time::OffsetDateTime::from_unix_timestamp(1700000000).unwrap();
let dt = parse_tick_timestamp(ts).unwrap();
assert_eq!(dt.timestamp(), 1700000000);
assert_eq!(dt.timestamp_subsec_nanos(), 0);
}
#[test]
fn parse_tick_timestamp_preserves_subsecond_nanos() {
let ts =
time::OffsetDateTime::from_unix_timestamp_nanos(1_700_000_000_123_456_789).unwrap();
let dt = parse_tick_timestamp(ts).unwrap();
assert_eq!(dt.timestamp(), 1700000000);
assert_eq!(dt.timestamp_subsec_nanos(), 123_456_789);
}
}