use crate::broker::{
Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
};
use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use std::collections::HashMap;
use futures::stream::{Stream, StreamExt};
use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
use parking_lot::RwLock;
use reqwest::{Client, Method, StatusCode};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{debug, error, info, warn};
use url::Url;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct IBKRConfig {
pub host: String,
pub port: u16,
pub client_id: i32,
pub account: String,
pub paper_trading: bool,
pub timeout: Duration,
pub streaming: bool,
pub level2_depth: bool,
pub options_enabled: bool,
pub algo_orders: bool,
}
impl Default for IBKRConfig {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 7497, client_id: 1,
account: String::new(),
paper_trading: true,
timeout: Duration::from_secs(30),
streaming: true,
level2_depth: false,
options_enabled: true,
algo_orders: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IBKROrderClass {
Simple,
Bracket,
OCA,
OCO,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlgoStrategy {
VWAP {
start_time: String,
end_time: String,
},
TWAP {
start_time: String,
end_time: String,
},
PercentOfVolume {
participation_rate: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionContract {
pub underlying: String,
pub strike: Decimal,
pub expiry: String, pub right: OptionRight,
pub multiplier: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OptionRight {
Call,
Put,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionGreeks {
pub delta: f64,
pub gamma: f64,
pub theta: f64,
pub vega: f64,
pub rho: f64,
pub implied_volatility: f64,
}
#[derive(Debug, Clone)]
pub struct BracketOrder {
pub entry: OrderRequest,
pub stop_loss: OrderRequest,
pub take_profit: OrderRequest,
}
#[derive(Debug, Clone)]
pub enum TrailingStop {
Percentage(f64),
Dollar(Decimal),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MarketTick {
pub symbol: String,
pub timestamp: DateTime<Utc>,
pub last_price: Decimal,
pub bid: Decimal,
pub ask: Decimal,
pub volume: i64,
pub bid_size: i64,
pub ask_size: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MarketDepth {
pub symbol: String,
pub timestamp: DateTime<Utc>,
pub bids: Vec<(Decimal, i64)>, pub asks: Vec<(Decimal, i64)>,
}
#[derive(Debug, Clone)]
pub struct RiskCheckResult {
pub passed: bool,
pub margin_required: Decimal,
pub buying_power_used: Decimal,
pub warnings: Vec<String>,
}
pub struct IBKRBroker {
client: Client,
config: IBKRConfig,
base_url: String,
rate_limiter: DefaultDirectRateLimiter,
positions_cache: Arc<DashMap<String, Position>>,
account_cache: Arc<RwLock<Option<Account>>>,
connection_status: Arc<RwLock<ConnectionStatus>>,
market_data_tx: Arc<RwLock<Option<broadcast::Sender<MarketTick>>>>,
depth_data_tx: Arc<RwLock<Option<broadcast::Sender<MarketDepth>>>>,
option_chains: Arc<DashMap<String, Vec<OptionContract>>>,
greeks_cache: Arc<DashMap<String, OptionGreeks>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ConnectionStatus {
Connected,
Disconnected,
Reconnecting,
}
impl IBKRBroker {
pub fn new(config: IBKRConfig) -> Self {
let base_url = format!("http://{}:{}/v1", config.host, config.port);
let client = Client::builder()
.timeout(config.timeout)
.build()
.expect("Failed to create HTTP client");
let quota = Quota::per_second(NonZeroU32::new(50).unwrap());
let rate_limiter = RateLimiter::direct(quota);
Self {
client,
config,
base_url,
rate_limiter,
positions_cache: Arc::new(DashMap::new()),
account_cache: Arc::new(RwLock::new(None)),
connection_status: Arc::new(RwLock::new(ConnectionStatus::Disconnected)),
market_data_tx: Arc::new(RwLock::new(None)),
depth_data_tx: Arc::new(RwLock::new(None)),
option_chains: Arc::new(DashMap::new()),
greeks_cache: Arc::new(DashMap::new()),
}
}
pub async fn connect(&self) -> Result<(), BrokerError> {
let status: IBKRStatus = self
.request(Method::GET, "/iserver/auth/status", None::<()>)
.await?;
if status.authenticated {
info!("Connected to IBKR TWS/Gateway");
*self.connection_status.write() = ConnectionStatus::Connected;
self.refresh_account().await?;
Ok(())
} else {
error!("IBKR authentication failed");
Err(BrokerError::Auth("Not authenticated with TWS/Gateway".to_string()))
}
}
pub async fn start_streaming(&self, symbols: Vec<String>) -> Result<(), BrokerError> {
if !self.config.streaming {
return Ok(());
}
let (tx, _) = broadcast::channel(1000);
*self.market_data_tx.write() = Some(tx.clone());
for symbol in symbols {
let conid = self.get_contract_id(&symbol).await?;
self.subscribe_market_data(conid, tx.clone()).await?;
}
Ok(())
}
async fn subscribe_market_data(
&self,
conid: i64,
tx: broadcast::Sender<MarketTick>,
) -> Result<(), BrokerError> {
let req = IBKRMarketDataRequest {
conid,
fields: vec![
"31".to_string(), "84".to_string(), "86".to_string(), "87".to_string(), "88".to_string(), "85".to_string(), ],
};
let _: serde_json::Value = self
.request(Method::POST, "/iserver/marketdata/snapshot", Some(req))
.await?;
Ok(())
}
pub fn market_data_stream(&self) -> Option<broadcast::Receiver<MarketTick>> {
self.market_data_tx.read().as_ref().map(|tx| tx.subscribe())
}
pub async fn start_depth_streaming(&self, symbols: Vec<String>) -> Result<(), BrokerError> {
if !self.config.level2_depth {
return Ok(());
}
let (tx, _) = broadcast::channel(1000);
*self.depth_data_tx.write() = Some(tx.clone());
for symbol in symbols {
let conid = self.get_contract_id(&symbol).await?;
self.subscribe_depth(conid, tx.clone()).await?;
}
Ok(())
}
async fn subscribe_depth(
&self,
conid: i64,
_tx: broadcast::Sender<MarketDepth>,
) -> Result<(), BrokerError> {
let _: serde_json::Value = self
.request(
Method::POST,
&format!("/iserver/marketdata/depth?conid={}", conid),
None::<()>,
)
.await?;
Ok(())
}
pub fn depth_stream(&self) -> Option<broadcast::Receiver<MarketDepth>> {
self.depth_data_tx.read().as_ref().map(|tx| tx.subscribe())
}
pub async fn get_historical_data(
&self,
symbol: &str,
period: &str,
bar_size: &str,
) -> Result<Vec<HistoricalBar>, BrokerError> {
let conid = self.get_contract_id(symbol).await?;
let bars: IBKRHistoricalResponse = self
.request(
Method::GET,
&format!(
"/iserver/marketdata/history?conid={}&period={}&bar={}",
conid, period, bar_size
),
None::<()>,
)
.await?;
Ok(bars.data.into_iter().map(|b| b.into()).collect())
}
pub async fn get_option_chain(
&self,
underlying: &str,
) -> Result<Vec<OptionContract>, BrokerError> {
if !self.config.options_enabled {
return Err(BrokerError::InvalidOrder(
"Options trading not enabled".to_string(),
));
}
if let Some(chain) = self.option_chains.get(underlying) {
return Ok(chain.clone());
}
let conid = self.get_contract_id(underlying).await?;
let chain: IBKROptionChain = self
.request(
Method::GET,
&format!("/iserver/secdef/info?conid={}§ype=OPT", conid),
None::<()>,
)
.await?;
let contracts: Vec<OptionContract> = chain.strikes.into_iter().flat_map(|strike| {
vec![
OptionContract {
underlying: underlying.to_string(),
strike: strike.strike,
expiry: strike.expiry.clone(),
right: OptionRight::Call,
multiplier: 100,
},
OptionContract {
underlying: underlying.to_string(),
strike: strike.strike,
expiry: strike.expiry,
right: OptionRight::Put,
multiplier: 100,
},
]
}).collect();
self.option_chains.insert(underlying.to_string(), contracts.clone());
Ok(contracts)
}
pub async fn get_option_greeks(
&self,
contract: &OptionContract,
) -> Result<OptionGreeks, BrokerError> {
let cache_key = format!("{}_{}_{:?}", contract.underlying, contract.expiry, contract.right);
if let Some(greeks) = self.greeks_cache.get(&cache_key) {
return Ok(greeks.clone());
}
let conid = self.get_option_contract_id(contract).await?;
let greeks: IBKRGreeksResponse = self
.request(
Method::GET,
&format!("/iserver/marketdata/snapshot?conids={}&fields=7283,7284,7285,7286,7287,7633", conid),
None::<()>,
)
.await?;
let option_greeks: OptionGreeks = greeks.into();
self.greeks_cache.insert(cache_key, option_greeks.clone());
Ok(option_greeks)
}
pub async fn place_option_order(
&self,
contract: OptionContract,
quantity: i64,
side: OrderSide,
price: Option<Decimal>,
) -> Result<OrderResponse, BrokerError> {
let conid = self.get_option_contract_id(&contract).await?;
let order = IBKROrderRequest {
acct_id: self.config.account.clone(),
order_type: if price.is_some() { "LMT" } else { "MKT" }.to_string(),
side: match side {
OrderSide::Buy => "BUY".to_string(),
OrderSide::Sell => "SELL".to_string(),
},
tif: "DAY".to_string(),
quantity: quantity.to_string(),
price: price.map(|p| p.to_string()),
stop_price: None,
};
let response: Vec<IBKROrderResponseItem> = self
.request(
Method::POST,
&format!("/iserver/account/{}/orders", self.config.account),
Some(serde_json::json!({
"conid": conid,
"orders": [order]
})),
)
.await?;
let resp = response.first().ok_or_else(|| {
BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
})?;
Ok(OrderResponse {
order_id: resp.order_id.clone(),
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
pub async fn place_bracket_order(
&self,
bracket: BracketOrder,
) -> Result<Vec<OrderResponse>, BrokerError> {
let conid = self.get_contract_id(&bracket.entry.symbol.to_string()).await?;
let parent = self.convert_order(&bracket.entry);
let stop_loss = self.convert_order(&bracket.stop_loss);
let take_profit = self.convert_order(&bracket.take_profit);
let bracket_req = serde_json::json!({
"conid": conid,
"orders": [
{
"orderType": parent.order_type,
"side": parent.side,
"quantity": parent.quantity,
"price": parent.price,
"tif": parent.tif,
"outsideRth": false,
"attachedOrders": [
{
"orderType": stop_loss.order_type,
"side": stop_loss.side,
"quantity": stop_loss.quantity,
"stopPrice": stop_loss.stop_price,
"tif": "GTC"
},
{
"orderType": take_profit.order_type,
"side": take_profit.side,
"quantity": take_profit.quantity,
"price": take_profit.price,
"tif": "GTC"
}
]
}
]
});
let responses: Vec<IBKROrderResponseItem> = self
.request(
Method::POST,
&format!("/iserver/account/{}/orders", self.config.account),
Some(bracket_req),
)
.await?;
Ok(responses
.into_iter()
.map(|r| OrderResponse {
order_id: r.order_id,
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
.collect())
}
pub async fn place_trailing_stop(
&self,
symbol: &str,
quantity: i64,
side: OrderSide,
trail: TrailingStop,
) -> Result<OrderResponse, BrokerError> {
let conid = self.get_contract_id(symbol).await?;
let (trail_amount, trail_unit) = match trail {
TrailingStop::Percentage(pct) => (pct.to_string(), "%"),
TrailingStop::Dollar(amt) => (amt.to_string(), "$"),
};
let order = serde_json::json!({
"conid": conid,
"orderType": "TRAIL",
"side": match side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
},
"quantity": quantity,
"tif": "GTC",
"trailingAmount": trail_amount,
"trailingType": trail_unit,
});
let responses: Vec<IBKROrderResponseItem> = self
.request(
Method::POST,
&format!("/iserver/account/{}/orders", self.config.account),
Some(serde_json::json!({ "orders": [order] })),
)
.await?;
let resp = responses.first().ok_or_else(|| {
BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
})?;
Ok(OrderResponse {
order_id: resp.order_id.clone(),
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
pub async fn place_algo_order(
&self,
symbol: &str,
quantity: i64,
side: OrderSide,
strategy: AlgoStrategy,
) -> Result<OrderResponse, BrokerError> {
if !self.config.algo_orders {
return Err(BrokerError::InvalidOrder(
"Algorithmic orders not enabled".to_string(),
));
}
let conid = self.get_contract_id(symbol).await?;
let (algo_strategy, algo_params) = match strategy {
AlgoStrategy::VWAP { start_time, end_time } => (
"Vwap",
serde_json::json!({
"startTime": start_time,
"endTime": end_time,
"allowPastEndTime": true,
}),
),
AlgoStrategy::TWAP { start_time, end_time } => (
"Twap",
serde_json::json!({
"startTime": start_time,
"endTime": end_time,
"allowPastEndTime": true,
}),
),
AlgoStrategy::PercentOfVolume { participation_rate } => (
"PctVol",
serde_json::json!({
"pctVol": participation_rate,
}),
),
};
let order = serde_json::json!({
"conid": conid,
"orderType": "LMT",
"side": match side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
},
"quantity": quantity,
"tif": "DAY",
"strategy": algo_strategy,
"strategyParameters": algo_params,
});
let responses: Vec<IBKROrderResponseItem> = self
.request(
Method::POST,
&format!("/iserver/account/{}/orders", self.config.account),
Some(serde_json::json!({ "orders": [order] })),
)
.await?;
let resp = responses.first().ok_or_else(|| {
BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
})?;
Ok(OrderResponse {
order_id: resp.order_id.clone(),
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
pub async fn pre_trade_risk_check(
&self,
order: &OrderRequest,
) -> Result<RiskCheckResult, BrokerError> {
let account = self.get_account().await?;
let conid = self.get_contract_id(&order.symbol.to_string()).await?;
let margin: IBKRMarginResponse = self
.request(
Method::POST,
"/iserver/account/margin",
Some(serde_json::json!({
"conid": conid,
"quantity": order.quantity,
"side": match order.side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
},
})),
)
.await?;
let mut warnings = Vec::new();
if margin.initial_margin > account.buying_power {
warnings.push("Insufficient buying power for order".to_string());
}
if account.daytrade_count >= 3 && account.equity < Decimal::from(25000) {
warnings.push("Pattern day trader: account equity below $25,000".to_string());
}
if margin.maintenance_margin > account.maintenance_margin {
warnings.push("Order would exceed maintenance margin requirements".to_string());
}
Ok(RiskCheckResult {
passed: warnings.is_empty(),
margin_required: margin.initial_margin,
buying_power_used: margin.initial_margin,
warnings,
})
}
pub async fn calculate_buying_power(
&self,
asset_class: &str,
) -> Result<Decimal, BrokerError> {
let account = self.get_account().await?;
let multiplier = match asset_class {
"STK" => Decimal::from(4), "OPT" => Decimal::ONE, "FUT" => Decimal::from(10), "FX" => Decimal::from(50), _ => Decimal::from(2),
};
Ok(account.buying_power * multiplier)
}
pub async fn is_pattern_day_trader(&self) -> Result<bool, BrokerError> {
let account = self.get_account().await?;
Ok(account.daytrade_count >= 3 && account.equity < Decimal::from(25000))
}
async fn get_contract_id(&self, symbol: &str) -> Result<i64, BrokerError> {
#[derive(Deserialize)]
struct ContractSearchResult {
conid: i64,
}
let results: Vec<ContractSearchResult> = self
.request(
Method::GET,
&format!("/iserver/secdef/search?symbol={}", symbol),
None::<()>,
)
.await?;
results
.first()
.map(|r| r.conid)
.ok_or_else(|| BrokerError::InvalidOrder(format!("Symbol not found: {}", symbol)))
}
async fn get_option_contract_id(&self, contract: &OptionContract) -> Result<i64, BrokerError> {
let right = match contract.right {
OptionRight::Call => "C",
OptionRight::Put => "P",
};
let local_symbol = format!(
"{}{}{}{}",
contract.underlying,
contract.expiry,
right,
contract.strike
);
let results: Vec<ContractSearchResult> = self
.request(
Method::GET,
&format!("/iserver/secdef/search?symbol={}", local_symbol),
None::<()>,
)
.await?;
results
.first()
.map(|r| r.conid)
.ok_or_else(|| BrokerError::InvalidOrder(format!("Option contract not found")))
}
async fn refresh_account(&self) -> Result<(), BrokerError> {
let accounts: Vec<String> = self
.request(Method::GET, "/portfolio/accounts", None::<()>)
.await?;
let account_id = if self.config.account.is_empty() {
accounts.first().cloned().unwrap_or_default()
} else {
self.config.account.clone()
};
let summary: IBKRAccountSummary = self
.request(
Method::GET,
&format!("/portfolio/{}/summary", account_id),
None::<()>,
)
.await?;
let account = Account {
account_id: account_id.clone(),
cash: summary.total_cash_value,
portfolio_value: summary.net_liquidation,
buying_power: summary.buying_power,
equity: summary.equity_with_loan_value,
last_equity: summary.previous_day_equity,
multiplier: "1".to_string(),
currency: summary.currency,
shorting_enabled: true,
long_market_value: summary.gross_position_value,
short_market_value: Decimal::ZERO,
initial_margin: summary.init_margin_req,
maintenance_margin: summary.maint_margin_req,
day_trading_buying_power: summary.day_trades_remaining.into(),
daytrade_count: summary.day_trades_remaining,
};
*self.account_cache.write() = Some(account);
Ok(())
}
async fn request<T: serde::de::DeserializeOwned>(
&self,
method: Method,
path: &str,
body: Option<impl Serialize>,
) -> Result<T, BrokerError> {
self.rate_limiter.until_ready().await;
let url = format!("{}{}", self.base_url, path);
let mut req = self.client.request(method.clone(), &url);
if let Some(body) = body {
req = req.json(&body);
}
debug!("IBKR API request: {} {}", method, path);
let response = req.send().await?;
match response.status() {
StatusCode::OK | StatusCode::CREATED => {
let result = response.json().await?;
Ok(result)
}
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
Err(BrokerError::Auth("IBKR authentication failed".to_string()))
}
StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
StatusCode::SERVICE_UNAVAILABLE => {
Err(BrokerError::Unavailable("IBKR service unavailable".to_string()))
}
status => {
let error_text = response.text().await.unwrap_or_default();
error!("IBKR API error {}: {}", status, error_text);
Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
}
}
}
fn convert_order(&self, order: &OrderRequest) -> IBKROrderRequest {
let order_type = match order.order_type {
OrderType::Market => "MKT",
OrderType::Limit => "LMT",
OrderType::StopLoss => "STP",
OrderType::StopLimit => "STP LMT",
};
let side = match order.side {
OrderSide::Buy => "BUY",
OrderSide::Sell => "SELL",
};
let tif = match order.time_in_force {
TimeInForce::Day => "DAY",
TimeInForce::GTC => "GTC",
TimeInForce::IOC => "IOC",
TimeInForce::FOK => "FOK",
};
IBKROrderRequest {
acct_id: self.config.account.clone(),
order_type: order_type.to_string(),
side: side.to_string(),
tif: tif.to_string(),
quantity: order.quantity.to_string(),
price: order.limit_price.map(|p| p.to_string()),
stop_price: order.stop_price.map(|p| p.to_string()),
}
}
}
#[async_trait]
impl BrokerClient for IBKRBroker {
async fn get_account(&self) -> Result<Account, BrokerError> {
if let Some(account) = self.account_cache.read().as_ref() {
return Ok(account.clone());
}
self.refresh_account().await?;
self.account_cache
.read()
.as_ref()
.cloned()
.ok_or_else(|| BrokerError::Other(anyhow::anyhow!("Failed to load account")))
}
async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
let account_id = self.get_account().await?.account_id;
let positions: Vec<IBKRPosition> = self
.request(
Method::GET,
&format!("/portfolio/{}/positions", account_id),
None::<()>,
)
.await?;
Ok(positions
.into_iter()
.map(|p| Position {
symbol: Symbol::new(p.ticker.as_str()).expect("Invalid symbol from IBKR"),
qty: p.position,
side: if p.position > 0 {
PositionSide::Long
} else {
PositionSide::Short
},
avg_entry_price: p.avg_price,
market_value: p.market_value,
cost_basis: p.avg_price * Decimal::from(p.position.abs()),
unrealized_pl: p.unrealized_pnl,
unrealized_plpc: if p.market_value != Decimal::ZERO {
(p.unrealized_pnl / p.market_value.abs()) * Decimal::from(100)
} else {
Decimal::ZERO
},
current_price: p.market_price,
lastday_price: p.market_price,
change_today: Decimal::ZERO,
})
.collect())
}
async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
let risk_check = self.pre_trade_risk_check(&order).await?;
if !risk_check.passed {
return Err(BrokerError::InvalidOrder(format!(
"Risk check failed: {:?}",
risk_check.warnings
)));
}
let conid = self.get_contract_id(&order.symbol.to_string()).await?;
let ibkr_order = self.convert_order(&order);
let response: Vec<IBKROrderResponseItem> = self
.request(
Method::POST,
&format!("/iserver/account/{}/orders", self.config.account),
Some(serde_json::json!({
"conid": conid,
"orders": [ibkr_order]
})),
)
.await?;
let resp = response.first().ok_or_else(|| {
BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
})?;
Ok(OrderResponse {
order_id: resp.order_id.clone(),
client_order_id: Uuid::new_v4().to_string(),
status: OrderStatus::Accepted,
filled_qty: 0,
filled_avg_price: None,
submitted_at: Utc::now(),
filled_at: None,
})
}
async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
let _: serde_json::Value = self
.request(
Method::DELETE,
&format!("/iserver/account/{}/order/{}", self.config.account, order_id),
None::<()>,
)
.await?;
Ok(())
}
async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
let order: IBKROrderStatus = self
.request(
Method::GET,
&format!("/iserver/account/order/status/{}", order_id),
None::<()>,
)
.await?;
let status = match order.status.as_str() {
"Submitted" => OrderStatus::Accepted,
"Filled" => OrderStatus::Filled,
"Cancelled" => OrderStatus::Cancelled,
"PendingSubmit" => OrderStatus::Pending,
_ => OrderStatus::Pending,
};
Ok(OrderResponse {
order_id: order_id.to_string(),
client_order_id: order.order_ref.unwrap_or_default(),
status,
filled_qty: order.filled_quantity.try_into().unwrap_or(0),
filled_avg_price: Some(order.avg_price),
submitted_at: Utc::now(),
filled_at: if status == OrderStatus::Filled {
Some(Utc::now())
} else {
None
},
})
}
async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
let orders: IBKROrdersResponse = self
.request(
Method::GET,
&format!("/iserver/account/{}/orders", self.config.account),
None::<()>,
)
.await?;
Ok(orders
.orders
.into_iter()
.filter_map(|o| {
Some(OrderResponse {
order_id: o.order_id?,
client_order_id: o.order_ref.unwrap_or_default(),
status: OrderStatus::Pending,
filled_qty: o.filled_quantity.and_then(|q| q.try_into().ok()).unwrap_or(0),
filled_avg_price: o.avg_price,
submitted_at: Utc::now(),
filled_at: None,
})
})
.collect())
}
async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
match *self.connection_status.read() {
ConnectionStatus::Connected => Ok(HealthStatus::Healthy),
ConnectionStatus::Reconnecting => Ok(HealthStatus::Degraded),
ConnectionStatus::Disconnected => Ok(HealthStatus::Unhealthy),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct IBKRStatus {
authenticated: bool,
competing: bool,
connected: bool,
}
#[derive(Debug, Serialize)]
struct IBKROrderRequest {
acct_id: String,
order_type: String,
side: String,
tif: String,
quantity: String,
#[serde(skip_serializing_if = "Option::is_none")]
price: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stop_price: Option<String>,
}
#[derive(Debug, Deserialize)]
struct IBKRAccountSummary {
#[serde(rename = "totalcashvalue")]
total_cash_value: Decimal,
#[serde(rename = "netliquidation")]
net_liquidation: Decimal,
#[serde(rename = "buyingpower")]
buying_power: Decimal,
#[serde(rename = "equitywithloanvalue")]
equity_with_loan_value: Decimal,
#[serde(rename = "previousdayequitywithloanvalue")]
previous_day_equity: Decimal,
#[serde(rename = "grosspositionvalue")]
gross_position_value: Decimal,
#[serde(rename = "initmarginreq")]
init_margin_req: Decimal,
#[serde(rename = "maintmarginreq")]
maint_margin_req: Decimal,
#[serde(rename = "daytradesremaining")]
day_trades_remaining: i32,
currency: String,
}
#[derive(Debug, Deserialize)]
struct IBKRPosition {
ticker: String,
position: i64,
#[serde(rename = "mktPrice")]
market_price: Decimal,
#[serde(rename = "mktValue")]
market_value: Decimal,
#[serde(rename = "avgPrice")]
avg_price: Decimal,
#[serde(rename = "unrealizedPnL")]
unrealized_pnl: Decimal,
}
#[derive(Debug, Deserialize)]
struct IBKROrderStatus {
#[serde(rename = "orderId")]
order_id: String,
status: String,
#[serde(rename = "filledQuantity")]
filled_quantity: i64,
#[serde(rename = "remainingQuantity")]
remaining_quantity: i64,
#[serde(rename = "avgPrice")]
avg_price: Decimal,
#[serde(rename = "limitPrice")]
limit_price: Option<Decimal>,
symbol: Option<String>,
#[serde(rename = "orderRef")]
order_ref: Option<String>,
}
#[derive(Debug, Deserialize)]
struct IBKROrdersResponse {
orders: Vec<IBKROrderItem>,
}
#[derive(Debug, Deserialize)]
struct IBKROrderItem {
#[serde(rename = "orderId")]
order_id: Option<String>,
#[serde(rename = "orderRef")]
order_ref: Option<String>,
ticker: Option<String>,
#[serde(rename = "totalSize")]
total_size: Option<i64>,
#[serde(rename = "filledQuantity")]
filled_quantity: Option<i64>,
price: Option<Decimal>,
#[serde(rename = "avgPrice")]
avg_price: Option<Decimal>,
}
#[derive(Debug, Serialize)]
struct IBKRMarketDataRequest {
conid: i64,
fields: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct IBKROptionChain {
strikes: Vec<IBKRStrike>,
}
#[derive(Debug, Deserialize)]
struct IBKRStrike {
strike: Decimal,
expiry: String,
}
#[derive(Debug, Deserialize)]
struct IBKRGreeksResponse {
delta: Option<f64>,
gamma: Option<f64>,
theta: Option<f64>,
vega: Option<f64>,
rho: Option<f64>,
#[serde(rename = "impliedVol")]
implied_vol: Option<f64>,
}
impl From<IBKRGreeksResponse> for OptionGreeks {
fn from(r: IBKRGreeksResponse) -> Self {
Self {
delta: r.delta.unwrap_or(0.0),
gamma: r.gamma.unwrap_or(0.0),
theta: r.theta.unwrap_or(0.0),
vega: r.vega.unwrap_or(0.0),
rho: r.rho.unwrap_or(0.0),
implied_volatility: r.implied_vol.unwrap_or(0.0),
}
}
}
#[derive(Debug, Deserialize)]
struct IBKROrderResponseItem {
order_id: String,
}
#[derive(Debug, Deserialize)]
struct IBKRMarginResponse {
#[serde(rename = "initialMargin")]
initial_margin: Decimal,
#[serde(rename = "maintenanceMargin")]
maintenance_margin: Decimal,
}
#[derive(Debug, Deserialize)]
struct IBKRHistoricalResponse {
data: Vec<IBKRHistoricalBar>,
}
#[derive(Debug, Deserialize)]
struct IBKRHistoricalBar {
t: i64, o: f64, h: f64, l: f64, c: f64, v: i64, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistoricalBar {
pub timestamp: DateTime<Utc>,
pub open: Decimal,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub volume: i64,
}
impl From<IBKRHistoricalBar> for HistoricalBar {
fn from(b: IBKRHistoricalBar) -> Self {
use chrono::TimeZone;
Self {
timestamp: Utc.timestamp_opt(b.t, 0).unwrap(),
open: Decimal::from_f64_retain(b.o).unwrap(),
high: Decimal::from_f64_retain(b.h).unwrap(),
low: Decimal::from_f64_retain(b.l).unwrap(),
close: Decimal::from_f64_retain(b.c).unwrap(),
volume: b.v,
}
}
}
#[derive(Debug, Deserialize)]
struct ContractSearchResult {
conid: i64,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_ibkr_broker_creation() {
let _config = IBKRConfig::default();
let broker = IBKRBroker::new(config);
assert_eq!(*broker.connection_status.read(), ConnectionStatus::Disconnected);
}
#[tokio::test]
async fn test_health_check() {
let _config = IBKRConfig::default();
let broker = IBKRBroker::new(config);
let health = broker.health_check().await.unwrap();
assert_eq!(health, HealthStatus::Unhealthy);
}
#[tokio::test]
async fn test_bracket_order_structure() {
let entry = OrderRequest {
symbol: Symbol::new("AAPL").unwrap(),
quantity: 100,
side: OrderSide::Buy,
order_type: OrderType::Limit,
time_in_force: TimeInForce::Day,
limit_price: Some(Decimal::from(150)),
stop_price: None,
};
let stop_loss = OrderRequest {
symbol: Symbol::new("AAPL").unwrap(),
quantity: 100,
side: OrderSide::Sell,
order_type: OrderType::StopLoss,
time_in_force: TimeInForce::GTC,
limit_price: None,
stop_price: Some(Decimal::from(145)),
};
let take_profit = OrderRequest {
symbol: Symbol::new("AAPL").unwrap(),
quantity: 100,
side: OrderSide::Sell,
order_type: OrderType::Limit,
time_in_force: TimeInForce::GTC,
limit_price: Some(Decimal::from(160)),
stop_price: None,
};
let bracket = BracketOrder {
entry,
stop_loss,
take_profit,
};
assert_eq!(bracket.entry.quantity, 100);
assert_eq!(bracket.stop_loss.stop_price.unwrap(), Decimal::from(145));
assert_eq!(bracket.take_profit.limit_price.unwrap(), Decimal::from(160));
}
#[tokio::test]
async fn test_option_contract() {
let contract = OptionContract {
underlying: "AAPL".to_string(),
strike: Decimal::from(150),
expiry: "20250117".to_string(),
right: OptionRight::Call,
multiplier: 100,
};
assert_eq!(contract.underlying, "AAPL");
assert_eq!(contract.strike, Decimal::from(150));
assert!(matches!(contract.right, OptionRight::Call));
}
#[tokio::test]
async fn test_trailing_stop_types() {
let pct_trail = TrailingStop::Percentage(5.0);
let dollar_trail = TrailingStop::Dollar(Decimal::from(10));
match pct_trail {
TrailingStop::Percentage(p) => assert_eq!(p, 5.0),
_ => panic!("Wrong trailing stop type"),
}
match dollar_trail {
TrailingStop::Dollar(d) => assert_eq!(d, Decimal::from(10)),
_ => panic!("Wrong trailing stop type"),
}
}
}