hyperi_rustlib/worker/engine/
parse.rs1use super::types::PayloadFormat;
19
20#[derive(Debug)]
22pub enum ParseError {
23 Empty,
25 Json(sonic_rs::Error),
27 MsgPack(String),
29 UnsupportedFormat(&'static str),
31 TooDeep,
35}
36
37impl std::fmt::Display for ParseError {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 Self::Empty => write!(f, "empty payload"),
41 Self::Json(e) => write!(f, "json parse error: {e}"),
42 Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
43 Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
44 Self::TooDeep => write!(
45 f,
46 "payload nesting exceeds the maximum parse depth of {}",
47 crate::parse_guard::MAX_PARSE_DEPTH
48 ),
49 }
50 }
51}
52
53impl std::error::Error for ParseError {
54 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
55 match self {
56 Self::Json(e) => Some(e),
57 _ => None,
58 }
59 }
60}
61
62pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
77 if payload.is_empty() {
78 return Err(ParseError::Empty);
79 }
80
81 let effective = match format {
82 PayloadFormat::Auto => PayloadFormat::detect(payload),
83 other => other,
84 };
85
86 match effective {
87 PayloadFormat::Json | PayloadFormat::Auto => {
89 if !crate::parse_guard::json_depth_within(payload, crate::parse_guard::MAX_PARSE_DEPTH)
93 {
94 return Err(ParseError::TooDeep);
95 }
96 sonic_rs::from_slice(payload).map_err(ParseError::Json)
97 }
98 PayloadFormat::MsgPack => {
99 #[cfg(feature = "worker-msgpack")]
100 {
101 let mut cursor: &[u8] = payload;
106 let value = rmpv::decode::read_value_with_max_depth(
109 &mut cursor,
110 crate::parse_guard::MAX_PARSE_DEPTH,
111 )
112 .map_err(|e| ParseError::MsgPack(e.to_string()))?;
113 Ok(rmpv_to_sonic(&value))
114 }
115 #[cfg(not(feature = "worker-msgpack"))]
116 {
117 Err(ParseError::UnsupportedFormat(
118 "msgpack requires the worker-msgpack feature",
119 ))
120 }
121 }
122 }
123}
124
125#[cfg(feature = "worker-msgpack")]
144fn rmpv_to_sonic(value: &rmpv::Value) -> sonic_rs::Value {
145 use rmpv::Value as M;
146 use sonic_rs::Value as S;
147
148 let from_f64 = |f: f64| S::new_f64(f).unwrap_or_else(S::new_null);
151
152 match value {
153 M::Nil | M::Ext(_, _) => S::new_null(),
157 M::Boolean(b) => S::new_bool(*b),
158 M::Integer(i) => {
159 if let Some(n) = i.as_i64() {
160 S::new_i64(n)
161 } else if let Some(n) = i.as_u64() {
162 S::new_u64(n)
163 } else {
164 S::new_null()
166 }
167 }
168 M::F32(f) => from_f64(f64::from(*f)),
169 M::F64(f) => from_f64(*f),
170 M::String(s) => match s.as_str() {
171 Some(text) => S::from(text),
172 None => S::from(String::from_utf8_lossy(s.as_bytes())),
173 },
174 M::Binary(bytes) => S::from(String::from_utf8_lossy(bytes)),
175 M::Array(items) => {
176 let mut arr = sonic_rs::Array::new();
177 for item in items {
178 arr.push(rmpv_to_sonic(item));
179 }
180 S::from(arr)
181 }
182 M::Map(pairs) => {
183 let mut obj = sonic_rs::Object::new();
184 for (k, v) in pairs {
185 let key = msgpack_key_to_string(k);
186 obj.insert(&key, rmpv_to_sonic(v));
187 }
188 S::from(obj)
189 }
190 }
191}
192
193#[cfg(feature = "worker-msgpack")]
197fn msgpack_key_to_string(key: &rmpv::Value) -> String {
198 use rmpv::Value as M;
199 match key {
200 M::String(s) => match s.as_str() {
201 Some(text) => text.to_string(),
202 None => String::from_utf8_lossy(s.as_bytes()).into_owned(),
203 },
204 M::Integer(i) => i.to_string(),
205 M::Boolean(b) => b.to_string(),
206 M::Nil => "null".to_string(),
207 other => format!("{other}"),
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use sonic_rs::JsonValueTrait as _;
214
215 use super::*;
216
217 #[test]
218 fn parse_valid_json() {
219 let payload = br#"{"host": "web1", "status": 200}"#;
220 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
221 assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
222 assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
223 }
224
225 #[test]
226 fn parse_auto_detects_json() {
227 let payload = br#"{"_table": "events"}"#;
228 let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
229 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
230 }
231
232 #[test]
233 fn parse_invalid_json_returns_error() {
234 let payload = b"this is not json {";
235 let result = parse_payload(payload, PayloadFormat::Json);
236 assert!(
237 matches!(result, Err(ParseError::Json(_))),
238 "expected Json error, got {result:?}"
239 );
240 }
241
242 #[test]
243 fn parse_empty_payload_returns_empty_error() {
244 let result = parse_payload(b"", PayloadFormat::Json);
245 assert!(
246 matches!(result, Err(ParseError::Empty)),
247 "expected Empty error, got {result:?}"
248 );
249 }
250
251 #[test]
252 fn parse_empty_payload_auto_returns_empty_error() {
253 let result = parse_payload(b"", PayloadFormat::Auto);
254 assert!(matches!(result, Err(ParseError::Empty)));
255 }
256
257 #[test]
258 fn parse_nested_json() {
259 let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
260 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
261 assert!(value.get("meta").is_some());
262 assert!(value.get("data").is_some());
263 let meta = value.get("meta").unwrap();
265 assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
266 }
267
268 #[test]
269 fn parse_json_with_unicode() {
270 let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
271 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
272 assert!(value.get("name").is_some());
273 }
274
275 #[test]
276 fn parse_error_display_empty() {
277 let e = ParseError::Empty;
278 assert_eq!(e.to_string(), "empty payload");
279 }
280
281 #[test]
282 fn parse_error_display_msgpack_unsupported() {
283 #[cfg(not(feature = "worker-msgpack"))]
285 {
286 let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
288 let result = parse_payload(payload, PayloadFormat::MsgPack);
289 assert!(
290 matches!(result, Err(ParseError::UnsupportedFormat(_))),
291 "expected UnsupportedFormat, got {result:?}"
292 );
293 }
294 #[cfg(feature = "worker-msgpack")]
295 {
296 let e = ParseError::UnsupportedFormat("test");
299 assert!(e.to_string().contains("test"));
300 }
301 }
302
303 #[cfg(feature = "worker-msgpack")]
308 mod msgpack_native {
309 use super::*;
310
311 fn fixstr(s: &str) -> Vec<u8> {
313 let bytes = s.as_bytes();
314 let mut out = vec![0xa0 | u8::try_from(bytes.len()).expect("len < 32")];
315 out.extend_from_slice(bytes);
316 out
317 }
318
319 fn sample() -> Vec<u8> {
322 let mut buf = vec![0x80 | 5]; buf.extend(fixstr("_table"));
324 buf.extend(fixstr("events"));
325 buf.extend(fixstr("org_id"));
326 buf.push(42); buf.extend(fixstr("live"));
328 buf.push(0xc3); buf.extend(fixstr("ratio"));
330 buf.push(0xcb); buf.extend_from_slice(&1.5f64.to_be_bytes());
332 buf.extend(fixstr("missing"));
333 buf.push(0xc0); buf
335 }
336
337 #[test]
338 fn msgpack_native_decode_extracts_string_field() {
339 let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
340 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
341 }
342
343 #[test]
344 fn msgpack_native_decode_preserves_scalar_types() {
345 let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
346 assert_eq!(value.get("org_id").and_then(|v| v.as_i64()), Some(42));
347 assert_eq!(value.get("live").and_then(|v| v.as_bool()), Some(true));
348 assert_eq!(value.get("ratio").and_then(|v| v.as_f64()), Some(1.5));
349 assert!(value.get("missing").is_some_and(|v| v.is_null()));
350 }
351
352 #[test]
353 fn msgpack_auto_detects_and_decodes_natively() {
354 let value = parse_payload(&sample(), PayloadFormat::Auto).unwrap();
356 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
357 }
358
359 #[test]
360 fn msgpack_nested_array_and_map_walk() {
361 let mut buf = vec![0x80 | 2];
363 buf.extend(fixstr("items"));
364 buf.push(0x90 | 2); buf.push(1);
366 buf.push(2);
367 buf.extend(fixstr("meta"));
368 buf.push(0x80 | 1); buf.extend(fixstr("k"));
370 buf.extend(fixstr("v"));
371
372 let value = parse_payload(&buf, PayloadFormat::MsgPack).unwrap();
373 let items = value.get("items").unwrap();
374 assert_eq!(items[0].as_i64(), Some(1));
375 assert_eq!(items[1].as_i64(), Some(2));
376 assert_eq!(
377 value
378 .get("meta")
379 .and_then(|m| m.get("k"))
380 .and_then(|v| v.as_str()),
381 Some("v")
382 );
383 }
384
385 #[test]
386 fn malformed_msgpack_returns_msgpack_error() {
387 let result = parse_payload(&[0x81], PayloadFormat::MsgPack);
389 assert!(
390 matches!(result, Err(ParseError::MsgPack(_))),
391 "expected MsgPack error, got {result:?}"
392 );
393 }
394 }
395}