betex 0.8.0

Betfair / Prediction Market Exchange
Documentation
use crate::{
    book::protocol::command::Side,
    book::protocol::response::{Response, TradeSummary},
    book::{BookEvent, BookEventEnvelope},
    disruptor::Envelope,
    engine::core::EngineEvent,
    types::{FillPrice, FillQuantity, Money, OrderId, TradeId},
};

use disrupt_rs::EventHandler;
use smallvec::SmallVec;
use std::collections::HashMap;
use tracing::{trace, warn};

#[derive(Default)]
pub struct ResponseCallbackHandler {
    current_tx_id: Option<u64>,
    buffer: ResponseBuffer,
}

impl ResponseCallbackHandler {
    fn on_tx_event(&mut self, envelope: &Envelope<BookEventEnvelope>) {
        let tx_id = envelope.tx_id;
        if self.current_tx_id != Some(tx_id) {
            self.current_tx_id = Some(tx_id);
            self.buffer = ResponseBuffer::default();
        }
        self.buffer.apply_event(&envelope.payload.event);

        let is_last = envelope.tx_ix.saturating_add(1) == envelope.tx_len;
        if !is_last {
            return;
        }

        let Some(cb) = envelope.response_cb.as_ref() else {
            self.current_tx_id = None;
            self.buffer = ResponseBuffer::default();
            return;
        };

        let mut buffer = std::mem::take(&mut self.buffer);
        buffer.finalize_trades();
        self.current_tx_id = None;
        let tx_start_seq = envelope.seq.saturating_sub(u64::from(envelope.tx_ix));

        let response = Response::Ok {
            order_id: buffer.order_id,
            trades: buffer.trades,
            liability_delta: Money(0),
            tx_start_seq: Some(tx_start_seq),
            tx_len: envelope.tx_len,
        };
        if cb.tx.send(Ok(response)).is_err() {
            trace!(
                tx_id = tx_id,
                "response callback receiver dropped before send"
            );
        }
    }
}

impl EventHandler<EngineEvent> for ResponseCallbackHandler {
    fn on_event(&mut self, event: &EngineEvent, _: disrupt_rs::Sequence, _: bool) {
        self.on_tx_event(event);
    }
}

#[derive(Default)]
struct ResponseBuffer {
    order_id: Option<OrderId>,
    order_id_priority: u8,
    trades: SmallVec<[TradeSummary; 4]>,
    pending_trades: HashMap<TradeId, PendingTradeView>,
}

#[derive(Debug, Clone)]
struct PendingTradeView {
    order_id: OrderId,
    side: Side,
    price: FillPrice,
    quantity: FillQuantity,
}

impl ResponseBuffer {
    fn apply_event(&mut self, event: &BookEvent) {
        match event {
            BookEvent::OrderAccepted { order_id, .. }
            | BookEvent::BinaryOrderAccepted { order_id, .. } => {
                self.maybe_set_order_id(*order_id, 2);
            }
            BookEvent::OrderCancelled { order_id, .. }
            | BookEvent::OrderLapsed { order_id, .. }
            | BookEvent::OrderVoided { order_id, .. } => {
                self.maybe_set_order_id(*order_id, 1);
            }
            BookEvent::TradeMatched {
                trade_id,
                order_id,
                side,
                price,
                stake: _,
                counter_party,
                matched_delta,
                ..
            } => {
                let current = PendingTradeView {
                    order_id: *order_id,
                    side: *side,
                    price: FillPrice::Odds(*price),
                    quantity: FillQuantity::Stake(*matched_delta),
                };
                if let Some(first) = self.pending_trades.remove(trade_id) {
                    if first.order_id != *counter_party {
                        warn!(trade_id = trade_id.0, "trade view pairing mismatch");
                        return;
                    }
                    self.trades.push(TradeSummary {
                        trade_id: *trade_id,
                        maker_order_id: first.order_id,
                        maker_side: first.side,
                        taker_order_id: current.order_id,
                        taker_side: current.side,
                        price: current.price.clone(),
                        quantity: current.quantity.clone(),
                    });
                } else {
                    self.pending_trades.insert(*trade_id, current);
                }
            }
            BookEvent::BinaryTradeMatched {
                trade_id,
                maker,
                taker,
                price_ticks,
                qty_shares,
            } => {
                self.trades.push(TradeSummary {
                    trade_id: *trade_id,
                    maker_order_id: maker.id,
                    maker_side: maker.side,
                    taker_order_id: taker.id,
                    taker_side: taker.side,
                    price: FillPrice::Ticks(*price_ticks),
                    quantity: FillQuantity::Shares(*qty_shares),
                });
            }
            _ => {}
        }
    }

    fn maybe_set_order_id(&mut self, order_id: OrderId, priority: u8) {
        if self.order_id_priority < priority {
            self.order_id = Some(order_id);
            self.order_id_priority = priority;
        }
    }

    fn finalize_trades(&mut self) {
        if !self.pending_trades.is_empty() {
            warn!(pending = self.pending_trades.len(), "incomplete trade view pairs in tx");
            self.pending_trades.clear();
        }
    }
}