use std::str::FromStr;
use nautilus_common::enums::LogColor;
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::{
enums::{LiquiditySide, OrderStatus, OrderType},
events::{
OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
OrderTriggered, OrderUpdated,
},
identifiers::{AccountId, PositionId},
instruments::{Instrument, InstrumentAny},
orders::{Order, OrderAny, TRIGGERABLE_ORDER_TYPES},
reports::{FillReport, OrderStatusReport},
types::{Money, Price, Quantity},
};
use rust_decimal::Decimal;
use ustr::Ustr;
use super::{
ids::create_inferred_reconciliation_trade_id, positions::is_within_single_unit_tolerance,
};
fn reconciliation_position_id(
report: &OrderStatusReport,
instrument: &InstrumentAny,
) -> PositionId {
report
.venue_position_id
.unwrap_or_else(|| PositionId::new(format!("{}-EXTERNAL", instrument.id())))
}
pub fn generate_external_order_status_events(
order: &OrderAny,
report: &OrderStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
ts_now: UnixNanos,
) -> Vec<OrderEventAny> {
let accepted = OrderEventAny::Accepted(OrderAccepted::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
report.venue_order_id,
*account_id,
UUID4::new(),
report.ts_accepted,
ts_now,
true, ));
match report.order_status {
OrderStatus::Accepted | OrderStatus::Triggered => vec![accepted],
OrderStatus::PartiallyFilled | OrderStatus::Filled => {
let mut events = vec![accepted];
if !report.filled_qty.is_zero()
&& let Some(filled) =
create_inferred_fill(order, report, account_id, instrument, ts_now, None)
{
events.push(filled);
}
events
}
OrderStatus::Canceled => {
let canceled = OrderEventAny::Canceled(OrderCanceled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
report.ts_last,
ts_now,
true, Some(report.venue_order_id),
Some(*account_id),
));
vec![accepted, canceled]
}
OrderStatus::Expired => {
let expired = OrderEventAny::Expired(OrderExpired::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
report.ts_last,
ts_now,
true, Some(report.venue_order_id),
Some(*account_id),
));
vec![accepted, expired]
}
OrderStatus::Rejected => {
vec![OrderEventAny::Rejected(OrderRejected::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
*account_id,
Ustr::from(report.cancel_reason.as_deref().unwrap_or("UNKNOWN")),
UUID4::new(),
report.ts_last,
ts_now,
true, false,
))]
}
_ => {
log::warn!(
"Unhandled order status {} for external order {}",
report.order_status,
order.client_order_id()
);
Vec::new()
}
}
}
pub fn create_inferred_fill(
order: &OrderAny,
report: &OrderStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
ts_now: UnixNanos,
commission: Option<Money>,
) -> Option<OrderEventAny> {
let liquidity_side = match order.order_type() {
OrderType::Market | OrderType::StopMarket | OrderType::TrailingStopMarket => {
LiquiditySide::Taker
}
_ if report.post_only => LiquiditySide::Maker,
_ => LiquiditySide::NoLiquiditySide,
};
let last_px = if let Some(avg_px) = report.avg_px {
match Price::from_decimal_dp(avg_px, instrument.price_precision()) {
Ok(px) => px,
Err(e) => {
log::warn!("Failed to create price from avg_px for inferred fill: {e}");
return None;
}
}
} else if let Some(price) = report.price {
price
} else {
log::warn!(
"Cannot create inferred fill for {}: no avg_px or price available",
order.client_order_id()
);
return None;
};
let position_id = reconciliation_position_id(report, instrument);
let trade_id = create_inferred_reconciliation_trade_id(
*account_id,
order.instrument_id(),
order.client_order_id(),
Some(report.venue_order_id),
report.order_side,
order.order_type(),
report.filled_qty,
report.filled_qty,
last_px,
position_id,
report.ts_last,
);
log::info!(
"Generated inferred fill for {} ({}) qty={} px={}",
order.client_order_id(),
report.venue_order_id,
report.filled_qty,
last_px,
);
Some(OrderEventAny::Filled(OrderFilled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
report.venue_order_id,
*account_id,
trade_id,
report.order_side,
order.order_type(),
report.filled_qty,
last_px,
instrument.quote_currency(),
liquidity_side,
UUID4::new(),
report.ts_last,
ts_now,
true, report.venue_position_id,
commission,
)))
}
#[must_use]
pub fn create_reconciliation_accepted(
order: &OrderAny,
report: &OrderStatusReport,
ts_now: UnixNanos,
) -> OrderEventAny {
OrderEventAny::Accepted(OrderAccepted::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
order.venue_order_id().unwrap_or(report.venue_order_id),
order
.account_id()
.expect("Order should have account_id for reconciliation"),
UUID4::new(),
report.ts_accepted,
ts_now,
true, ))
}
#[must_use]
pub fn create_reconciliation_rejected(
order: &OrderAny,
reason: Option<&str>,
ts_now: UnixNanos,
) -> Option<OrderEventAny> {
let account_id = order.account_id()?;
let reason = reason.unwrap_or("UNKNOWN");
Some(OrderEventAny::Rejected(OrderRejected::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
account_id,
Ustr::from(reason),
UUID4::new(),
ts_now,
ts_now,
true, false, )))
}
#[must_use]
pub fn create_reconciliation_triggered(
order: &OrderAny,
report: &OrderStatusReport,
ts_now: UnixNanos,
) -> OrderEventAny {
OrderEventAny::Triggered(OrderTriggered::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
report.ts_triggered.unwrap_or(ts_now),
ts_now,
true, order.venue_order_id(),
order.account_id(),
))
}
#[must_use]
pub fn create_reconciliation_canceled(
order: &OrderAny,
report: &OrderStatusReport,
ts_now: UnixNanos,
) -> OrderEventAny {
OrderEventAny::Canceled(OrderCanceled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
report.ts_last,
ts_now,
true, order.venue_order_id(),
order.account_id(),
))
}
#[must_use]
pub fn create_reconciliation_expired(
order: &OrderAny,
report: &OrderStatusReport,
ts_now: UnixNanos,
) -> OrderEventAny {
OrderEventAny::Expired(OrderExpired::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
UUID4::new(),
report.ts_last,
ts_now,
true, order.venue_order_id(),
order.account_id(),
))
}
#[must_use]
pub fn create_reconciliation_updated(
order: &OrderAny,
report: &OrderStatusReport,
ts_now: UnixNanos,
) -> OrderEventAny {
let trigger_price = match order.order_type() {
OrderType::StopMarket
| OrderType::StopLimit
| OrderType::MarketIfTouched
| OrderType::LimitIfTouched
| OrderType::TrailingStopMarket
| OrderType::TrailingStopLimit => report.trigger_price,
_ => None,
};
OrderEventAny::Updated(OrderUpdated::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
report.quantity,
UUID4::new(),
report.ts_last,
ts_now,
true, order.venue_order_id(),
order.account_id(),
report.price,
trigger_price,
None, order.is_quote_quantity(),
))
}
pub fn should_reconciliation_update(order: &OrderAny, report: &OrderStatusReport) -> bool {
if report.quantity != order.quantity() && report.quantity >= order.filled_qty() {
return true;
}
match order.order_type() {
OrderType::Limit => report.price != order.price(),
OrderType::StopMarket | OrderType::TrailingStopMarket => {
report.trigger_price != order.trigger_price()
}
OrderType::StopLimit | OrderType::TrailingStopLimit => {
report.trigger_price != order.trigger_price() || report.price != order.price()
}
_ => false,
}
}
#[must_use]
pub fn reconcile_order_report(
order: &OrderAny,
report: &OrderStatusReport,
instrument: Option<&InstrumentAny>,
ts_now: UnixNanos,
) -> Option<OrderEventAny> {
if order.status() == report.order_status && order.filled_qty() == report.filled_qty {
if should_reconciliation_update(order, report) {
log::info!(
"Order {} has been updated at venue: qty={}->{}, price={:?}->{:?}",
order.client_order_id(),
order.quantity(),
report.quantity,
order.price(),
report.price
);
return Some(create_reconciliation_updated(order, report, ts_now));
}
return None; }
match report.order_status {
OrderStatus::Accepted => {
if order.status() == OrderStatus::Accepted
&& should_reconciliation_update(order, report)
{
return Some(create_reconciliation_updated(order, report, ts_now));
}
Some(create_reconciliation_accepted(order, report, ts_now))
}
OrderStatus::Rejected => {
create_reconciliation_rejected(order, report.cancel_reason.as_deref(), ts_now)
}
OrderStatus::Triggered => {
if TRIGGERABLE_ORDER_TYPES.contains(&order.order_type()) {
Some(create_reconciliation_triggered(order, report, ts_now))
} else {
log::debug!(
"Skipping OrderTriggered for {} order {}: market-style stops have no TRIGGERED state",
order.order_type(),
order.client_order_id(),
);
None
}
}
OrderStatus::Canceled => Some(create_reconciliation_canceled(order, report, ts_now)),
OrderStatus::Expired => Some(create_reconciliation_expired(order, report, ts_now)),
OrderStatus::PartiallyFilled | OrderStatus::Filled => {
reconcile_fill_quantity_mismatch(order, report, instrument, ts_now)
}
OrderStatus::PendingUpdate | OrderStatus::PendingCancel => {
log::debug!(
"Order {} in pending state: {:?}",
order.client_order_id(),
report.order_status
);
None
}
OrderStatus::Initialized
| OrderStatus::Submitted
| OrderStatus::Denied
| OrderStatus::Emulated
| OrderStatus::Released => {
log::warn!(
"Unexpected order status in venue report for {}: {:?}",
order.client_order_id(),
report.order_status
);
None
}
}
}
#[must_use]
pub fn generate_reconciliation_order_events(
order: &OrderAny,
report: &OrderStatusReport,
instrument: Option<&InstrumentAny>,
ts_now: UnixNanos,
) -> Vec<OrderEventAny> {
if should_accept_before_reconciliation(order, report) {
let accepted = create_reconciliation_accepted(order, report, ts_now);
let mut accepted_order = order.clone();
if let Err(e) = accepted_order.apply(accepted.clone()) {
log::warn!(
"Failed to pre-apply reconciliation acceptance for {}: {e}",
order.client_order_id(),
);
return reconcile_order_report(order, report, instrument, ts_now)
.into_iter()
.collect();
}
let mut events = vec![accepted];
if let Some(event) = reconcile_order_report(&accepted_order, report, instrument, ts_now) {
events.push(event);
}
return events;
}
reconcile_order_report(order, report, instrument, ts_now)
.into_iter()
.collect()
}
fn should_accept_before_reconciliation(order: &OrderAny, report: &OrderStatusReport) -> bool {
order.status() == OrderStatus::Submitted && report.order_status != OrderStatus::Rejected
}
fn reconcile_fill_quantity_mismatch(
order: &OrderAny,
report: &OrderStatusReport,
instrument: Option<&InstrumentAny>,
ts_now: UnixNanos,
) -> Option<OrderEventAny> {
let order_filled_qty = order.filled_qty();
let report_filled_qty = report.filled_qty;
if report_filled_qty < order_filled_qty {
log::error!(
"Fill qty mismatch for {}: cached={}, venue={} (venue < cached)",
order.client_order_id(),
order_filled_qty,
report_filled_qty
);
return None;
}
if report_filled_qty > order_filled_qty {
if order.is_closed() {
let precision = order_filled_qty.precision.max(report_filled_qty.precision);
if is_within_single_unit_tolerance(
report_filled_qty.as_decimal(),
order_filled_qty.as_decimal(),
precision,
) {
return None;
}
log::debug!(
"{} {} already closed but reported difference in filled_qty: \
report={}, cached={}, skipping inferred fill generation for closed order",
order.instrument_id(),
order.client_order_id(),
report_filled_qty,
order_filled_qty,
);
return None;
}
let Some(instrument) = instrument else {
log::warn!(
"Cannot generate inferred fill for {}: instrument not available",
order.client_order_id()
);
return None;
};
let account_id = order.account_id()?;
return create_incremental_inferred_fill(
order,
report,
&account_id,
instrument,
ts_now,
None,
);
}
if order.status() != report.order_status {
if should_reconciliation_update(order, report) {
log::info!(
"Status mismatch with matching fill qty for {}: local={:?}, venue={:?}, \
filled_qty={}, updating quantity {}->{}",
order.client_order_id(),
order.status(),
report.order_status,
report.filled_qty,
order.quantity(),
report.quantity,
);
return Some(create_reconciliation_updated(order, report, ts_now));
}
log::warn!(
"Status mismatch with matching fill qty for {}: local={:?}, venue={:?}, filled_qty={}",
order.client_order_id(),
order.status(),
report.order_status,
report.filled_qty
);
}
None
}
pub fn create_incremental_inferred_fill(
order: &OrderAny,
report: &OrderStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
ts_now: UnixNanos,
commission: Option<Money>,
) -> Option<OrderEventAny> {
let order_filled_qty = order.filled_qty();
debug_assert!(
report.filled_qty >= order_filled_qty,
"incremental inferred fill requires report.filled_qty ({}) >= order.filled_qty ({}) for {}",
report.filled_qty,
order_filled_qty,
order.client_order_id(),
);
let last_qty = report.filled_qty - order_filled_qty;
if last_qty <= Quantity::zero(instrument.size_precision()) {
return None;
}
let liquidity_side = match order.order_type() {
OrderType::Market
| OrderType::StopMarket
| OrderType::MarketToLimit
| OrderType::TrailingStopMarket => LiquiditySide::Taker,
_ if order.is_post_only() => LiquiditySide::Maker,
_ => LiquiditySide::NoLiquiditySide,
};
let last_px = calculate_incremental_fill_price(order, report, instrument)?;
let venue_order_id = order.venue_order_id().unwrap_or(report.venue_order_id);
let position_id = reconciliation_position_id(report, instrument);
let trade_id = create_inferred_reconciliation_trade_id(
*account_id,
order.instrument_id(),
order.client_order_id(),
Some(venue_order_id),
order.order_side(),
order.order_type(),
report.filled_qty,
last_qty,
last_px,
position_id,
report.ts_last,
);
log::info!(
color = LogColor::Blue as u8;
"Generated inferred fill for {}: qty={}, px={}",
order.client_order_id(),
last_qty,
last_px,
);
Some(OrderEventAny::Filled(OrderFilled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
venue_order_id,
*account_id,
trade_id,
order.order_side(),
order.order_type(),
last_qty,
last_px,
instrument.quote_currency(),
liquidity_side,
UUID4::new(),
report.ts_last,
ts_now,
true, None, commission,
)))
}
pub fn create_inferred_fill_for_qty(
order: &OrderAny,
report: &OrderStatusReport,
account_id: &AccountId,
instrument: &InstrumentAny,
fill_qty: Quantity,
ts_now: UnixNanos,
commission: Option<Money>,
) -> Option<OrderEventAny> {
if fill_qty.is_zero() {
return None;
}
let liquidity_side = match order.order_type() {
OrderType::Market
| OrderType::StopMarket
| OrderType::MarketToLimit
| OrderType::TrailingStopMarket => LiquiditySide::Taker,
_ if order.is_post_only() => LiquiditySide::Maker,
_ => LiquiditySide::NoLiquiditySide,
};
let last_px = if let Some(avg_px) = report.avg_px {
Price::from_decimal_dp(avg_px, instrument.price_precision()).ok()?
} else if let Some(price) = report.price {
price
} else if let Some(price) = order.price() {
price
} else {
log::warn!(
"Cannot determine fill price for {}: no avg_px or price available",
order.client_order_id()
);
return None;
};
let venue_order_id = order.venue_order_id().unwrap_or(report.venue_order_id);
let position_id = reconciliation_position_id(report, instrument);
let trade_id = create_inferred_reconciliation_trade_id(
*account_id,
order.instrument_id(),
order.client_order_id(),
Some(venue_order_id),
order.order_side(),
order.order_type(),
report.filled_qty,
fill_qty,
last_px,
position_id,
report.ts_last,
);
log::info!(
color = LogColor::Blue as u8;
"Generated inferred fill for {}: qty={}, px={}",
order.client_order_id(),
fill_qty,
last_px,
);
Some(OrderEventAny::Filled(OrderFilled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
venue_order_id,
*account_id,
trade_id,
order.order_side(),
order.order_type(),
fill_qty,
last_px,
instrument.quote_currency(),
liquidity_side,
UUID4::new(),
report.ts_last,
ts_now,
true, None, commission,
)))
}
fn calculate_incremental_fill_price(
order: &OrderAny,
report: &OrderStatusReport,
instrument: &InstrumentAny,
) -> Option<Price> {
let order_filled_qty = order.filled_qty();
debug_assert!(
report.filled_qty >= order_filled_qty,
"incremental fill price requires report.filled_qty ({}) >= order.filled_qty ({}) for {}",
report.filled_qty,
order_filled_qty,
order.client_order_id(),
);
if order_filled_qty.is_zero() {
if let Some(avg_px) = report.avg_px {
return Price::from_decimal_dp(avg_px, instrument.price_precision()).ok();
}
if let Some(price) = report.price {
return Some(price);
}
if let Some(price) = order.price() {
return Some(price);
}
log::warn!(
"Cannot determine fill price for {}: no avg_px, report price, or order price",
order.client_order_id()
);
return None;
}
if let Some(report_avg_px) = report.avg_px {
let Some(order_avg_px) = order.avg_px() else {
return Price::from_decimal_dp(report_avg_px, instrument.price_precision()).ok();
};
let report_filled_qty = report.filled_qty;
let last_qty = report_filled_qty - order_filled_qty;
let report_notional = report_avg_px * report_filled_qty.as_decimal();
let order_notional = Decimal::from_str(&order_avg_px.to_string()).unwrap_or_default()
* order_filled_qty.as_decimal();
let last_notional = report_notional - order_notional;
let last_px_decimal = last_notional / last_qty.as_decimal();
return Price::from_decimal_dp(last_px_decimal, instrument.price_precision()).ok();
}
if let Some(price) = report.price {
return Some(price);
}
order.price()
}
pub fn reconcile_fill_report(
order: &OrderAny,
report: &FillReport,
instrument: &InstrumentAny,
ts_now: UnixNanos,
allow_overfills: bool,
) -> Option<OrderEventAny> {
debug_assert!(
!report.last_qty.is_zero(),
"fill report last_qty must be non-zero for {}",
order.client_order_id(),
);
if order.trade_ids().iter().any(|id| **id == report.trade_id) {
log::debug!(
"Duplicate fill detected: trade_id {} already exists for order {}",
report.trade_id,
order.client_order_id()
);
return None;
}
let potential_filled_qty = order.filled_qty() + report.last_qty;
if potential_filled_qty > order.quantity() {
if !allow_overfills {
log::warn!(
"Rejecting fill that would cause overfill for {}: order.quantity={}, order.filled_qty={}, fill.last_qty={}, would result in filled_qty={}",
order.client_order_id(),
order.quantity(),
order.filled_qty(),
report.last_qty,
potential_filled_qty
);
return None;
}
log::warn!(
"Allowing overfill during reconciliation for {}: order.quantity={}, order.filled_qty={}, fill.last_qty={}, will result in filled_qty={}",
order.client_order_id(),
order.quantity(),
order.filled_qty(),
report.last_qty,
potential_filled_qty
);
}
let account_id = order.account_id().unwrap_or(report.account_id);
let venue_order_id = order.venue_order_id().unwrap_or(report.venue_order_id);
log::info!(
color = LogColor::Blue as u8;
"Reconciling fill for {}: qty={}, px={}, trade_id={}",
order.client_order_id(),
report.last_qty,
report.last_px,
report.trade_id,
);
Some(OrderEventAny::Filled(OrderFilled::new(
order.trader_id(),
order.strategy_id(),
order.instrument_id(),
order.client_order_id(),
venue_order_id,
account_id,
report.trade_id,
order.order_side(),
order.order_type(),
report.last_qty,
report.last_px,
instrument.quote_currency(),
report.liquidity_side,
UUID4::new(),
report.ts_event,
ts_now,
true, report.venue_position_id,
Some(report.commission),
)))
}