#![allow(clippy::unwrap_used, clippy::expect_used)]
use futures_util::StreamExt;
use ibapi::contracts::Contract;
use rustrade_data::exchange::ibkr::{
IbkrMarketStream, IbkrStreamConfig,
subscription::{IbkrSubscription, IbkrSubscriptionKind},
};
use rustrade_instrument::ibkr::ContractRegistry;
use std::sync::Arc;
use tracing::{info, warn};
#[tokio::main]
async fn main() {
init_logging();
let config = IbkrStreamConfig {
host: "127.0.0.1".to_string(),
port: 4002, client_id: 100,
};
let registry = ContractRegistry::new();
let aapl_contract = Contract::stock("AAPL").build();
registry.register("AAPL".into(), aapl_contract.clone());
let registry = Arc::new(registry);
let subscriptions = vec![IbkrSubscription {
instrument: "AAPL".into(),
key: "AAPL".to_string(),
kind: IbkrSubscriptionKind::Quotes,
}];
info!("Connecting to IB Gateway...");
let mut stream = match IbkrMarketStream::init(config, registry, subscriptions) {
Ok(s) => s,
Err(e) => {
warn!("Failed to connect: {e}");
warn!("Make sure TWS/Gateway is running with API enabled on port 4002");
return;
}
};
info!("Connected! Streaming market data for AAPL...");
info!("Press Ctrl+C to stop");
while let Some(result) = stream.next().await {
match result {
Ok(event) => {
info!("{event:?}");
}
Err(e) => {
warn!("Stream error: {e}");
}
}
}
info!("Stream ended");
}
fn init_logging() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::filter::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy(),
)
.with_ansi(cfg!(debug_assertions))
.init()
}