betex 0.5.1

Betfair / Prediction Market Exchange
Documentation
use crate::{
    book::protocol::response::{Response, TradeSummary},
    book::{BookEvent, BookEventEnvelope},
    disruptor::Envelope,
    engine::core::EngineEvent,
    types::{CorrelationId, FillPrice, FillQuantity, Money, OrderId},
};

use disrupt_rs::EventHandler;
use smallvec::SmallVec;
use std::sync::atomic::{AtomicU64, Ordering};

static RESPONSE_CORRELATION_COUNTER: AtomicU64 = AtomicU64::new(1);

#[derive(Default)]
pub struct ResponseCallbackHandler {
    current_tx_id: Option<u64>,
    buffer: ResponseBuffer,
}

impl ResponseCallbackHandler {
    fn next_correlation_id() -> CorrelationId {
        CorrelationId(RESPONSE_CORRELATION_COUNTER.fetch_add(1, Ordering::Relaxed))
    }

    fn on_tx_event(&mut self, envelope: &Envelope<BookEventEnvelope>) {
        let tx_id = envelope.tx_id;
        if self.current_tx_id != Some(tx_id) {
            self.current_tx_id = Some(tx_id);
            self.buffer = ResponseBuffer::default();
        }
        self.buffer.apply_event(&envelope.payload.event);

        let is_last = envelope.tx_ix.saturating_add(1) == envelope.tx_len;
        if !is_last {
            return;
        }

        let Some(cb) = envelope.response_cb.as_ref() else {
            self.current_tx_id = None;
            self.buffer = ResponseBuffer::default();
            return;
        };

        let buffer = std::mem::take(&mut self.buffer);
        self.current_tx_id = None;

        let response = Response::Ok {
            correlation_id: Self::next_correlation_id(),
            order_id: buffer.order_id,
            trades: buffer.trades,
            liability_delta: Money(0),
            last_event_seq: envelope.seq,
        };
        let _ = cb.tx.send(Ok(response));
    }
}

impl EventHandler<EngineEvent> for ResponseCallbackHandler {
    fn on_event(
        &mut self,
        event: &EngineEvent,
        _sequence: disrupt_rs::Sequence,
        end_of_batch: bool,
    ) {
        let _ = end_of_batch;
        self.on_tx_event(event);
    }
}

#[derive(Default)]
struct ResponseBuffer {
    order_id: Option<OrderId>,
    order_id_priority: u8,
    trades: SmallVec<[TradeSummary; 4]>,
}

impl ResponseBuffer {
    fn apply_event(&mut self, event: &BookEvent) {
        match event {
            BookEvent::OrderAccepted { order_id, .. }
            | BookEvent::BinaryOrderAccepted { order_id, .. } => {
                self.maybe_set_order_id(*order_id, 2);
            }
            BookEvent::OrderUpdated { order_id, .. }
            | BookEvent::BinaryOrderUpdated { order_id, .. }
            | BookEvent::OrderCancelled { order_id, .. }
            | BookEvent::OrderLapsed { order_id, .. }
            | BookEvent::OrderVoided { order_id, .. } => {
                self.maybe_set_order_id(*order_id, 1);
            }
            BookEvent::TradeMatched {
                trade_id,
                maker,
                taker,
                price,
                stake,
                ..
            } => {
                self.trades.push(TradeSummary {
                    trade_id: *trade_id,
                    maker_order_id: maker.id,
                    maker_side: maker.side,
                    taker_order_id: taker.id,
                    taker_side: taker.side,
                    price: FillPrice::Odds(*price),
                    quantity: FillQuantity::Stake(*stake),
                });
            }
            BookEvent::BinaryTradeMatched {
                trade_id,
                maker,
                taker,
                price_ticks,
                qty_shares,
            } => {
                self.trades.push(TradeSummary {
                    trade_id: *trade_id,
                    maker_order_id: maker.id,
                    maker_side: maker.side,
                    taker_order_id: taker.id,
                    taker_side: taker.side,
                    price: FillPrice::Ticks(*price_ticks),
                    quantity: FillQuantity::Shares(*qty_shares),
                });
            }
            _ => {}
        }
    }

    fn maybe_set_order_id(&mut self, order_id: OrderId, priority: u8) {
        if self.order_id_priority < priority {
            self.order_id = Some(order_id);
            self.order_id_priority = priority;
        }
    }
}