use nautilus_common::{
factories::OrderEventFactory,
messages::{ExecutionEvent, ExecutionReport},
};
use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
use nautilus_model::{
enums::{AccountType, LiquiditySide},
events::{
AccountState, OrderAcceptedBatch, OrderCancelRejected, OrderCanceledBatch, OrderEventAny,
OrderModifyRejected, OrderRejected, OrderSubmittedBatch,
},
identifiers::{
AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
VenueOrderId,
},
orders::OrderAny,
reports::{FillReport, OrderStatusReport, PositionStatusReport},
types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
};
#[derive(Debug, Clone)]
pub struct ExecutionEventEmitter {
clock: &'static AtomicTime,
factory: OrderEventFactory,
sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
}
impl ExecutionEventEmitter {
#[must_use]
pub fn new(
clock: &'static AtomicTime,
trader_id: TraderId,
account_id: AccountId,
account_type: AccountType,
base_currency: Option<Currency>,
) -> Self {
Self {
clock,
factory: OrderEventFactory::new(trader_id, account_id, account_type, base_currency),
sender: None,
}
}
fn ts_init(&self) -> UnixNanos {
self.clock.get_time_ns()
}
pub fn set_sender(&mut self, sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
self.sender = Some(sender);
}
#[must_use]
pub fn is_initialized(&self) -> bool {
self.sender.is_some()
}
#[must_use]
pub fn trader_id(&self) -> TraderId {
self.factory.trader_id()
}
#[must_use]
pub fn account_id(&self) -> AccountId {
self.factory.account_id()
}
pub fn emit_account_state(
&self,
balances: Vec<AccountBalance>,
margins: Vec<MarginBalance>,
reported: bool,
ts_event: UnixNanos,
) {
let state = self.factory.generate_account_state(
balances,
margins,
reported,
ts_event,
self.ts_init(),
);
self.send_account_state(state);
}
pub fn emit_order_denied(&self, order: &OrderAny, reason: &str) {
let event = self
.factory
.generate_order_denied(order, reason, self.ts_init());
self.send_order_event(event);
}
pub fn emit_order_submitted(&self, order: &OrderAny) {
let event = self.factory.generate_order_submitted(order, self.ts_init());
self.send_order_event(event);
}
pub fn emit_order_rejected(
&self,
order: &OrderAny,
reason: &str,
ts_event: UnixNanos,
due_post_only: bool,
) {
let event = self.factory.generate_order_rejected(
order,
reason,
ts_event,
self.ts_init(),
due_post_only,
);
self.send_order_event(event);
}
pub fn emit_order_accepted(
&self,
order: &OrderAny,
venue_order_id: VenueOrderId,
ts_event: UnixNanos,
) {
let event =
self.factory
.generate_order_accepted(order, venue_order_id, ts_event, self.ts_init());
self.send_order_event(event);
}
pub fn emit_order_modify_rejected(
&self,
order: &OrderAny,
venue_order_id: Option<VenueOrderId>,
reason: &str,
ts_event: UnixNanos,
) {
let event = self.factory.generate_order_modify_rejected(
order,
venue_order_id,
reason,
ts_event,
self.ts_init(),
);
self.send_order_event(event);
}
pub fn emit_order_cancel_rejected(
&self,
order: &OrderAny,
venue_order_id: Option<VenueOrderId>,
reason: &str,
ts_event: UnixNanos,
) {
let event = self.factory.generate_order_cancel_rejected(
order,
venue_order_id,
reason,
ts_event,
self.ts_init(),
);
self.send_order_event(event);
}
#[expect(clippy::too_many_arguments)]
pub fn emit_order_updated(
&self,
order: &OrderAny,
venue_order_id: VenueOrderId,
quantity: Quantity,
price: Option<Price>,
trigger_price: Option<Price>,
protection_price: Option<Price>,
ts_event: UnixNanos,
) {
let event = self.factory.generate_order_updated(
order,
venue_order_id,
quantity,
price,
trigger_price,
protection_price,
ts_event,
self.ts_init(),
);
self.send_order_event(event);
}
pub fn emit_order_canceled(
&self,
order: &OrderAny,
venue_order_id: Option<VenueOrderId>,
ts_event: UnixNanos,
) {
let event =
self.factory
.generate_order_canceled(order, venue_order_id, ts_event, self.ts_init());
self.send_order_event(event);
}
pub fn emit_order_triggered(
&self,
order: &OrderAny,
venue_order_id: Option<VenueOrderId>,
ts_event: UnixNanos,
) {
let event =
self.factory
.generate_order_triggered(order, venue_order_id, ts_event, self.ts_init());
self.send_order_event(event);
}
pub fn emit_order_expired(
&self,
order: &OrderAny,
venue_order_id: Option<VenueOrderId>,
ts_event: UnixNanos,
) {
let event =
self.factory
.generate_order_expired(order, venue_order_id, ts_event, self.ts_init());
self.send_order_event(event);
}
#[expect(clippy::too_many_arguments)]
pub fn emit_order_filled(
&self,
order: &OrderAny,
venue_order_id: VenueOrderId,
venue_position_id: Option<PositionId>,
trade_id: TradeId,
last_qty: Quantity,
last_px: Price,
quote_currency: Currency,
commission: Option<Money>,
liquidity_side: LiquiditySide,
ts_event: UnixNanos,
) {
let event = self.factory.generate_order_filled(
order,
venue_order_id,
venue_position_id,
trade_id,
last_qty,
last_px,
quote_currency,
commission,
liquidity_side,
ts_event,
self.ts_init(),
);
self.send_order_event(event);
}
pub fn emit_order_rejected_event(
&self,
strategy_id: StrategyId,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
reason: &str,
ts_event: UnixNanos,
due_post_only: bool,
) {
let event = OrderRejected::new(
self.factory.trader_id(),
strategy_id,
instrument_id,
client_order_id,
self.factory.account_id(),
reason.into(),
UUID4::new(),
ts_event,
self.ts_init(),
false,
due_post_only,
);
self.send_order_event(OrderEventAny::Rejected(event));
}
pub fn emit_order_modify_rejected_event(
&self,
strategy_id: StrategyId,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
venue_order_id: Option<VenueOrderId>,
reason: &str,
ts_event: UnixNanos,
) {
let event = OrderModifyRejected::new(
self.factory.trader_id(),
strategy_id,
instrument_id,
client_order_id,
reason.into(),
UUID4::new(),
ts_event,
self.ts_init(),
false,
venue_order_id,
Some(self.factory.account_id()),
);
self.send_order_event(OrderEventAny::ModifyRejected(event));
}
pub fn emit_order_cancel_rejected_event(
&self,
strategy_id: StrategyId,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
venue_order_id: Option<VenueOrderId>,
reason: &str,
ts_event: UnixNanos,
) {
let event = OrderCancelRejected::new(
self.factory.trader_id(),
strategy_id,
instrument_id,
client_order_id,
reason.into(),
UUID4::new(),
ts_event,
self.ts_init(),
false,
venue_order_id,
Some(self.factory.account_id()),
);
self.send_order_event(OrderEventAny::CancelRejected(event));
}
pub fn send_order_event(&self, event: OrderEventAny) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
log::warn!("Failed to send order event: {e}");
}
} else {
log::warn!("Cannot send order event: sender not initialized");
}
}
pub fn send_order_submitted_batch(&self, batch: OrderSubmittedBatch) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::OrderSubmittedBatch(batch)) {
log::warn!("Failed to send order submitted batch: {e}");
}
} else {
log::warn!("Cannot send order submitted batch: sender not initialized");
}
}
pub fn send_order_accepted_batch(&self, batch: OrderAcceptedBatch) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::OrderAcceptedBatch(batch)) {
log::warn!("Failed to send order accepted batch: {e}");
}
} else {
log::warn!("Cannot send order accepted batch: sender not initialized");
}
}
pub fn send_order_canceled_batch(&self, batch: OrderCanceledBatch) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::OrderCanceledBatch(batch)) {
log::warn!("Failed to send order canceled batch: {e}");
}
} else {
log::warn!("Cannot send order canceled batch: sender not initialized");
}
}
pub fn send_account_state(&self, state: AccountState) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
log::warn!("Failed to send account state: {e}");
}
} else {
log::warn!("Cannot send account state: sender not initialized");
}
}
pub fn send_execution_report(&self, report: ExecutionReport) {
if let Some(sender) = &self.sender {
if let Err(e) = sender.send(ExecutionEvent::Report(report)) {
log::warn!("Failed to send execution report: {e}");
}
} else {
log::warn!("Cannot send execution report: sender not initialized");
}
}
pub fn send_order_status_report(&self, report: OrderStatusReport) {
self.send_execution_report(ExecutionReport::Order(Box::new(report)));
}
pub fn send_fill_report(&self, report: FillReport) {
self.send_execution_report(ExecutionReport::Fill(Box::new(report)));
}
pub fn send_order_with_fills(&self, report: OrderStatusReport, fills: Vec<FillReport>) {
self.send_execution_report(ExecutionReport::OrderWithFills(Box::new(report), fills));
}
pub fn send_position_report(&self, report: PositionStatusReport) {
self.send_execution_report(ExecutionReport::Position(Box::new(report)));
}
}