use crate::{
order::id::{ClientOrderId, StrategyId},
trade::{AssetFees, Trade, TradeId},
};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use chrono_tz::Tz;
use fnv::FnvHashMap;
use ibapi::orders::{CommissionReport, ExecutionData};
use parking_lot::Mutex;
use rust_decimal::Decimal;
use rustrade_instrument::{
Side, asset::name::AssetNameExchange, instrument::name::InstrumentNameExchange,
};
use smol_str::SmolStr;
use std::{cell::RefCell, sync::Arc};
use tracing::warn;
thread_local! {
static TZ_CACHE: RefCell<FnvHashMap<SmolStr, Tz>> = RefCell::new(FnvHashMap::default());
}
#[derive(Debug, Clone)]
pub struct ExecutionBuffer {
inner: Arc<Mutex<ExecutionBufferInner>>,
}
#[derive(Debug, Default)]
struct ExecutionBufferInner {
pending: FnvHashMap<String, PendingExecution>,
}
#[derive(Debug, Clone)]
struct PendingExecution {
execution: ExecutionData,
instrument: InstrumentNameExchange,
client_order_id: ClientOrderId,
}
impl ExecutionBuffer {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(ExecutionBufferInner::default())),
}
}
pub fn add_execution(
&self,
execution: ExecutionData,
instrument: InstrumentNameExchange,
client_order_id: ClientOrderId,
) {
let exec_id = execution.execution.execution_id.clone();
let mut inner = self.inner.lock();
inner.pending.insert(
exec_id,
PendingExecution {
execution,
instrument,
client_order_id,
},
);
let pending_count = inner.pending.len();
if pending_count > 1000 && pending_count.is_multiple_of(100) {
warn!(
pending_count,
"ExecutionBuffer has >1000 pending entries; commission reports may be delayed or lost"
);
}
}
pub fn complete_with_commission(
&self,
report: &CommissionReport,
) -> Option<Trade<AssetNameExchange, InstrumentNameExchange>> {
let pending = {
let mut inner = self.inner.lock();
inner.pending.remove(&report.execution_id)?
};
build_trade(pending, report)
}
pub fn pending_count(&self) -> usize {
self.inner.lock().pending.len()
}
pub fn clear_stale(&self, max_age: std::time::Duration) -> usize {
let now = Utc::now();
let mut inner = self.inner.lock();
let before = inner.pending.len();
let max_age_secs = i64::try_from(max_age.as_secs()).unwrap_or(i64::MAX);
inner.pending.retain(|_, pending| {
if let Some(exec_time) = parse_ib_timestamp(&pending.execution.execution.time) {
let age = now.signed_duration_since(exec_time);
age.num_seconds() < max_age_secs
} else {
false
}
});
before - inner.pending.len()
}
}
impl Default for ExecutionBuffer {
fn default() -> Self {
Self::new()
}
}
fn build_trade(
pending: PendingExecution,
commission: &CommissionReport,
) -> Option<Trade<AssetNameExchange, InstrumentNameExchange>> {
let exec = &pending.execution.execution;
let side = match parse_ib_side(&exec.side) {
Some(s) => s,
None => {
warn!(
side = %exec.side,
exec_id = %exec.execution_id,
"Unknown IB side string, dropping trade"
);
return None;
}
};
let price = parse_decimal_or_warn(exec.price, "exec.price");
let quantity = parse_decimal_or_warn(exec.shares, "exec.shares");
let commission_amount = parse_decimal_or_warn(commission.commission, "commission");
let time_exchange = parse_ib_timestamp(&exec.time).unwrap_or_else(Utc::now);
Some(Trade {
id: TradeId::new(&exec.execution_id),
order_id: crate::order::id::OrderId::new(&pending.client_order_id.0),
instrument: pending.instrument,
strategy: StrategyId::unknown(),
time_exchange,
side,
price,
quantity,
fees: AssetFees {
asset: AssetNameExchange::from(commission.currency.as_str()),
fees: commission_amount,
fees_quote: None, },
})
}
pub fn parse_ib_side(s: &str) -> Option<Side> {
match s {
"BOT" | "BUY" => Some(Side::Buy),
"SLD" | "SELL" | "SSHORT" | "SLONG" => Some(Side::Sell),
_ => None,
}
}
pub fn parse_decimal_or_warn(value: f64, field_name: &str) -> Decimal {
Decimal::try_from(value).unwrap_or_else(|e| {
warn!(field = %field_name, value = %value, error = %e, "Invalid f64 for Decimal, using zero");
Decimal::ZERO
})
}
pub fn parse_ib_timestamp(s: &str) -> Option<DateTime<Utc>> {
let mut parts = s.split_whitespace();
let date_part = parts.next()?;
let time_part = parts.next()?;
let tz_part = parts.next();
let datetime_end = date_part.len() + 1 + time_part.len();
let datetime_str = &s[..datetime_end.min(s.len())];
let naive = NaiveDateTime::parse_from_str(datetime_str, "%Y%m%d %H:%M:%S").ok()?;
if let Some(tz_str) = tz_part {
let tz_opt = TZ_CACHE.with(|cache| {
let mut cache = cache.borrow_mut();
if let Some(tz) = cache.get(tz_str) {
return Some(*tz);
}
if let Ok(tz) = tz_str.parse::<Tz>() {
cache.insert(SmolStr::new(tz_str), tz);
return Some(tz);
}
None
});
if let Some(tz) = tz_opt {
return tz
.from_local_datetime(&naive)
.single()
.map(|dt| dt.with_timezone(&Utc));
}
warn!(timezone = %tz_str, "Unknown timezone in IB timestamp, treating as UTC");
}
Some(naive.and_utc())
}
#[cfg(test)]
#[allow(clippy::unwrap_used)] mod tests {
use super::*;
use chrono::{Datelike, Timelike};
#[test]
fn test_parse_ib_timestamp_with_timezone() {
let ts = parse_ib_timestamp("20250418 10:30:00 US/Eastern");
assert!(ts.is_some());
let dt = ts.unwrap();
assert_eq!(dt.year(), 2025);
assert_eq!(dt.month(), 4);
assert_eq!(dt.day(), 18);
assert_eq!(dt.hour(), 14);
assert_eq!(dt.minute(), 30);
}
#[test]
fn test_parse_ib_timestamp_no_timezone() {
let ts = parse_ib_timestamp("20250418 10:30:00");
assert!(ts.is_some());
let dt = ts.unwrap();
assert_eq!(dt.hour(), 10);
}
#[test]
fn test_parse_ib_timestamp_invalid() {
assert!(parse_ib_timestamp("invalid").is_none());
assert!(parse_ib_timestamp("").is_none());
}
#[test]
fn test_parse_ib_side() {
assert_eq!(parse_ib_side("BOT"), Some(Side::Buy));
assert_eq!(parse_ib_side("BUY"), Some(Side::Buy));
assert_eq!(parse_ib_side("SLD"), Some(Side::Sell));
assert_eq!(parse_ib_side("SELL"), Some(Side::Sell));
assert_eq!(parse_ib_side("SSHORT"), Some(Side::Sell));
assert_eq!(parse_ib_side("SLONG"), Some(Side::Sell));
assert_eq!(parse_ib_side("UNKNOWN"), None);
}
}