use anyhow::Context;
use nautilus_common::messages::execution::{
GenerateFillReports, GenerateOrderStatusReport, GenerateOrderStatusReports,
GeneratePositionStatusReports, QueryAccount, QueryOrder,
};
use nautilus_core::time::AtomicTime;
use nautilus_live::ExecutionEventEmitter;
use nautilus_model::{
enums::{OrderStatus, OrderType, TimeInForce},
identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
instruments::Instrument,
orders::Order,
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
types::{Currency, Quantity},
};
use rust_decimal::Decimal;
use super::{
PolymarketExecutionClient,
parse::{
parse_balance_allowance, parse_order_status_report, snap_filled_qty_to_quantity,
sum_filled_quantity, weighted_average_price,
},
reconciliation::{
FillContext, apply_fill_filters, build_fill_reports_from_trades, build_position_reports,
},
};
use crate::{
common::{consts::DUST_SNAP_THRESHOLD_DEC, enums::SignatureType},
http::{
clob::PolymarketClobHttpClient,
query::{GetBalanceAllowanceParams, GetTradesParams},
},
};
impl PolymarketExecutionClient {
pub(super) fn fill_context(&self) -> FillContext<'_> {
let user_address = self
.secrets
.funder
.as_deref()
.unwrap_or(&self.secrets.address);
FillContext {
account_id: self.core.account_id,
user_address,
api_key: self.secrets.credential.api_key().as_str(),
pusd: get_pusd_currency(),
clock: self.clock,
}
}
pub(super) async fn recover_terminal_status_from_trades(
&self,
venue_order_id: VenueOrderId,
instrument_id: InstrumentId,
client_order_id: Option<ClientOrderId>,
size_prec: u8,
) -> anyhow::Result<Option<OrderStatusReport>> {
let ts_init = self.clock.get_time_ns();
let ctx = self.fill_context();
let trades = self
.http_client
.get_trades(GetTradesParams::default())
.await
.context("failed to fetch trades for order recovery")?;
let (mut order_fills, _) = build_fill_reports_from_trades(
&trades,
&ctx,
&self.shared_token_instruments,
Some(instrument_id),
ts_init,
);
order_fills.retain(|f| f.venue_order_id == venue_order_id);
self.fill_tracker.snap_fill_reports(&mut order_fills);
let resolved_client_order_id =
client_order_id.or_else(|| self.core.cache().client_order_id(&venue_order_id).copied());
let cached = resolved_client_order_id.and_then(|cid| self.core.cache().order_owned(&cid));
let cached_quantity = cached.as_ref().map(Order::quantity);
let cached_order_type = cached.as_ref().map_or(OrderType::Limit, Order::order_type);
let cached_tif = cached
.as_ref()
.map_or(TimeInForce::Gtc, Order::time_in_force);
let cached_price = cached.as_ref().and_then(Order::price);
let cached_side = cached.as_ref().map(Order::order_side);
if order_fills.is_empty() {
let Some(cached) = cached.as_ref() else {
log::info!(
"Order {venue_order_id} not active at venue, no trades found, and no cached order; nothing to recover"
);
return Ok(None);
};
log::info!(
"Order {venue_order_id} not active at venue and no trades found; recovering as Canceled"
);
let mut report = OrderStatusReport::new(
self.core.account_id,
instrument_id,
resolved_client_order_id,
venue_order_id,
cached.order_side(),
cached.order_type(),
cached.time_in_force(),
OrderStatus::Canceled,
cached.quantity(),
cached.filled_qty(),
ts_init,
ts_init,
ts_init,
None,
);
report.price = cached_price;
report.cancel_reason = Some("ORDER_NOT_FOUND_AT_VENUE".to_string());
return Ok(Some(report));
}
let Some(quantity) = cached_quantity else {
log::info!(
"Order {venue_order_id} has trades but no cached order; deferring to engine"
);
return Ok(None);
};
let total_filled_dec = sum_filled_quantity(&order_fills);
let avg_px = weighted_average_price(&order_fills, total_filled_dec);
let raw_filled_qty = Quantity::from_decimal_dp(total_filled_dec, size_prec)
.unwrap_or_else(|_| Quantity::zero(size_prec));
let order_side = cached_side.unwrap_or(order_fills[0].order_side);
let ts_event = order_fills
.iter()
.map(|f| f.ts_event)
.max()
.unwrap_or(ts_init);
let dust_diff = (quantity.as_decimal() - raw_filled_qty.as_decimal()).abs();
let order_status = if raw_filled_qty >= quantity || dust_diff < DUST_SNAP_THRESHOLD_DEC {
OrderStatus::Filled
} else {
OrderStatus::Canceled
};
let filled_qty = snap_filled_qty_to_quantity(quantity, raw_filled_qty, order_status);
log::info!(
"Recovered {} status for {venue_order_id} from {} trade(s) (filled_qty={filled_qty}, quantity={quantity})",
if order_status == OrderStatus::Filled {
"Filled"
} else {
"Canceled (partially filled)"
},
order_fills.len(),
);
let mut report = OrderStatusReport::new(
self.core.account_id,
instrument_id,
resolved_client_order_id,
venue_order_id,
order_side,
cached_order_type,
cached_tif,
order_status,
quantity,
filled_qty,
ts_event,
ts_event,
ts_init,
None,
);
report.price = cached_price;
report.avg_px = avg_px;
Ok(Some(report))
}
pub(super) fn query_account_command(&self, _cmd: QueryAccount) {
let http_client = self.http_client.clone();
let emitter = self.emitter.clone();
let clock = self.clock;
let signature_type = self.config.signature_type;
self.spawn_task("query_account", async move {
fetch_and_emit_account_state(&http_client, &emitter, clock, signature_type).await
});
}
pub(super) fn query_order_command(&self, cmd: &QueryOrder) {
log::debug!("Querying order: client_order_id={}", cmd.client_order_id);
let venue_order_id = match &cmd.venue_order_id {
Some(id) => id.to_string(),
None => {
log::warn!("query_order requires venue_order_id for Polymarket");
return;
}
};
let instrument_id = cmd.instrument_id;
let client_order_id = cmd.client_order_id;
let account_id = self.core.account_id;
let cache = self.core.cache();
let (price_prec, size_prec) = match cache.instrument(&instrument_id) {
Some(i) => (i.price_precision(), i.size_precision()),
None => (4, 6),
};
let http_client = self.http_client.clone();
let emitter = self.emitter.clone();
let clock = self.clock;
self.spawn_task("query_order", async move {
match http_client.get_order_optional(&venue_order_id).await {
Ok(Some(order)) => {
let report = parse_order_status_report(
&order,
instrument_id,
account_id,
Some(client_order_id),
price_prec,
size_prec,
clock.get_time_ns(),
);
emitter.send_order_status_report(report);
}
Ok(None) => {
log::warn!("Order {venue_order_id} not found (empty response)");
}
Err(e) => {
log::warn!("Failed to query order {venue_order_id}: {e}");
}
}
Ok(())
});
}
pub(super) async fn generate_order_status_report_impl(
&self,
cmd: &GenerateOrderStatusReport,
) -> anyhow::Result<Option<OrderStatusReport>> {
let venue_order_id = match cmd.venue_order_id {
Some(id) => id,
None => {
log::warn!("generate_order_status_report requires venue_order_id");
return Ok(None);
}
};
let instrument_id = match cmd.instrument_id {
Some(id) => id,
None => {
log::warn!("generate_order_status_report requires instrument_id");
return Ok(None);
}
};
let order = self
.http_client
.get_order_optional(venue_order_id.as_str())
.await
.context("failed to fetch order")?;
let instrument = self.core.cache().instrument(&instrument_id).cloned();
let (price_prec, size_prec) = match &instrument {
Some(i) => (i.price_precision(), i.size_precision()),
None => (4, 6),
};
if let Some(order) = order {
let report = parse_order_status_report(
&order,
instrument_id,
self.core.account_id,
cmd.client_order_id,
price_prec,
size_prec,
self.clock.get_time_ns(),
);
return Ok(Some(report));
}
self.recover_terminal_status_from_trades(
venue_order_id,
instrument_id,
cmd.client_order_id,
size_prec,
)
.await
}
pub(super) async fn generate_order_status_reports_impl(
&self,
cmd: &GenerateOrderStatusReports,
) -> anyhow::Result<Vec<OrderStatusReport>> {
let params = crate::http::query::GetOrdersParams::default();
let orders = self
.http_client
.get_orders(params)
.await
.context("failed to fetch orders")?;
let (reports, _) = super::reconciliation::build_order_reports_from_orders(
&orders,
&self.shared_token_instruments,
self.core.account_id,
cmd.instrument_id,
self.clock.get_time_ns(),
);
let reports = if cmd.open_only {
reports
.into_iter()
.filter(|r| r.order_status.is_open())
.collect()
} else {
reports
};
log::debug!("Generated {} order status reports", reports.len());
Ok(reports)
}
pub(super) async fn generate_fill_reports_impl(
&self,
cmd: GenerateFillReports,
) -> anyhow::Result<Vec<FillReport>> {
let trades = self
.http_client
.get_trades(GetTradesParams::default())
.await
.context("failed to fetch trades")?;
let ctx = self.fill_context();
let (mut reports, _) = build_fill_reports_from_trades(
&trades,
&ctx,
&self.shared_token_instruments,
cmd.instrument_id,
self.clock.get_time_ns(),
);
self.fill_tracker.snap_fill_reports(&mut reports);
let reports = apply_fill_filters(reports, cmd.venue_order_id, cmd.start, cmd.end);
log::debug!("Generated {} fill reports", reports.len());
Ok(reports)
}
pub(super) async fn generate_position_status_reports_impl(
&self,
cmd: &GeneratePositionStatusReports,
) -> anyhow::Result<Vec<PositionStatusReport>> {
let ctx = self.fill_context();
let positions = self
.data_api_client
.get_positions(ctx.user_address)
.await
.context("failed to fetch positions from Data API")?;
let ts_now = self.clock.get_time_ns();
let mut reports = build_position_reports(&positions, self.core.account_id, ts_now);
if let Some(ref filter_id) = cmd.instrument_id {
reports.retain(|r| &r.instrument_id == filter_id);
}
log::debug!("Generated {} position status reports", reports.len());
Ok(reports)
}
pub(super) async fn generate_mass_status_impl(
&self,
lookback_mins: Option<u64>,
) -> anyhow::Result<Option<ExecutionMassStatus>> {
let ctx = self.fill_context();
super::reconciliation::generate_mass_status(
&self.http_client,
&self.data_api_client,
&self.shared_token_instruments,
&self.fill_tracker,
&ctx,
self.core.client_id,
self.core.venue,
lookback_mins,
)
.await
}
}
pub(crate) fn get_pusd_currency() -> Currency {
Currency::pUSD()
}
pub(super) async fn fetch_and_emit_account_state(
http_client: &PolymarketClobHttpClient,
emitter: &ExecutionEventEmitter,
clock: &'static AtomicTime,
signature_type: SignatureType,
) -> anyhow::Result<()> {
let params = GetBalanceAllowanceParams {
asset_type: Some(crate::http::query::AssetType::Collateral),
signature_type: Some(signature_type),
..Default::default()
};
let balance_allowance = http_client
.get_balance_allowance(params)
.await
.context("failed to fetch balance allowance")?;
let pusd = get_pusd_currency();
let account_balance = parse_balance_allowance(balance_allowance.balance, pusd)
.context("failed to parse balance allowance")?;
let ts_event = clock.get_time_ns();
log::info!(
"Account state updated: balance={} pUSD",
account_balance.total
);
emitter.emit_account_state(vec![account_balance], vec![], true, ts_event);
Ok(())
}
pub(super) async fn fetch_collateral_balance_pusd(
http_client: &PolymarketClobHttpClient,
signature_type: SignatureType,
) -> anyhow::Result<Decimal> {
let params = GetBalanceAllowanceParams {
asset_type: Some(crate::http::query::AssetType::Collateral),
signature_type: Some(signature_type),
..Default::default()
};
let balance_allowance = http_client
.get_balance_allowance(params)
.await
.context("failed to fetch balance allowance")?;
let usdc_scale = Decimal::from(1_000_000u32);
Ok(balance_allowance.balance / usdc_scale)
}