pub mod account;
pub mod contract;
pub mod execution;
pub mod order;
use crate::{
AccountEventKind, AccountSnapshot, InstrumentAccountSnapshot, Snapshot, UnindexedAccountEvent,
UnindexedAccountSnapshot,
balance::AssetBalance,
client::{BracketOrderClient, ExecutionClient},
error::{ApiError, ConnectivityError, OrderError, UnindexedClientError},
order::{
Order, OrderKey, OrderKind, TimeInForce,
bracket::{
BracketOrderRequest as UnifiedBracketOrderRequest,
BracketOrderResult as UnifiedBracketOrderResult,
},
id::{ClientOrderId, OrderId, StrategyId},
request::{
OrderRequestCancel, OrderRequestOpen, OrderResponseCancel, UnindexedOrderResponseCancel,
},
state::{Cancelled, Expired, Filled, Open, OrderState, UnindexedOrderState},
},
trade::{AssetFees, Trade, TradeId},
};
use account::BalanceAggregator;
use chrono::{DateTime, Utc};
use execution::{ExecutionBuffer, parse_decimal_or_warn, parse_ib_side};
use futures::stream::BoxStream;
use ibapi::{
accounts::{AccountSummaryResult, types::AccountGroup},
client::blocking::Client,
};
pub use order::{BracketOrderRequest, BracketOrderResult};
use order::{
OrderContext, OrderIdMap, PendingCancels, build_ib_bracket_with_oca, build_ib_order,
side_to_action, time_in_force_to_ib,
};
use parking_lot::Mutex;
use rust_decimal::Decimal;
use rustrade_instrument::{
Side, asset::name::AssetNameExchange, exchange::ExchangeId, ibkr::ContractRegistry,
instrument::name::InstrumentNameExchange,
};
use serde::{Deserialize, Serialize};
use smol_str::format_smolstr;
use std::{
collections::HashSet,
panic::{AssertUnwindSafe, catch_unwind},
sync::Arc,
};
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IbkrConfig {
pub host: String,
pub port: u16,
pub client_id: i32,
pub account: String,
#[serde(default)]
pub contracts: Vec<ContractConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContractConfig {
pub name: String,
pub symbol: String,
pub security_type: String,
pub exchange: String,
pub currency: String,
#[serde(default)]
pub last_trade_date: Option<String>,
#[serde(default)]
pub strike: Option<f64>,
#[serde(default)]
pub right: Option<String>,
}
impl ContractConfig {
fn to_contract(&self) -> ibapi::contracts::Contract {
match self.security_type.as_str() {
"STK" => contract::stock_contract(&self.symbol, &self.exchange, &self.currency),
"FUT" => contract::futures_contract(
&self.symbol,
self.last_trade_date.as_deref().unwrap_or(""),
&self.exchange,
&self.currency,
),
"OPT" => contract::option_contract(
&self.symbol,
self.last_trade_date.as_deref().unwrap_or(""),
self.strike.unwrap_or(0.0),
self.right.as_deref().unwrap_or("C"),
&self.exchange,
&self.currency,
),
"CASH" => contract::forex_contract(&self.symbol, &self.currency),
other => {
warn!(security_type = %other, symbol = %self.symbol, "Unknown security_type, defaulting to STK");
contract::stock_contract(&self.symbol, &self.exchange, &self.currency)
}
}
}
}
static ACCOUNT_GROUP_ALL: std::sync::LazyLock<AccountGroup> =
std::sync::LazyLock::new(|| AccountGroup("All".to_string()));
const POSITION_STREAM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
#[derive(Clone)]
pub struct IbkrClient {
config: Arc<IbkrConfig>,
client: Arc<Client>,
contracts: ContractRegistry,
order_ids: OrderIdMap,
pending_cancels: PendingCancels,
execution_buffer: ExecutionBuffer,
next_order_id: Arc<Mutex<i32>>,
}
impl std::fmt::Debug for IbkrClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IbkrClient")
.field("config", &self.config)
.field("contracts_count", &self.contracts.len())
.field("pending_orders", &self.order_ids.len())
.finish()
}
}
impl IbkrClient {
pub fn connect_sync(config: IbkrConfig) -> Result<Self, UnindexedClientError> {
let url = format!("{}:{}", config.host, config.port);
info!(url = %url, client_id = config.client_id, "Connecting to IB");
let client = Client::connect(&url, config.client_id).map_err(|e| {
UnindexedClientError::Connectivity(ConnectivityError::Socket(e.to_string()))
})?;
let next_id = client.next_order_id();
let contracts = ContractRegistry::new();
for contract_config in &config.contracts {
let contract = contract_config.to_contract();
let name = InstrumentNameExchange::from(contract_config.name.as_str());
match client.contract_details(&contract) {
Ok(details) => {
if let Some(detail) = details.into_iter().next() {
contracts.register(name.clone(), detail.contract.clone());
debug!(name = %name, con_id = detail.contract.contract_id, "Registered contract");
}
}
Err(e) => {
warn!(name = %name, error = %e, "Failed to resolve contract");
}
}
}
info!(
contracts = contracts.len(),
next_order_id = next_id,
"Connected to IB"
);
Ok(Self {
config: Arc::new(config),
client: Arc::new(client),
contracts,
order_ids: OrderIdMap::new(),
pending_cancels: PendingCancels::new(),
execution_buffer: ExecutionBuffer::new(),
next_order_id: Arc::new(Mutex::new(next_id)),
})
}
fn allocate_order_id(&self) -> i32 {
self.allocate_order_id_range(1)
}
#[allow(clippy::expect_used)] fn allocate_order_id_range(&self, count: u32) -> i32 {
let count_i32: i32 = count.try_into().expect("count exceeds i32::MAX");
let mut id = self.next_order_id.lock();
let base = *id;
*id = id
.checked_add(count_i32)
.expect("order ID overflow: i32::MAX exceeded");
base
}
pub fn register_contract(
&self,
name: InstrumentNameExchange,
contract: ibapi::contracts::Contract,
) {
self.contracts.register(name, contract);
}
pub fn contract_registry(&self) -> &ContractRegistry {
&self.contracts
}
pub fn pending_execution_count(&self) -> usize {
self.execution_buffer.pending_count()
}
pub fn clear_stale_executions(&self, max_age: std::time::Duration) -> usize {
self.execution_buffer.clear_stale(max_age)
}
pub fn clear_stale_order_ids(&self, max_age: std::time::Duration) -> usize {
self.order_ids.clear_stale(max_age)
}
pub fn clear_stale_pending_cancels(&self, max_age: std::time::Duration) -> usize {
self.pending_cancels.clear_stale(max_age)
}
pub fn disconnect(&self) {
debug!("Disconnecting IbkrClient");
self.client.disconnect();
}
pub async fn open_bracket_order(&self, request: BracketOrderRequest) -> BracketOrderResult {
let instrument = request.instrument.clone();
let contract = match self.contracts.get_contract(&instrument) {
Some(c) => c,
None => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::InstrumentInvalid(
instrument,
"contract not registered".to_string(),
)),
);
}
};
let quantity: f64 = match request.quantity.try_into() {
Ok(q) => q,
Err(_) => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"quantity {} exceeds f64 range",
request.quantity
))),
);
}
};
let entry_price: f64 = match request.entry_price.try_into() {
Ok(p) => p,
Err(_) => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"entry_price {} exceeds f64 range",
request.entry_price
))),
);
}
};
let tp_price: f64 = match request.take_profit_price.try_into() {
Ok(p) => p,
Err(_) => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"take_profit_price {} exceeds f64 range",
request.take_profit_price
))),
);
}
};
let sl_price: f64 = match request.stop_loss_price.try_into() {
Ok(p) => p,
Err(_) => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"stop_loss_price {} exceeds f64 range",
request.stop_loss_price
))),
);
}
};
let ib_tif = match time_in_force_to_ib(&request.time_in_force) {
Ok(tif) => tif,
Err(e) => {
return make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"TIF not supported by IB: {e}"
))),
);
}
};
let parent_ib_id = self.allocate_order_id_range(3);
let tp_ib_id = parent_ib_id + 1;
let sl_ib_id = parent_ib_id + 2;
let action = side_to_action(request.side);
let ib_orders = build_ib_bracket_with_oca(
parent_ib_id,
action,
quantity,
entry_price,
tp_price,
sl_price,
ib_tif,
);
let parent_cid = request.parent_cid.clone();
let (tp_cid, sl_cid) = derive_child_cids(&parent_cid);
let exit_side = match request.side {
Side::Buy => Side::Sell,
Side::Sell => Side::Buy,
};
let parent_ctx = OrderContext {
instrument: instrument.clone(),
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
};
let tp_ctx = OrderContext {
instrument: instrument.clone(),
side: exit_side,
price: Some(request.take_profit_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
};
let sl_ctx = OrderContext {
instrument: instrument.clone(),
side: exit_side,
price: None, quantity: request.quantity,
kind: OrderKind::Stop {
trigger_price: request.stop_loss_price,
},
time_in_force: request.time_in_force,
};
self.order_ids
.register(parent_cid.clone(), parent_ib_id, parent_ctx);
self.order_ids.register(tp_cid.clone(), tp_ib_id, tp_ctx);
self.order_ids.register(sl_cid.clone(), sl_ib_id, sl_ctx);
let client = self.client.clone();
let result = tokio::task::spawn_blocking(move || {
use ibapi::orders::PlaceOrder;
let parent_sub = match client.place_order(parent_ib_id, &contract, &ib_orders[0]) {
Ok(s) => s,
Err(e) => return Err(format!("parent order failed: {e}")),
};
let tp_sub = match client.place_order(tp_ib_id, &contract, &ib_orders[1]) {
Ok(s) => s,
Err(e) => {
let _ = client.cancel_order(parent_ib_id, "");
return Err(format!("take_profit order failed: {e}"));
}
};
let sl_sub = match client.place_order(sl_ib_id, &contract, &ib_orders[2]) {
Ok(s) => s,
Err(e) => {
let _ = client.cancel_order(parent_ib_id, "");
let _ = client.cancel_order(tp_ib_id, "");
return Err(format!("stop_loss order failed: {e}"));
}
};
let mut parent_status = None;
let mut tp_status = None;
let mut sl_status = None;
for event in parent_sub {
if let PlaceOrder::OrderStatus(status) = event {
match status.status.as_str() {
"Submitted" | "PreSubmitted" | "PendingSubmit" => {
parent_status = Some(Ok(status.filled));
break;
}
"Cancelled" | "Inactive" => {
parent_status = Some(Err(status.status));
break;
}
other => {
parent_status = Some(Err(format!("unexpected parent status: {other}")));
break;
}
}
}
}
for event in tp_sub {
if let PlaceOrder::OrderStatus(status) = event {
match status.status.as_str() {
"Submitted" | "PreSubmitted" | "PendingSubmit" => {
tp_status = Some(Ok(status.filled));
break;
}
"Cancelled" | "Inactive" => {
tp_status = Some(Err(status.status));
break;
}
other => {
tp_status =
Some(Err(format!("unexpected take_profit status: {other}")));
break;
}
}
}
}
for event in sl_sub {
if let PlaceOrder::OrderStatus(status) = event {
match status.status.as_str() {
"Submitted" | "PreSubmitted" | "PendingSubmit" => {
sl_status = Some(Ok(status.filled));
break;
}
"Cancelled" | "Inactive" => {
sl_status = Some(Err(status.status));
break;
}
other => {
sl_status = Some(Err(format!("unexpected stop_loss status: {other}")));
break;
}
}
}
}
match (parent_status, tp_status, sl_status) {
(Some(Ok(parent)), Some(Ok(tp)), Some(Ok(sl))) => Ok((parent, tp, sl)),
(parent, tp, sl) => {
let _ = client.cancel_order(parent_ib_id, "");
let _ = client.cancel_order(tp_ib_id, "");
let _ = client.cancel_order(sl_ib_id, "");
Err(format!(
"bracket order rejected: parent={parent:?}, tp={tp:?}, sl={sl:?}"
))
}
}
})
.await;
match result {
Ok(Ok((parent_filled, tp_filled, sl_filled))) => {
let now = Utc::now();
let parent_filled_dec = parse_decimal_or_warn(parent_filled, "parent.filled");
let tp_filled_dec = parse_decimal_or_warn(tp_filled, "tp.filled");
let sl_filled_dec = parse_decimal_or_warn(sl_filled, "sl.filled");
BracketOrderResult {
parent: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: instrument.clone(),
strategy: request.strategy.clone(),
cid: parent_cid,
},
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::active(Open::new(
OrderId::new(format_smolstr!("{}", parent_ib_id)),
now,
parent_filled_dec,
)),
},
take_profit: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: instrument.clone(),
strategy: request.strategy.clone(),
cid: tp_cid,
},
side: exit_side,
price: Some(request.take_profit_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::active(Open::new(
OrderId::new(format_smolstr!("{}", tp_ib_id)),
now,
tp_filled_dec,
)),
},
stop_loss: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: instrument.clone(),
strategy: request.strategy.clone(),
cid: sl_cid,
},
side: exit_side,
price: None, quantity: request.quantity,
kind: OrderKind::Stop {
trigger_price: request.stop_loss_price,
},
time_in_force: request.time_in_force,
state: OrderState::active(Open::new(
OrderId::new(format_smolstr!("{}", sl_ib_id)),
now,
sl_filled_dec,
)),
},
}
}
Ok(Err(err_msg)) => {
self.order_ids.remove_by_ib_id(parent_ib_id);
self.order_ids.remove_by_ib_id(tp_ib_id);
self.order_ids.remove_by_ib_id(sl_ib_id);
make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(err_msg)),
)
}
Err(join_err) => {
self.order_ids.remove_by_ib_id(parent_ib_id);
self.order_ids.remove_by_ib_id(tp_ib_id);
self.order_ids.remove_by_ib_id(sl_ib_id);
make_all_inactive_bracket(
&request,
OrderError::Rejected(ApiError::OrderRejected(format!(
"task join error: {join_err}"
))),
)
}
}
}
}
fn derive_child_cids(parent_cid: &ClientOrderId) -> (ClientOrderId, ClientOrderId) {
(
ClientOrderId::new(format_smolstr!("{}_tp", parent_cid.0)),
ClientOrderId::new(format_smolstr!("{}_sl", parent_cid.0)),
)
}
fn make_all_inactive_bracket(
request: &BracketOrderRequest,
error: OrderError<AssetNameExchange, InstrumentNameExchange>,
) -> BracketOrderResult {
let exit_side = match request.side {
Side::Buy => Side::Sell,
Side::Sell => Side::Buy,
};
let parent_cid = request.parent_cid.clone();
let (tp_cid, sl_cid) = derive_child_cids(&parent_cid);
BracketOrderResult {
parent: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: request.instrument.clone(),
strategy: request.strategy.clone(),
cid: parent_cid,
},
side: request.side,
price: Some(request.entry_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(error.clone()),
},
take_profit: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: request.instrument.clone(),
strategy: request.strategy.clone(),
cid: tp_cid,
},
side: exit_side,
price: Some(request.take_profit_price),
quantity: request.quantity,
kind: OrderKind::Limit,
time_in_force: request.time_in_force,
state: OrderState::inactive(error.clone()),
},
stop_loss: Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: request.instrument.clone(),
strategy: request.strategy.clone(),
cid: sl_cid,
},
side: exit_side,
price: None, quantity: request.quantity,
kind: OrderKind::Stop {
trigger_price: request.stop_loss_price,
},
time_in_force: request.time_in_force,
state: OrderState::inactive(error),
},
}
}
impl ExecutionClient for IbkrClient {
const EXCHANGE: ExchangeId = ExchangeId::Ibkr;
type Config = IbkrConfig;
type AccountStream = BoxStream<'static, UnindexedAccountEvent>;
#[track_caller]
fn new(config: Self::Config) -> Self {
#[allow(clippy::expect_used)] Self::connect_sync(config).expect("failed to connect to IB")
}
async fn account_snapshot(
&self,
assets: &[AssetNameExchange],
instruments: &[InstrumentNameExchange],
) -> Result<UnindexedAccountSnapshot, UnindexedClientError> {
let client = self.client.clone();
let contracts = self.contracts.clone();
let instruments_filter: Option<HashSet<_>> = if instruments.is_empty() {
None
} else {
Some(instruments.iter().cloned().collect())
};
let balances_future = self.fetch_balances(assets);
let positions_future = tokio::task::spawn_blocking(move || {
use ibapi::accounts::PositionUpdate;
let positions_sub = client
.positions()
.map_err(|e| UnindexedClientError::Internal(format!("positions: {e}")))?;
let mut snapshots = Vec::new();
let mut seen = HashSet::new();
while let Some(pos_update) = positions_sub.next_timeout(POSITION_STREAM_TIMEOUT) {
let PositionUpdate::Position(pos) = pos_update else {
trace!(?pos_update, "Ignoring non-Position variant");
continue;
};
let Some(instrument) = contracts.get_name_by_con_id(pos.contract.contract_id)
else {
continue;
};
if instruments_filter
.as_ref()
.is_some_and(|f| !f.contains(&instrument))
{
continue;
}
if seen.contains(&instrument) {
debug!(
instrument = %instrument,
"Duplicate position for instrument (multi-account?), skipping"
);
continue;
}
seen.insert(instrument.clone());
snapshots.push(InstrumentAccountSnapshot {
instrument,
orders: Vec::new(),
position: None,
});
}
Ok::<_, UnindexedClientError>(snapshots)
});
let (balances_result, positions_result) = tokio::join!(balances_future, positions_future);
let balances = balances_result?;
let instrument_snapshots = positions_result
.map_err(|e| UnindexedClientError::TaskFailed(format!("task join: {e}")))??;
Ok(AccountSnapshot {
exchange: ExchangeId::Ibkr,
balances,
instruments: instrument_snapshots,
})
}
async fn account_stream(
&self,
_assets: &[AssetNameExchange],
_instruments: &[InstrumentNameExchange],
) -> Result<Self::AccountStream, UnindexedClientError> {
let client = self.client.clone();
let order_sub = tokio::task::spawn_blocking(move || client.order_update_stream())
.await
.map_err(|e| UnindexedClientError::TaskFailed(format!("task join: {e}")))?
.map_err(|e| UnindexedClientError::Internal(format!("order updates: {e}")))?;
let contracts_clone = self.contracts.clone();
let order_ids_clone = self.order_ids.clone();
let pending_cancels_clone = self.pending_cancels.clone();
let exec_buffer_clone = self.execution_buffer.clone();
let (tx, rx) = mpsc::unbounded_channel();
std::thread::Builder::new()
.name("ibkr-order-stream".to_string())
.spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
use ibapi::orders::OrderUpdate;
for update in order_sub {
let event = match update {
OrderUpdate::OrderStatus(status) => {
let ib_id = status.order_id;
let is_terminal =
matches!(status.status.as_str(), "Cancelled" | "Inactive");
let lookup_result = if is_terminal {
order_ids_clone.remove_and_get_context(ib_id)
} else {
order_ids_clone.get_client_id_and_context(ib_id)
};
if let Some((client_id, ctx)) = lookup_result {
let order = make_order_from_status(
&status,
client_id,
&ctx,
&pending_cancels_clone,
);
Some(UnindexedAccountEvent {
exchange: ExchangeId::Ibkr,
kind: AccountEventKind::OrderSnapshot(Snapshot::new(order)),
})
} else {
debug!(ib_order_id = ib_id, "OrderStatus for unknown order ID");
None
}
}
OrderUpdate::ExecutionData(exec) => {
let order_id = exec.execution.order_id;
let con_id = exec.contract.contract_id;
let Some(client_id) = order_ids_clone.get_client_id(order_id)
else {
debug!(
ib_order_id = order_id,
con_id, "ExecutionData for unknown order ID, dropping"
);
continue;
};
let Some(instrument) = contracts_clone.get_name_by_con_id(con_id)
else {
debug!(
ib_order_id = order_id,
con_id, "ExecutionData for unknown contract ID, dropping"
);
continue;
};
exec_buffer_clone.add_execution(exec, instrument, client_id);
None
}
OrderUpdate::CommissionReport(report) => exec_buffer_clone
.complete_with_commission(&report)
.map(|trade| UnindexedAccountEvent {
exchange: ExchangeId::Ibkr,
kind: AccountEventKind::Trade(trade),
}),
_ => None,
};
if let Some(e) = event
&& tx.send(e).is_err()
{
break;
}
}
}));
if let Err(panic_info) = result {
let msg = panic_info
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic_info.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!("Order stream worker panicked: {msg}");
}
})
.map_err(|e| UnindexedClientError::TaskFailed(format!("thread spawn: {e}")))?;
Ok(Box::pin(
tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
))
}
async fn cancel_order(
&self,
request: OrderRequestCancel<ExchangeId, &InstrumentNameExchange>,
) -> Option<UnindexedOrderResponseCancel> {
let key = OrderKey {
exchange: request.key.exchange,
instrument: request.key.instrument.clone(),
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
};
let ib_order_id = match self.order_ids.get_ib_id(&request.key.cid) {
Some(id) => id,
None => {
return Some(OrderResponseCancel {
key,
state: Err(crate::error::OrderError::Rejected(ApiError::OrderRejected(
"order ID not found in map".to_string(),
))),
});
}
};
let client = self.client.clone();
let result =
tokio::task::spawn_blocking(move || client.cancel_order(ib_order_id, "")).await;
match result {
Ok(Ok(_sub)) => {
self.pending_cancels.insert(ib_order_id);
Some(OrderResponseCancel {
key,
state: Ok(Cancelled::new(
OrderId::new(format_smolstr!("{}", ib_order_id)),
Utc::now(),
Decimal::ZERO,
)),
})
}
Ok(Err(e)) => {
error!(order_id = ib_order_id, error = %e, "Failed to cancel order");
Some(OrderResponseCancel {
key,
state: Err(crate::error::OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
})
}
Err(e) => {
error!(order_id = ib_order_id, error = %e, "Task join error");
Some(OrderResponseCancel {
key,
state: Err(crate::error::OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
})
}
}
}
async fn open_order(
&self,
request: OrderRequestOpen<ExchangeId, &InstrumentNameExchange>,
) -> Option<Order<ExchangeId, InstrumentNameExchange, UnindexedOrderState>> {
let key = OrderKey {
exchange: ExchangeId::Ibkr,
instrument: request.key.instrument.clone(),
strategy: request.key.strategy.clone(),
cid: request.key.cid.clone(),
};
let contract = match self.contracts.get_contract(request.key.instrument) {
Some(c) => c,
None => {
return Some(Order {
key,
side: request.state.side,
price: request.state.price,
quantity: request.state.quantity,
kind: request.state.kind,
time_in_force: request.state.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::InstrumentInvalid(
request.key.instrument.clone(),
"contract not registered".to_string(),
))),
});
}
};
let quantity: f64 = match request.state.quantity.try_into() {
Ok(q) => q,
Err(_) => {
return Some(Order {
key,
side: request.state.side,
price: request.state.price,
quantity: request.state.quantity,
kind: request.state.kind,
time_in_force: request.state.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
format!("quantity {} exceeds f64 range", request.state.quantity),
))),
});
}
};
let ib_order = match build_ib_order(
request.state.side,
quantity,
&request.state.kind,
request.state.price,
&request.state.time_in_force,
) {
Ok(o) => o,
Err(e) => {
return Some(Order {
key,
side: request.state.side,
price: request.state.price,
quantity: request.state.quantity,
kind: request.state.kind,
time_in_force: request.state.time_in_force,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
});
}
};
let ib_order_id = self.allocate_order_id();
let context = OrderContext {
instrument: request.key.instrument.clone(),
side: request.state.side,
price: request.state.price,
quantity: request.state.quantity,
kind: request.state.kind,
time_in_force: request.state.time_in_force,
};
self.order_ids
.register(request.key.cid.clone(), ib_order_id, context);
let client = self.client.clone();
let side = request.state.side;
let price = request.state.price;
let req_quantity = request.state.quantity;
let kind = request.state.kind;
let tif = request.state.time_in_force;
let result = tokio::task::spawn_blocking(move || {
use ibapi::orders::PlaceOrder;
let sub = match client.place_order(ib_order_id, &contract, &ib_order) {
Ok(s) => s,
Err(e) => return Err(e.to_string()),
};
for event in sub {
if let PlaceOrder::OrderStatus(status) = event {
match status.status.as_str() {
"Submitted" | "PreSubmitted" | "PendingSubmit" => {
let filled = parse_decimal_or_warn(status.filled, "status.filled");
return Ok(Some((ib_order_id, filled)));
}
"Cancelled" | "Inactive" => {
return Err(status.status);
}
_ => continue,
}
}
}
Ok(None)
})
.await;
match result {
Ok(Ok(Some((order_id, filled)))) => {
Some(Order {
key,
side,
price,
quantity: req_quantity,
kind,
time_in_force: tif,
state: OrderState::active(Open::new(
OrderId::new(format_smolstr!("{}", order_id)),
Utc::now(),
filled,
)),
})
}
Ok(Ok(None)) => {
warn!(
ib_order_id,
"Order subscription ended without terminal status, returning Open"
);
Some(Order {
key,
side,
price,
quantity: req_quantity,
kind,
time_in_force: tif,
state: OrderState::active(Open::new(
OrderId::new(format_smolstr!("{}", ib_order_id)),
Utc::now(),
Decimal::ZERO,
)),
})
}
Ok(Err(status)) => {
self.order_ids.remove_by_ib_id(ib_order_id);
Some(Order {
key,
side,
price,
quantity: req_quantity,
kind,
time_in_force: tif,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
status,
))),
})
}
Err(e) => {
self.order_ids.remove_by_ib_id(ib_order_id);
Some(Order {
key,
side,
price,
quantity: req_quantity,
kind,
time_in_force: tif,
state: OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
e.to_string(),
))),
})
}
}
}
async fn fetch_balances(
&self,
assets: &[AssetNameExchange],
) -> Result<Vec<AssetBalance<AssetNameExchange>>, UnindexedClientError> {
let client = self.client.clone();
let assets_filter: Option<HashSet<AssetNameExchange>> = if assets.is_empty() {
None
} else {
Some(assets.iter().cloned().collect())
};
tokio::task::spawn_blocking(move || {
let sub = client
.account_summary(&ACCOUNT_GROUP_ALL, &["TotalCashValue", "AvailableFunds"])
.map_err(|e| UnindexedClientError::Internal(format!("account_summary: {e}")))?;
let mut aggregator = BalanceAggregator::new();
for summary in sub {
match summary {
AccountSummaryResult::Summary(s) => aggregator.process(&s),
AccountSummaryResult::End => break,
}
}
let mut balances = aggregator.to_balances();
if let Some(ref filter) = assets_filter {
balances.retain(|b| filter.contains(&b.asset));
}
Ok(balances)
})
.await
.map_err(|e| UnindexedClientError::TaskFailed(format!("task join: {e}")))?
}
async fn fetch_open_orders(
&self,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Order<ExchangeId, InstrumentNameExchange, Open>>, UnindexedClientError> {
let client = self.client.clone();
let contracts = self.contracts.clone();
let order_ids = self.order_ids.clone();
let instruments_filter: Option<HashSet<_>> = if instruments.is_empty() {
None
} else {
Some(instruments.iter().cloned().collect())
};
tokio::task::spawn_blocking(move || {
use ibapi::orders::Orders;
let sub = client
.all_open_orders()
.map_err(|e| UnindexedClientError::Internal(format!("open_orders: {e}")))?;
let mut orders = Vec::new();
for order_item in sub {
let order_data = match order_item {
Orders::OrderData(data) => data,
_ => continue,
};
let instrument = match contracts.get_name_by_con_id(order_data.contract.contract_id)
{
Some(i) => i,
None => continue,
};
if instruments_filter
.as_ref()
.is_some_and(|f| !f.contains(&instrument))
{
continue;
}
let client_id = order_ids
.get_client_id(order_data.order_id)
.unwrap_or_else(|| {
ClientOrderId::new(format_smolstr!("{}", order_data.order_id))
});
let side = match order_data.order.action {
ibapi::orders::Action::Buy => Side::Buy,
ibapi::orders::Action::Sell
| ibapi::orders::Action::SellShort
| ibapi::orders::Action::SellLong => Side::Sell,
};
let price = order_data
.order
.limit_price
.map(|p| parse_decimal_or_warn(p, "limit_price"));
let kind = if order_data.order.order_type == "LMT" {
OrderKind::Limit
} else {
OrderKind::Market
};
orders.push(Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument,
strategy: StrategyId::unknown(),
cid: client_id,
},
side,
price,
quantity: parse_decimal_or_warn(
order_data.order.total_quantity,
"total_quantity",
),
kind,
time_in_force: TimeInForce::GoodUntilCancelled { post_only: false },
state: Open::new(
OrderId::new(format_smolstr!("{}", order_data.order_id)),
Utc::now(),
Decimal::ZERO, ),
});
}
Ok(orders)
})
.await
.map_err(|e| UnindexedClientError::TaskFailed(format!("task join: {e}")))?
}
async fn fetch_trades(
&self,
time_since: DateTime<Utc>,
instruments: &[InstrumentNameExchange],
) -> Result<Vec<Trade<AssetNameExchange, InstrumentNameExchange>>, UnindexedClientError> {
let client = self.client.clone();
let contracts = self.contracts.clone();
let order_ids = self.order_ids.clone();
let instruments_filter: Option<HashSet<_>> = if instruments.is_empty() {
None
} else {
Some(instruments.iter().cloned().collect())
};
tokio::task::spawn_blocking(move || {
use ibapi::orders::Executions;
let exec_filter = ibapi::orders::ExecutionFilter::default();
let sub = client
.executions(exec_filter)
.map_err(|e| UnindexedClientError::Internal(format!("executions: {e}")))?;
let mut trades = Vec::new();
for exec_item in sub {
let exec_data = match exec_item {
Executions::ExecutionData(data) => data,
_ => continue,
};
let instrument = match contracts.get_name_by_con_id(exec_data.contract.contract_id)
{
Some(i) => i,
None => continue,
};
if instruments_filter
.as_ref()
.is_some_and(|f| !f.contains(&instrument))
{
continue;
}
let exec = &exec_data.execution;
let exec_time = match execution::parse_ib_timestamp(&exec.time) {
Some(t) => t,
None => {
warn!(
exec_id = %exec.execution_id,
time = %exec.time,
"Unparseable timestamp in execution, skipping"
);
continue;
}
};
if exec_time < time_since {
continue;
}
let side = match parse_ib_side(&exec.side) {
Some(s) => s,
None => {
warn!(
side = %exec.side,
exec_id = %exec.execution_id,
"Unknown IB side string, skipping trade"
);
continue;
}
};
let client_id = order_ids
.get_client_id(exec.order_id)
.unwrap_or_else(|| ClientOrderId::new(format_smolstr!("{}", exec.order_id)));
trades.push(Trade {
id: TradeId::new(&exec.execution_id),
order_id: OrderId::new(&client_id.0),
instrument,
strategy: StrategyId::unknown(),
time_exchange: exec_time,
side,
price: parse_decimal_or_warn(exec.price, "exec.price"),
quantity: parse_decimal_or_warn(exec.shares, "exec.shares"),
fees: AssetFees::new(AssetNameExchange::from("UNKNOWN"), Decimal::ZERO, None),
});
}
Ok(trades)
})
.await
.map_err(|e| UnindexedClientError::TaskFailed(format!("task join: {e}")))?
}
}
fn make_order_from_status(
status: &ibapi::orders::OrderStatus,
client_id: ClientOrderId,
ctx: &OrderContext,
pending_cancels: &PendingCancels,
) -> Order<ExchangeId, InstrumentNameExchange, OrderState<AssetNameExchange, InstrumentNameExchange>>
{
let ib_id = status.order_id;
let order_id = OrderId::new(format_smolstr!("{}", ib_id));
let filled_qty = parse_decimal_or_warn(status.filled, "status.filled");
let state = match status.status.as_str() {
"Inactive" => {
OrderState::inactive(OrderError::Rejected(ApiError::OrderRejected(
"IB status: Inactive (order blocked by validation/margin/exchange)".into(),
)))
}
"Cancelled" => {
let was_user_cancel = pending_cancels.remove(ib_id);
if was_user_cancel {
OrderState::inactive(Cancelled::new(order_id, Utc::now(), filled_qty))
} else if matches!(ctx.time_in_force, TimeInForce::GoodUntilEndOfDay) {
OrderState::inactive(Expired::new(order_id, Utc::now(), filled_qty))
} else {
OrderState::inactive(Cancelled::new(order_id, Utc::now(), filled_qty))
}
}
"Filled" => OrderState::fully_filled(Filled::new(order_id, Utc::now(), filled_qty, None)),
_ => {
OrderState::active(Open::new(order_id, Utc::now(), filled_qty))
}
};
Order {
key: OrderKey {
exchange: ExchangeId::Ibkr,
instrument: ctx.instrument.clone(),
strategy: StrategyId::unknown(),
cid: client_id,
},
side: ctx.side,
price: ctx.price,
quantity: ctx.quantity,
kind: ctx.kind,
time_in_force: ctx.time_in_force,
state,
}
}
pub use execution::parse_ib_timestamp;
impl BracketOrderClient for IbkrClient {
async fn open_bracket_order(
&self,
request: UnifiedBracketOrderRequest<ExchangeId, &InstrumentNameExchange>,
) -> UnifiedBracketOrderResult {
let ibkr_request = BracketOrderRequest {
instrument: request.key.instrument.clone(),
strategy: request.key.strategy.clone(),
parent_cid: request.key.cid.clone(),
side: request.state.side,
quantity: request.state.quantity,
entry_price: request.state.entry_price,
take_profit_price: request.state.take_profit_price,
stop_loss_price: request.state.stop_loss_price,
time_in_force: request.state.time_in_force,
};
let result = self.open_bracket_order(ibkr_request).await;
UnifiedBracketOrderResult::with_all_legs(
result.parent,
result.take_profit,
result.stop_loss,
)
}
}