use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
use indexmap::{IndexMap, IndexSet};
use nautilus_common::{
cache::Cache,
clients::ExecutionClient,
clock::Clock,
enums::{LogColor, LogLevel},
log_info,
messages::{
ExecutionReport,
execution::{
QueryOrder, TradingCommand,
report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
},
},
};
use nautilus_core::{
UUID4, UnixNanos,
datetime::{
NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, mins_to_nanos, mins_to_secs,
nanos_to_millis,
},
};
use nautilus_execution::{
engine::ExecutionEngine,
reconciliation::{
calculate_reconciliation_price, create_inferred_fill_for_qty,
create_position_reconciliation_venue_order_id, create_reconciliation_rejected,
create_reconciliation_triggered, generate_external_order_status_events,
process_mass_status_for_reconciliation, reconcile_order_report,
should_reconciliation_update,
},
};
use nautilus_model::{
enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
events::{OrderCanceled, OrderEventAny, OrderFilled, OrderInitialized},
identifiers::{
AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
VenueOrderId,
},
instruments::{Instrument, InstrumentAny},
orders::{Order, OrderAny, TRIGGERABLE_ORDER_TYPES},
position::Position,
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
types::{Price, Quantity},
};
use rust_decimal::{Decimal, prelude::ToPrimitive};
use ustr::Ustr;
use crate::config::LiveExecEngineConfig;
static TAG_VENUE: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("VENUE"));
static TAG_RECONCILIATION: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("RECONCILIATION"));
#[derive(Debug, Clone)]
pub struct ExternalOrderMetadata {
pub client_order_id: ClientOrderId,
pub venue_order_id: VenueOrderId,
pub instrument_id: InstrumentId,
pub strategy_id: StrategyId,
pub ts_init: UnixNanos,
}
#[derive(Debug, Default)]
pub struct ReconciliationResult {
pub events: Vec<OrderEventAny>,
pub external_orders: Vec<ExternalOrderMetadata>,
}
#[derive(Debug, Default)]
pub struct InflightCheckResult {
pub events: Vec<OrderEventAny>,
pub queries: Vec<TradingCommand>,
}
#[derive(Debug, Clone)]
pub struct ExecutionManagerConfig {
pub trader_id: TraderId,
pub reconciliation: bool,
pub lookback_mins: Option<u64>,
pub reconciliation_instrument_ids: IndexSet<InstrumentId>,
pub filter_unclaimed_external: bool,
pub filter_position_reports: bool,
pub filtered_client_order_ids: IndexSet<ClientOrderId>,
pub generate_missing_orders: bool,
pub inflight_check_interval_ms: u32,
pub inflight_threshold_ms: u64,
pub inflight_max_retries: u32,
pub open_check_interval_secs: Option<f64>,
pub open_check_lookback_mins: Option<u64>,
pub open_check_threshold_ns: u64,
pub open_check_missing_retries: u32,
pub open_check_open_only: bool,
pub max_single_order_queries_per_cycle: u32,
pub single_order_query_delay_ms: u32,
pub position_check_interval_secs: Option<f64>,
pub position_check_lookback_mins: u64,
pub position_check_threshold_ns: u64,
pub position_check_retries: u32,
pub purge_closed_orders_buffer_mins: Option<u32>,
pub purge_closed_positions_buffer_mins: Option<u32>,
pub purge_account_events_lookback_mins: Option<u32>,
pub purge_from_database: bool,
}
impl Default for ExecutionManagerConfig {
fn default() -> Self {
Self {
trader_id: TraderId::default(),
reconciliation: true,
lookback_mins: Some(60),
reconciliation_instrument_ids: IndexSet::new(),
filter_unclaimed_external: false,
filter_position_reports: false,
filtered_client_order_ids: IndexSet::new(),
generate_missing_orders: true,
inflight_check_interval_ms: 2_000,
inflight_threshold_ms: 5_000,
inflight_max_retries: 5,
open_check_interval_secs: None,
open_check_lookback_mins: Some(60),
open_check_threshold_ns: 5_000_000_000,
open_check_missing_retries: 5,
open_check_open_only: true,
max_single_order_queries_per_cycle: 5,
single_order_query_delay_ms: 100,
position_check_interval_secs: None,
position_check_lookback_mins: 60,
position_check_threshold_ns: 60_000_000_000,
position_check_retries: 3,
purge_closed_orders_buffer_mins: None,
purge_closed_positions_buffer_mins: None,
purge_account_events_lookback_mins: None,
purge_from_database: false,
}
}
}
impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
fn from(config: &LiveExecEngineConfig) -> Self {
let filtered_client_order_ids: IndexSet<ClientOrderId> = config
.filtered_client_order_ids
.clone()
.unwrap_or_default()
.into_iter()
.map(|value| ClientOrderId::from(value.as_str()))
.collect();
let reconciliation_instrument_ids: IndexSet<InstrumentId> = config
.reconciliation_instrument_ids
.clone()
.unwrap_or_default()
.into_iter()
.map(InstrumentId::from)
.collect();
let open_check_threshold_ns =
(config.open_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
let position_check_threshold_ns =
(config.position_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
Self {
trader_id: TraderId::default(), reconciliation: config.reconciliation,
lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
reconciliation_instrument_ids,
filter_unclaimed_external: config.filter_unclaimed_external_orders,
filter_position_reports: config.filter_position_reports,
filtered_client_order_ids,
generate_missing_orders: config.generate_missing_orders,
inflight_check_interval_ms: config.inflight_check_interval_ms,
inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
inflight_max_retries: config.inflight_check_retries,
open_check_interval_secs: config.open_check_interval_secs,
open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
open_check_threshold_ns,
open_check_missing_retries: config.open_check_missing_retries,
open_check_open_only: config.open_check_open_only,
max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
single_order_query_delay_ms: config.single_order_query_delay_ms,
position_check_interval_secs: config.position_check_interval_secs,
position_check_lookback_mins: config.position_check_lookback_mins as u64,
position_check_threshold_ns,
position_check_retries: config.position_check_retries,
purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
purge_from_database: config.purge_from_database,
}
}
}
impl ExecutionManagerConfig {
#[must_use]
pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
self.trader_id = trader_id;
self
}
}
#[derive(Debug, Clone)]
struct InflightCheck {
#[allow(dead_code)]
pub client_order_id: ClientOrderId,
pub ts_submitted: UnixNanos,
pub retry_count: u32,
pub last_query_ts: Option<UnixNanos>,
}
#[derive(Clone)]
pub struct ExecutionManager {
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
config: ExecutionManagerConfig,
inflight_checks: IndexMap<ClientOrderId, InflightCheck>,
external_order_claims: IndexMap<InstrumentId, StrategyId>,
processed_fills: IndexMap<TradeId, ClientOrderId>,
recon_check_retries: IndexMap<ClientOrderId, u32>,
ts_last_query: IndexMap<ClientOrderId, UnixNanos>,
order_local_activity_ns: IndexMap<ClientOrderId, UnixNanos>,
position_local_activity_ns: IndexMap<InstrumentId, UnixNanos>,
position_recon_retries: IndexMap<InstrumentId, u32>,
recent_fills_cache: IndexMap<TradeId, UnixNanos>,
}
impl Debug for ExecutionManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(ExecutionManager))
.field("config", &self.config)
.field("inflight_checks", &self.inflight_checks)
.field("external_order_claims", &self.external_order_claims)
.field("processed_fills", &self.processed_fills)
.field("recon_check_retries", &self.recon_check_retries)
.finish()
}
}
impl ExecutionManager {
pub fn new(
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
config: ExecutionManagerConfig,
) -> Self {
Self {
clock,
cache,
config,
inflight_checks: IndexMap::new(),
external_order_claims: IndexMap::new(),
processed_fills: IndexMap::new(),
recon_check_retries: IndexMap::new(),
ts_last_query: IndexMap::new(),
order_local_activity_ns: IndexMap::new(),
position_local_activity_ns: IndexMap::new(),
position_recon_retries: IndexMap::new(),
recent_fills_cache: IndexMap::new(),
}
}
#[must_use]
pub fn generate_timestamp_ns(&self) -> UnixNanos {
self.clock.borrow().timestamp_ns()
}
pub async fn reconcile_execution_mass_status(
&mut self,
mass_status: ExecutionMassStatus,
exec_engine: Rc<RefCell<ExecutionEngine>>,
) -> ReconciliationResult {
let venue = mass_status.venue;
let order_count = mass_status.order_reports().len();
let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
let position_count = mass_status.position_reports().len();
log_info!(
"Reconciling ExecutionMassStatus for {venue}",
color = LogColor::Blue
);
log_info!(
"Received {order_count} order(s), {fill_count} fill(s), {position_count} position(s)",
color = LogColor::Blue
);
let (adjusted_order_reports, adjusted_fill_reports) =
self.adjust_mass_status_fills(&mass_status);
let mut events = Vec::new();
let mut external_orders = Vec::new();
let mut orders_reconciled = 0usize;
let mut external_orders_created = 0usize;
let mut open_orders_initialized = 0usize;
let mut orders_skipped_no_instrument = 0usize;
let mut orders_skipped_duplicate = 0usize;
let mut fills_applied = 0usize;
let fill_reports = &adjusted_fill_reports;
let mut seen_trade_ids: IndexSet<TradeId> = IndexSet::new();
for fills in fill_reports.values() {
for fill in fills {
if !seen_trade_ids.insert(fill.trade_id) {
log::warn!("Duplicate trade_id {} in mass status", fill.trade_id);
}
}
}
let order_reports = self.deduplicate_order_reports(adjusted_order_reports.values());
let mut orders_skipped_filtered = 0usize;
for report in order_reports.values() {
if self.should_skip_order_report(report) {
orders_skipped_filtered += 1;
continue;
}
if let Some(client_order_id) = &report.client_order_id {
if let Some(cached_order) = self.get_order(client_order_id)
&& self.is_exact_order_match(&cached_order, report)
{
log::debug!("Skipping order {client_order_id}: already in sync with venue");
orders_skipped_duplicate += 1;
if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
client_order_id,
&report.venue_order_id,
false,
) {
log::warn!("Failed to add venue order ID index: {e}");
}
continue;
}
if let Some(cached_order) = self.get_order(client_order_id)
&& cached_order.is_closed()
&& cached_order
.tags()
.is_some_and(|tags| tags.contains(&*TAG_RECONCILIATION))
{
log::debug!(
"Skipping closed reconciliation order {client_order_id}: \
synthetic position adjustment from previous session",
);
orders_skipped_duplicate += 1;
continue;
}
if let Some(order) = self.get_order(client_order_id) {
let instrument = self.get_instrument(&report.instrument_id);
log::info!(
color = LogColor::Blue as u8;
"Reconciling {} {} {} [{}] -> [{}]",
client_order_id,
report.venue_order_id,
report.instrument_id,
order.status(),
report.order_status,
);
let order_fills: Vec<&FillReport> = fill_reports
.get(&report.venue_order_id)
.map(|f| f.iter().collect())
.unwrap_or_default();
let order_events = self.reconcile_order_with_fills(
&order,
report,
&order_fills,
instrument.as_ref(),
);
if !order_events.is_empty() {
orders_reconciled += 1;
fills_applied += order_events
.iter()
.filter(|e| matches!(e, OrderEventAny::Filled(_)))
.count();
events.extend(order_events);
}
if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
client_order_id,
&report.venue_order_id,
false,
) {
log::warn!("Failed to add venue order ID index: {e}");
}
} else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id)
{
let instrument = self.get_instrument(&report.instrument_id);
log::info!(
color = LogColor::Blue as u8;
"Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
order.client_order_id(),
report.venue_order_id,
report.instrument_id,
order.status(),
report.order_status,
);
let order_fills: Vec<&FillReport> = fill_reports
.get(&report.venue_order_id)
.map(|f| f.iter().collect())
.unwrap_or_default();
let order_events = self.reconcile_order_with_fills(
&order,
report,
&order_fills,
instrument.as_ref(),
);
if !order_events.is_empty() {
orders_reconciled += 1;
fills_applied += order_events
.iter()
.filter(|e| matches!(e, OrderEventAny::Filled(_)))
.count();
events.extend(order_events);
}
if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
&order.client_order_id(),
&report.venue_order_id,
false,
) {
log::warn!("Failed to add venue order ID index: {e}");
}
} else if !self.config.filter_unclaimed_external {
if let Some(instrument) = self.get_instrument(&report.instrument_id) {
let order_fills: Vec<&FillReport> = fill_reports
.get(&report.venue_order_id)
.map(|f| f.iter().collect())
.unwrap_or_default();
let (external_events, metadata) = self.handle_external_order(
report,
&mass_status.account_id,
&instrument,
&order_fills,
false, );
if !external_events.is_empty() {
external_orders_created += 1;
fills_applied += external_events
.iter()
.filter(|e| matches!(e, OrderEventAny::Filled(_)))
.count();
if report.order_status.is_open() {
open_orders_initialized += 1;
}
events.extend(external_events);
if let Some(m) = metadata {
external_orders.push(m);
}
}
} else {
orders_skipped_no_instrument += 1;
}
}
} else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id) {
let instrument = self.get_instrument(&report.instrument_id);
log::info!(
color = LogColor::Blue as u8;
"Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
order.client_order_id(),
report.venue_order_id,
report.instrument_id,
order.status(),
report.order_status,
);
let order_fills: Vec<&FillReport> = fill_reports
.get(&report.venue_order_id)
.map(|f| f.iter().collect())
.unwrap_or_default();
let order_events = self.reconcile_order_with_fills(
&order,
report,
&order_fills,
instrument.as_ref(),
);
if !order_events.is_empty() {
orders_reconciled += 1;
fills_applied += order_events
.iter()
.filter(|e| matches!(e, OrderEventAny::Filled(_)))
.count();
events.extend(order_events);
}
if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
&order.client_order_id(),
&report.venue_order_id,
false,
) {
log::warn!("Failed to add venue order ID index: {e}");
}
} else if let Some(instrument) = self.get_instrument(&report.instrument_id) {
let is_synthetic = report.venue_order_id.as_str().starts_with("S-");
let order_fills: Vec<&FillReport> = fill_reports
.get(&report.venue_order_id)
.map(|f| f.iter().collect())
.unwrap_or_default();
let (external_events, metadata) = self.handle_external_order(
report,
&mass_status.account_id,
&instrument,
&order_fills,
is_synthetic,
);
if !external_events.is_empty() {
external_orders_created += 1;
fills_applied += external_events
.iter()
.filter(|e| matches!(e, OrderEventAny::Filled(_)))
.count();
if report.order_status.is_open() {
open_orders_initialized += 1;
}
events.extend(external_events);
if let Some(m) = metadata {
external_orders.push(m);
}
}
} else {
orders_skipped_no_instrument += 1;
}
}
let processed_venue_order_ids: IndexSet<VenueOrderId> =
order_reports.keys().copied().collect();
for (venue_order_id, fills) in fill_reports {
if processed_venue_order_ids.contains(venue_order_id) {
continue;
}
let Some(first_fill) = fills.first() else {
continue;
};
if !self.should_reconcile_instrument(&first_fill.instrument_id) {
log::debug!(
"Skipping orphan fills for {}: not in reconciliation_instrument_ids",
first_fill.instrument_id
);
continue;
}
if let Some(client_order_id) = &first_fill.client_order_id
&& self
.config
.filtered_client_order_ids
.contains(client_order_id)
{
log::debug!(
"Skipping orphan fills for {client_order_id}: in filtered_client_order_ids"
);
continue;
}
let order = first_fill
.client_order_id
.as_ref()
.and_then(|id| self.get_order(id))
.or_else(|| self.get_order_by_venue_order_id(venue_order_id));
if let Some(ref order) = order
&& self
.config
.filtered_client_order_ids
.contains(&order.client_order_id())
{
log::debug!(
"Skipping orphan fills for {}: in filtered_client_order_ids",
order.client_order_id()
);
continue;
}
if let Some(order) = order {
let instrument_id = order.instrument_id();
if let Some(instrument) = self.get_instrument(&instrument_id) {
let mut sorted_fills: Vec<&FillReport> = fills.iter().collect();
sorted_fills.sort_by_key(|f| f.ts_event);
for fill in sorted_fills {
if let Some(event) = self.create_order_fill(&order, fill, &instrument) {
fills_applied += 1;
events.push(event);
}
}
}
}
}
events.sort_by_key(|e| e.ts_event());
for event in &events {
exec_engine.borrow_mut().process(event);
}
let mut positions_created = 0usize;
if !self.config.filter_position_reports {
let instruments_with_unattributed_fills: IndexSet<InstrumentId> = mass_status
.fill_reports()
.values()
.flatten()
.filter(|f| f.venue_position_id.is_none())
.map(|f| f.instrument_id)
.chain(
mass_status
.order_reports()
.values()
.filter(|r| !r.filled_qty.is_zero() && r.venue_position_id.is_none())
.map(|r| r.instrument_id),
)
.collect();
let positions_with_fills: IndexSet<PositionId> = mass_status
.fill_reports()
.values()
.flatten()
.filter_map(|f| f.venue_position_id)
.chain(
mass_status
.order_reports()
.values()
.filter(|r| !r.filled_qty.is_zero())
.filter_map(|r| r.venue_position_id),
)
.collect();
for (instrument_id, reports) in mass_status.position_reports() {
if !self.should_reconcile_instrument(&instrument_id) {
log::debug!(
"Skipping position reports for {instrument_id}: not in reconciliation_instrument_ids"
);
continue;
}
for report in reports {
if let Some(position_events) = self.reconcile_position_report(
&report,
&mass_status.account_id,
&instruments_with_unattributed_fills,
&positions_with_fills,
) {
for event in position_events {
exec_engine.borrow_mut().process(&event);
events.push(event);
}
positions_created += 1;
}
}
}
}
if orders_skipped_no_instrument > 0 {
log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
}
if orders_skipped_duplicate > 0 {
log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
}
if orders_skipped_filtered > 0 {
log::debug!("{orders_skipped_filtered} orders skipped (filtered by config)");
}
log::info!(
color = LogColor::Blue as u8;
"Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, positions={positions_created}, skipped={orders_skipped_duplicate}, filtered={orders_skipped_filtered}",
);
ReconciliationResult {
events,
external_orders,
}
}
pub fn check_inflight_orders(&mut self) -> InflightCheckResult {
let mut result = InflightCheckResult::default();
let current_time = self.clock.borrow().timestamp_ns();
let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
let mut to_check = Vec::new();
for (client_order_id, check) in &self.inflight_checks {
if current_time - check.ts_submitted > threshold_ns {
to_check.push(*client_order_id);
}
}
for client_order_id in to_check {
if self
.config
.filtered_client_order_ids
.contains(&client_order_id)
{
continue;
}
if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
if let Some(last_query_ts) = check.last_query_ts
&& current_time - last_query_ts < threshold_ns
{
continue;
}
check.retry_count += 1;
check.last_query_ts = Some(current_time);
self.ts_last_query.insert(client_order_id, current_time);
self.recon_check_retries
.insert(client_order_id, check.retry_count);
if check.retry_count >= self.config.inflight_max_retries {
let ts_now = self.clock.borrow().timestamp_ns();
if let Some(order) = self.get_order(&client_order_id) {
match order.status() {
OrderStatus::Submitted => {
if let Some(event) = create_reconciliation_rejected(
&order,
Some("INFLIGHT_TIMEOUT"),
ts_now,
) {
result.events.push(event);
}
}
OrderStatus::PendingUpdate | OrderStatus::PendingCancel => {
let event = OrderEventAny::Canceled(OrderCanceled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
ts_now,
ts_now,
true, order.venue_order_id(),
order.account_id(),
));
result.events.push(event);
}
_ => {
}
}
}
self.clear_recon_tracking(&client_order_id, true);
} else if let Some(order) = self.get_order(&client_order_id) {
let client_id = self.cache.borrow().client_id(&client_order_id).copied();
let query = TradingCommand::QueryOrder(QueryOrder::new(
order.trader_id(),
client_id,
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
order.venue_order_id(),
UUID4::new(),
current_time,
None,
));
result.queries.push(query);
}
}
}
result
}
pub async fn check_open_orders(
&mut self,
clients: &[&dyn ExecutionClient],
) -> Vec<OrderEventAny> {
log::debug!("Checking order consistency between cached-state and venues");
let filtered_orders: Vec<OrderAny> = {
let cache = self.cache.borrow();
let mut orders = cache.orders_open(None, None, None, None, None);
orders.extend(cache.orders_inflight(None, None, None, None, None));
if self.config.reconciliation_instrument_ids.is_empty() {
orders.iter().map(|o| (*o).clone()).collect()
} else {
orders
.iter()
.filter(|o| {
self.config
.reconciliation_instrument_ids
.contains(&o.instrument_id())
})
.map(|o| (*o).clone())
.collect()
}
};
log::debug!(
"Found {} order{} open in cache",
filtered_orders.len(),
if filtered_orders.len() == 1 { "" } else { "s" }
);
let mut all_reports = Vec::new();
let mut venue_reported_ids = IndexSet::new();
let ts_now = self.clock.borrow().timestamp_ns();
let start = self.config.open_check_lookback_mins.map(|mins| {
let lookback_ns = mins_to_nanos(mins);
UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
});
for client in clients {
let mut cmd = GenerateOrderStatusReports::new(
UUID4::new(),
ts_now,
self.config.open_check_open_only,
None, start,
None, None, None, );
cmd.log_receipt_level = LogLevel::Debug;
match client.generate_order_status_reports(&cmd).await {
Ok(reports) => {
for report in reports {
if let Some(client_order_id) = &report.client_order_id {
venue_reported_ids.insert(*client_order_id);
}
all_reports.push(report);
}
}
Err(e) => {
log::error!(
"Failed to query order reports from {}: {e}",
client.client_id()
);
}
}
}
let ts_now = self.clock.borrow().timestamp_ns();
let mut events = Vec::new();
for report in all_reports {
if let Some(client_order_id) = &report.client_order_id
&& let Some(order) = self.get_order(client_order_id)
{
if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
&& (ts_now - last_activity) < self.config.open_check_threshold_ns
{
let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
log::info!(
"Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
);
continue;
}
let instrument = self.get_instrument(&report.instrument_id);
if let Some(event) =
self.reconcile_order_report(&order, &report, instrument.as_ref())
{
events.push(event);
}
}
}
if !self.config.open_check_open_only {
let candidates: Vec<&OrderAny> = if let Some(cutoff) = start {
filtered_orders
.iter()
.filter(|o| o.ts_last() >= cutoff)
.collect()
} else {
filtered_orders.iter().collect()
};
let cached_ids: IndexSet<ClientOrderId> =
candidates.iter().map(|o| o.client_order_id()).collect();
let missing_at_venue: IndexSet<ClientOrderId> = cached_ids
.difference(&venue_reported_ids)
.copied()
.collect();
for client_order_id in missing_at_venue {
events.extend(self.handle_missing_order(client_order_id));
}
}
events
}
pub async fn check_positions_consistency(
&mut self,
clients: &[&dyn ExecutionClient],
) -> Vec<OrderEventAny> {
log::debug!("Checking position consistency between cached-state and venues");
let open_positions = {
let cache = self.cache.borrow();
let positions = cache.positions_open(None, None, None, None, None);
if self.config.reconciliation_instrument_ids.is_empty() {
positions.iter().map(|p| (*p).clone()).collect()
} else {
positions
.iter()
.filter(|p| {
self.config
.reconciliation_instrument_ids
.contains(&p.instrument_id)
})
.map(|p| (*p).clone())
.collect::<Vec<_>>()
}
};
log::debug!(
"Found {} position{} to check",
open_positions.len(),
if open_positions.len() == 1 { "" } else { "s" }
);
let mut venue_positions = IndexMap::new();
for client in clients {
let mut cmd = GeneratePositionStatusReports::new(
UUID4::new(),
self.clock.borrow().timestamp_ns(),
None, None, None, None, None, );
cmd.log_receipt_level = LogLevel::Debug;
match client.generate_position_status_reports(&cmd).await {
Ok(reports) => {
for report in reports {
venue_positions.insert(report.instrument_id, report);
}
}
Err(e) => {
log::error!(
"Failed to query position reports from {}: {e}",
client.client_id()
);
}
}
}
let mut events = Vec::new();
let mut checked_instruments = IndexSet::new();
for position in &open_positions {
if !checked_instruments.insert(position.instrument_id) {
continue;
}
if !self.config.reconciliation_instrument_ids.is_empty()
&& !self
.config
.reconciliation_instrument_ids
.contains(&position.instrument_id)
{
continue;
}
let venue_report = venue_positions.get(&position.instrument_id);
if let Some(discrepancy_events) =
self.check_position_discrepancy(position, venue_report)
{
events.extend(discrepancy_events);
}
}
let active_instruments: IndexSet<InstrumentId> = open_positions
.iter()
.map(|p| p.instrument_id)
.chain(
venue_positions
.iter()
.filter(|(_, r)| r.signed_decimal_qty != Decimal::ZERO)
.map(|(id, _)| *id),
)
.collect();
self.position_recon_retries
.retain(|iid, _| active_instruments.contains(iid));
events
}
pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
let ts_submitted = self.clock.borrow().timestamp_ns();
self.inflight_checks.insert(
client_order_id,
InflightCheck {
client_order_id,
ts_submitted,
retry_count: 0,
last_query_ts: None,
},
);
self.recon_check_retries.insert(client_order_id, 0);
self.ts_last_query.shift_remove(&client_order_id);
self.order_local_activity_ns.shift_remove(&client_order_id);
}
pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
let ts_now = self.clock.borrow().timestamp_ns();
self.order_local_activity_ns.insert(client_order_id, ts_now);
}
pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
self.inflight_checks.shift_remove(client_order_id);
self.recon_check_retries.shift_remove(client_order_id);
if drop_last_query {
self.ts_last_query.shift_remove(client_order_id);
}
self.order_local_activity_ns.shift_remove(client_order_id);
}
pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
self.external_order_claims
.insert(instrument_id, strategy_id);
}
pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
self.position_local_activity_ns
.insert(instrument_id, ts_event);
}
pub fn observe_execution_report(&mut self, report: &ExecutionReport) {
match report {
ExecutionReport::Order(order_report) => {
if let Some(client_order_id) = &order_report.client_order_id {
if !matches!(
order_report.order_status,
OrderStatus::PendingUpdate | OrderStatus::PendingCancel
) {
self.clear_recon_tracking(client_order_id, true);
}
self.record_local_activity(*client_order_id);
}
}
ExecutionReport::Fill(fill_report) => {
let client_order_id = fill_report.client_order_id.or_else(|| {
self.cache
.borrow()
.client_order_id(&fill_report.venue_order_id)
.copied()
});
if let Some(coid) = client_order_id {
self.record_local_activity(coid);
}
self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
}
ExecutionReport::OrderWithFills(order_report, fills) => {
if let Some(client_order_id) = &order_report.client_order_id
&& !matches!(
order_report.order_status,
OrderStatus::PendingUpdate | OrderStatus::PendingCancel
)
{
self.clear_recon_tracking(client_order_id, true);
self.record_local_activity(*client_order_id);
}
for fill_report in fills {
self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
}
}
ExecutionReport::Position(position_report) => {
self.record_position_activity(
position_report.instrument_id,
position_report.ts_last,
);
}
ExecutionReport::MassStatus(_) => {
}
}
}
pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
self.recent_fills_cache.contains_key(trade_id)
}
pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
let ts_now = self.clock.borrow().timestamp_ns();
self.recent_fills_cache.insert(trade_id, ts_now);
}
pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
let ts_now = self.clock.borrow().timestamp_ns();
let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
self.recent_fills_cache
.retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
}
pub fn purge_closed_orders(&mut self) {
let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
return;
};
let ts_now = self.clock.borrow().timestamp_ns();
let buffer_secs = mins_to_secs(buffer_mins as u64);
self.cache
.borrow_mut()
.purge_closed_orders(ts_now, buffer_secs);
}
pub fn purge_closed_positions(&mut self) {
let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
return;
};
let ts_now = self.clock.borrow().timestamp_ns();
let buffer_secs = mins_to_secs(buffer_mins as u64);
self.cache
.borrow_mut()
.purge_closed_positions(ts_now, buffer_secs);
}
pub fn purge_account_events(&mut self) {
let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
return;
};
let ts_now = self.clock.borrow().timestamp_ns();
let lookback_secs = mins_to_secs(lookback_mins as u64);
self.cache
.borrow_mut()
.purge_account_events(ts_now, lookback_secs);
}
fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
self.cache.borrow().order(client_order_id).cloned()
}
fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
let cache = self.cache.borrow();
cache
.client_order_id(venue_order_id)
.and_then(|client_order_id| cache.order(client_order_id).cloned())
}
fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
self.cache.borrow().instrument(instrument_id).cloned()
}
fn should_skip_order_report(&self, report: &OrderStatusReport) -> bool {
if let Some(client_order_id) = &report.client_order_id
&& self
.config
.filtered_client_order_ids
.contains(client_order_id)
{
log::debug!(
"Skipping order report {client_order_id}: in filtered_client_order_ids list"
);
return true;
}
if !self.should_reconcile_instrument(&report.instrument_id) {
log::debug!(
"Skipping order report for {}: not in reconciliation_instrument_ids",
report.instrument_id
);
return true;
}
false
}
fn should_reconcile_instrument(&self, instrument_id: &InstrumentId) -> bool {
self.config.reconciliation_instrument_ids.is_empty()
|| self
.config
.reconciliation_instrument_ids
.contains(instrument_id)
}
fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
let mut events = Vec::new();
let Some(order) = self.get_order(&client_order_id) else {
return events;
};
let ts_now = self.clock.borrow().timestamp_ns();
let ts_last = order.ts_last();
if (ts_now - ts_last) < self.config.open_check_threshold_ns {
return events;
}
if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
&& (ts_now - last_activity) < self.config.open_check_threshold_ns
{
return events;
}
let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
*retries += 1;
if *retries >= self.config.open_check_missing_retries {
log::warn!(
"Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
);
let ts_now = self.clock.borrow().timestamp_ns();
if let Some(rejected) =
create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
{
events.push(rejected);
}
self.clear_recon_tracking(&client_order_id, true);
} else {
log::debug!(
"Order {} not found at venue, retry {}/{}",
client_order_id,
retries,
self.config.open_check_missing_retries
);
}
events
}
fn check_position_discrepancy(
&mut self,
position: &Position,
venue_report: Option<&PositionStatusReport>,
) -> Option<Vec<OrderEventAny>> {
let cached_signed_qty = position.signed_decimal_qty();
let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
let tolerance = Decimal::from_str("0.00000001").unwrap();
if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
self.position_recon_retries
.shift_remove(&position.instrument_id);
return None; }
let ts_now = self.clock.borrow().timestamp_ns();
if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
&& (ts_now - last_activity) < self.config.position_check_threshold_ns
{
log::debug!(
"Skipping position reconciliation for {}: recent activity within threshold",
position.instrument_id
);
return None;
}
let retries = *self
.position_recon_retries
.get(&position.instrument_id)
.unwrap_or(&0);
if retries >= self.config.position_check_retries {
return None;
}
log::warn!(
"Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
position.instrument_id,
cached_signed_qty,
venue_signed_qty
);
let account_id = position.account_id;
let instrument_id = position.instrument_id;
let Some(instrument) = self.cache.borrow().instrument(&instrument_id).cloned() else {
log::debug!("Cannot reconcile position for {instrument_id}: instrument not in cache");
let new_retries = retries + 1;
self.position_recon_retries
.insert(instrument_id, new_retries);
if new_retries >= self.config.position_check_retries {
log::error!(
"Position discrepancy for {instrument_id} unresolved after {} attempts \
(cached_qty={cached_signed_qty}, venue_qty={venue_signed_qty}); \
no further reconciliation attempts will be made",
self.config.position_check_retries,
);
}
return None;
};
let cached_avg_px = if position.avg_px_open > 0.0 {
Decimal::from_str(&position.avg_px_open.to_string()).ok()
} else {
None
};
let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
|| (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
let result = if crosses_zero {
let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
self.reconcile_cross_zero_position(
&instrument,
account_id,
instrument_id,
cached_signed_qty,
cached_avg_px,
venue_signed_qty,
venue_avg_px,
ts_now,
venue_ts_last,
)
} else {
let qty_diff = venue_signed_qty - cached_signed_qty;
let order_side = if qty_diff > Decimal::ZERO {
OrderSide::Buy
} else {
OrderSide::Sell
};
let reconciliation_px = calculate_reconciliation_price(
cached_signed_qty,
cached_avg_px,
venue_signed_qty,
venue_avg_px,
);
match reconciliation_px.or(venue_avg_px).or(cached_avg_px) {
Some(fill_px) => {
let fill_qty = qty_diff.abs();
let venue_position_id = venue_report.and_then(|r| r.venue_position_id);
let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
Quantity::from_decimal_dp(fill_qty, instrument.size_precision())
.ok()
.and_then(|order_qty| {
let fill_price =
Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
let venue_order_id = create_position_reconciliation_venue_order_id(
account_id,
instrument_id,
order_side,
OrderType::Market,
order_qty,
fill_price,
venue_position_id,
None,
venue_ts_last,
);
OrderStatusReport::new(
account_id,
instrument_id,
None,
venue_order_id,
order_side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
order_qty,
order_qty,
ts_now,
ts_now,
ts_now,
None,
)
.with_avg_px(fill_px.to_f64().unwrap_or(0.0))
.ok()
})
.map(|order_report| {
log::info!(
color = LogColor::Blue as u8;
"Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={}, px={fill_px}", qty_diff.abs(),
);
let (events, _) = self.handle_external_order(
&order_report,
&account_id,
&instrument,
&[],
true,
);
events
})
}
None => None,
}
};
if result.is_none() || result.as_ref().is_some_and(|e| e.is_empty()) {
let new_retries = retries + 1;
self.position_recon_retries
.insert(instrument_id, new_retries);
if new_retries >= self.config.position_check_retries {
log::error!(
"Position discrepancy for {} unresolved after {} attempts \
(cached_qty={}, venue_qty={}); \
no further reconciliation attempts will be made",
instrument_id,
self.config.position_check_retries,
cached_signed_qty,
venue_signed_qty,
);
}
} else {
self.position_recon_retries.shift_remove(&instrument_id);
}
result
}
#[expect(clippy::too_many_arguments)]
fn reconcile_cross_zero_position(
&mut self,
instrument: &InstrumentAny,
account_id: AccountId,
instrument_id: InstrumentId,
cached_signed_qty: Decimal,
cached_avg_px: Option<Decimal>,
venue_signed_qty: Decimal,
venue_avg_px: Option<Decimal>,
ts_now: UnixNanos,
venue_ts_last: UnixNanos,
) -> Option<Vec<OrderEventAny>> {
log::info!(
color = LogColor::Blue as u8;
"Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
);
let mut all_events = Vec::new();
let close_qty = cached_signed_qty.abs();
let close_side = if cached_signed_qty < Decimal::ZERO {
OrderSide::Buy } else {
OrderSide::Sell };
if let Some(close_px) = cached_avg_px {
let close_order_qty =
Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
let close_fill_price =
Price::from_decimal_dp(close_px, instrument.price_precision()).ok();
let close_venue_order_id = create_position_reconciliation_venue_order_id(
account_id,
instrument_id,
close_side,
OrderType::Market,
close_order_qty,
close_fill_price,
None,
Some("CLOSE"),
venue_ts_last,
);
let close_report = OrderStatusReport::new(
account_id,
instrument_id,
None,
close_venue_order_id,
close_side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
close_order_qty,
close_order_qty,
ts_now,
ts_now,
ts_now,
None,
)
.with_avg_px(close_px.to_f64().unwrap_or(0.0))
.ok()?;
log::info!(
color = LogColor::Blue as u8;
"Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
);
let (close_events, _) =
self.handle_external_order(&close_report, &account_id, instrument, &[], true);
all_events.extend(close_events);
} else {
log::warn!("Cannot close position for {instrument_id}: no cached average price");
return None;
}
let open_qty = venue_signed_qty.abs();
let open_side = if venue_signed_qty > Decimal::ZERO {
OrderSide::Buy } else {
OrderSide::Sell };
if let Some(open_px) = venue_avg_px {
let open_order_qty =
Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
let open_fill_price =
Price::from_decimal_dp(open_px, instrument.price_precision()).ok();
let open_venue_order_id = create_position_reconciliation_venue_order_id(
account_id,
instrument_id,
open_side,
OrderType::Market,
open_order_qty,
open_fill_price,
None,
Some("OPEN"),
venue_ts_last,
);
let open_report = OrderStatusReport::new(
account_id,
instrument_id,
None,
open_venue_order_id,
open_side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
open_order_qty,
open_order_qty,
ts_now,
ts_now,
ts_now,
None,
)
.with_avg_px(open_px.to_f64().unwrap_or(0.0))
.ok()?;
log::info!(
color = LogColor::Blue as u8;
"Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
);
let (open_events, _) =
self.handle_external_order(&open_report, &account_id, instrument, &[], true);
all_events.extend(open_events);
} else {
log::warn!("Cannot open new position for {instrument_id}: no venue average price");
return Some(all_events);
}
Some(all_events)
}
fn create_position_from_report(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
) -> Option<Vec<OrderEventAny>> {
let instrument_id = report.instrument_id;
let venue_signed_qty = report.signed_decimal_qty;
if venue_signed_qty == Decimal::ZERO {
return None;
}
let order_side = if venue_signed_qty > Decimal::ZERO {
OrderSide::Buy
} else {
OrderSide::Sell
};
let qty_abs = venue_signed_qty.abs();
let venue_avg_px = report.avg_px_open?;
let ts_now = self.clock.borrow().timestamp_ns();
let order_qty = Quantity::from_decimal_dp(qty_abs, instrument.size_precision()).ok()?;
let fill_price = Price::from_decimal_dp(venue_avg_px, instrument.price_precision()).ok();
let venue_order_id = create_position_reconciliation_venue_order_id(
*account_id,
instrument_id,
order_side,
OrderType::Market,
order_qty,
fill_price,
report.venue_position_id,
None,
report.ts_last,
);
let mut order_report = OrderStatusReport::new(
*account_id,
instrument_id,
None,
venue_order_id,
order_side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
order_qty,
order_qty,
ts_now,
ts_now,
ts_now,
None,
)
.with_avg_px(venue_avg_px.to_f64().unwrap_or(0.0))
.ok()?;
if let Some(venue_position_id) = report.venue_position_id {
order_report = order_report.with_venue_position_id(venue_position_id);
}
log::info!(
color = LogColor::Blue as u8;
"Creating position from venue report for {instrument_id}: side={order_side:?}, qty={qty_abs}, avg_px={venue_avg_px}",
);
let (events, _) =
self.handle_external_order(&order_report, account_id, instrument, &[], true);
Some(events)
}
fn reconcile_position_report(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
positions_with_fills: &IndexSet<PositionId>,
) -> Option<Vec<OrderEventAny>> {
if report.venue_position_id.is_some() {
self.reconcile_position_report_hedging(
report,
account_id,
instruments_with_unattributed_fills,
positions_with_fills,
)
} else {
self.reconcile_position_report_netting(report, account_id)
}
}
fn reconcile_position_report_hedging(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
positions_with_fills: &IndexSet<PositionId>,
) -> Option<Vec<OrderEventAny>> {
let venue_position_id = report.venue_position_id?;
if positions_with_fills.contains(&venue_position_id) {
log::debug!(
"Skipping hedge position {venue_position_id} reconciliation: fills already in batch"
);
return None;
}
if instruments_with_unattributed_fills.contains(&report.instrument_id) {
log::debug!(
"Skipping hedge position {venue_position_id} reconciliation: unattributed fills in batch"
);
return None;
}
log::debug!(
"Reconciling HEDGE position for {}, venue_position_id={}",
report.instrument_id,
venue_position_id
);
let position = {
let cache = self.cache.borrow();
cache.position(&venue_position_id).cloned()
};
match position {
Some(position) => {
let cached_signed_qty = position.signed_decimal_qty();
let venue_signed_qty = report.signed_decimal_qty;
if cached_signed_qty == venue_signed_qty {
log::debug!(
"Hedge position {venue_position_id} matches venue: qty={cached_signed_qty}"
);
return None;
}
if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
return None;
}
if !self.config.generate_missing_orders {
log::error!(
"Cannot reconcile {} {}: position net qty {} != reported net qty {} \
and `generate_missing_orders` is disabled",
report.instrument_id,
venue_position_id,
cached_signed_qty,
venue_signed_qty
);
return None;
}
self.reconcile_hedge_position_discrepancy(
report,
account_id,
&position,
cached_signed_qty,
)
}
None => {
if report.signed_decimal_qty == Decimal::ZERO {
return None;
}
if !self.config.generate_missing_orders {
log::error!(
"Cannot reconcile position: {venue_position_id} not found and `generate_missing_orders` is disabled"
);
return None;
}
self.reconcile_missing_hedge_position(report, account_id)
}
}
}
fn reconcile_hedge_position_discrepancy(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
position: &Position,
cached_signed_qty: Decimal,
) -> Option<Vec<OrderEventAny>> {
let instrument = self.get_instrument(&report.instrument_id)?;
let venue_signed_qty = report.signed_decimal_qty;
let diff = (cached_signed_qty - venue_signed_qty).abs();
let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
if diff_qty.is_zero() {
log::debug!(
"Difference quantity rounds to zero for {}, skipping",
instrument.id()
);
return None;
}
let venue_position_id = report.venue_position_id?;
log::warn!(
"Hedge position discrepancy for {} {}: cached={}, venue={}, generating reconciliation order",
report.instrument_id,
venue_position_id,
cached_signed_qty,
venue_signed_qty
);
let current_avg_px = if position.avg_px_open > 0.0 {
Decimal::from_str(&position.avg_px_open.to_string()).ok()
} else {
None
};
self.create_position_reconciliation_order(
report,
account_id,
&instrument,
cached_signed_qty,
diff_qty,
current_avg_px,
)
}
fn reconcile_missing_hedge_position(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
) -> Option<Vec<OrderEventAny>> {
let instrument = self.get_instrument(&report.instrument_id)?;
let venue_signed_qty = report.signed_decimal_qty;
let qty = venue_signed_qty.abs();
let diff_qty = Quantity::from_decimal_dp(qty, instrument.size_precision()).ok()?;
if diff_qty.is_zero() {
return None;
}
let venue_position_id = report.venue_position_id?;
log::warn!(
"Missing hedge position for {} {}: venue reports {}, generating reconciliation order",
report.instrument_id,
venue_position_id,
venue_signed_qty
);
self.create_position_reconciliation_order(
report,
account_id,
&instrument,
Decimal::ZERO,
diff_qty,
None,
)
}
fn reconcile_position_report_netting(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
) -> Option<Vec<OrderEventAny>> {
let instrument_id = report.instrument_id;
log::debug!("Reconciling NET position for {instrument_id}");
let instrument = self.get_instrument(&instrument_id)?;
let (cached_signed_qty, cached_avg_px) = {
let cache = self.cache.borrow();
let positions =
cache.positions_open(None, Some(&instrument_id), None, Some(account_id), None);
if positions.is_empty() {
(Decimal::ZERO, None)
} else {
let mut total_signed_qty = Decimal::ZERO;
let mut total_value = Decimal::ZERO;
let mut total_qty = Decimal::ZERO;
for pos in positions {
total_signed_qty += pos.signed_decimal_qty();
let qty = pos.signed_decimal_qty().abs();
if pos.avg_px_open > 0.0
&& qty > Decimal::ZERO
&& let Ok(avg_px) = Decimal::from_str(&pos.avg_px_open.to_string())
{
total_value += avg_px * qty;
total_qty += qty;
}
}
let avg_px = if total_qty > Decimal::ZERO {
Some(total_value / total_qty)
} else {
None
};
(total_signed_qty, avg_px)
}
};
let venue_signed_qty = report.signed_decimal_qty;
log::debug!("venue_signed_qty={venue_signed_qty}, cached_signed_qty={cached_signed_qty}");
let tolerance = Decimal::from_str("0.00000001").unwrap_or(Decimal::ZERO);
if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
log::debug!("Position quantities match for {instrument_id}, no reconciliation needed");
return None;
}
if !self.config.generate_missing_orders {
log::warn!(
"Discrepancy for {instrument_id} position when `generate_missing_orders` disabled, skipping"
);
return None;
}
let diff = (cached_signed_qty - venue_signed_qty).abs();
let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
if diff_qty.is_zero() {
log::debug!(
"Difference quantity rounds to zero for {instrument_id}, skipping order generation"
);
return None;
}
let crosses_zero = cached_signed_qty != Decimal::ZERO
&& venue_signed_qty != Decimal::ZERO
&& ((cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
|| (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO));
if crosses_zero {
let ts_now = self.clock.borrow().timestamp_ns();
return self.reconcile_cross_zero_position(
&instrument,
*account_id,
instrument_id,
cached_signed_qty,
cached_avg_px,
venue_signed_qty,
report.avg_px_open,
ts_now,
report.ts_last,
);
}
if cached_signed_qty == Decimal::ZERO {
return self.create_position_from_report(report, account_id, &instrument);
}
self.create_position_reconciliation_order(
report,
account_id,
&instrument,
cached_signed_qty,
diff_qty,
cached_avg_px,
)
}
fn create_position_reconciliation_order(
&mut self,
report: &PositionStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
cached_signed_qty: Decimal,
diff_qty: Quantity,
current_avg_px: Option<Decimal>,
) -> Option<Vec<OrderEventAny>> {
let venue_signed_qty = report.signed_decimal_qty;
let instrument_id = report.instrument_id;
let order_side = if venue_signed_qty > cached_signed_qty {
OrderSide::Buy
} else {
OrderSide::Sell
};
let reconciliation_px = calculate_reconciliation_price(
cached_signed_qty,
current_avg_px,
venue_signed_qty,
report.avg_px_open,
);
let fill_px = reconciliation_px
.or(report.avg_px_open)
.or(current_avg_px)?;
let ts_now = self.clock.borrow().timestamp_ns();
let fill_price = Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
let venue_order_id = create_position_reconciliation_venue_order_id(
*account_id,
instrument_id,
order_side,
OrderType::Market,
diff_qty,
fill_price,
report.venue_position_id,
None,
report.ts_last,
);
let mut order_report = OrderStatusReport::new(
*account_id,
instrument_id,
None,
venue_order_id,
order_side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
diff_qty,
diff_qty,
ts_now,
ts_now,
ts_now,
None,
)
.with_avg_px(fill_px.to_f64().unwrap_or(0.0))
.ok()?;
if let Some(venue_position_id) = report.venue_position_id {
order_report = order_report.with_venue_position_id(venue_position_id);
}
log::info!(
color = LogColor::Blue as u8;
"Generating reconciliation order for {instrument_id}: side={order_side:?}, qty={diff_qty}, px={fill_px}",
);
let (events, _) =
self.handle_external_order(&order_report, account_id, instrument, &[], true);
Some(events)
}
fn reconcile_order_report(
&self,
order: &OrderAny,
report: &OrderStatusReport,
instrument: Option<&InstrumentAny>,
) -> Option<OrderEventAny> {
let ts_now = self.clock.borrow().timestamp_ns();
reconcile_order_report(order, report, instrument, ts_now)
}
fn reconcile_order_with_fills(
&mut self,
order: &OrderAny,
report: &OrderStatusReport,
fills: &[&FillReport],
instrument: Option<&InstrumentAny>,
) -> Vec<OrderEventAny> {
let mut events = Vec::new();
let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
sorted_fills.sort_by_key(|f| f.ts_event);
let ts_now = self.clock.borrow().timestamp_ns();
match report.order_status {
OrderStatus::Canceled => {
if report.ts_triggered.is_some()
&& order.status() != OrderStatus::Triggered
&& TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
{
events.push(create_reconciliation_triggered(order, report, ts_now));
}
if let Some(inst) = instrument {
for fill in &sorted_fills {
if let Some(event) = self.create_order_fill(order, fill, inst) {
events.push(event);
}
}
}
if let Some(event) = self.reconcile_order_report(order, report, instrument) {
events.push(event);
}
}
OrderStatus::Expired => {
if report.ts_triggered.is_some()
&& order.status() != OrderStatus::Triggered
&& TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
{
events.push(create_reconciliation_triggered(order, report, ts_now));
}
if let Some(inst) = instrument {
for fill in &sorted_fills {
if let Some(event) = self.create_order_fill(order, fill, inst) {
events.push(event);
}
}
}
if let Some(event) = self.reconcile_order_report(order, report, instrument) {
events.push(event);
}
}
_ => {
if let Some(event) = self.reconcile_order_report(order, report, instrument) {
events.push(event);
}
if let Some(inst) = instrument {
for fill in &sorted_fills {
if let Some(event) = self.create_order_fill(order, fill, inst) {
events.push(event);
}
}
}
}
}
events
}
fn handle_external_order(
&mut self,
report: &OrderStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
fills: &[&FillReport],
is_synthetic: bool,
) -> (Vec<OrderEventAny>, Option<ExternalOrderMetadata>) {
let (strategy_id, tags) =
if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
let order_id = report
.client_order_id
.map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
log::info!(
color = LogColor::Blue as u8;
"External order {} for {} claimed by strategy {}",
order_id,
report.instrument_id,
claimed_strategy,
);
(*claimed_strategy, None)
} else {
let tag = if is_synthetic {
*TAG_RECONCILIATION
} else {
*TAG_VENUE
};
(StrategyId::from("EXTERNAL"), Some(vec![tag]))
};
if self.config.filter_unclaimed_external && !is_synthetic {
return (Vec::new(), None);
}
let client_order_id = report
.client_order_id
.unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
if !report.quantity.is_positive() {
log::error!(
"Skipping external order {} ({}) for {}: non-positive quantity in report {:?}",
client_order_id,
report.venue_order_id,
report.instrument_id,
report,
);
return (Vec::new(), None);
}
let ts_now = self.clock.borrow().timestamp_ns();
let initialized = OrderInitialized::new(
self.config.trader_id,
strategy_id,
report.instrument_id,
client_order_id,
report.order_side,
report.order_type,
report.quantity,
report.time_in_force,
report.post_only,
report.reduce_only,
false, true, UUID4::new(),
ts_now,
ts_now,
report.price,
report.trigger_price,
report.trigger_type,
report.limit_offset,
report.trailing_offset,
Some(report.trailing_offset_type),
report.expire_time,
report.display_qty,
None, None, Some(report.contingency_type),
report.order_list_id,
report.linked_order_ids.clone(),
report.parent_order_id,
None, None, None, tags,
);
let events = vec![OrderEventAny::Initialized(initialized)];
let order = match OrderAny::from_events(events) {
Ok(order) => order,
Err(e) => {
log::error!("Failed to create order from report: {e}");
return (Vec::new(), None);
}
};
{
let mut cache = self.cache.borrow_mut();
if let Err(e) = cache.add_order(order.clone(), None, None, false) {
match cache.order(&client_order_id) {
Some(existing) if is_synthetic && existing.is_closed() => {
log::debug!(
"Skipping synthetic reconciliation order {client_order_id} for {}: \
replay deduped (cached status={:?})",
report.instrument_id,
existing.status(),
);
}
Some(existing) if is_synthetic => {
log::warn!(
"Synthetic reconciliation order {client_order_id} for {} exists in \
cache in non-terminal state {:?}; fill not regenerated",
report.instrument_id,
existing.status(),
);
}
_ => {
log::error!("Failed to add external order to cache: {e}");
}
}
return (Vec::new(), None);
}
if let Err(e) =
cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
{
log::warn!("Failed to add venue order ID index: {e}");
}
}
log::info!(
color = LogColor::Blue as u8;
"Created external order {} ({}) for {} [{}]",
client_order_id,
report.venue_order_id,
report.instrument_id,
report.order_status,
);
let ts_now = self.clock.borrow().timestamp_ns();
let mut order_events =
generate_external_order_status_events(&order, report, account_id, instrument, ts_now);
if !fills.is_empty() {
let cached_order = self.get_order(&client_order_id).unwrap();
let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
sorted_fills.sort_by_key(|f| f.ts_event);
match report.order_status {
OrderStatus::Canceled | OrderStatus::Expired => {
let terminal_event = order_events.pop();
for fill in sorted_fills {
if let Some(fill_event) =
self.create_order_fill(&cached_order, fill, instrument)
{
order_events.push(fill_event);
}
}
if let Some(event) = terminal_event {
order_events.push(event);
}
}
OrderStatus::Filled | OrderStatus::PartiallyFilled => {
if order_events
.last()
.is_some_and(|e| matches!(e, OrderEventAny::Filled(_)))
{
order_events.pop();
}
let mut real_fill_total = Decimal::ZERO;
for fill in &sorted_fills {
if let Some(fill_event) =
self.create_order_fill(&cached_order, fill, instrument)
{
real_fill_total += fill.last_qty.as_decimal();
order_events.push(fill_event);
}
}
let report_filled = report.filled_qty.as_decimal();
if real_fill_total < report_filled {
let diff_decimal = report_filled - real_fill_total;
if let Ok(diff) =
Quantity::from_decimal_dp(diff_decimal, instrument.size_precision())
&& let Some(inferred_fill) = create_inferred_fill_for_qty(
&cached_order,
report,
account_id,
instrument,
diff,
ts_now,
None,
)
{
order_events.push(inferred_fill);
}
}
}
_ => {}
}
}
let metadata = ExternalOrderMetadata {
client_order_id,
venue_order_id: report.venue_order_id,
instrument_id: report.instrument_id,
strategy_id,
ts_init: ts_now,
};
(order_events, Some(metadata))
}
fn adjust_mass_status_fills(
&self,
mass_status: &ExecutionMassStatus,
) -> (
IndexMap<VenueOrderId, OrderStatusReport>,
IndexMap<VenueOrderId, Vec<FillReport>>,
) {
let mut final_orders: IndexMap<VenueOrderId, OrderStatusReport> =
mass_status.order_reports();
let mut final_fills: IndexMap<VenueOrderId, Vec<FillReport>> = mass_status.fill_reports();
let mut instruments_to_adjust = Vec::new();
for (instrument_id, position_reports) in mass_status.position_reports() {
if !self.should_reconcile_instrument(&instrument_id) {
log::debug!(
"Skipping fill adjustment for {instrument_id}: not in reconciliation_instrument_ids"
);
continue;
}
let is_hedge_mode = position_reports
.iter()
.any(|r| r.venue_position_id.is_some());
if is_hedge_mode {
log::debug!(
"Skipping fill adjustment for {instrument_id}: hedge mode (has venue_position_id)"
);
continue;
}
if let Some(instrument) = self.get_instrument(&instrument_id) {
instruments_to_adjust.push(instrument);
} else {
log::debug!(
"Skipping fill adjustment for {instrument_id}: instrument not found in cache"
);
}
}
if instruments_to_adjust.is_empty() {
return (final_orders, final_fills);
}
log_info!(
"Adjusting fills for {} instrument(s) with position reports",
instruments_to_adjust.len(),
color = LogColor::Blue
);
for instrument in &instruments_to_adjust {
let instrument_id = instrument.id();
match process_mass_status_for_reconciliation(mass_status, instrument, None) {
Ok(result) => {
final_orders.retain(|_, order| order.instrument_id != instrument_id);
final_fills.retain(|_, fills| {
fills
.first()
.is_none_or(|f| f.instrument_id != instrument_id)
});
for (venue_order_id, order) in result.orders {
final_orders.insert(venue_order_id, order);
}
for (venue_order_id, fills) in result.fills {
final_fills.insert(venue_order_id, fills);
}
}
Err(e) => {
log::warn!("Failed to adjust fills for {instrument_id}: {e}");
}
}
}
log_info!(
"After adjustment: {} order(s), {} fill group(s)",
final_orders.len(),
final_fills.len(),
color = LogColor::Blue
);
(final_orders, final_fills)
}
fn deduplicate_order_reports<'a>(
&self,
reports: impl Iterator<Item = &'a OrderStatusReport>,
) -> IndexMap<VenueOrderId, &'a OrderStatusReport> {
let mut best_reports: IndexMap<VenueOrderId, &'a OrderStatusReport> = IndexMap::new();
for report in reports {
let dominated = best_reports
.get(&report.venue_order_id)
.is_some_and(|existing| self.is_more_advanced(existing, report));
if !dominated {
best_reports.insert(report.venue_order_id, report);
}
}
best_reports
}
fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
if a.filled_qty > b.filled_qty {
return true;
}
if a.filled_qty < b.filled_qty {
return false;
}
Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
}
const fn status_priority(status: OrderStatus) -> u8 {
match status {
OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
OrderStatus::Released | OrderStatus::Denied => 1,
OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
OrderStatus::Triggered => 3,
OrderStatus::PartiallyFilled => 4,
OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
OrderStatus::Filled => 6,
}
}
fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
order.status() == report.order_status
&& order.filled_qty() == report.filled_qty
&& !should_reconciliation_update(order, report)
}
fn create_order_fill(
&mut self,
order: &OrderAny,
fill: &FillReport,
instrument: &InstrumentAny,
) -> Option<OrderEventAny> {
if self.processed_fills.contains_key(&fill.trade_id) {
return None;
}
self.processed_fills
.insert(fill.trade_id, order.client_order_id());
Some(OrderEventAny::Filled(OrderFilled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
fill.venue_order_id,
fill.account_id,
fill.trade_id,
fill.order_side,
order.order_type(),
fill.last_qty,
fill.last_px,
instrument.quote_currency(),
fill.liquidity_side,
fill.report_id,
fill.ts_event,
self.clock.borrow().timestamp_ns(),
false,
fill.venue_position_id,
Some(fill.commission),
)))
}
}