Skip to main content

hyperi_rustlib/worker/engine/
parse.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/parse.rs
3// Purpose:   SIMD-accelerated payload parsing for the batch processing engine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Parse phase: convert raw bytes into a `sonic_rs::Value` using SIMD
10//! acceleration. This is the most CPU-intensive phase (~1-5 µs per message).
11//!
12//! - JSON: `sonic_rs::from_slice` (AVX2/NEON SIMD, 2-4x faster than serde_json)
13//! - MsgPack: `rmpv` native decode -> `sonic_rs::Value` via a direct value walker
14//!   (no `rmp_serde -> serde_json` bridge; MsgPack messages are a small minority
15//!   in practice)
16//! - Auto: byte-sniff via [`PayloadFormat::detect`], then dispatch
17
18use super::types::PayloadFormat;
19
20/// Error produced when a single message fails to parse.
21#[derive(Debug)]
22pub enum ParseError {
23    /// Payload was empty -- nothing to parse.
24    Empty,
25    /// JSON parse error from sonic_rs.
26    Json(sonic_rs::Error),
27    /// MsgPack decode error.
28    MsgPack(String),
29    /// Format not supported (feature gate not enabled).
30    UnsupportedFormat(&'static str),
31    /// Payload nests deeper than `MAX_PARSE_DEPTH` (`parse_guard` module).
32    /// Rejected BEFORE the recursive parser runs so a hostile deeply-nested
33    /// payload cannot exhaust the worker stack (a per-record error, not a crash).
34    TooDeep,
35}
36
37impl std::fmt::Display for ParseError {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            Self::Empty => write!(f, "empty payload"),
41            Self::Json(e) => write!(f, "json parse error: {e}"),
42            Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
43            Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
44            Self::TooDeep => write!(
45                f,
46                "payload nesting exceeds the maximum parse depth of {}",
47                crate::parse_guard::MAX_PARSE_DEPTH
48            ),
49        }
50    }
51}
52
53impl std::error::Error for ParseError {
54    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
55        match self {
56            Self::Json(e) => Some(e),
57            _ => None,
58        }
59    }
60}
61
62/// Parse raw bytes into a `sonic_rs::Value` using SIMD acceleration.
63///
64/// # Format dispatch
65///
66/// | Format | Engine |
67/// |--------|--------|
68/// | `Json` | `sonic_rs::from_slice` (SIMD) |
69/// | `Auto` | byte-sniff -> `Json` or `MsgPack` |
70/// | `MsgPack` | `rmpv` native decode -> `sonic_rs::Value` walker (requires `worker-msgpack` feature) |
71///
72/// # Errors
73///
74/// Returns [`ParseError`] for empty payloads, malformed JSON/MsgPack, or when
75/// the `worker-msgpack` feature is not enabled and a MsgPack payload is given.
76pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
77    if payload.is_empty() {
78        return Err(ParseError::Empty);
79    }
80
81    let effective = match format {
82        PayloadFormat::Auto => PayloadFormat::detect(payload),
83        other => other,
84    };
85
86    match effective {
87        // Auto resolves to Json or MsgPack; treat residual Auto as Json.
88        PayloadFormat::Json | PayloadFormat::Auto => {
89            // Reject pathological nesting with a CHEAP ITERATIVE pre-scan before
90            // the recursive SIMD parser runs -- a hostile deeply-nested payload
91            // would otherwise exhaust the worker stack and abort the process.
92            if !crate::parse_guard::json_depth_within(payload, crate::parse_guard::MAX_PARSE_DEPTH)
93            {
94                return Err(ParseError::TooDeep);
95            }
96            sonic_rs::from_slice(payload).map_err(ParseError::Json)
97        }
98        PayloadFormat::MsgPack => {
99            #[cfg(feature = "worker-msgpack")]
100            {
101                // Native MsgPack: a SINGLE schema-less decode (the same decoder
102                // `codec::parse` uses), then `rmpv_to_sonic` walks the value
103                // straight into a `sonic_rs::Value` -- see that fn for the
104                // bridge-free rationale.
105                let mut cursor: &[u8] = payload;
106                // Bound nesting depth so a hostile deeply-nested MsgPack payload
107                // cannot exhaust the stack here or in the rmpv_to_sonic walk.
108                let value = rmpv::decode::read_value_with_max_depth(
109                    &mut cursor,
110                    crate::parse_guard::MAX_PARSE_DEPTH,
111                )
112                .map_err(|e| ParseError::MsgPack(e.to_string()))?;
113                Ok(rmpv_to_sonic(&value))
114            }
115            #[cfg(not(feature = "worker-msgpack"))]
116            {
117                Err(ParseError::UnsupportedFormat(
118                    "msgpack requires the worker-msgpack feature",
119                ))
120            }
121        }
122    }
123}
124
125/// Convert a native `rmpv::Value` into a `sonic_rs::Value` with a direct value
126/// walker -- NO `rmp_serde -> serde_json` bridge, no JSON re-serialise. The
127/// engine keeps a `sonic_rs::Value`, so `ParsedMessage`, `extract_known`,
128/// pre-route, the interner and the transform-closure contract are unchanged.
129///
130/// ## Mapping
131///
132/// - Nil -> JSON null; Boolean -> bool; F32/F64 -> JSON number.
133/// - Integers fold to `i64` / `u64` (a `u64 > i64::MAX` stays unsigned;
134///   otherwise it is surfaced as `f64`, matching the codec's lossy-but-numeric
135///   policy for the rare oversized case).
136/// - String: valid UTF-8 -> JSON string; otherwise the lossy form (a MsgPack
137///   `str` is meant to be UTF-8, so this only bites on malformed input).
138/// - Binary / Ext: base-relevant routing never keys off these, so a `bin` maps
139///   to its lossy-UTF-8 string and `ext` to JSON null -- neither aborts the
140///   parse (the bytes still round-trip via the original `Record.payload`).
141/// - Array / Map: recurse. Non-string map keys are stringified so the object is
142///   still addressable (JSON object keys must be strings).
143#[cfg(feature = "worker-msgpack")]
144fn rmpv_to_sonic(value: &rmpv::Value) -> sonic_rs::Value {
145    use rmpv::Value as M;
146    use sonic_rs::Value as S;
147
148    // A non-finite float (NaN / +/-inf) has no JSON representation, so it folds
149    // to null -- the same total-on-bad-input stance the rest of the walker takes.
150    let from_f64 = |f: f64| S::new_f64(f).unwrap_or_else(S::new_null);
151
152    match value {
153        // Nil -> JSON null. Ext carries an application-defined type tag + bytes
154        // with no JSON analogue and routers never key off it, so it folds to
155        // null too (the bytes still round-trip via the original Record.payload).
156        M::Nil | M::Ext(_, _) => S::new_null(),
157        M::Boolean(b) => S::new_bool(*b),
158        M::Integer(i) => {
159            if let Some(n) = i.as_i64() {
160                S::new_i64(n)
161            } else if let Some(n) = i.as_u64() {
162                S::new_u64(n)
163            } else {
164                // Cannot happen for a well-formed rmpv integer, but stay total.
165                S::new_null()
166            }
167        }
168        M::F32(f) => from_f64(f64::from(*f)),
169        M::F64(f) => from_f64(*f),
170        M::String(s) => match s.as_str() {
171            Some(text) => S::from(text),
172            None => S::from(String::from_utf8_lossy(s.as_bytes())),
173        },
174        M::Binary(bytes) => S::from(String::from_utf8_lossy(bytes)),
175        M::Array(items) => {
176            let mut arr = sonic_rs::Array::new();
177            for item in items {
178                arr.push(rmpv_to_sonic(item));
179            }
180            S::from(arr)
181        }
182        M::Map(pairs) => {
183            let mut obj = sonic_rs::Object::new();
184            for (k, v) in pairs {
185                let key = msgpack_key_to_string(k);
186                obj.insert(&key, rmpv_to_sonic(v));
187            }
188            S::from(obj)
189        }
190    }
191}
192
193/// Stringify an `rmpv` map key so it can be a JSON object key (which must be a
194/// string). String keys pass through; everything else uses its `Display`-ish
195/// form so the field stays addressable rather than being dropped.
196#[cfg(feature = "worker-msgpack")]
197fn msgpack_key_to_string(key: &rmpv::Value) -> String {
198    use rmpv::Value as M;
199    match key {
200        M::String(s) => match s.as_str() {
201            Some(text) => text.to_string(),
202            None => String::from_utf8_lossy(s.as_bytes()).into_owned(),
203        },
204        M::Integer(i) => i.to_string(),
205        M::Boolean(b) => b.to_string(),
206        M::Nil => "null".to_string(),
207        other => format!("{other}"),
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use sonic_rs::JsonValueTrait as _;
214
215    use super::*;
216
217    #[test]
218    fn parse_valid_json() {
219        let payload = br#"{"host": "web1", "status": 200}"#;
220        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
221        assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
222        assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
223    }
224
225    #[test]
226    fn parse_auto_detects_json() {
227        let payload = br#"{"_table": "events"}"#;
228        let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
229        assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
230    }
231
232    #[test]
233    fn parse_invalid_json_returns_error() {
234        let payload = b"this is not json {";
235        let result = parse_payload(payload, PayloadFormat::Json);
236        assert!(
237            matches!(result, Err(ParseError::Json(_))),
238            "expected Json error, got {result:?}"
239        );
240    }
241
242    #[test]
243    fn parse_empty_payload_returns_empty_error() {
244        let result = parse_payload(b"", PayloadFormat::Json);
245        assert!(
246            matches!(result, Err(ParseError::Empty)),
247            "expected Empty error, got {result:?}"
248        );
249    }
250
251    #[test]
252    fn parse_empty_payload_auto_returns_empty_error() {
253        let result = parse_payload(b"", PayloadFormat::Auto);
254        assert!(matches!(result, Err(ParseError::Empty)));
255    }
256
257    #[test]
258    fn parse_nested_json() {
259        let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
260        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
261        assert!(value.get("meta").is_some());
262        assert!(value.get("data").is_some());
263        // Verify nested field access.
264        let meta = value.get("meta").unwrap();
265        assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
266    }
267
268    #[test]
269    fn parse_json_with_unicode() {
270        let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
271        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
272        assert!(value.get("name").is_some());
273    }
274
275    #[test]
276    fn parse_error_display_empty() {
277        let e = ParseError::Empty;
278        assert_eq!(e.to_string(), "empty payload");
279    }
280
281    #[test]
282    fn parse_error_display_msgpack_unsupported() {
283        // Without the worker-msgpack feature, MsgPack returns UnsupportedFormat.
284        #[cfg(not(feature = "worker-msgpack"))]
285        {
286            // Construct a minimal fixmap: 0x81 = fixmap with 1 entry.
287            let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
288            let result = parse_payload(payload, PayloadFormat::MsgPack);
289            assert!(
290                matches!(result, Err(ParseError::UnsupportedFormat(_))),
291                "expected UnsupportedFormat, got {result:?}"
292            );
293        }
294        #[cfg(feature = "worker-msgpack")]
295        {
296            // Feature is enabled; just verify the UnsupportedFormat variant
297            // can still be constructed and displayed.
298            let e = ParseError::UnsupportedFormat("test");
299            assert!(e.to_string().contains("test"));
300        }
301    }
302
303    // ---- Native MsgPack path (rmpv, NO rmp_serde -> serde_json bridge) -------
304    //
305    // Hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv decoder
306    // + value walker, not a serde round-trip.
307    #[cfg(feature = "worker-msgpack")]
308    mod msgpack_native {
309        use super::*;
310
311        /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
312        fn fixstr(s: &str) -> Vec<u8> {
313            let bytes = s.as_bytes();
314            let mut out = vec![0xa0 | u8::try_from(bytes.len()).expect("len < 32")];
315            out.extend_from_slice(bytes);
316            out
317        }
318
319        /// `{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":nil}`
320        /// as a MsgPack fixmap -- the same canonical record the codec tests use.
321        fn sample() -> Vec<u8> {
322            let mut buf = vec![0x80 | 5]; // fixmap(5)
323            buf.extend(fixstr("_table"));
324            buf.extend(fixstr("events"));
325            buf.extend(fixstr("org_id"));
326            buf.push(42); // positive fixint
327            buf.extend(fixstr("live"));
328            buf.push(0xc3); // true
329            buf.extend(fixstr("ratio"));
330            buf.push(0xcb); // float64
331            buf.extend_from_slice(&1.5f64.to_be_bytes());
332            buf.extend(fixstr("missing"));
333            buf.push(0xc0); // nil
334            buf
335        }
336
337        #[test]
338        fn msgpack_native_decode_extracts_string_field() {
339            let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
340            assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
341        }
342
343        #[test]
344        fn msgpack_native_decode_preserves_scalar_types() {
345            let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
346            assert_eq!(value.get("org_id").and_then(|v| v.as_i64()), Some(42));
347            assert_eq!(value.get("live").and_then(|v| v.as_bool()), Some(true));
348            assert_eq!(value.get("ratio").and_then(|v| v.as_f64()), Some(1.5));
349            assert!(value.get("missing").is_some_and(|v| v.is_null()));
350        }
351
352        #[test]
353        fn msgpack_auto_detects_and_decodes_natively() {
354            // Leading fixmap byte (0x85) must auto-detect as MsgPack and decode.
355            let value = parse_payload(&sample(), PayloadFormat::Auto).unwrap();
356            assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
357        }
358
359        #[test]
360        fn msgpack_nested_array_and_map_walk() {
361            // {"items":[1,2],"meta":{"k":"v"}}
362            let mut buf = vec![0x80 | 2];
363            buf.extend(fixstr("items"));
364            buf.push(0x90 | 2); // fixarray(2)
365            buf.push(1);
366            buf.push(2);
367            buf.extend(fixstr("meta"));
368            buf.push(0x80 | 1); // fixmap(1)
369            buf.extend(fixstr("k"));
370            buf.extend(fixstr("v"));
371
372            let value = parse_payload(&buf, PayloadFormat::MsgPack).unwrap();
373            let items = value.get("items").unwrap();
374            assert_eq!(items[0].as_i64(), Some(1));
375            assert_eq!(items[1].as_i64(), Some(2));
376            assert_eq!(
377                value
378                    .get("meta")
379                    .and_then(|m| m.get("k"))
380                    .and_then(|v| v.as_str()),
381                Some("v")
382            );
383        }
384
385        #[test]
386        fn malformed_msgpack_returns_msgpack_error() {
387            // 0x81 declares a 1-entry fixmap but supplies no key/value.
388            let result = parse_payload(&[0x81], PayloadFormat::MsgPack);
389            assert!(
390                matches!(result, Err(ParseError::MsgPack(_))),
391                "expected MsgPack error, got {result:?}"
392            );
393        }
394    }
395}