Skip to main content

hyperi_rustlib/worker/engine/
parse.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/parse.rs
3// Purpose:   SIMD-accelerated payload parsing for the batch processing engine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Parse phase: convert raw bytes into a `sonic_rs::Value` using SIMD
10//! acceleration. This is the most CPU-intensive phase (~1-5 µs per message).
11//!
12//! - JSON: `sonic_rs::from_slice` (AVX2/NEON SIMD, 2-4x faster than serde_json)
13//! - MsgPack: `rmpv` native decode -> `sonic_rs::Value` via a direct value walker
14//!   (no `rmp_serde -> serde_json` bridge; MsgPack messages are a small minority
15//!   in practice)
16//! - Auto: byte-sniff via [`PayloadFormat::detect`], then dispatch
17
18use super::types::PayloadFormat;
19
20/// Error produced when a single message fails to parse.
21#[derive(Debug)]
22pub enum ParseError {
23    /// Payload was empty -- nothing to parse.
24    Empty,
25    /// JSON parse error from sonic_rs.
26    Json(sonic_rs::Error),
27    /// MsgPack decode error.
28    MsgPack(String),
29    /// Format not supported (feature gate not enabled).
30    UnsupportedFormat(&'static str),
31}
32
33impl std::fmt::Display for ParseError {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Empty => write!(f, "empty payload"),
37            Self::Json(e) => write!(f, "json parse error: {e}"),
38            Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
39            Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
40        }
41    }
42}
43
44impl std::error::Error for ParseError {
45    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
46        match self {
47            Self::Json(e) => Some(e),
48            _ => None,
49        }
50    }
51}
52
53/// Parse raw bytes into a `sonic_rs::Value` using SIMD acceleration.
54///
55/// # Format dispatch
56///
57/// | Format | Engine |
58/// |--------|--------|
59/// | `Json` | `sonic_rs::from_slice` (SIMD) |
60/// | `Auto` | byte-sniff -> `Json` or `MsgPack` |
61/// | `MsgPack` | `rmpv` native decode -> `sonic_rs::Value` walker (requires `worker-msgpack` feature) |
62///
63/// # Errors
64///
65/// Returns [`ParseError`] for empty payloads, malformed JSON/MsgPack, or when
66/// the `worker-msgpack` feature is not enabled and a MsgPack payload is given.
67pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
68    if payload.is_empty() {
69        return Err(ParseError::Empty);
70    }
71
72    let effective = match format {
73        PayloadFormat::Auto => PayloadFormat::detect(payload),
74        other => other,
75    };
76
77    match effective {
78        // Auto resolves to Json or MsgPack; treat residual Auto as Json.
79        PayloadFormat::Json | PayloadFormat::Auto => {
80            sonic_rs::from_slice(payload).map_err(ParseError::Json)
81        }
82        PayloadFormat::MsgPack => {
83            #[cfg(feature = "worker-msgpack")]
84            {
85                // Native MsgPack: `rmpv::decode::read_value` is a SINGLE native
86                // decode (the same schema-less decoder `codec::parse` uses), then
87                // `rmpv_to_sonic` walks the value straight into a `sonic_rs::Value`.
88                // There is NO `rmp_serde -> serde_json` bridge and no JSON
89                // re-serialise -- the engine's `ParsedMessage` keeps its
90                // `sonic_rs::Value` shape so `field`/`value`/`extract_known` and
91                // the transform-closure contract are untouched.
92                let mut cursor: &[u8] = payload;
93                let value = rmpv::decode::read_value(&mut cursor)
94                    .map_err(|e| ParseError::MsgPack(e.to_string()))?;
95                Ok(rmpv_to_sonic(&value))
96            }
97            #[cfg(not(feature = "worker-msgpack"))]
98            {
99                Err(ParseError::UnsupportedFormat(
100                    "msgpack requires the worker-msgpack feature",
101                ))
102            }
103        }
104    }
105}
106
107/// Convert a native `rmpv::Value` into a `sonic_rs::Value` with a direct value
108/// walker -- NO `serde_json` intermediate and NO JSON re-serialise.
109///
110/// This is the bridge-free MsgPack path: the engine retains a
111/// `sonic_rs::Value` (so `ParsedMessage`, `extract_known`, pre-route, the
112/// interner and the transform-closure contract are unchanged) without ever
113/// passing through `rmp_serde -> serde_json`.
114///
115/// ## Mapping
116///
117/// - Nil -> JSON null; Boolean -> bool; F32/F64 -> JSON number.
118/// - Integers fold to `i64` / `u64` (a `u64 > i64::MAX` stays unsigned;
119///   otherwise it is surfaced as `f64`, matching the codec's lossy-but-numeric
120///   policy for the rare oversized case).
121/// - String: valid UTF-8 -> JSON string; otherwise the lossy form (a MsgPack
122///   `str` is meant to be UTF-8, so this only bites on malformed input).
123/// - Binary / Ext: base-relevant routing never keys off these, so a `bin` maps
124///   to its lossy-UTF-8 string and `ext` to JSON null -- neither aborts the
125///   parse (the bytes still round-trip via the original `Record.payload`).
126/// - Array / Map: recurse. Non-string map keys are stringified so the object is
127///   still addressable (JSON object keys must be strings).
128#[cfg(feature = "worker-msgpack")]
129fn rmpv_to_sonic(value: &rmpv::Value) -> sonic_rs::Value {
130    use rmpv::Value as M;
131    use sonic_rs::Value as S;
132
133    // A non-finite float (NaN / +/-inf) has no JSON representation, so it folds
134    // to null -- the same total-on-bad-input stance the rest of the walker takes.
135    let from_f64 = |f: f64| S::new_f64(f).unwrap_or_else(S::new_null);
136
137    match value {
138        // Nil -> JSON null. Ext carries an application-defined type tag + bytes
139        // with no JSON analogue and routers never key off it, so it folds to
140        // null too (the bytes still round-trip via the original Record.payload).
141        M::Nil | M::Ext(_, _) => S::new_null(),
142        M::Boolean(b) => S::new_bool(*b),
143        M::Integer(i) => {
144            if let Some(n) = i.as_i64() {
145                S::new_i64(n)
146            } else if let Some(n) = i.as_u64() {
147                S::new_u64(n)
148            } else {
149                // Cannot happen for a well-formed rmpv integer, but stay total.
150                S::new_null()
151            }
152        }
153        M::F32(f) => from_f64(f64::from(*f)),
154        M::F64(f) => from_f64(*f),
155        M::String(s) => match s.as_str() {
156            Some(text) => S::from(text),
157            None => S::from(String::from_utf8_lossy(s.as_bytes())),
158        },
159        M::Binary(bytes) => S::from(String::from_utf8_lossy(bytes)),
160        M::Array(items) => {
161            let mut arr = sonic_rs::Array::new();
162            for item in items {
163                arr.push(rmpv_to_sonic(item));
164            }
165            S::from(arr)
166        }
167        M::Map(pairs) => {
168            let mut obj = sonic_rs::Object::new();
169            for (k, v) in pairs {
170                let key = msgpack_key_to_string(k);
171                obj.insert(&key, rmpv_to_sonic(v));
172            }
173            S::from(obj)
174        }
175    }
176}
177
178/// Stringify an `rmpv` map key so it can be a JSON object key (which must be a
179/// string). String keys pass through; everything else uses its `Display`-ish
180/// form so the field stays addressable rather than being dropped.
181#[cfg(feature = "worker-msgpack")]
182fn msgpack_key_to_string(key: &rmpv::Value) -> String {
183    use rmpv::Value as M;
184    match key {
185        M::String(s) => match s.as_str() {
186            Some(text) => text.to_string(),
187            None => String::from_utf8_lossy(s.as_bytes()).into_owned(),
188        },
189        M::Integer(i) => i.to_string(),
190        M::Boolean(b) => b.to_string(),
191        M::Nil => "null".to_string(),
192        other => format!("{other}"),
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use sonic_rs::JsonValueTrait as _;
199
200    use super::*;
201
202    #[test]
203    fn parse_valid_json() {
204        let payload = br#"{"host": "web1", "status": 200}"#;
205        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
206        assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
207        assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
208    }
209
210    #[test]
211    fn parse_auto_detects_json() {
212        let payload = br#"{"_table": "events"}"#;
213        let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
214        assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
215    }
216
217    #[test]
218    fn parse_invalid_json_returns_error() {
219        let payload = b"this is not json {";
220        let result = parse_payload(payload, PayloadFormat::Json);
221        assert!(
222            matches!(result, Err(ParseError::Json(_))),
223            "expected Json error, got {result:?}"
224        );
225    }
226
227    #[test]
228    fn parse_empty_payload_returns_empty_error() {
229        let result = parse_payload(b"", PayloadFormat::Json);
230        assert!(
231            matches!(result, Err(ParseError::Empty)),
232            "expected Empty error, got {result:?}"
233        );
234    }
235
236    #[test]
237    fn parse_empty_payload_auto_returns_empty_error() {
238        let result = parse_payload(b"", PayloadFormat::Auto);
239        assert!(matches!(result, Err(ParseError::Empty)));
240    }
241
242    #[test]
243    fn parse_nested_json() {
244        let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
245        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
246        assert!(value.get("meta").is_some());
247        assert!(value.get("data").is_some());
248        // Verify nested field access.
249        let meta = value.get("meta").unwrap();
250        assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
251    }
252
253    #[test]
254    fn parse_json_with_unicode() {
255        let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
256        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
257        assert!(value.get("name").is_some());
258    }
259
260    #[test]
261    fn parse_error_display_empty() {
262        let e = ParseError::Empty;
263        assert_eq!(e.to_string(), "empty payload");
264    }
265
266    #[test]
267    fn parse_error_display_msgpack_unsupported() {
268        // Without the worker-msgpack feature, MsgPack returns UnsupportedFormat.
269        #[cfg(not(feature = "worker-msgpack"))]
270        {
271            // Construct a minimal fixmap: 0x81 = fixmap with 1 entry.
272            let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
273            let result = parse_payload(payload, PayloadFormat::MsgPack);
274            assert!(
275                matches!(result, Err(ParseError::UnsupportedFormat(_))),
276                "expected UnsupportedFormat, got {result:?}"
277            );
278        }
279        #[cfg(feature = "worker-msgpack")]
280        {
281            // Feature is enabled; just verify the UnsupportedFormat variant
282            // can still be constructed and displayed.
283            let e = ParseError::UnsupportedFormat("test");
284            assert!(e.to_string().contains("test"));
285        }
286    }
287
288    // ---- Native MsgPack path (rmpv, NO rmp_serde -> serde_json bridge) -------
289    //
290    // Hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv decoder
291    // + value walker, not a serde round-trip.
292    #[cfg(feature = "worker-msgpack")]
293    mod msgpack_native {
294        use super::*;
295
296        /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
297        fn fixstr(s: &str) -> Vec<u8> {
298            let bytes = s.as_bytes();
299            let mut out = vec![0xa0 | u8::try_from(bytes.len()).expect("len < 32")];
300            out.extend_from_slice(bytes);
301            out
302        }
303
304        /// `{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":nil}`
305        /// as a MsgPack fixmap -- the same canonical record the codec tests use.
306        fn sample() -> Vec<u8> {
307            let mut buf = vec![0x80 | 5]; // fixmap(5)
308            buf.extend(fixstr("_table"));
309            buf.extend(fixstr("events"));
310            buf.extend(fixstr("org_id"));
311            buf.push(42); // positive fixint
312            buf.extend(fixstr("live"));
313            buf.push(0xc3); // true
314            buf.extend(fixstr("ratio"));
315            buf.push(0xcb); // float64
316            buf.extend_from_slice(&1.5f64.to_be_bytes());
317            buf.extend(fixstr("missing"));
318            buf.push(0xc0); // nil
319            buf
320        }
321
322        #[test]
323        fn msgpack_native_decode_extracts_string_field() {
324            let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
325            assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
326        }
327
328        #[test]
329        fn msgpack_native_decode_preserves_scalar_types() {
330            let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
331            assert_eq!(value.get("org_id").and_then(|v| v.as_i64()), Some(42));
332            assert_eq!(value.get("live").and_then(|v| v.as_bool()), Some(true));
333            assert_eq!(value.get("ratio").and_then(|v| v.as_f64()), Some(1.5));
334            assert!(value.get("missing").is_some_and(|v| v.is_null()));
335        }
336
337        #[test]
338        fn msgpack_auto_detects_and_decodes_natively() {
339            // Leading fixmap byte (0x85) must auto-detect as MsgPack and decode.
340            let value = parse_payload(&sample(), PayloadFormat::Auto).unwrap();
341            assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
342        }
343
344        #[test]
345        fn msgpack_nested_array_and_map_walk() {
346            // {"items":[1,2],"meta":{"k":"v"}}
347            let mut buf = vec![0x80 | 2];
348            buf.extend(fixstr("items"));
349            buf.push(0x90 | 2); // fixarray(2)
350            buf.push(1);
351            buf.push(2);
352            buf.extend(fixstr("meta"));
353            buf.push(0x80 | 1); // fixmap(1)
354            buf.extend(fixstr("k"));
355            buf.extend(fixstr("v"));
356
357            let value = parse_payload(&buf, PayloadFormat::MsgPack).unwrap();
358            let items = value.get("items").unwrap();
359            assert_eq!(items[0].as_i64(), Some(1));
360            assert_eq!(items[1].as_i64(), Some(2));
361            assert_eq!(
362                value
363                    .get("meta")
364                    .and_then(|m| m.get("k"))
365                    .and_then(|v| v.as_str()),
366                Some("v")
367            );
368        }
369
370        #[test]
371        fn malformed_msgpack_returns_msgpack_error() {
372            // 0x81 declares a 1-entry fixmap but supplies no key/value.
373            let result = parse_payload(&[0x81], PayloadFormat::MsgPack);
374            assert!(
375                matches!(result, Err(ParseError::MsgPack(_))),
376                "expected MsgPack error, got {result:?}"
377            );
378        }
379    }
380}