use crate::{
book::protocol::response::{Response, TradeSummary},
book::{BookEvent, BookEventEnvelope},
disruptor::Envelope,
engine::core::EngineEvent,
types::{CorrelationId, FillPrice, FillQuantity, Money, OrderId},
};
use disrupt_rs::EventHandler;
use smallvec::SmallVec;
use std::sync::atomic::{AtomicU64, Ordering};
static RESPONSE_CORRELATION_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Default)]
pub struct ResponseCallbackHandler {
current_tx_id: Option<u64>,
buffer: ResponseBuffer,
}
impl ResponseCallbackHandler {
fn next_correlation_id() -> CorrelationId {
CorrelationId(RESPONSE_CORRELATION_COUNTER.fetch_add(1, Ordering::Relaxed))
}
fn on_tx_event(&mut self, envelope: &Envelope<BookEventEnvelope>) {
let tx_id = envelope.tx_id;
if self.current_tx_id != Some(tx_id) {
self.current_tx_id = Some(tx_id);
self.buffer = ResponseBuffer::default();
}
self.buffer.apply_event(&envelope.payload.event);
let is_last = envelope.tx_ix.saturating_add(1) == envelope.tx_len;
if !is_last {
return;
}
let Some(cb) = envelope.response_cb.as_ref() else {
self.current_tx_id = None;
self.buffer = ResponseBuffer::default();
return;
};
let buffer = std::mem::take(&mut self.buffer);
self.current_tx_id = None;
let response = Response::Ok {
correlation_id: Self::next_correlation_id(),
order_id: buffer.order_id,
trades: buffer.trades,
liability_delta: Money(0),
last_event_seq: envelope.seq,
};
let _ = cb.tx.send(Ok(response));
}
}
impl EventHandler<EngineEvent> for ResponseCallbackHandler {
fn on_event(
&mut self,
event: &EngineEvent,
_sequence: disrupt_rs::Sequence,
end_of_batch: bool,
) {
let _ = end_of_batch;
self.on_tx_event(event);
}
}
#[derive(Default)]
struct ResponseBuffer {
order_id: Option<OrderId>,
order_id_priority: u8,
trades: SmallVec<[TradeSummary; 4]>,
}
impl ResponseBuffer {
fn apply_event(&mut self, event: &BookEvent) {
match event {
BookEvent::OrderAccepted { order_id, .. }
| BookEvent::BinaryOrderAccepted { order_id, .. } => {
self.maybe_set_order_id(*order_id, 2);
}
BookEvent::OrderCancelled { order_id, .. }
| BookEvent::OrderLapsed { order_id, .. }
| BookEvent::OrderVoided { order_id, .. } => {
self.maybe_set_order_id(*order_id, 1);
}
BookEvent::TradeMatched {
trade_id,
maker,
taker,
price,
stake,
..
} => {
self.trades.push(TradeSummary {
trade_id: *trade_id,
maker_order_id: maker.id,
maker_side: maker.side,
taker_order_id: taker.id,
taker_side: taker.side,
price: FillPrice::Odds(*price),
quantity: FillQuantity::Stake(*stake),
});
}
BookEvent::BinaryTradeMatched {
trade_id,
maker,
taker,
price_ticks,
qty_shares,
} => {
self.trades.push(TradeSummary {
trade_id: *trade_id,
maker_order_id: maker.id,
maker_side: maker.side,
taker_order_id: taker.id,
taker_side: taker.side,
price: FillPrice::Ticks(*price_ticks),
quantity: FillQuantity::Shares(*qty_shares),
});
}
_ => {}
}
}
fn maybe_set_order_id(&mut self, order_id: OrderId, priority: u8) {
if self.order_id_priority < priority {
self.order_id = Some(order_id);
self.order_id_priority = priority;
}
}
}