use indexmap::IndexMap;
use nautilus_core::UnixNanos;
use nautilus_model::{
enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
identifiers::{AccountId, InstrumentId, VenueOrderId},
instruments::{Instrument, InstrumentAny},
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
types::{Money, Price, Quantity},
};
use rust_decimal::Decimal;
use super::{
ids::{create_synthetic_trade_id, create_synthetic_venue_order_id},
types::{FillAdjustmentResult, FillSnapshot, ReconciliationResult, VenuePositionSnapshot},
};
const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4);
#[must_use]
pub fn simulate_position(fills: &[FillSnapshot]) -> (Decimal, Decimal) {
let mut qty = Decimal::ZERO;
let mut value = Decimal::ZERO;
for fill in fills {
debug_assert!(
fill.qty > Decimal::ZERO,
"fill snapshot qty must be positive, received {}",
fill.qty,
);
let direction = Decimal::from(fill.direction());
let new_qty = qty + (direction * fill.qty);
if (qty >= Decimal::ZERO && direction > Decimal::ZERO)
|| (qty <= Decimal::ZERO && direction < Decimal::ZERO)
{
value += fill.qty * fill.px;
qty = new_qty;
} else {
if qty.abs() >= fill.qty {
let close_ratio = fill.qty / qty.abs();
value *= Decimal::ONE - close_ratio;
qty = new_qty;
} else {
let remaining = fill.qty - qty.abs();
qty = direction * remaining;
value = remaining * fill.px;
}
}
}
debug_assert!(
value >= Decimal::ZERO,
"simulated position value must be non-negative, was {value}",
);
debug_assert!(
!(qty != Decimal::ZERO && value.is_sign_negative()),
"simulated avg price invariant: qty={qty}, value={value}",
);
(qty, value)
}
#[must_use]
pub fn detect_zero_crossings(fills: &[FillSnapshot]) -> Vec<u64> {
let mut running_qty = Decimal::ZERO;
let mut zero_crossings = Vec::new();
for fill in fills {
let prev_qty = running_qty;
running_qty += Decimal::from(fill.direction()) * fill.qty;
if prev_qty != Decimal::ZERO {
if running_qty == Decimal::ZERO {
zero_crossings.push(fill.ts_event);
} else if (prev_qty > Decimal::ZERO) != (running_qty > Decimal::ZERO) {
zero_crossings.push(fill.ts_event);
}
}
}
zero_crossings
}
#[must_use]
pub fn check_position_match(
simulated_qty: Decimal,
simulated_value: Decimal,
venue_qty: Decimal,
venue_avg_px: Decimal,
tolerance: Decimal,
) -> bool {
if simulated_qty != venue_qty {
return false;
}
if simulated_qty == Decimal::ZERO {
return true; }
let abs_qty = simulated_qty.abs();
if abs_qty == Decimal::ZERO {
return false;
}
let simulated_avg_px = simulated_value / abs_qty;
if venue_avg_px == Decimal::ZERO {
return false;
}
let relative_diff = (simulated_avg_px - venue_avg_px).abs() / venue_avg_px;
relative_diff <= tolerance
}
pub fn calculate_reconciliation_price(
current_position_qty: Decimal,
current_position_avg_px: Option<Decimal>,
target_position_qty: Decimal,
target_position_avg_px: Option<Decimal>,
) -> Option<Decimal> {
let qty_diff = target_position_qty - current_position_qty;
if qty_diff == Decimal::ZERO {
return None; }
if target_position_qty == Decimal::ZERO {
return current_position_avg_px;
}
let target_avg_px = target_position_avg_px?;
if target_avg_px == Decimal::ZERO {
return None;
}
if current_position_qty == Decimal::ZERO || current_position_avg_px.is_none() {
return Some(target_avg_px);
}
let current_avg_px = current_position_avg_px?;
let is_flip = (current_position_qty > Decimal::ZERO) != (target_position_qty > Decimal::ZERO)
&& target_position_qty != Decimal::ZERO;
if is_flip {
return Some(target_avg_px);
}
let target_value = target_position_qty * target_avg_px;
let current_value = current_position_qty * current_avg_px;
let diff_value = target_value - current_value;
let reconciliation_px = diff_value / qty_diff;
if reconciliation_px > Decimal::ZERO {
return Some(reconciliation_px);
}
None
}
#[must_use]
#[expect(clippy::missing_panics_doc)] pub fn adjust_fills_for_partial_window(
fills: &[FillSnapshot],
venue_position: &VenuePositionSnapshot,
_instrument: &InstrumentAny,
tolerance: Decimal,
) -> FillAdjustmentResult {
if fills.is_empty() {
return FillAdjustmentResult::NoAdjustment;
}
if venue_position.qty == Decimal::ZERO {
return FillAdjustmentResult::NoAdjustment;
}
let zero_crossings = detect_zero_crossings(fills);
let venue_qty_signed = match venue_position.side {
OrderSide::Buy => venue_position.qty,
OrderSide::Sell => -venue_position.qty,
_ => Decimal::ZERO,
};
if !zero_crossings.is_empty() {
let mut last_flat_crossing_ts = None;
let mut running_qty = Decimal::ZERO;
for fill in fills {
let prev_qty = running_qty;
running_qty += Decimal::from(fill.direction()) * fill.qty;
if prev_qty != Decimal::ZERO && running_qty == Decimal::ZERO {
last_flat_crossing_ts = Some(fill.ts_event);
}
}
let lifecycle_boundary_ts =
last_flat_crossing_ts.unwrap_or(*zero_crossings.last().unwrap());
let current_lifecycle_fills: Vec<FillSnapshot> = fills
.iter()
.filter(|f| f.ts_event > lifecycle_boundary_ts)
.cloned()
.collect();
if current_lifecycle_fills.is_empty() {
return FillAdjustmentResult::NoAdjustment;
}
let (current_qty, current_value) = simulate_position(¤t_lifecycle_fills);
if check_position_match(
current_qty,
current_value,
venue_qty_signed,
venue_position.avg_px,
tolerance,
) {
return FillAdjustmentResult::FilterToCurrentLifecycle {
last_zero_crossing_ts: lifecycle_boundary_ts,
current_lifecycle_fills,
};
}
if let Some(first_fill) = current_lifecycle_fills.first() {
let synthetic_fill = FillSnapshot::new(
first_fill.ts_event.saturating_sub(1), venue_position.side,
venue_position.qty,
venue_position.avg_px,
first_fill.venue_order_id,
);
return FillAdjustmentResult::ReplaceCurrentLifecycle {
synthetic_fill,
first_venue_order_id: first_fill.venue_order_id,
};
}
return FillAdjustmentResult::NoAdjustment;
}
let oldest_lifecycle_fills: Vec<FillSnapshot> =
if let Some(&first_zero_crossing_ts) = zero_crossings.first() {
fills
.iter()
.filter(|f| f.ts_event <= first_zero_crossing_ts)
.cloned()
.collect()
} else {
fills.to_vec()
};
if oldest_lifecycle_fills.is_empty() {
return FillAdjustmentResult::NoAdjustment;
}
let (oldest_qty, oldest_value) = simulate_position(&oldest_lifecycle_fills);
if zero_crossings.is_empty() {
if check_position_match(
oldest_qty,
oldest_value,
venue_qty_signed,
venue_position.avg_px,
tolerance,
) {
return FillAdjustmentResult::NoAdjustment;
}
if let Some(first_fill) = oldest_lifecycle_fills.first() {
let oldest_avg_px = if oldest_qty == Decimal::ZERO {
None
} else {
Some(oldest_value / oldest_qty.abs())
};
let reconciliation_price = calculate_reconciliation_price(
oldest_qty,
oldest_avg_px,
venue_qty_signed,
Some(venue_position.avg_px),
);
if let Some(opening_px) = reconciliation_price {
let opening_qty = if oldest_qty == Decimal::ZERO {
venue_qty_signed
} else {
venue_qty_signed - oldest_qty
};
if opening_qty.abs() > Decimal::ZERO {
let synthetic_side = if opening_qty > Decimal::ZERO {
OrderSide::Buy
} else {
OrderSide::Sell
};
let synthetic_fill = FillSnapshot::new(
first_fill.ts_event.saturating_sub(1),
synthetic_side,
opening_qty.abs(),
opening_px,
first_fill.venue_order_id,
);
return FillAdjustmentResult::AddSyntheticOpening {
synthetic_fill,
existing_fills: oldest_lifecycle_fills,
};
}
}
}
return FillAdjustmentResult::NoAdjustment;
}
if oldest_qty == Decimal::ZERO {
return FillAdjustmentResult::NoAdjustment;
}
if !oldest_lifecycle_fills.is_empty()
&& let Some(&first_zero_crossing_ts) = zero_crossings.first()
{
let current_lifecycle_fills: Vec<FillSnapshot> = fills
.iter()
.filter(|f| f.ts_event > first_zero_crossing_ts)
.cloned()
.collect();
if !current_lifecycle_fills.is_empty()
&& let Some(first_current_fill) = current_lifecycle_fills.first()
{
let synthetic_fill = FillSnapshot::new(
first_current_fill.ts_event.saturating_sub(1),
venue_position.side,
venue_position.qty,
venue_position.avg_px,
first_current_fill.venue_order_id,
);
return FillAdjustmentResult::AddSyntheticOpening {
synthetic_fill,
existing_fills: oldest_lifecycle_fills,
};
}
}
FillAdjustmentResult::NoAdjustment
}
pub fn create_synthetic_order_report(
fill: &FillSnapshot,
account_id: AccountId,
instrument_id: InstrumentId,
instrument: &InstrumentAny,
venue_order_id: VenueOrderId,
) -> anyhow::Result<OrderStatusReport> {
let order_qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
let mut report = OrderStatusReport::new(
account_id,
instrument_id,
None, venue_order_id,
fill.side,
OrderType::Market,
TimeInForce::Gtc,
OrderStatus::Filled,
order_qty,
order_qty, UnixNanos::from(fill.ts_event),
UnixNanos::from(fill.ts_event),
UnixNanos::from(fill.ts_event),
None, );
report.avg_px = Some(fill.px);
Ok(report)
}
pub fn create_synthetic_fill_report(
fill: &FillSnapshot,
account_id: AccountId,
instrument_id: InstrumentId,
instrument: &InstrumentAny,
venue_order_id: VenueOrderId,
) -> anyhow::Result<FillReport> {
let trade_id = create_synthetic_trade_id(fill);
let qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
let px = Price::from_decimal_dp(fill.px, instrument.price_precision())?;
Ok(FillReport::new(
account_id,
instrument_id,
venue_order_id,
trade_id,
fill.side,
qty,
px,
Money::new(0.0, instrument.quote_currency()),
LiquiditySide::NoLiquiditySide,
None, None, fill.ts_event.into(),
fill.ts_event.into(),
None, ))
}
pub fn process_mass_status_for_reconciliation(
mass_status: &ExecutionMassStatus,
instrument: &InstrumentAny,
tolerance: Option<Decimal>,
) -> anyhow::Result<ReconciliationResult> {
let instrument_id = instrument.id();
let account_id = mass_status.account_id;
let tol = tolerance.unwrap_or(DEFAULT_TOLERANCE);
let position_reports = mass_status.position_reports();
let venue_position = match position_reports.get(&instrument_id).and_then(|r| r.first()) {
Some(report) => position_report_to_snapshot(report),
None => {
return Ok(extract_instrument_reports(mass_status, instrument_id));
}
};
let extracted = extract_fills_for_instrument(mass_status, instrument_id);
let fill_snapshots = extracted.snapshots;
let mut order_map = extracted.orders;
let mut fill_map = extracted.fills;
if fill_snapshots.is_empty() {
return Ok(ReconciliationResult {
orders: order_map,
fills: fill_map,
});
}
let result = adjust_fills_for_partial_window(&fill_snapshots, &venue_position, instrument, tol);
match result {
FillAdjustmentResult::NoAdjustment => {}
FillAdjustmentResult::AddSyntheticOpening {
synthetic_fill,
existing_fills: _,
} => {
let venue_order_id = create_synthetic_venue_order_id(&synthetic_fill, instrument_id);
let order = create_synthetic_order_report(
&synthetic_fill,
account_id,
instrument_id,
instrument,
venue_order_id,
)?;
let fill = create_synthetic_fill_report(
&synthetic_fill,
account_id,
instrument_id,
instrument,
venue_order_id,
)?;
order_map.insert(venue_order_id, order);
fill_map.entry(venue_order_id).or_default().insert(0, fill);
}
FillAdjustmentResult::ReplaceCurrentLifecycle {
synthetic_fill,
first_venue_order_id,
} => {
let order = create_synthetic_order_report(
&synthetic_fill,
account_id,
instrument_id,
instrument,
first_venue_order_id,
)?;
let fill = create_synthetic_fill_report(
&synthetic_fill,
account_id,
instrument_id,
instrument,
first_venue_order_id,
)?;
order_map.clear();
fill_map.clear();
order_map.insert(first_venue_order_id, order);
fill_map.insert(first_venue_order_id, vec![fill]);
}
FillAdjustmentResult::FilterToCurrentLifecycle {
last_zero_crossing_ts,
current_lifecycle_fills: _,
} => {
for fills in fill_map.values_mut() {
fills.retain(|f| f.ts_event.as_u64() > last_zero_crossing_ts);
}
fill_map.retain(|_, fills| !fills.is_empty());
let orders_with_fills: ahash::AHashSet<VenueOrderId> =
fill_map.keys().copied().collect();
order_map.retain(|id, order| {
orders_with_fills.contains(id)
|| !matches!(
order.order_status,
OrderStatus::Denied
| OrderStatus::Rejected
| OrderStatus::Canceled
| OrderStatus::Expired
| OrderStatus::Filled
)
});
}
}
Ok(ReconciliationResult {
orders: order_map,
fills: fill_map,
})
}
fn position_report_to_snapshot(report: &PositionStatusReport) -> VenuePositionSnapshot {
let side = match report.position_side {
PositionSideSpecified::Long => OrderSide::Buy,
PositionSideSpecified::Short => OrderSide::Sell,
PositionSideSpecified::Flat => OrderSide::Buy,
};
VenuePositionSnapshot {
side,
qty: report.quantity.into(),
avg_px: report.avg_px_open.unwrap_or(Decimal::ZERO),
}
}
fn extract_instrument_reports(
mass_status: &ExecutionMassStatus,
instrument_id: InstrumentId,
) -> ReconciliationResult {
let mut orders = IndexMap::new();
let mut fills = IndexMap::new();
for (id, order) in mass_status.order_reports() {
if order.instrument_id == instrument_id {
orders.insert(id, order.clone());
}
}
for (id, fill_list) in mass_status.fill_reports() {
let filtered: Vec<_> = fill_list
.iter()
.filter(|f| f.instrument_id == instrument_id)
.cloned()
.collect();
if !filtered.is_empty() {
fills.insert(id, filtered);
}
}
ReconciliationResult { orders, fills }
}
struct ExtractedFills {
snapshots: Vec<FillSnapshot>,
orders: IndexMap<VenueOrderId, OrderStatusReport>,
fills: IndexMap<VenueOrderId, Vec<FillReport>>,
}
fn extract_fills_for_instrument(
mass_status: &ExecutionMassStatus,
instrument_id: InstrumentId,
) -> ExtractedFills {
let mut snapshots = Vec::new();
let mut order_map = IndexMap::new();
let mut fill_map = IndexMap::new();
for (id, order) in mass_status.order_reports() {
if order.instrument_id == instrument_id {
order_map.insert(id, order.clone());
}
}
for (venue_order_id, fill_reports) in mass_status.fill_reports() {
for fill in fill_reports {
if fill.instrument_id == instrument_id {
let side = mass_status
.order_reports()
.get(&venue_order_id)
.map_or(fill.order_side, |o| o.order_side);
snapshots.push(FillSnapshot::new(
fill.ts_event.as_u64(),
side,
fill.last_qty.into(),
fill.last_px.into(),
venue_order_id,
));
fill_map
.entry(venue_order_id)
.or_insert_with(Vec::new)
.push(fill.clone());
}
}
}
snapshots.sort_by_key(|f| f.ts_event);
ExtractedFills {
snapshots,
orders: order_map,
fills: fill_map,
}
}
pub fn check_position_reconciliation(
report: &PositionStatusReport,
cached_signed_qty: Decimal,
size_precision: Option<u8>,
) -> bool {
let venue_signed_qty = report.signed_decimal_qty;
if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
return true;
}
if let Some(precision) = size_precision
&& is_within_single_unit_tolerance(cached_signed_qty, venue_signed_qty, precision)
{
log::debug!(
"Position for {} within tolerance: cached={}, venue={}",
report.instrument_id,
cached_signed_qty,
venue_signed_qty
);
return true;
}
if cached_signed_qty == venue_signed_qty {
return true;
}
log::warn!(
"Position discrepancy for {}: cached={}, venue={}",
report.instrument_id,
cached_signed_qty,
venue_signed_qty
);
false
}
#[must_use]
pub fn is_within_single_unit_tolerance(value1: Decimal, value2: Decimal, precision: u8) -> bool {
if precision == 0 {
return value1 == value2;
}
let tolerance = Decimal::new(1, u32::from(precision));
let difference = (value1 - value2).abs();
difference <= tolerance
}