Skip to main content

polyfill_rs/
ws_hot_path.rs

1//! Zero-allocation-ish WebSocket hot-path processing.
2//!
3//! This module is focused on the "decode + apply" path for WS `book` events:
4//! after warmup, processing a message should not perform heap allocations.
5//!
6//! Important: using the current tokio-tungstenite transport, the *network layer*
7//! may still allocate when producing `Message::Text(String)`. This module aims to
8//! make the *processing* layer allocation-free so we can enforce it with tests.
9
10use crate::book::OrderBookManager;
11use crate::errors::{PolyfillError, Result};
12use crate::types::{decimal_to_price, decimal_to_qty, Side};
13use rust_decimal::Decimal;
14use simd_json::prelude::*;
15use std::str::FromStr;
16
17/// Summary of what happened while processing a WS payload.
18#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
19pub struct WsBookApplyStats {
20    pub book_messages: usize,
21    pub book_levels_applied: usize,
22}
23
24/// In-place WS `book` message processor built on `simd-json`'s tape API.
25///
26/// This avoids building a DOM (which allocates for arrays/objects) by decoding into a
27/// reusable tape, then traversing it to extract the fields needed for order book updates.
28pub struct WsBookUpdateProcessor {
29    buffers: simd_json::Buffers,
30    tape: Option<simd_json::Tape<'static>>,
31}
32
33impl WsBookUpdateProcessor {
34    /// Create a new processor.
35    ///
36    /// `input_len_hint` should be set to the typical WS message size to reduce warmup reallocs.
37    pub fn new(input_len_hint: usize) -> Self {
38        Self {
39            buffers: simd_json::Buffers::new(input_len_hint),
40            // Store an empty tape with a `'static` lifetime so we can reuse its allocation.
41            tape: Some(simd_json::Tape::null().reset()),
42        }
43    }
44
45    /// Process a WS payload in-place (bytes will be mutated by the JSON parser).
46    pub fn process_bytes(
47        &mut self,
48        bytes: &mut [u8],
49        books: &OrderBookManager,
50    ) -> Result<WsBookApplyStats> {
51        let mut tape = self
52            .tape
53            .take()
54            .expect("WsBookUpdateProcessor tape must be present")
55            .reset();
56
57        simd_json::fill_tape(bytes, &mut self.buffers, &mut tape).map_err(|e| {
58            PolyfillError::parse("Failed to parse WebSocket JSON", Some(Box::new(e)))
59        })?;
60
61        let root = tape.as_value();
62        let stats = process_root_value(root, books)?;
63
64        // Reset the tape to detach lifetimes and keep capacity for reuse.
65        self.tape = Some(tape.reset());
66        Ok(stats)
67    }
68
69    /// Convenience: process an owned text message without allocating an additional buffer.
70    pub fn process_text(
71        &mut self,
72        text: String,
73        books: &OrderBookManager,
74    ) -> Result<WsBookApplyStats> {
75        let mut bytes = text.into_bytes();
76        self.process_bytes(bytes.as_mut_slice(), books)
77    }
78}
79
80fn process_root_value<'tape, 'input>(
81    value: simd_json::tape::Value<'tape, 'input>,
82    books: &OrderBookManager,
83) -> Result<WsBookApplyStats> {
84    if let Some(obj) = value.as_object() {
85        return process_stream_object(obj, books);
86    }
87
88    let Some(arr) = value.as_array() else {
89        return Ok(WsBookApplyStats::default());
90    };
91
92    let mut total = WsBookApplyStats::default();
93    for elem in arr.iter() {
94        let Some(obj) = elem.as_object() else {
95            continue;
96        };
97        let stats = process_stream_object(obj, books)?;
98        total.book_messages += stats.book_messages;
99        total.book_levels_applied += stats.book_levels_applied;
100    }
101
102    Ok(total)
103}
104
105fn process_stream_object<'tape, 'input>(
106    obj: simd_json::tape::Object<'tape, 'input>,
107    books: &OrderBookManager,
108) -> Result<WsBookApplyStats> {
109    let Some(event_type) = obj.get("event_type").and_then(|v| v.into_string()) else {
110        return Ok(WsBookApplyStats::default());
111    };
112
113    if event_type != "book" {
114        return Ok(WsBookApplyStats::default());
115    }
116
117    let asset_id = obj
118        .get("asset_id")
119        .and_then(|v| v.into_string())
120        .ok_or_else(|| PolyfillError::parse("Missing asset_id", None))?;
121
122    let timestamp_value = obj
123        .get("timestamp")
124        .ok_or_else(|| PolyfillError::parse("Missing timestamp", None))?;
125    let timestamp = parse_u64(timestamp_value)
126        .ok_or_else(|| PolyfillError::parse("Invalid timestamp", None))?;
127
128    let bids = obj.get("bids").and_then(|v| v.as_array());
129    let asks = obj.get("asks").and_then(|v| v.as_array());
130
131    let levels_applied = books.with_book_mut(asset_id, |book| {
132        if !book.begin_ws_book_update(asset_id, timestamp)? {
133            return Ok(0);
134        }
135
136        let mut applied = 0usize;
137        if let Some(bids) = bids {
138            applied += apply_levels(book, Side::BUY, bids)?;
139        }
140        if let Some(asks) = asks {
141            applied += apply_levels(book, Side::SELL, asks)?;
142        }
143
144        book.finish_ws_book_update();
145        Ok(applied)
146    })?;
147
148    Ok(WsBookApplyStats {
149        book_messages: 1,
150        book_levels_applied: levels_applied,
151    })
152}
153
154fn parse_u64<'tape, 'input>(value: simd_json::tape::Value<'tape, 'input>) -> Option<u64> {
155    value
156        .as_u64()
157        .or_else(|| value.into_string().and_then(|s| s.parse::<u64>().ok()))
158}
159
160fn apply_levels<'tape, 'input>(
161    book: &mut crate::book::OrderBook,
162    side: Side,
163    levels: simd_json::tape::Array<'tape, 'input>,
164) -> Result<usize> {
165    let mut applied = 0usize;
166    for level in levels.iter() {
167        let Some(obj) = level.as_object() else {
168            continue;
169        };
170
171        let price_str = obj
172            .get("price")
173            .and_then(|v| v.into_string())
174            .ok_or_else(|| PolyfillError::parse("Missing price", None))?;
175        let size_str = obj
176            .get("size")
177            .and_then(|v| v.into_string())
178            .ok_or_else(|| PolyfillError::parse("Missing size", None))?;
179
180        let price_decimal =
181            Decimal::from_str(price_str).map_err(|_| PolyfillError::validation("Invalid price"))?;
182        let size_decimal =
183            Decimal::from_str(size_str).map_err(|_| PolyfillError::validation("Invalid size"))?;
184
185        let price_ticks = decimal_to_price(price_decimal)
186            .map_err(|_| PolyfillError::validation("Invalid price"))?;
187        let size_units =
188            decimal_to_qty(size_decimal).map_err(|_| PolyfillError::validation("Invalid size"))?;
189
190        book.apply_ws_book_level_fast(side, price_ticks, size_units)?;
191        applied += 1;
192    }
193
194    Ok(applied)
195}