Skip to main content

hyperi_rustlib/worker/engine/
parse.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/parse.rs
3// Purpose:   SIMD-accelerated payload parsing for the batch processing engine
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Parse phase: convert raw bytes into a `sonic_rs::Value` using SIMD
10//! acceleration. This is the most CPU-intensive phase (~1-5 µs per message).
11//!
12//! - JSON: `sonic_rs::from_slice` (AVX2/NEON SIMD, 2-4× faster than serde_json)
13//! - MsgPack: `rmp_serde` → `serde_json::Value` → JSON bytes → `sonic_rs::Value`
14//!   (slower, but MsgPack messages are a small minority in practice)
15//! - Auto: byte-sniff via [`PayloadFormat::detect`], then dispatch
16
17use super::types::PayloadFormat;
18
19/// Error produced when a single message fails to parse.
20#[derive(Debug)]
21pub enum ParseError {
22    /// Payload was empty -- nothing to parse.
23    Empty,
24    /// JSON parse error from sonic_rs.
25    Json(sonic_rs::Error),
26    /// MsgPack decode error.
27    MsgPack(String),
28    /// Format not supported (feature gate not enabled).
29    UnsupportedFormat(&'static str),
30}
31
32impl std::fmt::Display for ParseError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            Self::Empty => write!(f, "empty payload"),
36            Self::Json(e) => write!(f, "json parse error: {e}"),
37            Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
38            Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
39        }
40    }
41}
42
43impl std::error::Error for ParseError {
44    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
45        match self {
46            Self::Json(e) => Some(e),
47            _ => None,
48        }
49    }
50}
51
52/// Parse raw bytes into a `sonic_rs::Value` using SIMD acceleration.
53///
54/// # Format dispatch
55///
56/// | Format | Engine |
57/// |--------|--------|
58/// | `Json` | `sonic_rs::from_slice` (SIMD) |
59/// | `Auto` | byte-sniff → `Json` or `MsgPack` |
60/// | `MsgPack` | `rmp_serde` → JSON bridge → `sonic_rs` (requires `worker-msgpack` feature) |
61///
62/// # Errors
63///
64/// Returns [`ParseError`] for empty payloads, malformed JSON/MsgPack, or when
65/// the `worker-msgpack` feature is not enabled and a MsgPack payload is given.
66pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
67    if payload.is_empty() {
68        return Err(ParseError::Empty);
69    }
70
71    let effective = match format {
72        PayloadFormat::Auto => PayloadFormat::detect(payload),
73        other => other,
74    };
75
76    match effective {
77        // Auto resolves to Json or MsgPack; treat residual Auto as Json.
78        PayloadFormat::Json | PayloadFormat::Auto => {
79            sonic_rs::from_slice(payload).map_err(ParseError::Json)
80        }
81        PayloadFormat::MsgPack => {
82            #[cfg(feature = "worker-msgpack")]
83            {
84                // MsgPack → serde_json::Value → JSON bytes → sonic_rs::Value.
85                // serde_json is used as the intermediate representation because
86                // it supports both msgpack deserialization (via rmp_serde) and
87                // JSON serialization for the sonic_rs bridge.
88                let json_value: serde_json::Value = rmp_serde::from_slice(payload)
89                    .map_err(|e| ParseError::MsgPack(e.to_string()))?;
90                let json_bytes = serde_json::to_vec(&json_value)
91                    .map_err(|e| ParseError::MsgPack(e.to_string()))?;
92                sonic_rs::from_slice(&json_bytes).map_err(ParseError::Json)
93            }
94            #[cfg(not(feature = "worker-msgpack"))]
95            {
96                Err(ParseError::UnsupportedFormat(
97                    "msgpack requires the worker-msgpack feature",
98                ))
99            }
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use sonic_rs::JsonValueTrait as _;
107
108    use super::*;
109
110    #[test]
111    fn parse_valid_json() {
112        let payload = br#"{"host": "web1", "status": 200}"#;
113        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
114        assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
115        assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
116    }
117
118    #[test]
119    fn parse_auto_detects_json() {
120        let payload = br#"{"_table": "events"}"#;
121        let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
122        assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
123    }
124
125    #[test]
126    fn parse_invalid_json_returns_error() {
127        let payload = b"this is not json {";
128        let result = parse_payload(payload, PayloadFormat::Json);
129        assert!(
130            matches!(result, Err(ParseError::Json(_))),
131            "expected Json error, got {result:?}"
132        );
133    }
134
135    #[test]
136    fn parse_empty_payload_returns_empty_error() {
137        let result = parse_payload(b"", PayloadFormat::Json);
138        assert!(
139            matches!(result, Err(ParseError::Empty)),
140            "expected Empty error, got {result:?}"
141        );
142    }
143
144    #[test]
145    fn parse_empty_payload_auto_returns_empty_error() {
146        let result = parse_payload(b"", PayloadFormat::Auto);
147        assert!(matches!(result, Err(ParseError::Empty)));
148    }
149
150    #[test]
151    fn parse_nested_json() {
152        let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
153        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
154        assert!(value.get("meta").is_some());
155        assert!(value.get("data").is_some());
156        // Verify nested field access.
157        let meta = value.get("meta").unwrap();
158        assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
159    }
160
161    #[test]
162    fn parse_json_with_unicode() {
163        let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
164        let value = parse_payload(payload, PayloadFormat::Json).unwrap();
165        assert!(value.get("name").is_some());
166    }
167
168    #[test]
169    fn parse_error_display_empty() {
170        let e = ParseError::Empty;
171        assert_eq!(e.to_string(), "empty payload");
172    }
173
174    #[test]
175    fn parse_error_display_msgpack_unsupported() {
176        // Without the worker-msgpack feature, MsgPack returns UnsupportedFormat.
177        #[cfg(not(feature = "worker-msgpack"))]
178        {
179            // Construct a minimal fixmap: 0x81 = fixmap with 1 entry.
180            let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
181            let result = parse_payload(payload, PayloadFormat::MsgPack);
182            assert!(
183                matches!(result, Err(ParseError::UnsupportedFormat(_))),
184                "expected UnsupportedFormat, got {result:?}"
185            );
186        }
187        #[cfg(feature = "worker-msgpack")]
188        {
189            // Feature is enabled; just verify the UnsupportedFormat variant
190            // can still be constructed and displayed.
191            let e = ParseError::UnsupportedFormat("test");
192            assert!(e.to_string().contains("test"));
193        }
194    }
195}