betex 0.35.0

Betfair / Prediction Market Exchange
Documentation
use crate::{
    book::protocol::command::Side,
    book::protocol::response::{Response, TradeSummary},
    book::{BookEvent, BookEventEnvelope, TradeRole},
    disruptor::Envelope,
    engine::core::EngineEvent,
    types::{FillPrice, FillQuantity, Money, OrderId},
};

use disrupt_rs::EventHandler;
use smallvec::SmallVec;
use std::collections::HashMap;
use tracing::{trace, warn};

type TradePairKey = (OrderId, OrderId);

#[derive(Default)]
pub struct ResponseCallbackHandler {
    current_tx_id: Option<u64>,
    buffer: ResponseBuffer,
}

impl ResponseCallbackHandler {
    fn on_tx_event(&mut self, envelope: &Envelope<BookEventEnvelope>) {
        let tx_id = envelope.tx_id;
        if self.current_tx_id != Some(tx_id) {
            self.current_tx_id = Some(tx_id);
            self.buffer = ResponseBuffer::default();
        }
        self.buffer.apply_event(&envelope.payload.event);

        let is_last = envelope.tx_ix.saturating_add(1) == envelope.tx_len;
        if !is_last {
            return;
        }

        let Some(cb) = envelope.response_cb.as_ref() else {
            self.current_tx_id = None;
            self.buffer = ResponseBuffer::default();
            return;
        };

        let mut buffer = std::mem::take(&mut self.buffer);
        buffer.finalize_trades();
        self.current_tx_id = None;
        let tx_start_seq = envelope.seq.saturating_sub(u64::from(envelope.tx_ix));

        let response = Response::Ok {
            order_id: buffer.order_id,
            trades: buffer.trades,
            liability_delta: Money(0),
            tx_start_seq: Some(tx_start_seq),
            tx_len: envelope.tx_len,
        };
        if cb.tx.send(Ok(response)).is_err() {
            trace!(
                tx_id = tx_id,
                "response callback receiver dropped before send"
            );
        }
    }
}

impl EventHandler<EngineEvent> for ResponseCallbackHandler {
    fn on_event(&mut self, event: &EngineEvent, _: disrupt_rs::Sequence, _: bool) {
        self.on_tx_event(event);
    }
}

#[derive(Default)]
struct ResponseBuffer {
    order_id: Option<OrderId>,
    order_id_priority: u8,
    trades: SmallVec<[TradeSummary; 4]>,
    pending_trades: HashMap<TradePairKey, PendingTradeView>,
}

#[derive(Debug, Clone)]
struct PendingTradeView {
    order_id: OrderId,
    role: TradeRole,
    side: Side,
    price: FillPrice,
    quantity: FillQuantity,
}

impl ResponseBuffer {
    fn trade_pair_key(order_id: OrderId, counter_party: OrderId) -> TradePairKey {
        if order_id <= counter_party {
            (order_id, counter_party)
        } else {
            (counter_party, order_id)
        }
    }

    fn apply_event(&mut self, event: &BookEvent) {
        match event {
            BookEvent::OrderAccepted { order_id, .. }
            | BookEvent::BinaryOrderAccepted { order_id, .. } => {
                self.maybe_set_order_id(*order_id, 2);
            }
            BookEvent::OrderCancelled {
                cancelled_order, ..
            } => {
                self.maybe_set_order_id(cancelled_order.order_id, 1);
            }
            BookEvent::OrderCancelledBatched {
                cancelled_orders, ..
            } => {
                for cancelled_order in cancelled_orders {
                    self.maybe_set_order_id(cancelled_order.order_id, 1);
                }
            }
            BookEvent::TradeMatched {
                order_id,
                role,
                side,
                price,
                counter_party,
                matched_delta,
                ..
            } => {
                let current = PendingTradeView {
                    order_id: *order_id,
                    role: *role,
                    side: *side,
                    price: FillPrice::Odds(*price),
                    quantity: FillQuantity::Stake(*matched_delta),
                };
                let key = Self::trade_pair_key(*order_id, *counter_party);
                if let Some(first) = self.pending_trades.remove(&key) {
                    if first.order_id != *counter_party {
                        warn!(
                            order_id = order_id.0,
                            counter_party = counter_party.0,
                            "trade view pairing mismatch"
                        );
                        return;
                    }
                    let (maker_view, taker_view) = match (first.role, current.role) {
                        (TradeRole::Maker, TradeRole::Taker) => (&first, &current),
                        (TradeRole::Taker, TradeRole::Maker) => (&current, &first),
                        _ => {
                            warn!(
                                order_id = order_id.0,
                                counter_party = counter_party.0,
                                "trade pair does not contain one maker and one taker"
                            );
                            return;
                        }
                    };
                    self.trades.push(TradeSummary {
                        maker_order_id: maker_view.order_id,
                        maker_side: maker_view.side,
                        taker_order_id: taker_view.order_id,
                        taker_side: taker_view.side,
                        price: taker_view.price,
                        quantity: taker_view.quantity,
                    });
                } else {
                    self.pending_trades.insert(key, current);
                }
            }
            BookEvent::BinaryTradeMatched {
                order_id,
                role,
                side,
                price_ticks,
                counter_party,
                matched_delta_shares,
                ..
            } => {
                let current = PendingTradeView {
                    order_id: *order_id,
                    role: *role,
                    side: *side,
                    price: FillPrice::Ticks(*price_ticks),
                    quantity: FillQuantity::Shares(*matched_delta_shares),
                };
                let key = Self::trade_pair_key(*order_id, *counter_party);
                if let Some(first) = self.pending_trades.remove(&key) {
                    if first.order_id != *counter_party {
                        warn!(
                            order_id = order_id.0,
                            counter_party = counter_party.0,
                            "binary trade view pairing mismatch"
                        );
                        return;
                    }
                    let (maker_view, taker_view) = match (first.role, current.role) {
                        (TradeRole::Maker, TradeRole::Taker) => (&first, &current),
                        (TradeRole::Taker, TradeRole::Maker) => (&current, &first),
                        _ => {
                            warn!(
                                order_id = order_id.0,
                                counter_party = counter_party.0,
                                "binary trade pair does not contain one maker and one taker"
                            );
                            return;
                        }
                    };
                    self.trades.push(TradeSummary {
                        maker_order_id: maker_view.order_id,
                        maker_side: maker_view.side,
                        taker_order_id: taker_view.order_id,
                        taker_side: taker_view.side,
                        price: taker_view.price,
                        quantity: taker_view.quantity,
                    });
                } else {
                    self.pending_trades.insert(key, current);
                }
            }
            _ => {}
        }
    }

    fn maybe_set_order_id(&mut self, order_id: OrderId, priority: u8) {
        if self.order_id_priority < priority {
            self.order_id = Some(order_id);
            self.order_id_priority = priority;
        }
    }

    fn finalize_trades(&mut self) {
        if !self.pending_trades.is_empty() {
            warn!(
                pending = self.pending_trades.len(),
                "incomplete trade view pairs in tx"
            );
            self.pending_trades.clear();
        }
    }
}