hyperi_rustlib/transport/codec.rs
1// Project: hyperi-rustlib
2// File: src/transport/codec.rs
3// Purpose: Parse-on-demand WorkBatch codec (native JSON + MsgPack, no bridge)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Parse-on-demand codec
10//!
11//! The spine frames bytes into a [`WorkBatch`](super::WorkBatch) WITHOUT
12//! parsing. A transform/router that needs a field parses on demand here, format-
13//! agnostic.
14//!
15//! ## Native, no JSON bridge
16//!
17//! No `rmp_serde -> serde_json::Value -> sonic_rs` double-parse anywhere on the
18//! parse path. Both arms decode natively:
19//!
20//! - **JSON** -- [`sonic_rs`] (SIMD, AVX2/NEON).
21//! - **MsgPack** -- [`rmpv`] schema-less `Value` decoder. No intermediate
22//! `serde_json::Value`, no JSON re-serialise.
23//!
24//! ## Unified routing-field accessor
25//!
26//! A router keys off ONE field and must not branch on wire format.
27//! [`ParsedPayload`] exposes a format-agnostic accessor:
28//!
29//! - [`ParsedPayload::field_str`] -- the common case: a top-level string field.
30//! - [`ParsedPayload::field`] -- a [`FieldRef`] over the scalar routing cases
31//! (string/int/float/bool/null); everything else collapses to
32//! [`FieldRef::Other`] because routers do not key off containers.
33//!
34//! **Scope:** top-level object-key lookup only. No deep JSON-path (YAGNI -- keys
35//! live at the top level).
36//!
37//! See `docs/MIGRATIONS.md` and `docs/SELF-REGULATION.md`. Block contract in
38//! [`WorkBatch`](crate::transport::WorkBatch).
39
40use super::types::PayloadFormat;
41use bytes::Bytes;
42use sonic_rs::JsonValueTrait as _;
43use thiserror::Error;
44
45/// A parse failure, tagged by the format that failed.
46#[derive(Debug, Error)]
47#[non_exhaustive]
48pub enum CodecError {
49 /// JSON parse failed (sonic_rs SIMD parser).
50 #[error("json parse error: {0}")]
51 Json(#[from] sonic_rs::Error),
52
53 /// MsgPack parse failed (native rmpv decoder).
54 #[error("msgpack parse error: {0}")]
55 MsgPack(#[from] rmpv::decode::Error),
56
57 /// MsgPack serialise failed (native rmpv encoder).
58 ///
59 /// An in-memory `Vec` write effectively never fails, but the encoder is
60 /// fallible so we surface it rather than panic. JSON serialise reuses
61 /// [`CodecError::Json`].
62 #[error("msgpack encode error: {0}")]
63 Encode(#[from] rmpv::encode::Error),
64
65 /// Trailing bytes remain after a complete MsgPack value was decoded.
66 ///
67 /// A single-record payload must encode exactly ONE value. Trailing bytes
68 /// (the `usize`) mean corruption, framing error, or concatenated values --
69 /// MsgPack stream framing is a separate, deferred feature, not supported here.
70 #[error("msgpack trailing bytes: {0} byte(s) remain after value")]
71 TrailingBytes(usize),
72
73 /// Payload nests deeper than `MAX_PARSE_DEPTH` (`parse_guard` module).
74 /// Rejected BEFORE the recursive parser runs so a hostile deeply-nested
75 /// payload cannot exhaust the worker stack.
76 #[error("payload nesting exceeds the maximum parse depth")]
77 TooDeep,
78}
79
80/// A parsed payload, retaining its native value representation.
81///
82/// JSON stays a [`sonic_rs::Value`] (SIMD parse not thrown away), MsgPack stays
83/// an [`rmpv::Value`] (no JSON bridge). For a routing field, prefer
84/// [`ParsedPayload::field_str`] / [`ParsedPayload::field`] over matching the
85/// variant -- that is the point of the unified accessor.
86#[derive(Debug, Clone)]
87#[non_exhaustive]
88pub enum ParsedPayload {
89 /// JSON value parsed by sonic_rs (SIMD).
90 Json(sonic_rs::Value),
91 /// MsgPack value parsed natively by rmpv (no serde_json bridge).
92 MsgPack(rmpv::Value),
93}
94
95/// A borrowed view of one routing field, format-agnostic.
96///
97/// This is the shared currency the unified accessor returns so a router need
98/// not know whether the record was JSON or MsgPack. It covers the scalar cases
99/// a router actually keys off; nested objects / arrays / binary / ext collapse
100/// to [`FieldRef::Other`] because routing never branches on a container.
101///
102/// `Str` borrows from the parsed value (zero-copy); the numeric / bool variants
103/// are `Copy` scalars.
104#[derive(Debug, Clone, Copy, PartialEq)]
105#[non_exhaustive]
106pub enum FieldRef<'a> {
107 /// A string field (borrowed from the parsed value).
108 Str(&'a str),
109 /// An integer field (MsgPack ints and JSON integers fold to `i64`).
110 Int(i64),
111 /// A floating-point field.
112 Float(f64),
113 /// A boolean field.
114 Bool(bool),
115 /// An explicit null / nil field.
116 Null,
117 /// Present but not a routing scalar (object, array, binary, ext, ...).
118 Other,
119}
120
121/// Parse a framed payload into a native [`ParsedPayload`].
122///
123/// - [`PayloadFormat::Json`] -> sonic_rs (SIMD).
124/// - [`PayloadFormat::MsgPack`] -> rmpv (native, no JSON bridge).
125/// - [`PayloadFormat::Auto`] -> [`PayloadFormat::detect`] then dispatch. An empty
126/// blob detects as JSON (matching `detect`'s contract) and surfaces a
127/// [`CodecError::Json`] -- empty input is not valid JSON.
128///
129/// # Errors
130///
131/// Returns [`CodecError::Json`] or [`CodecError::MsgPack`] when the bytes are
132/// malformed for the (detected or declared) format.
133pub fn parse(payload: &Bytes, format: PayloadFormat) -> Result<ParsedPayload, CodecError> {
134 let effective = match format {
135 PayloadFormat::Auto => PayloadFormat::detect(payload),
136 other => other,
137 };
138
139 match effective {
140 // detect() never yields Auto, but treat a residual Auto as JSON.
141 PayloadFormat::Json | PayloadFormat::Auto => {
142 // Cheap iterative depth pre-scan before the recursive SIMD parser:
143 // reject pathological nesting that would otherwise blow the stack.
144 if !crate::parse_guard::json_depth_within(payload, crate::parse_guard::MAX_PARSE_DEPTH)
145 {
146 return Err(CodecError::TooDeep);
147 }
148 let value: sonic_rs::Value = sonic_rs::from_slice(payload)?;
149 Ok(ParsedPayload::Json(value))
150 }
151 PayloadFormat::MsgPack => {
152 // `&mut &[u8]` is the io::Read cursor; advances as it decodes. SINGLE
153 // native decode -- no rmp_serde, no serde_json, no re-encode.
154 //
155 // Bound nesting depth: a malicious/corrupt payload can encode deep
156 // nesting that drives recursive decode into worker-thread stack
157 // exhaustion. rmpv defaults to 1024; tighten to the shared parse-path
158 // bound on this untrusted path.
159 let mut cursor: &[u8] = payload.as_ref();
160 let value = rmpv::decode::read_value_with_max_depth(
161 &mut cursor,
162 crate::parse_guard::MAX_PARSE_DEPTH,
163 )?;
164 // One value per record. Leftover bytes mean corruption, framing
165 // misalignment, or concatenated values -- reject. MsgPack-stream
166 // framing is a separate deferred feature; do NOT silently accept.
167 let remaining = cursor.len();
168 if remaining > 0 {
169 return Err(CodecError::TrailingBytes(remaining));
170 }
171 Ok(ParsedPayload::MsgPack(value))
172 }
173 }
174}
175
176/// Serialise a JSON value to bytes via [`sonic_rs`] (SIMD), no bridge.
177///
178/// The inverse of the JSON arm of [`parse`]. Reuses sonic_rs end-to-end so a
179/// transform that mutates a parsed JSON value re-emits it without ever touching
180/// `serde_json`.
181///
182/// # Errors
183///
184/// Returns [`CodecError::Json`] if sonic_rs fails to serialise the value.
185pub fn to_json_bytes(value: &sonic_rs::Value) -> Result<Bytes, CodecError> {
186 let buf = sonic_rs::to_vec(value)?;
187 Ok(Bytes::from(buf))
188}
189
190/// Serialise a MsgPack value to bytes via NATIVE [`rmpv::encode::write_value`].
191///
192/// The inverse of the MsgPack arm of [`parse`]. This is the native rmpv encoder
193/// -- NOT `rmp_serde`, NOT a JSON bridge. A transform that mutates a parsed
194/// `rmpv::Value` re-emits MsgPack wire bytes with a single native encode, no
195/// intermediate `serde_json::Value`, no re-parse.
196///
197/// # Errors
198///
199/// Returns [`CodecError::Encode`] if the encoder fails to write the value. For
200/// an in-memory `Vec` writer this is effectively unreachable, but the encoder
201/// is fallible so the error is surfaced rather than unwrapped.
202pub fn to_msgpack_bytes(value: &rmpv::Value) -> Result<Bytes, CodecError> {
203 // write_value writes into any `io::Write`; a Vec<u8> is one and never
204 // returns a short write, so the only failure path is the encoder's own.
205 let mut buf: Vec<u8> = Vec::new();
206 rmpv::encode::write_value(&mut buf, value)?;
207 Ok(Bytes::from(buf))
208}
209
210impl ParsedPayload {
211 /// Whether the payload was decoded from JSON.
212 #[must_use]
213 pub fn is_json(&self) -> bool {
214 matches!(self, Self::Json(_))
215 }
216
217 /// Whether the payload was decoded from MsgPack.
218 #[must_use]
219 pub fn is_msgpack(&self) -> bool {
220 matches!(self, Self::MsgPack(_))
221 }
222
223 /// Read a top-level string field, format-agnostic.
224 ///
225 /// The common routing case: a router keys off one string field and does not
226 /// care about wire format. Returns `None` if the value is not a top-level
227 /// object, the key is absent, or the field is not a string. Borrows from the
228 /// parsed value (zero-copy).
229 ///
230 /// Top-level lookup only -- see the module docs.
231 #[must_use]
232 pub fn field_str(&self, name: &str) -> Option<&str> {
233 match self {
234 Self::Json(v) => v.get(name).and_then(|f| f.as_str()),
235 Self::MsgPack(v) => msgpack_field(v, name).and_then(rmpv::Value::as_str),
236 }
237 }
238
239 /// Read a top-level field as a format-agnostic [`FieldRef`].
240 ///
241 /// Returns `None` only when the value is not a top-level object or the key
242 /// is absent. A present-but-non-scalar field yields [`FieldRef::Other`]
243 /// (routers never key off containers). Borrows from the parsed value.
244 ///
245 /// Top-level lookup only -- see the module docs.
246 #[must_use]
247 pub fn field(&self, name: &str) -> Option<FieldRef<'_>> {
248 match self {
249 Self::Json(v) => v.get(name).map(json_field_ref),
250 Self::MsgPack(v) => msgpack_field(v, name).map(msgpack_field_ref),
251 }
252 }
253
254 /// Serialise back to the payload's OWN wire format.
255 ///
256 /// Same format in, same format out -- no cross-format conversion, no bridge.
257 ///
258 /// ## Pass-through contract -- DO NOT round-trip untouched records
259 ///
260 /// `to_bytes` is ONLY for a record a transform actually mutated. A record
261 /// the transform did NOT change must re-use its original `Record.payload`
262 /// directly on egress ("serde is the enemy / zero re-representation").
263 /// Calling `to_bytes` on an unmodified record pays a parse + re-serialise for
264 /// nothing AND can alter the wire bytes (key order, number formatting,
265 /// whitespace) even though the value is identical.
266 ///
267 /// No `to_bytes_as` cross-format egress: `sonic_rs::Value` and `rmpv::Value`
268 /// have no native conversion, so bridging would reintroduce the exact double-
269 /// representation this spine exists to avoid (YAGNI).
270 ///
271 /// # Errors
272 ///
273 /// Returns [`CodecError::Json`] (JSON serialise) or [`CodecError::Encode`]
274 /// (MsgPack serialise) on encoder failure.
275 pub fn to_bytes(&self) -> Result<Bytes, CodecError> {
276 match self {
277 Self::Json(v) => to_json_bytes(v),
278 Self::MsgPack(v) => to_msgpack_bytes(v),
279 }
280 }
281}
282
283/// Classify a sonic_rs JSON value into a [`FieldRef`] (borrows from `v`).
284///
285/// Order matters: probe the scalar accessors in turn. `as_i64` is tried before
286/// `as_f64` so integers stay [`FieldRef::Int`]; a JSON number with a fractional
287/// part falls through to [`FieldRef::Float`].
288fn json_field_ref(v: &sonic_rs::Value) -> FieldRef<'_> {
289 if let Some(s) = v.as_str() {
290 FieldRef::Str(s)
291 } else if v.is_null() {
292 FieldRef::Null
293 } else if let Some(b) = v.as_bool() {
294 FieldRef::Bool(b)
295 } else if let Some(i) = v.as_i64() {
296 FieldRef::Int(i)
297 } else if let Some(f) = v.as_f64() {
298 FieldRef::Float(f)
299 } else {
300 FieldRef::Other
301 }
302}
303
304/// Find a top-level value for `name` in an rmpv MsgPack value.
305///
306/// Only a [`rmpv::Value::Map`] has named fields. The map is a `Vec<(Value,
307/// Value)>`, so this is a linear scan -- routing maps are small (a handful of
308/// keys), so a linear scan beats building an index. Only string keys match.
309fn msgpack_field<'a>(v: &'a rmpv::Value, name: &str) -> Option<&'a rmpv::Value> {
310 match v {
311 rmpv::Value::Map(pairs) => pairs
312 .iter()
313 .find(|(k, _)| k.as_str() == Some(name))
314 .map(|(_, val)| val),
315 _ => None,
316 }
317}
318
319/// Classify an rmpv MsgPack value into a [`FieldRef`].
320///
321/// MsgPack integers split into signed/unsigned at the wire level; both fold to
322/// `i64` here when they fit. An unsigned value above `i64::MAX` cannot fit `i64`
323/// and is surfaced as [`FieldRef::Float`] via `as_f64` (lossy but it keeps a
324/// numeric field numeric for routing) rather than dropped to `Other`.
325fn msgpack_field_ref(v: &rmpv::Value) -> FieldRef<'_> {
326 match v {
327 rmpv::Value::String(s) => s.as_str().map_or(FieldRef::Other, FieldRef::Str),
328 rmpv::Value::Nil => FieldRef::Null,
329 rmpv::Value::Boolean(b) => FieldRef::Bool(*b),
330 rmpv::Value::Integer(_) => v
331 .as_i64()
332 .map(FieldRef::Int)
333 .or_else(|| v.as_f64().map(FieldRef::Float))
334 .unwrap_or(FieldRef::Other),
335 rmpv::Value::F32(f) => FieldRef::Float(f64::from(*f)),
336 rmpv::Value::F64(f) => FieldRef::Float(*f),
337 // Map / Array / Binary / Ext: routers do not key off containers.
338 _ => FieldRef::Other,
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 // ---- Helpers: build real MsgPack blobs by hand (no serde encode) -------
347 //
348 // We hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv
349 // decoder against the real wire format, not a serde round-trip.
350
351 /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
352 fn fixstr(s: &str) -> Vec<u8> {
353 let bytes = s.as_bytes();
354 assert!(bytes.len() < 32, "fixstr helper only handles len < 32");
355 let len = u8::try_from(bytes.len()).expect("len < 32 fits u8");
356 let mut out = vec![0xa0 | len];
357 out.extend_from_slice(bytes);
358 out
359 }
360
361 /// fixmap header: 0x80 | n (n < 16 entries).
362 fn fixmap_header(n: u8) -> u8 {
363 assert!(n < 16, "fixmap helper only handles < 16 entries");
364 0x80 | n
365 }
366
367 /// Build a logical record `{"_table": "events", "org_id": 42, "live":
368 /// true, "ratio": <f64>, "missing": nil}` as a MsgPack fixmap.
369 fn sample_msgpack() -> Bytes {
370 let mut buf = vec![fixmap_header(5)];
371 // "_table": "events"
372 buf.extend(fixstr("_table"));
373 buf.extend(fixstr("events"));
374 // "org_id": 42 (positive fixint -- the byte is its own value)
375 buf.extend(fixstr("org_id"));
376 buf.push(42);
377 // "live": true (0xc3)
378 buf.extend(fixstr("live"));
379 buf.push(0xc3);
380 // "ratio": 1.5 (float64 0xcb + 8 bytes big-endian)
381 buf.extend(fixstr("ratio"));
382 buf.push(0xcb);
383 buf.extend_from_slice(&1.5f64.to_be_bytes());
384 // "missing": nil (0xc0)
385 buf.extend(fixstr("missing"));
386 buf.push(0xc0);
387 Bytes::from(buf)
388 }
389
390 /// The same logical record as JSON.
391 fn sample_json() -> Bytes {
392 Bytes::from_static(
393 br#"{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":null}"#,
394 )
395 }
396
397 // ---- parse(): JSON -----------------------------------------------------
398
399 #[test]
400 fn parse_json_object() {
401 let parsed = parse(&sample_json(), PayloadFormat::Json).unwrap();
402 assert!(parsed.is_json());
403 assert!(!parsed.is_msgpack());
404 assert_eq!(parsed.field_str("_table"), Some("events"));
405 }
406
407 #[test]
408 fn parse_json_array_is_ok() {
409 // A top-level array is valid JSON; it simply has no named fields.
410 let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
411 assert!(parsed.is_json());
412 assert_eq!(parsed.field_str("anything"), None);
413 }
414
415 #[test]
416 fn parse_msgpack_rejects_excessive_nesting() {
417 // 70 nested single-element arrays exceed the 64-level decode-depth bound
418 // that guards worker-thread stacks against a deeply-nested hostile or
419 // corrupt payload. 0x91 = fixarray of 1 element; 0xc0 = nil leaf.
420 let mut buf = vec![0x91u8; 70];
421 buf.push(0xc0);
422 let result = parse(&Bytes::from(buf), PayloadFormat::MsgPack);
423 assert!(
424 matches!(result, Err(CodecError::MsgPack(_))),
425 "deeply nested msgpack must be rejected by the depth bound, got {result:?}"
426 );
427 }
428
429 #[test]
430 fn parse_msgpack_allows_reasonable_nesting() {
431 // Nesting well under the bound parses fine (no false positive).
432 let mut buf = vec![0x91u8; 8];
433 buf.push(0xc0);
434 assert!(parse(&Bytes::from(buf), PayloadFormat::MsgPack).is_ok());
435 }
436
437 // ---- parse(): MsgPack (native rmpv, hand-rolled bytes) -----------------
438
439 #[test]
440 fn parse_msgpack_map() {
441 let parsed = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
442 assert!(parsed.is_msgpack());
443 assert!(!parsed.is_json());
444 assert_eq!(parsed.field_str("_table"), Some("events"));
445 }
446
447 #[test]
448 fn parse_minimal_fixmap() {
449 // {"k": "v"} -- the smallest interesting map.
450 let mut buf = vec![fixmap_header(1)];
451 buf.extend(fixstr("k"));
452 buf.extend(fixstr("v"));
453 let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
454 assert_eq!(parsed.field_str("k"), Some("v"));
455 }
456
457 // ---- Auto detection dispatch ------------------------------------------
458
459 #[test]
460 fn parse_auto_dispatches_to_json() {
461 let parsed = parse(&sample_json(), PayloadFormat::Auto).unwrap();
462 assert!(parsed.is_json(), "object byte '{{' must detect as JSON");
463 assert_eq!(parsed.field_str("_table"), Some("events"));
464 }
465
466 #[test]
467 fn parse_auto_dispatches_to_msgpack() {
468 let parsed = parse(&sample_msgpack(), PayloadFormat::Auto).unwrap();
469 assert!(
470 parsed.is_msgpack(),
471 "fixmap byte 0x85 must detect as MsgPack"
472 );
473 assert_eq!(parsed.field_str("_table"), Some("events"));
474 }
475
476 // ---- Unified accessor: SAME field value from BOTH formats --------------
477
478 #[test]
479 fn field_str_identical_across_formats() {
480 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
481 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
482 // The whole point: same logical record, same routing field, regardless
483 // of wire format.
484 assert_eq!(j.field_str("_table"), m.field_str("_table"));
485 assert_eq!(j.field_str("_table"), Some("events"));
486 }
487
488 #[test]
489 fn field_str_returns_none_for_non_string() {
490 // org_id is an int -- field_str only returns strings.
491 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
492 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
493 assert_eq!(j.field_str("org_id"), None);
494 assert_eq!(m.field_str("org_id"), None);
495 }
496
497 #[test]
498 fn field_str_returns_none_for_missing_key() {
499 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
500 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
501 assert_eq!(j.field_str("nope"), None);
502 assert_eq!(m.field_str("nope"), None);
503 }
504
505 #[test]
506 fn field_str_value_is_present_via_field_too() {
507 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
508 assert_eq!(j.field("_table"), Some(FieldRef::Str("events")));
509 }
510
511 #[test]
512 fn field_int_identical_across_formats() {
513 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
514 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
515 assert_eq!(j.field("org_id"), Some(FieldRef::Int(42)));
516 assert_eq!(m.field("org_id"), Some(FieldRef::Int(42)));
517 }
518
519 #[test]
520 fn field_bool_identical_across_formats() {
521 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
522 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
523 assert_eq!(j.field("live"), Some(FieldRef::Bool(true)));
524 assert_eq!(m.field("live"), Some(FieldRef::Bool(true)));
525 }
526
527 #[test]
528 fn field_float_identical_across_formats() {
529 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
530 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
531 assert_eq!(j.field("ratio"), Some(FieldRef::Float(1.5)));
532 assert_eq!(m.field("ratio"), Some(FieldRef::Float(1.5)));
533 }
534
535 #[test]
536 fn field_null_identical_across_formats() {
537 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
538 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
539 assert_eq!(j.field("missing"), Some(FieldRef::Null));
540 assert_eq!(m.field("missing"), Some(FieldRef::Null));
541 }
542
543 #[test]
544 fn field_missing_key_is_none_for_both() {
545 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
546 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
547 assert_eq!(j.field("nope"), None);
548 assert_eq!(m.field("nope"), None);
549 }
550
551 #[test]
552 fn field_nested_object_is_other() {
553 // A field whose value is a container collapses to Other, not None.
554 let j = parse(
555 &Bytes::from_static(br#"{"k":{"nested":1}}"#),
556 PayloadFormat::Json,
557 )
558 .unwrap();
559 assert_eq!(j.field("k"), Some(FieldRef::Other));
560 // ...but it is not a routing scalar, so field_str is None.
561 assert_eq!(j.field_str("k"), None);
562
563 // MsgPack: {"k": [1]} -- an array value also collapses to Other.
564 // fixmap(1) "k" -> fixarray(1) [positive fixint 1]
565 let mut buf = vec![fixmap_header(1)];
566 buf.extend(fixstr("k"));
567 buf.push(0x91); // fixarray with 1 element
568 buf.push(0x01); // positive fixint 1
569 let m = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
570 assert_eq!(m.field("k"), Some(FieldRef::Other));
571 }
572
573 #[test]
574 fn field_on_non_object_top_level_is_none() {
575 // A top-level JSON array has no named fields.
576 let j = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
577 assert_eq!(j.field("0"), None);
578
579 // A top-level MsgPack array (fixarray) likewise.
580 // fixarray(2) [1, 2]
581 let m = parse(&Bytes::from(vec![0x92, 0x01, 0x02]), PayloadFormat::MsgPack).unwrap();
582 assert_eq!(m.field("0"), None);
583 }
584
585 // ---- Error cases -------------------------------------------------------
586
587 #[test]
588 fn malformed_json_errors() {
589 let err = parse(&Bytes::from_static(b"{not valid json"), PayloadFormat::Json).unwrap_err();
590 assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
591 assert!(!err.to_string().is_empty());
592 }
593
594 #[test]
595 fn empty_blob_auto_errors_as_json() {
596 // detect() maps empty -> Json; empty is not valid JSON.
597 let err = parse(&Bytes::new(), PayloadFormat::Auto).unwrap_err();
598 assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
599 }
600
601 #[test]
602 fn malformed_msgpack_errors() {
603 // 0x81 declares a fixmap with one entry but supplies no key/value.
604 let err = parse(&Bytes::from_static(&[0x81]), PayloadFormat::MsgPack).unwrap_err();
605 assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
606 assert!(!err.to_string().is_empty());
607 }
608
609 #[test]
610 fn msgpack_truncated_float_errors() {
611 // 0xcb declares a float64 but supplies only 3 of the 8 payload bytes.
612 let mut buf = vec![fixmap_header(1)];
613 buf.extend(fixstr("ratio"));
614 buf.push(0xcb);
615 buf.extend_from_slice(&[0x00, 0x01, 0x02]); // short
616 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap_err();
617 assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
618 }
619
620 // ---- Task 0.3b: serialise-out (round-trips through real bytes) ---------
621 //
622 // The contract is "parse -> (mutate) -> serialise -> parse again preserves
623 // the logical value". We assert on re-parsed VALUES (via the unified
624 // accessor), NOT on raw bytes: re-serialise may reorder keys or reformat
625 // numbers, so a byte-for-byte equality assertion would be wrong.
626
627 /// Compare every routing field of the canonical sample across two payloads.
628 fn assert_sample_fields_eq(a: &ParsedPayload, b: &ParsedPayload) {
629 assert_eq!(a.field("_table"), b.field("_table"));
630 assert_eq!(a.field("org_id"), b.field("org_id"));
631 assert_eq!(a.field("live"), b.field("live"));
632 assert_eq!(a.field("ratio"), b.field("ratio"));
633 assert_eq!(a.field("missing"), b.field("missing"));
634 }
635
636 #[test]
637 fn json_to_bytes_round_trips() {
638 // parse JSON -> to_bytes -> parse again -> values equal.
639 let original = parse(&sample_json(), PayloadFormat::Json).unwrap();
640 assert!(original.is_json());
641
642 let bytes = original.to_bytes().unwrap();
643 assert!(!bytes.is_empty());
644
645 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
646 assert!(reparsed.is_json(), "JSON must round-trip as JSON");
647 assert_sample_fields_eq(&original, &reparsed);
648 }
649
650 #[test]
651 fn msgpack_to_bytes_round_trips_via_native_bytes() {
652 // parse MsgPack (hand-rolled bytes) -> to_bytes (native rmpv encode)
653 // -> parse again -> values equal. No serde, no JSON bridge anywhere.
654 let original = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
655 assert!(original.is_msgpack());
656
657 let bytes = original.to_bytes().unwrap();
658 assert!(!bytes.is_empty());
659 // First byte must be a MsgPack map marker (fixmap 0x80..=0x8f for 5
660 // entries -> 0x85), proving native MsgPack came out, not JSON.
661 assert_eq!(bytes[0], fixmap_header(5), "expected fixmap(5) wire marker");
662
663 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
664 assert!(reparsed.is_msgpack(), "MsgPack must round-trip as MsgPack");
665 assert_sample_fields_eq(&original, &reparsed);
666 }
667
668 #[test]
669 fn to_json_bytes_reparses_to_same_value() {
670 // Free function: serialise a sonic_rs::Value, re-parse, compare.
671 let ParsedPayload::Json(value) = parse(&sample_json(), PayloadFormat::Json).unwrap() else {
672 panic!("expected JSON");
673 };
674 let bytes = to_json_bytes(&value).unwrap();
675 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
676 assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
677 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
678 assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
679 }
680
681 #[test]
682 fn to_msgpack_bytes_reparses_to_same_value() {
683 // Free function: serialise an rmpv::Value via native write_value,
684 // re-parse, compare.
685 let ParsedPayload::MsgPack(value) =
686 parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap()
687 else {
688 panic!("expected MsgPack");
689 };
690 let bytes = to_msgpack_bytes(&value).unwrap();
691 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
692 assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
693 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
694 assert_eq!(reparsed.field("live"), Some(FieldRef::Bool(true)));
695 assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
696 assert_eq!(reparsed.field("missing"), Some(FieldRef::Null));
697 }
698
699 #[test]
700 fn to_bytes_preserves_a_mutated_json_field() {
701 // The realistic case: a transform CHANGED a field, then re-serialises.
702 // Only then is to_bytes the right tool (unmodified records pass through
703 // the original Bytes -- see the to_bytes doc).
704 let ParsedPayload::Json(mut value) = parse(&sample_json(), PayloadFormat::Json).unwrap()
705 else {
706 panic!("expected JSON");
707 };
708 // Mutate _table in place via sonic_rs's object insert (overwrites the
709 // existing key) -- the value model is what gets re-serialised.
710 value.insert("_table", sonic_rs::Value::from("audit"));
711 let bytes = to_json_bytes(&value).unwrap();
712 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
713 assert_eq!(reparsed.field_str("_table"), Some("audit"));
714 // Untouched siblings survive the round-trip.
715 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
716 }
717
718 #[test]
719 fn json_to_bytes_handles_top_level_array() {
720 // Egress is not object-only: a top-level array must round-trip too.
721 let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
722 let bytes = parsed.to_bytes().unwrap();
723 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
724 assert!(reparsed.is_json());
725 // No named fields either way; the value re-parses without error.
726 assert_eq!(reparsed.field_str("anything"), None);
727 }
728
729 #[test]
730 fn msgpack_to_bytes_handles_top_level_scalar() {
731 // A bare MsgPack integer (positive fixint 42) round-trips as itself,
732 // not wrapped in a map.
733 let parsed = parse(&Bytes::from(vec![42u8]), PayloadFormat::MsgPack).unwrap();
734 let bytes = parsed.to_bytes().unwrap();
735 assert_eq!(
736 bytes.as_ref(),
737 &[42u8],
738 "fixint must re-emit byte-identical"
739 );
740 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
741 assert!(reparsed.is_msgpack());
742 }
743
744 #[test]
745 fn double_round_trip_is_stable() {
746 // parse -> to_bytes -> parse -> to_bytes: the SECOND serialise must
747 // equal the first (the value model is the fixed point, even if it
748 // differs from the original hand-rolled bytes).
749 let first = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
750 let b1 = first.to_bytes().unwrap();
751 let second = parse(&b1, PayloadFormat::MsgPack).unwrap();
752 let b2 = second.to_bytes().unwrap();
753 assert_eq!(b1, b2, "re-serialising a re-parsed value must be stable");
754 }
755
756 // ---- Phase 5: trailing-bytes hardening ------------------------------------
757
758 #[test]
759 fn msgpack_rejects_trailing_bytes() {
760 // A valid fixmap {"k": "v"} followed by a stray nil byte (0xc0).
761 // Before the fix this returns Ok (silently ignoring 0xc0).
762 // After the fix it must return CodecError::TrailingBytes(1).
763 let mut buf = vec![fixmap_header(1)];
764 buf.extend(fixstr("k"));
765 buf.extend(fixstr("v"));
766 buf.push(0xc0); // stray nil -- trailing garbage
767 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
768 .expect_err("trailing byte must be rejected");
769 match err {
770 CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
771 other => panic!("expected TrailingBytes(1), got {other:?}"),
772 }
773 }
774
775 #[test]
776 fn msgpack_rejects_concatenated_values() {
777 // Two valid MsgPack values back-to-back: fixint 1 then fixint 2.
778 // parse() decodes ONE value; the second byte is trailing and must error.
779 let buf = vec![0x01u8, 0x02u8]; // positive fixint 1, positive fixint 2
780 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
781 .expect_err("concatenated values must be rejected");
782 match err {
783 CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
784 other => panic!("expected TrailingBytes(1), got {other:?}"),
785 }
786 }
787
788 #[test]
789 fn msgpack_clean_single_value_still_parses_ok() {
790 // A single valid fixmap with no trailing bytes -- must still parse Ok.
791 // Regression guard: the fix must not break the happy path.
792 let mut buf = vec![fixmap_header(1)];
793 buf.extend(fixstr("k"));
794 buf.extend(fixstr("v"));
795 let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
796 assert_eq!(parsed.field_str("k"), Some("v"));
797 }
798
799 #[test]
800 fn json_rejects_trailing_garbage() {
801 // The MsgPack path rejects trailing bytes (CodecError::TrailingBytes).
802 // The JSON path delegates to sonic_rs, which rejects trailing
803 // NON-whitespace content after a complete value (serde_json semantics).
804 // A valid object followed by stray garbage must error.
805 let mut buf = br#"{"_table":"events"}"#.to_vec();
806 buf.extend_from_slice(b"garbage");
807 let err = parse(&Bytes::from(buf), PayloadFormat::Json)
808 .expect_err("trailing non-whitespace garbage must be rejected");
809 assert!(
810 matches!(err, CodecError::Json(_)),
811 "expected CodecError::Json for trailing garbage, got {err:?}"
812 );
813 }
814
815 #[test]
816 fn json_accepts_trailing_whitespace() {
817 // sonic_rs (like serde_json) tolerates trailing whitespace after a
818 // complete value -- a pretty-printer's trailing newline must not be a
819 // parse error. Asserted alongside the trailing-garbage rejection so the
820 // JSON trailing-byte contract is pinned both ways.
821 let mut buf = br#"{"_table":"events"}"#.to_vec();
822 buf.extend_from_slice(b" \t\r\n");
823 let parsed = parse(&Bytes::from(buf), PayloadFormat::Json)
824 .expect("trailing whitespace must be accepted");
825 assert_eq!(parsed.field_str("_table"), Some("events"));
826 }
827
828 #[test]
829 fn json_parsed_as_msgpack_errors() {
830 // Force the wrong decoder: JSON bytes through the MsgPack path. '{' is
831 // 0x7b, which rmpv reads as a positive fixint -- a single-byte value
832 // -- leaving the remaining 69 bytes of the JSON payload as trailing
833 // bytes. After Phase 5 hardening this MUST error with TrailingBytes.
834 let err = parse(&sample_json(), PayloadFormat::MsgPack)
835 .expect_err("JSON fed to MsgPack path must error after trailing-bytes hardening");
836 assert!(
837 matches!(err, CodecError::TrailingBytes(_)),
838 "expected TrailingBytes, got {err:?}"
839 );
840 }
841}