use crate::{error::DataError, subscription::candle::Candle};
use chrono::DateTime;
use ibapi::{
client::blocking::Client,
contracts::Contract,
market_data::{
TradingHours,
historical::{BarSize, Duration, WhatToShow},
},
};
use std::sync::Arc;
use time::OffsetDateTime;
use tracing::{debug, info};
pub use ibapi::market_data::historical::ToDuration;
#[derive(Debug)]
pub struct IbkrHistoricalData {
client: Arc<Client>,
}
impl IbkrHistoricalData {
pub fn connect(url: &str, client_id: i32) -> Result<Self, DataError> {
info!(%url, client_id, "Connecting to IB for historical data");
let client = Client::connect(url, client_id)
.map_err(|e| DataError::Socket(format!("IB connect: {e}")))?;
Ok(Self {
client: Arc::new(client),
})
}
pub fn from_client(client: Arc<Client>) -> Self {
Self { client }
}
pub async fn fetch_candles(
&self,
request: HistoricalRequest,
) -> Result<Vec<Candle>, DataError> {
let client = self.client.clone();
let symbol = request.contract.symbol.clone();
debug!(
symbol = %symbol,
bar_size = ?request.bar_size,
duration = ?request.duration,
"Fetching historical data"
);
let candles = tokio::task::spawn_blocking(move || {
let trading_hours = if request.regular_trading_hours_only {
TradingHours::Regular
} else {
TradingHours::Extended
};
let historical_data = client
.historical_data(
&request.contract,
request.end_date,
request.duration,
request.bar_size,
request.what_to_show,
trading_hours,
)
.map_err(|e| DataError::Socket(format!("historical data: {e}")))?;
let mut candles = Vec::with_capacity(historical_data.bars.len());
for bar in &historical_data.bars {
candles.push(bar_to_candle(bar)?);
}
Ok::<_, DataError>(candles)
})
.await
.map_err(|e| {
if e.is_panic() {
DataError::Socket(format!("historical_data task panicked: {e}"))
} else {
DataError::Socket(format!("historical_data task cancelled: {e}"))
}
})??;
debug!(symbol = %symbol, count = candles.len(), "Received historical bars");
Ok(candles)
}
}
#[derive(Debug, Clone)]
pub struct HistoricalRequest {
pub contract: Contract,
pub end_date: Option<OffsetDateTime>,
pub duration: Duration,
pub bar_size: BarSize,
pub what_to_show: WhatToShow,
pub regular_trading_hours_only: bool,
}
impl HistoricalRequest {
pub fn daily_trades(contract: Contract, days: i32) -> Self {
Self {
contract,
end_date: None,
duration: days.days(),
bar_size: BarSize::Day,
what_to_show: WhatToShow::Trades,
regular_trading_hours_only: true,
}
}
}
fn bar_to_candle(bar: &ibapi::market_data::historical::Bar) -> Result<Candle, DataError> {
let close_time = DateTime::from_timestamp(bar.date.unix_timestamp(), bar.date.nanosecond())
.ok_or_else(|| {
DataError::Socket(format!(
"IB timestamp {} out of DateTime<Utc> range",
bar.date.unix_timestamp()
))
})?;
Ok(Candle {
close_time,
open: bar.open,
high: bar.high,
low: bar.low,
close: bar.close,
volume: bar.volume,
#[allow(clippy::cast_sign_loss)] trade_count: bar.count.max(0) as u64,
})
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use chrono::Datelike;
use time::macros::datetime;
#[test]
fn bar_to_candle_converts_all_fields() {
let bar = ibapi::market_data::historical::Bar {
date: datetime!(2024-01-15 16:00 UTC),
open: 150.0,
high: 155.0,
low: 149.0,
close: 153.5,
volume: 1_000_000.0,
wap: 152.0,
count: 50_000,
};
let candle = bar_to_candle(&bar).unwrap();
assert_eq!(candle.open, 150.0);
assert_eq!(candle.high, 155.0);
assert_eq!(candle.low, 149.0);
assert_eq!(candle.close, 153.5);
assert_eq!(candle.volume, 1_000_000.0);
assert_eq!(candle.trade_count, 50_000);
assert_eq!(candle.close_time.year(), 2024);
assert_eq!(candle.close_time.month(), 1); assert_eq!(candle.close_time.day(), 15);
assert_eq!(candle.close_time.timestamp(), bar.date.unix_timestamp());
}
#[test]
fn bar_to_candle_handles_negative_count() {
let bar = ibapi::market_data::historical::Bar {
date: datetime!(2024-01-15 16:00 UTC),
open: 100.0,
high: 100.0,
low: 100.0,
close: 100.0,
volume: 0.0,
wap: 0.0,
count: -1, };
let candle = bar_to_candle(&bar).unwrap();
assert_eq!(candle.trade_count, 0);
}
#[test]
fn historical_request_daily_trades_builder() {
let contract = Contract::stock("AAPL").build();
let request = HistoricalRequest::daily_trades(contract, 30);
assert_eq!(request.contract.symbol.as_str(), "AAPL");
assert!(request.end_date.is_none());
assert!(matches!(request.bar_size, BarSize::Day));
assert!(matches!(request.what_to_show, WhatToShow::Trades));
assert!(request.regular_trading_hours_only);
}
}