#![cfg(feature = "ibkr")]
#![allow(clippy::unwrap_used, clippy::expect_used)]
use ibapi::{
contracts::Contract,
market_data::historical::{BarSize, WhatToShow},
};
use rustrade_data::{
event::DataKind,
exchange::ibkr::{
IbkrMarketStream, IbkrStreamConfig,
historical::{HistoricalRequest, IbkrHistoricalData, ToDuration},
subscription::{IbkrSubscription, IbkrSubscriptionKind},
},
};
use rustrade_instrument::ibkr::ContractRegistry;
use std::{sync::Arc, time::Duration};
use tokio_stream::StreamExt;
use tracing_subscriber::{EnvFilter, fmt};
fn init_logging() {
let _ = fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(tracing::Level::DEBUG.into())
.from_env_lossy(),
)
.try_init();
}
fn test_port() -> u16 {
std::env::var("IBKR_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(4002)
}
fn test_client_id_base() -> i32 {
std::env::var("IBKR_CLIENT_ID")
.ok()
.and_then(|id| id.parse().ok())
.unwrap_or(300)
}
fn aapl_contract() -> Contract {
Contract::stock("AAPL").build()
}
async fn connect_historical(url: &str, client_id: i32) -> Result<IbkrHistoricalData, String> {
let url = url.to_string();
tokio::task::spawn_blocking(move || {
IbkrHistoricalData::connect(&url, client_id).map_err(|e| e.to_string())
})
.await
.map_err(|e| format!("task join: {e}"))?
}
async fn connect_raw_client(
url: &str,
client_id: i32,
) -> Result<ibapi::client::blocking::Client, String> {
let url = url.to_string();
tokio::task::spawn_blocking(move || {
ibapi::client::blocking::Client::connect(&url, client_id).map_err(|e| e.to_string())
})
.await
.map_err(|e| format!("task join: {e}"))?
}
#[tokio::test]
#[ignore]
async fn test_historical_connection() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base();
let result = connect_historical(&url, client_id).await;
assert!(result.is_ok(), "Failed to connect: {:?}", result.err());
println!("Connected to IB for historical data");
}
#[tokio::test]
#[ignore]
async fn test_historical_daily_bars() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 1;
let client = connect_historical(&url, client_id)
.await
.expect("connection failed");
let contract = aapl_contract();
let request = HistoricalRequest::daily_trades(contract, 30);
println!("Fetching 30 days of AAPL daily bars...");
let result = client.fetch_candles(request).await;
assert!(result.is_ok(), "fetch_candles failed: {:?}", result.err());
let candles = result.unwrap();
println!("Received {} candles", candles.len());
assert!(!candles.is_empty(), "Expected at least one candle");
for candle in candles.iter().take(5) {
println!(
" {} O:{:.2} H:{:.2} L:{:.2} C:{:.2} V:{:.0} T:{}",
candle.close_time.format("%Y-%m-%d"),
candle.open,
candle.high,
candle.low,
candle.close,
candle.volume,
candle.trade_count
);
}
let first = &candles[0];
assert!(
first.high >= first.low,
"High {:.4} should be >= Low {:.4}",
first.high,
first.low
);
assert!(
first.high >= first.open,
"High {:.4} should be >= Open {:.4}",
first.high,
first.open
);
assert!(
first.high >= first.close,
"High {:.4} should be >= Close {:.4}",
first.high,
first.close
);
assert!(
first.low <= first.open,
"Low {:.4} should be <= Open {:.4}",
first.low,
first.open
);
assert!(
first.low <= first.close,
"Low {:.4} should be <= Close {:.4}",
first.low,
first.close
);
assert!(
first.volume >= 0.0,
"Volume {:.2} should be non-negative",
first.volume
);
}
#[tokio::test]
#[ignore]
async fn test_historical_hourly_bars() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 2;
let client = connect_historical(&url, client_id)
.await
.expect("connection failed");
let contract = aapl_contract();
let request = HistoricalRequest {
contract,
end_date: None,
duration: 5.days(),
bar_size: BarSize::Hour,
what_to_show: WhatToShow::Trades,
regular_trading_hours_only: true,
};
println!("Fetching 5 days of AAPL hourly bars...");
let result = client.fetch_candles(request).await;
assert!(result.is_ok(), "fetch_candles failed: {:?}", result.err());
let candles = result.unwrap();
println!("Received {} hourly candles", candles.len());
assert!(!candles.is_empty(), "Expected at least one hourly candle");
if candles.len() > 1 {
let first_time = candles[0].close_time;
let second_time = candles[1].close_time;
let diff = second_time - first_time;
println!(
"Time between first two bars: {} seconds",
diff.num_seconds()
);
assert!(
diff.num_seconds() > 0,
"Candles should be chronologically ordered"
);
}
}
#[tokio::test]
#[ignore]
async fn test_historical_minute_bars() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 3;
let client = connect_historical(&url, client_id)
.await
.expect("connection failed");
let contract = aapl_contract();
let request = HistoricalRequest {
contract,
end_date: None,
duration: 1.days(),
bar_size: BarSize::Min,
what_to_show: WhatToShow::Trades,
regular_trading_hours_only: true,
};
println!("Fetching 1 day of AAPL 1-minute bars...");
let result = client.fetch_candles(request).await;
assert!(result.is_ok(), "fetch_candles failed: {:?}", result.err());
let candles = result.unwrap();
println!("Received {} minute candles", candles.len());
assert!(!candles.is_empty(), "Expected at least one 1-minute candle");
}
#[tokio::test]
#[ignore]
async fn test_historical_midpoint_data() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 4;
let client = connect_historical(&url, client_id)
.await
.expect("connection failed");
let contract = aapl_contract();
let request = HistoricalRequest {
contract,
end_date: None,
duration: 5.days(),
bar_size: BarSize::Day,
what_to_show: WhatToShow::MidPoint,
regular_trading_hours_only: true,
};
println!("Fetching 5 days of AAPL midpoint data...");
let result = client.fetch_candles(request).await;
assert!(result.is_ok(), "fetch_candles failed: {:?}", result.err());
let candles = result.unwrap();
println!("Received {} midpoint candles", candles.len());
if !candles.is_empty() {
let first = &candles[0];
println!(
" First midpoint: {} C:{:.2}",
first.close_time.format("%Y-%m-%d"),
first.close
);
assert_eq!(
first.trade_count, 0,
"Midpoint data should have no trade count"
);
}
}
#[tokio::test]
#[ignore]
async fn test_historical_from_shared_client() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 5;
let ib_client = connect_raw_client(&url, client_id)
.await
.expect("connection failed");
let ib_client = Arc::new(ib_client);
let historical = IbkrHistoricalData::from_client(ib_client);
let contract = aapl_contract();
let request = HistoricalRequest::daily_trades(contract, 10);
let result = historical.fetch_candles(request).await;
assert!(
result.is_ok(),
"fetch_candles from shared client failed: {:?}",
result.err()
);
let candles = result.unwrap();
assert!(
!candles.is_empty(),
"Expected at least one candle from shared client"
);
println!("Received {} candles from shared client", candles.len());
}
#[tokio::test]
#[ignore]
async fn test_market_stream_connection() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: test_port(),
client_id: test_client_id_base() + 10,
};
let registry = ContractRegistry::new();
registry.register("AAPL".into(), aapl_contract());
let registry = Arc::new(registry);
let subscriptions = vec![IbkrSubscription {
instrument: "AAPL".into(),
key: "AAPL".to_string(),
kind: IbkrSubscriptionKind::Quotes,
}];
let result = IbkrMarketStream::init(config, registry, subscriptions);
assert!(
result.is_ok(),
"Failed to initialize market stream: {:?}",
result.err()
);
println!("Market stream initialized successfully");
}
#[tokio::test]
#[ignore]
async fn test_market_stream_quotes() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: test_port(),
client_id: test_client_id_base() + 11,
};
let registry = ContractRegistry::new();
registry.register("AAPL".into(), aapl_contract());
let registry = Arc::new(registry);
let subscriptions = vec![IbkrSubscription {
instrument: "AAPL".into(),
key: "AAPL".to_string(),
kind: IbkrSubscriptionKind::Quotes,
}];
let mut stream =
IbkrMarketStream::init(config, registry, subscriptions).expect("stream init failed");
println!("Waiting for quote events (10 second timeout)...");
println!("Note: No quotes will arrive outside US market hours (9:30 AM - 4:00 PM ET)");
let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
let mut quote_count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(event) => {
if let DataKind::OrderBookL1(l1) = &event.kind {
let bid_price = l1.best_bid.as_ref().map(|b| b.price);
let bid_amount = l1.best_bid.as_ref().map(|b| b.amount);
let ask_price = l1.best_ask.as_ref().map(|a| a.price);
let ask_amount = l1.best_ask.as_ref().map(|a| a.amount);
println!(
"Quote: bid={:?} @ {:?}, ask={:?} @ {:?}",
bid_price, bid_amount, ask_price, ask_amount
);
quote_count += 1;
if quote_count >= 5 {
break;
}
}
}
Err(e) => {
println!("Stream error: {:?}", e);
break;
}
}
}
quote_count
})
.await;
match timeout_result {
Ok(count) => println!("Received {} quotes", count),
Err(_) => println!("Timeout (normal outside market hours)"),
}
}
#[tokio::test]
#[ignore]
async fn test_market_stream_depth() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: test_port(),
client_id: test_client_id_base() + 12,
};
let registry = ContractRegistry::new();
registry.register("AAPL".into(), aapl_contract());
let registry = Arc::new(registry);
let subscriptions = vec![IbkrSubscription {
instrument: "AAPL".into(),
key: "AAPL".to_string(),
kind: IbkrSubscriptionKind::Depth { rows: 5 },
}];
let mut stream =
IbkrMarketStream::init(config, registry, subscriptions).expect("stream init failed");
println!("Waiting for depth events (10 second timeout)...");
println!("Note: Depth may not be available for all instruments or times");
let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
let mut depth_count = 0;
while let Some(result) = stream.next().await {
match result {
Ok(event) => {
if let DataKind::OrderBook(book_event) = &event.kind {
println!("Depth event: {:?}", book_event);
depth_count += 1;
if depth_count >= 3 {
break;
}
}
}
Err(e) => {
println!("Stream error: {:?}", e);
break;
}
}
}
depth_count
})
.await;
match timeout_result {
Ok(count) => println!("Received {} depth updates", count),
Err(_) => println!("Timeout (depth may not be available)"),
}
}
#[tokio::test]
#[ignore]
async fn test_market_stream_unregistered_contract() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: test_port(),
client_id: test_client_id_base() + 13,
};
let registry = Arc::new(ContractRegistry::new());
let subscriptions = vec![IbkrSubscription {
instrument: "UNKNOWN_SYMBOL".into(),
key: "UNKNOWN".to_string(),
kind: IbkrSubscriptionKind::Quotes,
}];
let result = IbkrMarketStream::init(config, registry, subscriptions);
assert!(result.is_err(), "Expected error for unregistered contract");
println!(
"Correctly rejected unregistered contract: {:?}",
result.err()
);
}
#[tokio::test]
#[ignore]
async fn test_market_stream_multiple_subscriptions() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: test_port(),
client_id: test_client_id_base() + 14,
};
let registry = ContractRegistry::new();
registry.register("AAPL".into(), aapl_contract());
registry.register("MSFT".into(), Contract::stock("MSFT").build());
let registry = Arc::new(registry);
let subscriptions = vec![
IbkrSubscription {
instrument: "AAPL".into(),
key: "AAPL".to_string(),
kind: IbkrSubscriptionKind::Quotes,
},
IbkrSubscription {
instrument: "MSFT".into(),
key: "MSFT".to_string(),
kind: IbkrSubscriptionKind::Quotes,
},
];
let result = IbkrMarketStream::init(config, registry, subscriptions);
assert!(
result.is_ok(),
"Failed to initialize multi-subscription stream: {:?}",
result.err()
);
println!("Multi-subscription stream initialized successfully");
let mut stream = result.unwrap();
let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
let mut aapl_count = 0;
let mut msft_count = 0;
while let Some(result) = stream.next().await {
if let Ok(event) = result {
match event.instrument.as_str() {
"AAPL" => aapl_count += 1,
"MSFT" => msft_count += 1,
_ => {}
}
if aapl_count >= 2 && msft_count >= 2 {
break;
}
}
}
(aapl_count, msft_count)
})
.await;
match timeout_result {
Ok((aapl, msft)) => println!("Received {} AAPL, {} MSFT events", aapl, msft),
Err(_) => println!("Timeout (normal outside market hours)"),
}
}
#[tokio::test]
#[ignore]
async fn test_contract_resolution() {
init_logging();
let url = format!("127.0.0.1:{}", test_port());
let client_id = test_client_id_base() + 20;
let client = connect_raw_client(&url, client_id)
.await
.expect("connection failed");
let contract = aapl_contract();
println!("Resolving AAPL contract details...");
let details = tokio::task::spawn_blocking(move || client.contract_details(&contract))
.await
.expect("task join failed");
assert!(
details.is_ok(),
"contract_details failed: {:?}",
details.err()
);
let details = details.unwrap();
assert!(!details.is_empty(), "Expected at least one contract detail");
let first = &details[0];
println!("Contract ID: {}", first.contract.contract_id);
println!("Symbol: {}", first.contract.symbol);
println!("Exchange: {}", first.contract.exchange);
println!("Currency: {}", first.contract.currency);
let registry = ContractRegistry::new();
registry.register("AAPL".into(), first.contract.clone());
assert_eq!(registry.len(), 1);
assert!(registry.get_contract(&"AAPL".into()).is_some());
assert!(
registry
.get_name_by_con_id(first.contract.contract_id)
.is_some()
);
}