px_core/ws_decoder.rs
1//! Shared WebSocket frame decoding helpers.
2//!
3//! Every exchange's WS handler needs to (a) handle both single-object and
4//! array-of-objects frames, (b) skip the slow `serde_json::Value` +
5//! `from_value(value.clone())` double-parse pattern that existed before.
6//! `decode_frame` centralises both.
7//!
8//! When the `simd-json` feature is enabled, large payloads route through
9//! simd-json's SIMD tokenizer (~15-20% faster on mid-to-large WS frames);
10//! small payloads stay on serde_json, where the SIMD startup cost would
11//! otherwise dominate the parse.
12
13use serde::de::DeserializeOwned;
14
15/// A parsed WebSocket frame — either a single object or an array-of-objects.
16///
17/// Many exchanges send batched updates as a top-level JSON array. Rather
18/// than parse twice (once as `Value` to peek, once as the typed struct),
19/// `decode_frame` dispatches based on the first non-whitespace byte.
20pub enum WsFrame<T> {
21 Single(T),
22 Array(Vec<T>),
23}
24
25impl<T> WsFrame<T> {
26 /// Call `f` on each contained `T`, consuming the frame.
27 pub fn for_each<F: FnMut(T)>(self, mut f: F) {
28 match self {
29 Self::Single(item) => f(item),
30 Self::Array(items) => items.into_iter().for_each(f),
31 }
32 }
33}
34
35/// Below this payload size, simd-json's startup cost exceeds the tokenizer
36/// speedup — `serde_json::from_str` is measurably faster on tiny frames
37/// (single-level price-change updates, subscription acks). Above it,
38/// simd-json wins steadily.
39///
40/// Crossover calibrated on the `ws_hot_path` bench: at ~250 bytes (1 book
41/// level) serde is ~40% faster; at ~1200 bytes (16 levels) simd-json is
42/// ~10% faster; at ~4.5 KB (64 levels) simd-json is ~20% faster. 512 bytes
43/// sits comfortably above the worst-case small-frame size.
44#[cfg(feature = "simd-json")]
45pub const SIMD_CROSSOVER_BYTES: usize = 512;
46
47/// Decode `text` into a `WsFrame<T>` with a single pass of whatever JSON
48/// parser is fastest for its size.
49///
50/// - Small frames: `serde_json::from_str` on the `&str` directly — no alloc.
51/// - Large frames (≥ `SIMD_CROSSOVER_BYTES`, `simd-json` feature on): copy
52/// to a `Vec<u8>` once, then `simd_json::serde::from_slice` with
53/// SIMD-accelerated tokenisation.
54///
55/// Returns `None` on parse failure; callers typically log and drop such
56/// frames. Dispatch rule between single and array: first non-whitespace
57/// byte is `[` → array; else single object. Matches polymarket / kalshi /
58/// opinion WS behaviour (both forms are observed in the wild).
59pub fn decode_frame<T: DeserializeOwned>(text: &str) -> Option<WsFrame<T>> {
60 #[cfg(feature = "simd-json")]
61 if text.len() >= SIMD_CROSSOVER_BYTES {
62 let mut bytes = text.as_bytes().to_vec();
63 let head = bytes.iter().find(|&&b| !b.is_ascii_whitespace()).copied()?;
64 return if head == b'[' {
65 simd_json::serde::from_slice::<Vec<T>>(&mut bytes)
66 .ok()
67 .map(WsFrame::Array)
68 } else {
69 simd_json::serde::from_slice::<T>(&mut bytes)
70 .ok()
71 .map(WsFrame::Single)
72 };
73 }
74
75 let trimmed = text.trim_start();
76 if trimmed.starts_with('[') {
77 serde_json::from_str::<Vec<T>>(text)
78 .ok()
79 .map(WsFrame::Array)
80 } else {
81 serde_json::from_str::<T>(text).ok().map(WsFrame::Single)
82 }
83}
84
85/// Parse `text` into a `serde_json::Value` using the same size-based simd
86/// switching as `decode_frame`. For exchanges (kalshi, opinion) that
87/// dispatch on a field inside a loosely typed Value rather than
88/// deserialising into a bespoke `RawWsMessage` struct.
89pub fn decode_value(text: &str) -> Option<serde_json::Value> {
90 #[cfg(feature = "simd-json")]
91 if text.len() >= SIMD_CROSSOVER_BYTES {
92 let mut bytes = text.as_bytes().to_vec();
93 return simd_json::serde::from_slice::<serde_json::Value>(&mut bytes).ok();
94 }
95 serde_json::from_str::<serde_json::Value>(text).ok()
96}
97
98/// Reusable simd-json scratch space for high-throughput WS decoders.
99///
100/// Hold one per WS connection (WS frames are processed serially on a single
101/// task, so no sync is needed). Each call to `parse_value` reuses the
102/// internal `simd_json::Buffers` allocation, so steady-state is zero
103/// allocation in the parser itself.
104///
105/// ```ignore
106/// let mut scratch = TapeScratch::new();
107/// while let Some(frame) = ws.next().await {
108/// let mut bytes = frame.into_bytes();
109/// let v = scratch.parse_value(&mut bytes)?;
110/// // walk v to extract fields — BorrowedValue points into `bytes`
111/// }
112/// ```
113#[cfg(feature = "simd-json")]
114pub struct TapeScratch {
115 buffers: simd_json::Buffers,
116}
117
118#[cfg(feature = "simd-json")]
119impl TapeScratch {
120 /// A scratch sized for typical WS frames (a few KB). Grows automatically.
121 pub fn new() -> Self {
122 Self::with_capacity(16 * 1024)
123 }
124
125 pub fn with_capacity(cap: usize) -> Self {
126 Self {
127 buffers: simd_json::Buffers::new(cap),
128 }
129 }
130
131 /// Parse `bytes` in place, returning a `BorrowedValue` that references
132 /// the input bytes (no string allocation per field). `bytes` is mutated
133 /// — callers must own them.
134 pub fn parse_value<'a>(
135 &mut self,
136 bytes: &'a mut [u8],
137 ) -> Result<simd_json::BorrowedValue<'a>, simd_json::Error> {
138 simd_json::to_borrowed_value_with_buffers(bytes, &mut self.buffers)
139 }
140}
141
142#[cfg(feature = "simd-json")]
143impl Default for TapeScratch {
144 fn default() -> Self {
145 Self::new()
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use serde::Deserialize;
153
154 #[derive(Debug, Deserialize, PartialEq)]
155 struct Msg {
156 event: String,
157 seq: u64,
158 }
159
160 #[test]
161 fn single_object() {
162 let text = r#"{"event":"book","seq":42}"#;
163 match decode_frame::<Msg>(text).unwrap() {
164 WsFrame::Single(m) => assert_eq!(
165 m,
166 Msg {
167 event: "book".into(),
168 seq: 42
169 }
170 ),
171 WsFrame::Array(_) => panic!("expected single"),
172 }
173 }
174
175 #[test]
176 fn array_of_objects() {
177 let text = r#"[{"event":"book","seq":1},{"event":"trade","seq":2}]"#;
178 match decode_frame::<Msg>(text).unwrap() {
179 WsFrame::Array(items) => assert_eq!(items.len(), 2),
180 WsFrame::Single(_) => panic!("expected array"),
181 }
182 }
183
184 #[test]
185 fn whitespace_prefix() {
186 let text = " \n [{\"event\":\"book\",\"seq\":1}]";
187 assert!(matches!(decode_frame::<Msg>(text), Some(WsFrame::Array(_))));
188 }
189
190 #[test]
191 fn malformed_returns_none() {
192 assert!(decode_frame::<Msg>("{not json").is_none());
193 assert!(decode_frame::<Msg>("").is_none());
194 }
195
196 #[test]
197 fn large_frame_uses_simd() {
198 // Build a frame safely above the crossover so the SIMD branch runs.
199 let mut inner = String::new();
200 for i in 0..100 {
201 if i > 0 {
202 inner.push(',');
203 }
204 inner.push_str(&format!(r#"{{"event":"tick","seq":{i}}}"#));
205 }
206 let text = format!("[{inner}]");
207 match decode_frame::<Msg>(&text).unwrap() {
208 WsFrame::Array(items) => assert_eq!(items.len(), 100),
209 WsFrame::Single(_) => panic!("expected array"),
210 }
211 }
212
213 #[test]
214 fn decode_value_handles_both_sizes() {
215 // small path
216 let small = r#"{"msgType":"ping","seq":1}"#;
217 let v = decode_value(small).unwrap();
218 assert_eq!(v.get("msgType").and_then(|v| v.as_str()), Some("ping"));
219
220 // large path (SIMD branch when feature is enabled)
221 let mut fields = String::new();
222 for i in 0..200 {
223 if i > 0 {
224 fields.push(',');
225 }
226 fields.push_str(&format!(r#""k{i}":"value_{i}""#));
227 }
228 let large = format!("{{{fields}}}");
229 let v = decode_value(&large).unwrap();
230 assert_eq!(v.get("k0").and_then(|v| v.as_str()), Some("value_0"));
231 }
232}