1use crate::broker::{
19 Account, BrokerClient, BrokerError, HealthStatus, OrderFilter, Position, PositionSide,
20};
21use crate::{OrderRequest, OrderResponse, OrderSide, OrderStatus, OrderType, Symbol, TimeInForce};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use std::collections::HashMap;
26use futures::stream::{Stream, StreamExt};
27use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
28use parking_lot::RwLock;
29use reqwest::{Client, Method, StatusCode};
30use rust_decimal::Decimal;
31use serde::{Deserialize, Serialize};
32use std::num::NonZeroU32;
33use std::pin::Pin;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{broadcast, mpsc};
37use tokio_tungstenite::{connect_async, tungstenite::Message};
38use tracing::{debug, error, info, warn};
39use url::Url;
40use uuid::Uuid;
41
42#[derive(Debug, Clone)]
44pub struct IBKRConfig {
45 pub host: String,
47 pub port: u16,
49 pub client_id: i32,
51 pub account: String,
53 pub paper_trading: bool,
55 pub timeout: Duration,
57 pub streaming: bool,
59 pub level2_depth: bool,
61 pub options_enabled: bool,
63 pub algo_orders: bool,
65}
66
67impl Default for IBKRConfig {
68 fn default() -> Self {
69 Self {
70 host: "127.0.0.1".to_string(),
71 port: 7497, client_id: 1,
73 account: String::new(),
74 paper_trading: true,
75 timeout: Duration::from_secs(30),
76 streaming: true,
77 level2_depth: false,
78 options_enabled: true,
79 algo_orders: true,
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub enum IBKROrderClass {
87 Simple,
89 Bracket,
91 OCA,
93 OCO,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub enum AlgoStrategy {
100 VWAP {
102 start_time: String,
103 end_time: String,
104 },
105 TWAP {
107 start_time: String,
108 end_time: String,
109 },
110 PercentOfVolume {
112 participation_rate: f64,
113 },
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct OptionContract {
119 pub underlying: String,
120 pub strike: Decimal,
121 pub expiry: String, pub right: OptionRight,
123 pub multiplier: i32,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum OptionRight {
128 Call,
129 Put,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct OptionGreeks {
135 pub delta: f64,
136 pub gamma: f64,
137 pub theta: f64,
138 pub vega: f64,
139 pub rho: f64,
140 pub implied_volatility: f64,
141}
142
143#[derive(Debug, Clone)]
145pub struct BracketOrder {
146 pub entry: OrderRequest,
148 pub stop_loss: OrderRequest,
150 pub take_profit: OrderRequest,
152}
153
154#[derive(Debug, Clone)]
156pub enum TrailingStop {
157 Percentage(f64),
159 Dollar(Decimal),
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct MarketTick {
166 pub symbol: String,
167 pub timestamp: DateTime<Utc>,
168 pub last_price: Decimal,
169 pub bid: Decimal,
170 pub ask: Decimal,
171 pub volume: i64,
172 pub bid_size: i64,
173 pub ask_size: i64,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct MarketDepth {
179 pub symbol: String,
180 pub timestamp: DateTime<Utc>,
181 pub bids: Vec<(Decimal, i64)>, pub asks: Vec<(Decimal, i64)>,
183}
184
185#[derive(Debug, Clone)]
187pub struct RiskCheckResult {
188 pub passed: bool,
189 pub margin_required: Decimal,
190 pub buying_power_used: Decimal,
191 pub warnings: Vec<String>,
192}
193
194pub struct IBKRBroker {
196 client: Client,
197 config: IBKRConfig,
198 base_url: String,
199 rate_limiter: DefaultDirectRateLimiter,
200 positions_cache: Arc<DashMap<String, Position>>,
201 account_cache: Arc<RwLock<Option<Account>>>,
202 connection_status: Arc<RwLock<ConnectionStatus>>,
203 market_data_tx: Arc<RwLock<Option<broadcast::Sender<MarketTick>>>>,
204 depth_data_tx: Arc<RwLock<Option<broadcast::Sender<MarketDepth>>>>,
205 option_chains: Arc<DashMap<String, Vec<OptionContract>>>,
206 greeks_cache: Arc<DashMap<String, OptionGreeks>>,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210enum ConnectionStatus {
211 Connected,
212 Disconnected,
213 Reconnecting,
214}
215
216impl IBKRBroker {
217 pub fn new(config: IBKRConfig) -> Self {
219 let base_url = format!("http://{}:{}/v1", config.host, config.port);
220
221 let client = Client::builder()
222 .timeout(config.timeout)
223 .build()
224 .expect("Failed to create HTTP client");
225
226 let quota = Quota::per_second(NonZeroU32::new(50).unwrap());
228 let rate_limiter = RateLimiter::direct(quota);
229
230 Self {
231 client,
232 config,
233 base_url,
234 rate_limiter,
235 positions_cache: Arc::new(DashMap::new()),
236 account_cache: Arc::new(RwLock::new(None)),
237 connection_status: Arc::new(RwLock::new(ConnectionStatus::Disconnected)),
238 market_data_tx: Arc::new(RwLock::new(None)),
239 depth_data_tx: Arc::new(RwLock::new(None)),
240 option_chains: Arc::new(DashMap::new()),
241 greeks_cache: Arc::new(DashMap::new()),
242 }
243 }
244
245 pub async fn connect(&self) -> Result<(), BrokerError> {
247 let status: IBKRStatus = self
249 .request(Method::GET, "/iserver/auth/status", None::<()>)
250 .await?;
251
252 if status.authenticated {
253 info!("Connected to IBKR TWS/Gateway");
254 *self.connection_status.write() = ConnectionStatus::Connected;
255
256 self.refresh_account().await?;
258 Ok(())
259 } else {
260 error!("IBKR authentication failed");
261 Err(BrokerError::Auth("Not authenticated with TWS/Gateway".to_string()))
262 }
263 }
264
265 pub async fn start_streaming(&self, symbols: Vec<String>) -> Result<(), BrokerError> {
269 if !self.config.streaming {
270 return Ok(());
271 }
272
273 let (tx, _) = broadcast::channel(1000);
275 *self.market_data_tx.write() = Some(tx.clone());
276
277 for symbol in symbols {
279 let conid = self.get_contract_id(&symbol).await?;
280 self.subscribe_market_data(conid, tx.clone()).await?;
281 }
282
283 Ok(())
284 }
285
286 async fn subscribe_market_data(
288 &self,
289 conid: i64,
290 tx: broadcast::Sender<MarketTick>,
291 ) -> Result<(), BrokerError> {
292 let req = IBKRMarketDataRequest {
293 conid,
294 fields: vec![
295 "31".to_string(), "84".to_string(), "86".to_string(), "87".to_string(), "88".to_string(), "85".to_string(), ],
302 };
303
304 let _: serde_json::Value = self
305 .request(Method::POST, "/iserver/marketdata/snapshot", Some(req))
306 .await?;
307
308 Ok(())
309 }
310
311 pub fn market_data_stream(&self) -> Option<broadcast::Receiver<MarketTick>> {
313 self.market_data_tx.read().as_ref().map(|tx| tx.subscribe())
314 }
315
316 pub async fn start_depth_streaming(&self, symbols: Vec<String>) -> Result<(), BrokerError> {
318 if !self.config.level2_depth {
319 return Ok(());
320 }
321
322 let (tx, _) = broadcast::channel(1000);
323 *self.depth_data_tx.write() = Some(tx.clone());
324
325 for symbol in symbols {
326 let conid = self.get_contract_id(&symbol).await?;
327 self.subscribe_depth(conid, tx.clone()).await?;
328 }
329
330 Ok(())
331 }
332
333 async fn subscribe_depth(
334 &self,
335 conid: i64,
336 _tx: broadcast::Sender<MarketDepth>,
337 ) -> Result<(), BrokerError> {
338 let _: serde_json::Value = self
339 .request(
340 Method::POST,
341 &format!("/iserver/marketdata/depth?conid={}", conid),
342 None::<()>,
343 )
344 .await?;
345
346 Ok(())
347 }
348
349 pub fn depth_stream(&self) -> Option<broadcast::Receiver<MarketDepth>> {
351 self.depth_data_tx.read().as_ref().map(|tx| tx.subscribe())
352 }
353
354 pub async fn get_historical_data(
356 &self,
357 symbol: &str,
358 period: &str,
359 bar_size: &str,
360 ) -> Result<Vec<HistoricalBar>, BrokerError> {
361 let conid = self.get_contract_id(symbol).await?;
362
363 let bars: IBKRHistoricalResponse = self
364 .request(
365 Method::GET,
366 &format!(
367 "/iserver/marketdata/history?conid={}&period={}&bar={}",
368 conid, period, bar_size
369 ),
370 None::<()>,
371 )
372 .await?;
373
374 Ok(bars.data.into_iter().map(|b| b.into()).collect())
375 }
376
377 pub async fn get_option_chain(
381 &self,
382 underlying: &str,
383 ) -> Result<Vec<OptionContract>, BrokerError> {
384 if !self.config.options_enabled {
385 return Err(BrokerError::InvalidOrder(
386 "Options trading not enabled".to_string(),
387 ));
388 }
389
390 if let Some(chain) = self.option_chains.get(underlying) {
392 return Ok(chain.clone());
393 }
394
395 let conid = self.get_contract_id(underlying).await?;
396
397 let chain: IBKROptionChain = self
398 .request(
399 Method::GET,
400 &format!("/iserver/secdef/info?conid={}§ype=OPT", conid),
401 None::<()>,
402 )
403 .await?;
404
405 let contracts: Vec<OptionContract> = chain.strikes.into_iter().flat_map(|strike| {
406 vec![
407 OptionContract {
408 underlying: underlying.to_string(),
409 strike: strike.strike,
410 expiry: strike.expiry.clone(),
411 right: OptionRight::Call,
412 multiplier: 100,
413 },
414 OptionContract {
415 underlying: underlying.to_string(),
416 strike: strike.strike,
417 expiry: strike.expiry,
418 right: OptionRight::Put,
419 multiplier: 100,
420 },
421 ]
422 }).collect();
423
424 self.option_chains.insert(underlying.to_string(), contracts.clone());
425 Ok(contracts)
426 }
427
428 pub async fn get_option_greeks(
430 &self,
431 contract: &OptionContract,
432 ) -> Result<OptionGreeks, BrokerError> {
433 let cache_key = format!("{}_{}_{:?}", contract.underlying, contract.expiry, contract.right);
434
435 if let Some(greeks) = self.greeks_cache.get(&cache_key) {
436 return Ok(greeks.clone());
437 }
438
439 let conid = self.get_option_contract_id(contract).await?;
440
441 let greeks: IBKRGreeksResponse = self
442 .request(
443 Method::GET,
444 &format!("/iserver/marketdata/snapshot?conids={}&fields=7283,7284,7285,7286,7287,7633", conid),
445 None::<()>,
446 )
447 .await?;
448
449 let option_greeks: OptionGreeks = greeks.into();
450 self.greeks_cache.insert(cache_key, option_greeks.clone());
451 Ok(option_greeks)
452 }
453
454 pub async fn place_option_order(
456 &self,
457 contract: OptionContract,
458 quantity: i64,
459 side: OrderSide,
460 price: Option<Decimal>,
461 ) -> Result<OrderResponse, BrokerError> {
462 let conid = self.get_option_contract_id(&contract).await?;
463
464 let order = IBKROrderRequest {
465 acct_id: self.config.account.clone(),
466 order_type: if price.is_some() { "LMT" } else { "MKT" }.to_string(),
467 side: match side {
468 OrderSide::Buy => "BUY".to_string(),
469 OrderSide::Sell => "SELL".to_string(),
470 },
471 tif: "DAY".to_string(),
472 quantity: quantity.to_string(),
473 price: price.map(|p| p.to_string()),
474 stop_price: None,
475 };
476
477 let response: Vec<IBKROrderResponseItem> = self
478 .request(
479 Method::POST,
480 &format!("/iserver/account/{}/orders", self.config.account),
481 Some(serde_json::json!({
482 "conid": conid,
483 "orders": [order]
484 })),
485 )
486 .await?;
487
488 let resp = response.first().ok_or_else(|| {
489 BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
490 })?;
491
492 Ok(OrderResponse {
493 order_id: resp.order_id.clone(),
494 client_order_id: Uuid::new_v4().to_string(),
495 status: OrderStatus::Accepted,
496 filled_qty: 0,
497 filled_avg_price: None,
498 submitted_at: Utc::now(),
499 filled_at: None,
500 })
501 }
502
503 pub async fn place_bracket_order(
507 &self,
508 bracket: BracketOrder,
509 ) -> Result<Vec<OrderResponse>, BrokerError> {
510 let conid = self.get_contract_id(&bracket.entry.symbol.to_string()).await?;
511
512 let parent = self.convert_order(&bracket.entry);
514
515 let stop_loss = self.convert_order(&bracket.stop_loss);
517
518 let take_profit = self.convert_order(&bracket.take_profit);
520
521 let bracket_req = serde_json::json!({
522 "conid": conid,
523 "orders": [
524 {
525 "orderType": parent.order_type,
526 "side": parent.side,
527 "quantity": parent.quantity,
528 "price": parent.price,
529 "tif": parent.tif,
530 "outsideRth": false,
531 "attachedOrders": [
532 {
533 "orderType": stop_loss.order_type,
534 "side": stop_loss.side,
535 "quantity": stop_loss.quantity,
536 "stopPrice": stop_loss.stop_price,
537 "tif": "GTC"
538 },
539 {
540 "orderType": take_profit.order_type,
541 "side": take_profit.side,
542 "quantity": take_profit.quantity,
543 "price": take_profit.price,
544 "tif": "GTC"
545 }
546 ]
547 }
548 ]
549 });
550
551 let responses: Vec<IBKROrderResponseItem> = self
552 .request(
553 Method::POST,
554 &format!("/iserver/account/{}/orders", self.config.account),
555 Some(bracket_req),
556 )
557 .await?;
558
559 Ok(responses
560 .into_iter()
561 .map(|r| OrderResponse {
562 order_id: r.order_id,
563 client_order_id: Uuid::new_v4().to_string(),
564 status: OrderStatus::Accepted,
565 filled_qty: 0,
566 filled_avg_price: None,
567 submitted_at: Utc::now(),
568 filled_at: None,
569 })
570 .collect())
571 }
572
573 pub async fn place_trailing_stop(
577 &self,
578 symbol: &str,
579 quantity: i64,
580 side: OrderSide,
581 trail: TrailingStop,
582 ) -> Result<OrderResponse, BrokerError> {
583 let conid = self.get_contract_id(symbol).await?;
584
585 let (trail_amount, trail_unit) = match trail {
586 TrailingStop::Percentage(pct) => (pct.to_string(), "%"),
587 TrailingStop::Dollar(amt) => (amt.to_string(), "$"),
588 };
589
590 let order = serde_json::json!({
591 "conid": conid,
592 "orderType": "TRAIL",
593 "side": match side {
594 OrderSide::Buy => "BUY",
595 OrderSide::Sell => "SELL",
596 },
597 "quantity": quantity,
598 "tif": "GTC",
599 "trailingAmount": trail_amount,
600 "trailingType": trail_unit,
601 });
602
603 let responses: Vec<IBKROrderResponseItem> = self
604 .request(
605 Method::POST,
606 &format!("/iserver/account/{}/orders", self.config.account),
607 Some(serde_json::json!({ "orders": [order] })),
608 )
609 .await?;
610
611 let resp = responses.first().ok_or_else(|| {
612 BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
613 })?;
614
615 Ok(OrderResponse {
616 order_id: resp.order_id.clone(),
617 client_order_id: Uuid::new_v4().to_string(),
618 status: OrderStatus::Accepted,
619 filled_qty: 0,
620 filled_avg_price: None,
621 submitted_at: Utc::now(),
622 filled_at: None,
623 })
624 }
625
626 pub async fn place_algo_order(
630 &self,
631 symbol: &str,
632 quantity: i64,
633 side: OrderSide,
634 strategy: AlgoStrategy,
635 ) -> Result<OrderResponse, BrokerError> {
636 if !self.config.algo_orders {
637 return Err(BrokerError::InvalidOrder(
638 "Algorithmic orders not enabled".to_string(),
639 ));
640 }
641
642 let conid = self.get_contract_id(symbol).await?;
643
644 let (algo_strategy, algo_params) = match strategy {
645 AlgoStrategy::VWAP { start_time, end_time } => (
646 "Vwap",
647 serde_json::json!({
648 "startTime": start_time,
649 "endTime": end_time,
650 "allowPastEndTime": true,
651 }),
652 ),
653 AlgoStrategy::TWAP { start_time, end_time } => (
654 "Twap",
655 serde_json::json!({
656 "startTime": start_time,
657 "endTime": end_time,
658 "allowPastEndTime": true,
659 }),
660 ),
661 AlgoStrategy::PercentOfVolume { participation_rate } => (
662 "PctVol",
663 serde_json::json!({
664 "pctVol": participation_rate,
665 }),
666 ),
667 };
668
669 let order = serde_json::json!({
670 "conid": conid,
671 "orderType": "LMT",
672 "side": match side {
673 OrderSide::Buy => "BUY",
674 OrderSide::Sell => "SELL",
675 },
676 "quantity": quantity,
677 "tif": "DAY",
678 "strategy": algo_strategy,
679 "strategyParameters": algo_params,
680 });
681
682 let responses: Vec<IBKROrderResponseItem> = self
683 .request(
684 Method::POST,
685 &format!("/iserver/account/{}/orders", self.config.account),
686 Some(serde_json::json!({ "orders": [order] })),
687 )
688 .await?;
689
690 let resp = responses.first().ok_or_else(|| {
691 BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
692 })?;
693
694 Ok(OrderResponse {
695 order_id: resp.order_id.clone(),
696 client_order_id: Uuid::new_v4().to_string(),
697 status: OrderStatus::Accepted,
698 filled_qty: 0,
699 filled_avg_price: None,
700 submitted_at: Utc::now(),
701 filled_at: None,
702 })
703 }
704
705 pub async fn pre_trade_risk_check(
709 &self,
710 order: &OrderRequest,
711 ) -> Result<RiskCheckResult, BrokerError> {
712 let account = self.get_account().await?;
713 let conid = self.get_contract_id(&order.symbol.to_string()).await?;
714
715 let margin: IBKRMarginResponse = self
717 .request(
718 Method::POST,
719 "/iserver/account/margin",
720 Some(serde_json::json!({
721 "conid": conid,
722 "quantity": order.quantity,
723 "side": match order.side {
724 OrderSide::Buy => "BUY",
725 OrderSide::Sell => "SELL",
726 },
727 })),
728 )
729 .await?;
730
731 let mut warnings = Vec::new();
732
733 if margin.initial_margin > account.buying_power {
735 warnings.push("Insufficient buying power for order".to_string());
736 }
737
738 if account.daytrade_count >= 3 && account.equity < Decimal::from(25000) {
740 warnings.push("Pattern day trader: account equity below $25,000".to_string());
741 }
742
743 if margin.maintenance_margin > account.maintenance_margin {
745 warnings.push("Order would exceed maintenance margin requirements".to_string());
746 }
747
748 Ok(RiskCheckResult {
749 passed: warnings.is_empty(),
750 margin_required: margin.initial_margin,
751 buying_power_used: margin.initial_margin,
752 warnings,
753 })
754 }
755
756 pub async fn calculate_buying_power(
758 &self,
759 asset_class: &str,
760 ) -> Result<Decimal, BrokerError> {
761 let account = self.get_account().await?;
762
763 let multiplier = match asset_class {
765 "STK" => Decimal::from(4), "OPT" => Decimal::ONE, "FUT" => Decimal::from(10), "FX" => Decimal::from(50), _ => Decimal::from(2),
770 };
771
772 Ok(account.buying_power * multiplier)
773 }
774
775 pub async fn is_pattern_day_trader(&self) -> Result<bool, BrokerError> {
777 let account = self.get_account().await?;
778 Ok(account.daytrade_count >= 3 && account.equity < Decimal::from(25000))
779 }
780
781 async fn get_contract_id(&self, symbol: &str) -> Result<i64, BrokerError> {
785 #[derive(Deserialize)]
786 struct ContractSearchResult {
787 conid: i64,
788 }
789
790 let results: Vec<ContractSearchResult> = self
791 .request(
792 Method::GET,
793 &format!("/iserver/secdef/search?symbol={}", symbol),
794 None::<()>,
795 )
796 .await?;
797
798 results
799 .first()
800 .map(|r| r.conid)
801 .ok_or_else(|| BrokerError::InvalidOrder(format!("Symbol not found: {}", symbol)))
802 }
803
804 async fn get_option_contract_id(&self, contract: &OptionContract) -> Result<i64, BrokerError> {
806 let right = match contract.right {
807 OptionRight::Call => "C",
808 OptionRight::Put => "P",
809 };
810
811 let local_symbol = format!(
812 "{}{}{}{}",
813 contract.underlying,
814 contract.expiry,
815 right,
816 contract.strike
817 );
818
819 let results: Vec<ContractSearchResult> = self
820 .request(
821 Method::GET,
822 &format!("/iserver/secdef/search?symbol={}", local_symbol),
823 None::<()>,
824 )
825 .await?;
826
827 results
828 .first()
829 .map(|r| r.conid)
830 .ok_or_else(|| BrokerError::InvalidOrder(format!("Option contract not found")))
831 }
832
833 async fn refresh_account(&self) -> Result<(), BrokerError> {
835 let accounts: Vec<String> = self
836 .request(Method::GET, "/portfolio/accounts", None::<()>)
837 .await?;
838
839 let account_id = if self.config.account.is_empty() {
840 accounts.first().cloned().unwrap_or_default()
841 } else {
842 self.config.account.clone()
843 };
844
845 let summary: IBKRAccountSummary = self
846 .request(
847 Method::GET,
848 &format!("/portfolio/{}/summary", account_id),
849 None::<()>,
850 )
851 .await?;
852
853 let account = Account {
854 account_id: account_id.clone(),
855 cash: summary.total_cash_value,
856 portfolio_value: summary.net_liquidation,
857 buying_power: summary.buying_power,
858 equity: summary.equity_with_loan_value,
859 last_equity: summary.previous_day_equity,
860 multiplier: "1".to_string(),
861 currency: summary.currency,
862 shorting_enabled: true,
863 long_market_value: summary.gross_position_value,
864 short_market_value: Decimal::ZERO,
865 initial_margin: summary.init_margin_req,
866 maintenance_margin: summary.maint_margin_req,
867 day_trading_buying_power: summary.day_trades_remaining.into(),
868 daytrade_count: summary.day_trades_remaining,
869 };
870
871 *self.account_cache.write() = Some(account);
872 Ok(())
873 }
874
875 async fn request<T: serde::de::DeserializeOwned>(
877 &self,
878 method: Method,
879 path: &str,
880 body: Option<impl Serialize>,
881 ) -> Result<T, BrokerError> {
882 self.rate_limiter.until_ready().await;
883
884 let url = format!("{}{}", self.base_url, path);
885 let mut req = self.client.request(method.clone(), &url);
886
887 if let Some(body) = body {
888 req = req.json(&body);
889 }
890
891 debug!("IBKR API request: {} {}", method, path);
892
893 let response = req.send().await?;
894
895 match response.status() {
896 StatusCode::OK | StatusCode::CREATED => {
897 let result = response.json().await?;
898 Ok(result)
899 }
900 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
901 Err(BrokerError::Auth("IBKR authentication failed".to_string()))
902 }
903 StatusCode::TOO_MANY_REQUESTS => Err(BrokerError::RateLimit),
904 StatusCode::SERVICE_UNAVAILABLE => {
905 Err(BrokerError::Unavailable("IBKR service unavailable".to_string()))
906 }
907 status => {
908 let error_text = response.text().await.unwrap_or_default();
909 error!("IBKR API error {}: {}", status, error_text);
910 Err(BrokerError::Other(anyhow::anyhow!("HTTP {}: {}", status, error_text)))
911 }
912 }
913 }
914
915 fn convert_order(&self, order: &OrderRequest) -> IBKROrderRequest {
917 let order_type = match order.order_type {
918 OrderType::Market => "MKT",
919 OrderType::Limit => "LMT",
920 OrderType::StopLoss => "STP",
921 OrderType::StopLimit => "STP LMT",
922 };
923
924 let side = match order.side {
925 OrderSide::Buy => "BUY",
926 OrderSide::Sell => "SELL",
927 };
928
929 let tif = match order.time_in_force {
930 TimeInForce::Day => "DAY",
931 TimeInForce::GTC => "GTC",
932 TimeInForce::IOC => "IOC",
933 TimeInForce::FOK => "FOK",
934 };
935
936 IBKROrderRequest {
937 acct_id: self.config.account.clone(),
938 order_type: order_type.to_string(),
939 side: side.to_string(),
940 tif: tif.to_string(),
941 quantity: order.quantity.to_string(),
942 price: order.limit_price.map(|p| p.to_string()),
943 stop_price: order.stop_price.map(|p| p.to_string()),
944 }
945 }
946}
947
948#[async_trait]
949impl BrokerClient for IBKRBroker {
950 async fn get_account(&self) -> Result<Account, BrokerError> {
951 if let Some(account) = self.account_cache.read().as_ref() {
952 return Ok(account.clone());
953 }
954
955 self.refresh_account().await?;
956 self.account_cache
957 .read()
958 .as_ref()
959 .cloned()
960 .ok_or_else(|| BrokerError::Other(anyhow::anyhow!("Failed to load account")))
961 }
962
963 async fn get_positions(&self) -> Result<Vec<Position>, BrokerError> {
964 let account_id = self.get_account().await?.account_id;
965 let positions: Vec<IBKRPosition> = self
966 .request(
967 Method::GET,
968 &format!("/portfolio/{}/positions", account_id),
969 None::<()>,
970 )
971 .await?;
972
973 Ok(positions
974 .into_iter()
975 .map(|p| Position {
976 symbol: Symbol::new(p.ticker.as_str()).expect("Invalid symbol from IBKR"),
977 qty: p.position,
978 side: if p.position > 0 {
979 PositionSide::Long
980 } else {
981 PositionSide::Short
982 },
983 avg_entry_price: p.avg_price,
984 market_value: p.market_value,
985 cost_basis: p.avg_price * Decimal::from(p.position.abs()),
986 unrealized_pl: p.unrealized_pnl,
987 unrealized_plpc: if p.market_value != Decimal::ZERO {
988 (p.unrealized_pnl / p.market_value.abs()) * Decimal::from(100)
989 } else {
990 Decimal::ZERO
991 },
992 current_price: p.market_price,
993 lastday_price: p.market_price,
994 change_today: Decimal::ZERO,
995 })
996 .collect())
997 }
998
999 async fn place_order(&self, order: OrderRequest) -> Result<OrderResponse, BrokerError> {
1000 let risk_check = self.pre_trade_risk_check(&order).await?;
1002 if !risk_check.passed {
1003 return Err(BrokerError::InvalidOrder(format!(
1004 "Risk check failed: {:?}",
1005 risk_check.warnings
1006 )));
1007 }
1008
1009 let conid = self.get_contract_id(&order.symbol.to_string()).await?;
1010 let ibkr_order = self.convert_order(&order);
1011
1012 let response: Vec<IBKROrderResponseItem> = self
1013 .request(
1014 Method::POST,
1015 &format!("/iserver/account/{}/orders", self.config.account),
1016 Some(serde_json::json!({
1017 "conid": conid,
1018 "orders": [ibkr_order]
1019 })),
1020 )
1021 .await?;
1022
1023 let resp = response.first().ok_or_else(|| {
1024 BrokerError::Other(anyhow::anyhow!("No order response from IBKR"))
1025 })?;
1026
1027 Ok(OrderResponse {
1028 order_id: resp.order_id.clone(),
1029 client_order_id: Uuid::new_v4().to_string(),
1030 status: OrderStatus::Accepted,
1031 filled_qty: 0,
1032 filled_avg_price: None,
1033 submitted_at: Utc::now(),
1034 filled_at: None,
1035 })
1036 }
1037
1038 async fn cancel_order(&self, order_id: &str) -> Result<(), BrokerError> {
1039 let _: serde_json::Value = self
1040 .request(
1041 Method::DELETE,
1042 &format!("/iserver/account/{}/order/{}", self.config.account, order_id),
1043 None::<()>,
1044 )
1045 .await?;
1046
1047 Ok(())
1048 }
1049
1050 async fn get_order(&self, order_id: &str) -> Result<OrderResponse, BrokerError> {
1051 let order: IBKROrderStatus = self
1052 .request(
1053 Method::GET,
1054 &format!("/iserver/account/order/status/{}", order_id),
1055 None::<()>,
1056 )
1057 .await?;
1058
1059 let status = match order.status.as_str() {
1060 "Submitted" => OrderStatus::Accepted,
1061 "Filled" => OrderStatus::Filled,
1062 "Cancelled" => OrderStatus::Cancelled,
1063 "PendingSubmit" => OrderStatus::Pending,
1064 _ => OrderStatus::Pending,
1065 };
1066
1067 Ok(OrderResponse {
1068 order_id: order_id.to_string(),
1069 client_order_id: order.order_ref.unwrap_or_default(),
1070 status,
1071 filled_qty: order.filled_quantity.try_into().unwrap_or(0),
1072 filled_avg_price: Some(order.avg_price),
1073 submitted_at: Utc::now(),
1074 filled_at: if status == OrderStatus::Filled {
1075 Some(Utc::now())
1076 } else {
1077 None
1078 },
1079 })
1080 }
1081
1082 async fn list_orders(&self, _filter: OrderFilter) -> Result<Vec<OrderResponse>, BrokerError> {
1083 let orders: IBKROrdersResponse = self
1084 .request(
1085 Method::GET,
1086 &format!("/iserver/account/{}/orders", self.config.account),
1087 None::<()>,
1088 )
1089 .await?;
1090
1091 Ok(orders
1092 .orders
1093 .into_iter()
1094 .filter_map(|o| {
1095 Some(OrderResponse {
1096 order_id: o.order_id?,
1097 client_order_id: o.order_ref.unwrap_or_default(),
1098 status: OrderStatus::Pending,
1099 filled_qty: o.filled_quantity.and_then(|q| q.try_into().ok()).unwrap_or(0),
1100 filled_avg_price: o.avg_price,
1101 submitted_at: Utc::now(),
1102 filled_at: None,
1103 })
1104 })
1105 .collect())
1106 }
1107
1108 async fn health_check(&self) -> Result<HealthStatus, BrokerError> {
1109 match *self.connection_status.read() {
1110 ConnectionStatus::Connected => Ok(HealthStatus::Healthy),
1111 ConnectionStatus::Reconnecting => Ok(HealthStatus::Degraded),
1112 ConnectionStatus::Disconnected => Ok(HealthStatus::Unhealthy),
1113 }
1114 }
1115}
1116
1117#[derive(Debug, Serialize, Deserialize)]
1120struct IBKRStatus {
1121 authenticated: bool,
1122 competing: bool,
1123 connected: bool,
1124}
1125
1126#[derive(Debug, Serialize)]
1127struct IBKROrderRequest {
1128 acct_id: String,
1129 order_type: String,
1130 side: String,
1131 tif: String,
1132 quantity: String,
1133 #[serde(skip_serializing_if = "Option::is_none")]
1134 price: Option<String>,
1135 #[serde(skip_serializing_if = "Option::is_none")]
1136 stop_price: Option<String>,
1137}
1138
1139#[derive(Debug, Deserialize)]
1140struct IBKRAccountSummary {
1141 #[serde(rename = "totalcashvalue")]
1142 total_cash_value: Decimal,
1143 #[serde(rename = "netliquidation")]
1144 net_liquidation: Decimal,
1145 #[serde(rename = "buyingpower")]
1146 buying_power: Decimal,
1147 #[serde(rename = "equitywithloanvalue")]
1148 equity_with_loan_value: Decimal,
1149 #[serde(rename = "previousdayequitywithloanvalue")]
1150 previous_day_equity: Decimal,
1151 #[serde(rename = "grosspositionvalue")]
1152 gross_position_value: Decimal,
1153 #[serde(rename = "initmarginreq")]
1154 init_margin_req: Decimal,
1155 #[serde(rename = "maintmarginreq")]
1156 maint_margin_req: Decimal,
1157 #[serde(rename = "daytradesremaining")]
1158 day_trades_remaining: i32,
1159 currency: String,
1160}
1161
1162#[derive(Debug, Deserialize)]
1163struct IBKRPosition {
1164 ticker: String,
1165 position: i64,
1166 #[serde(rename = "mktPrice")]
1167 market_price: Decimal,
1168 #[serde(rename = "mktValue")]
1169 market_value: Decimal,
1170 #[serde(rename = "avgPrice")]
1171 avg_price: Decimal,
1172 #[serde(rename = "unrealizedPnL")]
1173 unrealized_pnl: Decimal,
1174}
1175
1176#[derive(Debug, Deserialize)]
1177struct IBKROrderStatus {
1178 #[serde(rename = "orderId")]
1179 order_id: String,
1180 status: String,
1181 #[serde(rename = "filledQuantity")]
1182 filled_quantity: i64,
1183 #[serde(rename = "remainingQuantity")]
1184 remaining_quantity: i64,
1185 #[serde(rename = "avgPrice")]
1186 avg_price: Decimal,
1187 #[serde(rename = "limitPrice")]
1188 limit_price: Option<Decimal>,
1189 symbol: Option<String>,
1190 #[serde(rename = "orderRef")]
1191 order_ref: Option<String>,
1192}
1193
1194#[derive(Debug, Deserialize)]
1195struct IBKROrdersResponse {
1196 orders: Vec<IBKROrderItem>,
1197}
1198
1199#[derive(Debug, Deserialize)]
1200struct IBKROrderItem {
1201 #[serde(rename = "orderId")]
1202 order_id: Option<String>,
1203 #[serde(rename = "orderRef")]
1204 order_ref: Option<String>,
1205 ticker: Option<String>,
1206 #[serde(rename = "totalSize")]
1207 total_size: Option<i64>,
1208 #[serde(rename = "filledQuantity")]
1209 filled_quantity: Option<i64>,
1210 price: Option<Decimal>,
1211 #[serde(rename = "avgPrice")]
1212 avg_price: Option<Decimal>,
1213}
1214
1215#[derive(Debug, Serialize)]
1216struct IBKRMarketDataRequest {
1217 conid: i64,
1218 fields: Vec<String>,
1219}
1220
1221#[derive(Debug, Deserialize)]
1222struct IBKROptionChain {
1223 strikes: Vec<IBKRStrike>,
1224}
1225
1226#[derive(Debug, Deserialize)]
1227struct IBKRStrike {
1228 strike: Decimal,
1229 expiry: String,
1230}
1231
1232#[derive(Debug, Deserialize)]
1233struct IBKRGreeksResponse {
1234 delta: Option<f64>,
1235 gamma: Option<f64>,
1236 theta: Option<f64>,
1237 vega: Option<f64>,
1238 rho: Option<f64>,
1239 #[serde(rename = "impliedVol")]
1240 implied_vol: Option<f64>,
1241}
1242
1243impl From<IBKRGreeksResponse> for OptionGreeks {
1244 fn from(r: IBKRGreeksResponse) -> Self {
1245 Self {
1246 delta: r.delta.unwrap_or(0.0),
1247 gamma: r.gamma.unwrap_or(0.0),
1248 theta: r.theta.unwrap_or(0.0),
1249 vega: r.vega.unwrap_or(0.0),
1250 rho: r.rho.unwrap_or(0.0),
1251 implied_volatility: r.implied_vol.unwrap_or(0.0),
1252 }
1253 }
1254}
1255
1256#[derive(Debug, Deserialize)]
1257struct IBKROrderResponseItem {
1258 order_id: String,
1259}
1260
1261#[derive(Debug, Deserialize)]
1262struct IBKRMarginResponse {
1263 #[serde(rename = "initialMargin")]
1264 initial_margin: Decimal,
1265 #[serde(rename = "maintenanceMargin")]
1266 maintenance_margin: Decimal,
1267}
1268
1269#[derive(Debug, Deserialize)]
1270struct IBKRHistoricalResponse {
1271 data: Vec<IBKRHistoricalBar>,
1272}
1273
1274#[derive(Debug, Deserialize)]
1275struct IBKRHistoricalBar {
1276 t: i64, o: f64, h: f64, l: f64, c: f64, v: i64, }
1283
1284#[derive(Debug, Clone, Serialize, Deserialize)]
1285pub struct HistoricalBar {
1286 pub timestamp: DateTime<Utc>,
1287 pub open: Decimal,
1288 pub high: Decimal,
1289 pub low: Decimal,
1290 pub close: Decimal,
1291 pub volume: i64,
1292}
1293
1294impl From<IBKRHistoricalBar> for HistoricalBar {
1295 fn from(b: IBKRHistoricalBar) -> Self {
1296 use chrono::TimeZone;
1297 Self {
1298 timestamp: Utc.timestamp_opt(b.t, 0).unwrap(),
1299 open: Decimal::from_f64_retain(b.o).unwrap(),
1300 high: Decimal::from_f64_retain(b.h).unwrap(),
1301 low: Decimal::from_f64_retain(b.l).unwrap(),
1302 close: Decimal::from_f64_retain(b.c).unwrap(),
1303 volume: b.v,
1304 }
1305 }
1306}
1307
1308#[derive(Debug, Deserialize)]
1309struct ContractSearchResult {
1310 conid: i64,
1311}
1312
1313#[cfg(test)]
1314mod tests {
1315 use super::*;
1316
1317 #[tokio::test]
1318 async fn test_ibkr_broker_creation() {
1319 let _config = IBKRConfig::default();
1320 let broker = IBKRBroker::new(config);
1321 assert_eq!(*broker.connection_status.read(), ConnectionStatus::Disconnected);
1322 }
1323
1324 #[tokio::test]
1325 async fn test_health_check() {
1326 let _config = IBKRConfig::default();
1327 let broker = IBKRBroker::new(config);
1328 let health = broker.health_check().await.unwrap();
1329 assert_eq!(health, HealthStatus::Unhealthy);
1330 }
1331
1332 #[tokio::test]
1333 async fn test_bracket_order_structure() {
1334 let entry = OrderRequest {
1335 symbol: Symbol::new("AAPL").unwrap(),
1336 quantity: 100,
1337 side: OrderSide::Buy,
1338 order_type: OrderType::Limit,
1339 time_in_force: TimeInForce::Day,
1340 limit_price: Some(Decimal::from(150)),
1341 stop_price: None,
1342 };
1343
1344 let stop_loss = OrderRequest {
1345 symbol: Symbol::new("AAPL").unwrap(),
1346 quantity: 100,
1347 side: OrderSide::Sell,
1348 order_type: OrderType::StopLoss,
1349 time_in_force: TimeInForce::GTC,
1350 limit_price: None,
1351 stop_price: Some(Decimal::from(145)),
1352 };
1353
1354 let take_profit = OrderRequest {
1355 symbol: Symbol::new("AAPL").unwrap(),
1356 quantity: 100,
1357 side: OrderSide::Sell,
1358 order_type: OrderType::Limit,
1359 time_in_force: TimeInForce::GTC,
1360 limit_price: Some(Decimal::from(160)),
1361 stop_price: None,
1362 };
1363
1364 let bracket = BracketOrder {
1365 entry,
1366 stop_loss,
1367 take_profit,
1368 };
1369
1370 assert_eq!(bracket.entry.quantity, 100);
1371 assert_eq!(bracket.stop_loss.stop_price.unwrap(), Decimal::from(145));
1372 assert_eq!(bracket.take_profit.limit_price.unwrap(), Decimal::from(160));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_option_contract() {
1377 let contract = OptionContract {
1378 underlying: "AAPL".to_string(),
1379 strike: Decimal::from(150),
1380 expiry: "20250117".to_string(),
1381 right: OptionRight::Call,
1382 multiplier: 100,
1383 };
1384
1385 assert_eq!(contract.underlying, "AAPL");
1386 assert_eq!(contract.strike, Decimal::from(150));
1387 assert!(matches!(contract.right, OptionRight::Call));
1388 }
1389
1390 #[tokio::test]
1391 async fn test_trailing_stop_types() {
1392 let pct_trail = TrailingStop::Percentage(5.0);
1393 let dollar_trail = TrailingStop::Dollar(Decimal::from(10));
1394
1395 match pct_trail {
1396 TrailingStop::Percentage(p) => assert_eq!(p, 5.0),
1397 _ => panic!("Wrong trailing stop type"),
1398 }
1399
1400 match dollar_trail {
1401 TrailingStop::Dollar(d) => assert_eq!(d, Decimal::from(10)),
1402 _ => panic!("Wrong trailing stop type"),
1403 }
1404 }
1405}