Skip to main content

px_core/
ws_decoder.rs

1//! Shared WebSocket frame decoding helpers.
2//!
3//! Every exchange's WS handler needs to (a) handle both single-object and
4//! array-of-objects frames, (b) skip the slow `serde_json::Value` +
5//! `from_value(value.clone())` double-parse pattern that existed before.
6//! `decode_frame` centralises both.
7//!
8//! When the `simd-json` feature is enabled, large payloads route through
9//! simd-json's SIMD tokenizer (~15-20% faster on mid-to-large WS frames);
10//! small payloads stay on serde_json, where the SIMD startup cost would
11//! otherwise dominate the parse.
12
13use serde::de::DeserializeOwned;
14
15/// A parsed WebSocket frame — either a single object or an array-of-objects.
16///
17/// Many exchanges send batched updates as a top-level JSON array. Rather
18/// than parse twice (once as `Value` to peek, once as the typed struct),
19/// `decode_frame` dispatches based on the first non-whitespace byte.
20pub enum WsFrame<T> {
21    Single(T),
22    Array(Vec<T>),
23}
24
25impl<T> WsFrame<T> {
26    /// Call `f` on each contained `T`, consuming the frame.
27    pub fn for_each<F: FnMut(T)>(self, mut f: F) {
28        match self {
29            Self::Single(item) => f(item),
30            Self::Array(items) => items.into_iter().for_each(f),
31        }
32    }
33}
34
35/// Below this payload size, simd-json's startup cost exceeds the tokenizer
36/// speedup — `serde_json::from_str` is measurably faster on tiny frames
37/// (single-level price-change updates, subscription acks). Above it,
38/// simd-json wins steadily.
39///
40/// Crossover calibrated on the `ws_hot_path` bench: at ~250 bytes (1 book
41/// level) serde is ~40% faster; at ~1200 bytes (16 levels) simd-json is
42/// ~10% faster; at ~4.5 KB (64 levels) simd-json is ~20% faster. 512 bytes
43/// sits comfortably above the worst-case small-frame size.
44#[cfg(feature = "simd-json")]
45pub const SIMD_CROSSOVER_BYTES: usize = 512;
46
47/// Decode `text` into a `WsFrame<T>` with a single pass of whatever JSON
48/// parser is fastest for its size.
49///
50/// - Small frames: `serde_json::from_str` on the `&str` directly — no alloc.
51/// - Large frames (≥ `SIMD_CROSSOVER_BYTES`, `simd-json` feature on): copy
52///   to a `Vec<u8>` once, then `simd_json::serde::from_slice` with
53///   SIMD-accelerated tokenisation.
54///
55/// Returns `None` on parse failure; callers typically log and drop such
56/// frames. Dispatch rule between single and array: first non-whitespace
57/// byte is `[` → array; else single object. Matches polymarket / kalshi /
58/// opinion WS behaviour (both forms are observed in the wild).
59pub fn decode_frame<T: DeserializeOwned>(text: &str) -> Option<WsFrame<T>> {
60    #[cfg(feature = "simd-json")]
61    if text.len() >= SIMD_CROSSOVER_BYTES {
62        let mut bytes = text.as_bytes().to_vec();
63        let head = bytes.iter().find(|&&b| !b.is_ascii_whitespace()).copied()?;
64        return if head == b'[' {
65            simd_json::serde::from_slice::<Vec<T>>(&mut bytes)
66                .ok()
67                .map(WsFrame::Array)
68        } else {
69            simd_json::serde::from_slice::<T>(&mut bytes)
70                .ok()
71                .map(WsFrame::Single)
72        };
73    }
74
75    let trimmed = text.trim_start();
76    if trimmed.starts_with('[') {
77        serde_json::from_str::<Vec<T>>(text)
78            .ok()
79            .map(WsFrame::Array)
80    } else {
81        serde_json::from_str::<T>(text).ok().map(WsFrame::Single)
82    }
83}
84
85/// Parse `text` into a `serde_json::Value` using the same size-based simd
86/// switching as `decode_frame`. For exchanges (kalshi, opinion) that
87/// dispatch on a field inside a loosely typed Value rather than
88/// deserialising into a bespoke `RawWsMessage` struct.
89pub fn decode_value(text: &str) -> Option<serde_json::Value> {
90    #[cfg(feature = "simd-json")]
91    if text.len() >= SIMD_CROSSOVER_BYTES {
92        let mut bytes = text.as_bytes().to_vec();
93        return simd_json::serde::from_slice::<serde_json::Value>(&mut bytes).ok();
94    }
95    serde_json::from_str::<serde_json::Value>(text).ok()
96}
97
98/// Reusable simd-json scratch space for high-throughput WS decoders.
99///
100/// Hold one per WS connection (WS frames are processed serially on a single
101/// task, so no sync is needed). Each call to `parse_value` reuses the
102/// internal `simd_json::Buffers` allocation, so steady-state is zero
103/// allocation in the parser itself.
104///
105/// ```ignore
106/// let mut scratch = TapeScratch::new();
107/// while let Some(frame) = ws.next().await {
108///     let mut bytes = frame.into_bytes();
109///     let v = scratch.parse_value(&mut bytes)?;
110///     // walk v to extract fields — BorrowedValue points into `bytes`
111/// }
112/// ```
113#[cfg(feature = "simd-json")]
114pub struct TapeScratch {
115    buffers: simd_json::Buffers,
116}
117
118#[cfg(feature = "simd-json")]
119impl TapeScratch {
120    /// A scratch sized for typical WS frames (a few KB). Grows automatically.
121    pub fn new() -> Self {
122        Self::with_capacity(16 * 1024)
123    }
124
125    pub fn with_capacity(cap: usize) -> Self {
126        Self {
127            buffers: simd_json::Buffers::new(cap),
128        }
129    }
130
131    /// Parse `bytes` in place, returning a `BorrowedValue` that references
132    /// the input bytes (no string allocation per field). `bytes` is mutated
133    /// — callers must own them.
134    pub fn parse_value<'a>(
135        &mut self,
136        bytes: &'a mut [u8],
137    ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
138        simd_json::to_borrowed_value_with_buffers(bytes, &mut self.buffers)
139    }
140}
141
142#[cfg(feature = "simd-json")]
143impl Default for TapeScratch {
144    fn default() -> Self {
145        Self::new()
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use serde::Deserialize;
153
154    #[derive(Debug, Deserialize, PartialEq)]
155    struct Msg {
156        event: String,
157        seq: u64,
158    }
159
160    #[test]
161    fn single_object() {
162        let text = r#"{"event":"book","seq":42}"#;
163        match decode_frame::<Msg>(text).unwrap() {
164            WsFrame::Single(m) => assert_eq!(
165                m,
166                Msg {
167                    event: "book".into(),
168                    seq: 42
169                }
170            ),
171            WsFrame::Array(_) => panic!("expected single"),
172        }
173    }
174
175    #[test]
176    fn array_of_objects() {
177        let text = r#"[{"event":"book","seq":1},{"event":"trade","seq":2}]"#;
178        match decode_frame::<Msg>(text).unwrap() {
179            WsFrame::Array(items) => assert_eq!(items.len(), 2),
180            WsFrame::Single(_) => panic!("expected array"),
181        }
182    }
183
184    #[test]
185    fn whitespace_prefix() {
186        let text = "   \n  [{\"event\":\"book\",\"seq\":1}]";
187        assert!(matches!(decode_frame::<Msg>(text), Some(WsFrame::Array(_))));
188    }
189
190    #[test]
191    fn malformed_returns_none() {
192        assert!(decode_frame::<Msg>("{not json").is_none());
193        assert!(decode_frame::<Msg>("").is_none());
194    }
195
196    #[test]
197    fn large_frame_uses_simd() {
198        // Build a frame safely above the crossover so the SIMD branch runs.
199        let mut inner = String::new();
200        for i in 0..100 {
201            if i > 0 {
202                inner.push(',');
203            }
204            inner.push_str(&format!(r#"{{"event":"tick","seq":{i}}}"#));
205        }
206        let text = format!("[{inner}]");
207        match decode_frame::<Msg>(&text).unwrap() {
208            WsFrame::Array(items) => assert_eq!(items.len(), 100),
209            WsFrame::Single(_) => panic!("expected array"),
210        }
211    }
212
213    #[test]
214    fn decode_value_handles_both_sizes() {
215        // small path
216        let small = r#"{"msgType":"ping","seq":1}"#;
217        let v = decode_value(small).unwrap();
218        assert_eq!(v.get("msgType").and_then(|v| v.as_str()), Some("ping"));
219
220        // large path (SIMD branch when feature is enabled)
221        let mut fields = String::new();
222        for i in 0..200 {
223            if i > 0 {
224                fields.push(',');
225            }
226            fields.push_str(&format!(r#""k{i}":"value_{i}""#));
227        }
228        let large = format!("{{{fields}}}");
229        let v = decode_value(&large).unwrap();
230        assert_eq!(v.get("k0").and_then(|v| v.as_str()), Some("value_0"));
231    }
232}