use crate::{
book::protocol::command::Side,
book::protocol::response::{Response, TradeSummary},
book::{BookEvent, BookEventEnvelope, TradeRole},
disruptor::Envelope,
engine::core::EngineEvent,
types::{FillPrice, FillQuantity, Money, OrderId},
};
use disrupt_rs::EventHandler;
use smallvec::SmallVec;
use std::collections::HashMap;
use tracing::{trace, warn};
type TradePairKey = (OrderId, OrderId);
#[derive(Default)]
pub struct ResponseCallbackHandler {
current_tx_id: Option<u64>,
buffer: ResponseBuffer,
}
impl ResponseCallbackHandler {
fn on_tx_event(&mut self, envelope: &Envelope<BookEventEnvelope>) {
let tx_id = envelope.tx_id;
if self.current_tx_id != Some(tx_id) {
self.current_tx_id = Some(tx_id);
self.buffer = ResponseBuffer::default();
}
self.buffer.apply_event(&envelope.payload.event);
let is_last = envelope.tx_ix.saturating_add(1) == envelope.tx_len;
if !is_last {
return;
}
let Some(cb) = envelope.response_cb.as_ref() else {
self.current_tx_id = None;
self.buffer = ResponseBuffer::default();
return;
};
let mut buffer = std::mem::take(&mut self.buffer);
buffer.finalize_trades();
self.current_tx_id = None;
let tx_start_seq = envelope.seq.saturating_sub(u64::from(envelope.tx_ix));
let response = Response::Ok {
order_id: buffer.order_id,
trades: buffer.trades,
liability_delta: Money(0),
tx_start_seq: Some(tx_start_seq),
tx_len: envelope.tx_len,
};
if cb.tx.send(Ok(response)).is_err() {
trace!(
tx_id = tx_id,
"response callback receiver dropped before send"
);
}
}
}
impl EventHandler<EngineEvent> for ResponseCallbackHandler {
fn on_event(&mut self, event: &EngineEvent, _: disrupt_rs::Sequence, _: bool) {
self.on_tx_event(event);
}
}
#[derive(Default)]
struct ResponseBuffer {
order_id: Option<OrderId>,
order_id_priority: u8,
trades: SmallVec<[TradeSummary; 4]>,
pending_trades: HashMap<TradePairKey, PendingTradeView>,
}
#[derive(Debug, Clone)]
struct PendingTradeView {
order_id: OrderId,
role: TradeRole,
side: Side,
price: FillPrice,
quantity: FillQuantity,
}
impl ResponseBuffer {
fn trade_pair_key(order_id: OrderId, counter_party: OrderId) -> TradePairKey {
if order_id <= counter_party {
(order_id, counter_party)
} else {
(counter_party, order_id)
}
}
fn apply_event(&mut self, event: &BookEvent) {
match event {
BookEvent::OrderAccepted { order_id, .. }
| BookEvent::BinaryOrderAccepted { order_id, .. } => {
self.maybe_set_order_id(*order_id, 2);
}
BookEvent::OrderCancelled {
cancelled_order, ..
} => {
self.maybe_set_order_id(cancelled_order.order_id, 1);
}
BookEvent::OrderCancelledBatched {
cancelled_orders, ..
} => {
for cancelled_order in cancelled_orders {
self.maybe_set_order_id(cancelled_order.order_id, 1);
}
}
BookEvent::TradeMatched {
order_id,
role,
side,
price,
counter_party,
matched_delta,
..
} => {
let current = PendingTradeView {
order_id: *order_id,
role: *role,
side: *side,
price: FillPrice::Odds(*price),
quantity: FillQuantity::Stake(*matched_delta),
};
let key = Self::trade_pair_key(*order_id, *counter_party);
if let Some(first) = self.pending_trades.remove(&key) {
if first.order_id != *counter_party {
warn!(
order_id = order_id.0,
counter_party = counter_party.0,
"trade view pairing mismatch"
);
return;
}
let (maker_view, taker_view) = match (first.role, current.role) {
(TradeRole::Maker, TradeRole::Taker) => (&first, ¤t),
(TradeRole::Taker, TradeRole::Maker) => (¤t, &first),
_ => {
warn!(
order_id = order_id.0,
counter_party = counter_party.0,
"trade pair does not contain one maker and one taker"
);
return;
}
};
self.trades.push(TradeSummary {
maker_order_id: maker_view.order_id,
maker_side: maker_view.side,
taker_order_id: taker_view.order_id,
taker_side: taker_view.side,
price: taker_view.price,
quantity: taker_view.quantity,
});
} else {
self.pending_trades.insert(key, current);
}
}
BookEvent::BinaryTradeMatched {
order_id,
role,
side,
price_ticks,
counter_party,
matched_delta_shares,
..
} => {
let current = PendingTradeView {
order_id: *order_id,
role: *role,
side: *side,
price: FillPrice::Ticks(*price_ticks),
quantity: FillQuantity::Shares(*matched_delta_shares),
};
let key = Self::trade_pair_key(*order_id, *counter_party);
if let Some(first) = self.pending_trades.remove(&key) {
if first.order_id != *counter_party {
warn!(
order_id = order_id.0,
counter_party = counter_party.0,
"binary trade view pairing mismatch"
);
return;
}
let (maker_view, taker_view) = match (first.role, current.role) {
(TradeRole::Maker, TradeRole::Taker) => (&first, ¤t),
(TradeRole::Taker, TradeRole::Maker) => (¤t, &first),
_ => {
warn!(
order_id = order_id.0,
counter_party = counter_party.0,
"binary trade pair does not contain one maker and one taker"
);
return;
}
};
self.trades.push(TradeSummary {
maker_order_id: maker_view.order_id,
maker_side: maker_view.side,
taker_order_id: taker_view.order_id,
taker_side: taker_view.side,
price: taker_view.price,
quantity: taker_view.quantity,
});
} else {
self.pending_trades.insert(key, current);
}
}
_ => {}
}
}
fn maybe_set_order_id(&mut self, order_id: OrderId, priority: u8) {
if self.order_id_priority < priority {
self.order_id = Some(order_id);
self.order_id_priority = priority;
}
}
fn finalize_trades(&mut self) {
if !self.pending_trades.is_empty() {
warn!(
pending = self.pending_trades.len(),
"incomplete trade view pairs in tx"
);
self.pending_trades.clear();
}
}
}