Skip to main content

hyperi_rustlib/transport/
codec.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/codec.rs
3// Purpose:   Parse-on-demand WorkBatch codec (native JSON + MsgPack, no bridge)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Parse-on-demand codec
10//!
11//! The spine frames bytes into a [`WorkBatch`](super::WorkBatch) WITHOUT
12//! parsing. A transform/router that needs a field parses on demand here, format-
13//! agnostic.
14//!
15//! ## Native, no JSON bridge
16//!
17//! No `rmp_serde -> serde_json::Value -> sonic_rs` double-parse anywhere on the
18//! parse path. Both arms decode natively:
19//!
20//! - **JSON** -- [`sonic_rs`] (SIMD, AVX2/NEON).
21//! - **MsgPack** -- [`rmpv`] schema-less `Value` decoder. No intermediate
22//!   `serde_json::Value`, no JSON re-serialise.
23//!
24//! ## Unified routing-field accessor
25//!
26//! A router keys off ONE field and must not branch on wire format.
27//! [`ParsedPayload`] exposes a format-agnostic accessor:
28//!
29//! - [`ParsedPayload::field_str`] -- the common case: a top-level string field.
30//! - [`ParsedPayload::field`] -- a [`FieldRef`] over the scalar routing cases
31//!   (string/int/float/bool/null); everything else collapses to
32//!   [`FieldRef::Other`] because routers do not key off containers.
33//!
34//! **Scope:** top-level object-key lookup only. No deep JSON-path (YAGNI -- keys
35//! live at the top level).
36//!
37//! See `docs/MIGRATIONS.md` and `docs/SELF-REGULATION.md`. Block contract in
38//! [`WorkBatch`](crate::transport::WorkBatch).
39
40use super::types::PayloadFormat;
41use bytes::Bytes;
42use sonic_rs::JsonValueTrait as _;
43use thiserror::Error;
44
45/// A parse failure, tagged by the format that failed.
46#[derive(Debug, Error)]
47#[non_exhaustive]
48pub enum CodecError {
49    /// JSON parse failed (sonic_rs SIMD parser).
50    #[error("json parse error: {0}")]
51    Json(#[from] sonic_rs::Error),
52
53    /// MsgPack parse failed (native rmpv decoder).
54    #[error("msgpack parse error: {0}")]
55    MsgPack(#[from] rmpv::decode::Error),
56
57    /// MsgPack serialise failed (native rmpv encoder).
58    ///
59    /// An in-memory `Vec` write effectively never fails, but the encoder is
60    /// fallible so we surface it rather than panic. JSON serialise reuses
61    /// [`CodecError::Json`].
62    #[error("msgpack encode error: {0}")]
63    Encode(#[from] rmpv::encode::Error),
64
65    /// Trailing bytes remain after a complete MsgPack value was decoded.
66    ///
67    /// A single-record payload must encode exactly ONE value. Trailing bytes
68    /// (the `usize`) mean corruption, framing error, or concatenated values --
69    /// MsgPack stream framing is a separate, deferred feature, not supported here.
70    #[error("msgpack trailing bytes: {0} byte(s) remain after value")]
71    TrailingBytes(usize),
72
73    /// Payload nests deeper than `MAX_PARSE_DEPTH` (`parse_guard` module).
74    /// Rejected BEFORE the recursive parser runs so a hostile deeply-nested
75    /// payload cannot exhaust the worker stack.
76    #[error("payload nesting exceeds the maximum parse depth")]
77    TooDeep,
78}
79
80/// A parsed payload, retaining its native value representation.
81///
82/// JSON stays a [`sonic_rs::Value`] (SIMD parse not thrown away), MsgPack stays
83/// an [`rmpv::Value`] (no JSON bridge). For a routing field, prefer
84/// [`ParsedPayload::field_str`] / [`ParsedPayload::field`] over matching the
85/// variant -- that is the point of the unified accessor.
86#[derive(Debug, Clone)]
87#[non_exhaustive]
88pub enum ParsedPayload {
89    /// JSON value parsed by sonic_rs (SIMD).
90    Json(sonic_rs::Value),
91    /// MsgPack value parsed natively by rmpv (no serde_json bridge).
92    MsgPack(rmpv::Value),
93}
94
95/// A borrowed view of one routing field, format-agnostic.
96///
97/// This is the shared currency the unified accessor returns so a router need
98/// not know whether the record was JSON or MsgPack. It covers the scalar cases
99/// a router actually keys off; nested objects / arrays / binary / ext collapse
100/// to [`FieldRef::Other`] because routing never branches on a container.
101///
102/// `Str` borrows from the parsed value (zero-copy); the numeric / bool variants
103/// are `Copy` scalars.
104#[derive(Debug, Clone, Copy, PartialEq)]
105#[non_exhaustive]
106pub enum FieldRef<'a> {
107    /// A string field (borrowed from the parsed value).
108    Str(&'a str),
109    /// An integer field (MsgPack ints and JSON integers fold to `i64`).
110    Int(i64),
111    /// A floating-point field.
112    Float(f64),
113    /// A boolean field.
114    Bool(bool),
115    /// An explicit null / nil field.
116    Null,
117    /// Present but not a routing scalar (object, array, binary, ext, ...).
118    Other,
119}
120
121/// Parse a framed payload into a native [`ParsedPayload`].
122///
123/// - [`PayloadFormat::Json`] -> sonic_rs (SIMD).
124/// - [`PayloadFormat::MsgPack`] -> rmpv (native, no JSON bridge).
125/// - [`PayloadFormat::Auto`] -> [`PayloadFormat::detect`] then dispatch. An empty
126///   blob detects as JSON (matching `detect`'s contract) and surfaces a
127///   [`CodecError::Json`] -- empty input is not valid JSON.
128///
129/// # Errors
130///
131/// Returns [`CodecError::Json`] or [`CodecError::MsgPack`] when the bytes are
132/// malformed for the (detected or declared) format.
133pub fn parse(payload: &Bytes, format: PayloadFormat) -> Result<ParsedPayload, CodecError> {
134    let effective = match format {
135        PayloadFormat::Auto => PayloadFormat::detect(payload),
136        other => other,
137    };
138
139    match effective {
140        // detect() never yields Auto, but treat a residual Auto as JSON.
141        PayloadFormat::Json | PayloadFormat::Auto => {
142            // Cheap iterative depth pre-scan before the recursive SIMD parser:
143            // reject pathological nesting that would otherwise blow the stack.
144            if !crate::parse_guard::json_depth_within(payload, crate::parse_guard::MAX_PARSE_DEPTH)
145            {
146                return Err(CodecError::TooDeep);
147            }
148            let value: sonic_rs::Value = sonic_rs::from_slice(payload)?;
149            Ok(ParsedPayload::Json(value))
150        }
151        PayloadFormat::MsgPack => {
152            // `&mut &[u8]` is the io::Read cursor; advances as it decodes. SINGLE
153            // native decode -- no rmp_serde, no serde_json, no re-encode.
154            //
155            // Bound nesting depth: a malicious/corrupt payload can encode deep
156            // nesting that drives recursive decode into worker-thread stack
157            // exhaustion. rmpv defaults to 1024; tighten to the shared parse-path
158            // bound on this untrusted path.
159            let mut cursor: &[u8] = payload.as_ref();
160            let value = rmpv::decode::read_value_with_max_depth(
161                &mut cursor,
162                crate::parse_guard::MAX_PARSE_DEPTH,
163            )?;
164            // One value per record. Leftover bytes mean corruption, framing
165            // misalignment, or concatenated values -- reject. MsgPack-stream
166            // framing is a separate deferred feature; do NOT silently accept.
167            let remaining = cursor.len();
168            if remaining > 0 {
169                return Err(CodecError::TrailingBytes(remaining));
170            }
171            Ok(ParsedPayload::MsgPack(value))
172        }
173    }
174}
175
176/// Serialise a JSON value to bytes via [`sonic_rs`] (SIMD), no bridge.
177///
178/// The inverse of the JSON arm of [`parse`]. Reuses sonic_rs end-to-end so a
179/// transform that mutates a parsed JSON value re-emits it without ever touching
180/// `serde_json`.
181///
182/// # Errors
183///
184/// Returns [`CodecError::Json`] if sonic_rs fails to serialise the value.
185pub fn to_json_bytes(value: &sonic_rs::Value) -> Result<Bytes, CodecError> {
186    let buf = sonic_rs::to_vec(value)?;
187    Ok(Bytes::from(buf))
188}
189
190/// Serialise a MsgPack value to bytes via NATIVE [`rmpv::encode::write_value`].
191///
192/// The inverse of the MsgPack arm of [`parse`]. This is the native rmpv encoder
193/// -- NOT `rmp_serde`, NOT a JSON bridge. A transform that mutates a parsed
194/// `rmpv::Value` re-emits MsgPack wire bytes with a single native encode, no
195/// intermediate `serde_json::Value`, no re-parse.
196///
197/// # Errors
198///
199/// Returns [`CodecError::Encode`] if the encoder fails to write the value. For
200/// an in-memory `Vec` writer this is effectively unreachable, but the encoder
201/// is fallible so the error is surfaced rather than unwrapped.
202pub fn to_msgpack_bytes(value: &rmpv::Value) -> Result<Bytes, CodecError> {
203    // write_value writes into any `io::Write`; a Vec<u8> is one and never
204    // returns a short write, so the only failure path is the encoder's own.
205    let mut buf: Vec<u8> = Vec::new();
206    rmpv::encode::write_value(&mut buf, value)?;
207    Ok(Bytes::from(buf))
208}
209
210impl ParsedPayload {
211    /// Whether the payload was decoded from JSON.
212    #[must_use]
213    pub fn is_json(&self) -> bool {
214        matches!(self, Self::Json(_))
215    }
216
217    /// Whether the payload was decoded from MsgPack.
218    #[must_use]
219    pub fn is_msgpack(&self) -> bool {
220        matches!(self, Self::MsgPack(_))
221    }
222
223    /// Read a top-level string field, format-agnostic.
224    ///
225    /// The common routing case: a router keys off one string field and does not
226    /// care about wire format. Returns `None` if the value is not a top-level
227    /// object, the key is absent, or the field is not a string. Borrows from the
228    /// parsed value (zero-copy).
229    ///
230    /// Top-level lookup only -- see the module docs.
231    #[must_use]
232    pub fn field_str(&self, name: &str) -> Option<&str> {
233        match self {
234            Self::Json(v) => v.get(name).and_then(|f| f.as_str()),
235            Self::MsgPack(v) => msgpack_field(v, name).and_then(rmpv::Value::as_str),
236        }
237    }
238
239    /// Read a top-level field as a format-agnostic [`FieldRef`].
240    ///
241    /// Returns `None` only when the value is not a top-level object or the key
242    /// is absent. A present-but-non-scalar field yields [`FieldRef::Other`]
243    /// (routers never key off containers). Borrows from the parsed value.
244    ///
245    /// Top-level lookup only -- see the module docs.
246    #[must_use]
247    pub fn field(&self, name: &str) -> Option<FieldRef<'_>> {
248        match self {
249            Self::Json(v) => v.get(name).map(json_field_ref),
250            Self::MsgPack(v) => msgpack_field(v, name).map(msgpack_field_ref),
251        }
252    }
253
254    /// Serialise back to the payload's OWN wire format.
255    ///
256    /// Same format in, same format out -- no cross-format conversion, no bridge.
257    ///
258    /// ## Pass-through contract -- DO NOT round-trip untouched records
259    ///
260    /// `to_bytes` is ONLY for a record a transform actually mutated. A record
261    /// the transform did NOT change must re-use its original `Record.payload`
262    /// directly on egress ("serde is the enemy / zero re-representation").
263    /// Calling `to_bytes` on an unmodified record pays a parse + re-serialise for
264    /// nothing AND can alter the wire bytes (key order, number formatting,
265    /// whitespace) even though the value is identical.
266    ///
267    /// No `to_bytes_as` cross-format egress: `sonic_rs::Value` and `rmpv::Value`
268    /// have no native conversion, so bridging would reintroduce the exact double-
269    /// representation this spine exists to avoid (YAGNI).
270    ///
271    /// # Errors
272    ///
273    /// Returns [`CodecError::Json`] (JSON serialise) or [`CodecError::Encode`]
274    /// (MsgPack serialise) on encoder failure.
275    pub fn to_bytes(&self) -> Result<Bytes, CodecError> {
276        match self {
277            Self::Json(v) => to_json_bytes(v),
278            Self::MsgPack(v) => to_msgpack_bytes(v),
279        }
280    }
281}
282
283/// Classify a sonic_rs JSON value into a [`FieldRef`] (borrows from `v`).
284///
285/// Order matters: probe the scalar accessors in turn. `as_i64` is tried before
286/// `as_f64` so integers stay [`FieldRef::Int`]; a JSON number with a fractional
287/// part falls through to [`FieldRef::Float`].
288fn json_field_ref(v: &sonic_rs::Value) -> FieldRef<'_> {
289    if let Some(s) = v.as_str() {
290        FieldRef::Str(s)
291    } else if v.is_null() {
292        FieldRef::Null
293    } else if let Some(b) = v.as_bool() {
294        FieldRef::Bool(b)
295    } else if let Some(i) = v.as_i64() {
296        FieldRef::Int(i)
297    } else if let Some(f) = v.as_f64() {
298        FieldRef::Float(f)
299    } else {
300        FieldRef::Other
301    }
302}
303
304/// Find a top-level value for `name` in an rmpv MsgPack value.
305///
306/// Only a [`rmpv::Value::Map`] has named fields. The map is a `Vec<(Value,
307/// Value)>`, so this is a linear scan -- routing maps are small (a handful of
308/// keys), so a linear scan beats building an index. Only string keys match.
309fn msgpack_field<'a>(v: &'a rmpv::Value, name: &str) -> Option<&'a rmpv::Value> {
310    match v {
311        rmpv::Value::Map(pairs) => pairs
312            .iter()
313            .find(|(k, _)| k.as_str() == Some(name))
314            .map(|(_, val)| val),
315        _ => None,
316    }
317}
318
319/// Classify an rmpv MsgPack value into a [`FieldRef`].
320///
321/// MsgPack integers split into signed/unsigned at the wire level; both fold to
322/// `i64` here when they fit. An unsigned value above `i64::MAX` cannot fit `i64`
323/// and is surfaced as [`FieldRef::Float`] via `as_f64` (lossy but it keeps a
324/// numeric field numeric for routing) rather than dropped to `Other`.
325fn msgpack_field_ref(v: &rmpv::Value) -> FieldRef<'_> {
326    match v {
327        rmpv::Value::String(s) => s.as_str().map_or(FieldRef::Other, FieldRef::Str),
328        rmpv::Value::Nil => FieldRef::Null,
329        rmpv::Value::Boolean(b) => FieldRef::Bool(*b),
330        rmpv::Value::Integer(_) => v
331            .as_i64()
332            .map(FieldRef::Int)
333            .or_else(|| v.as_f64().map(FieldRef::Float))
334            .unwrap_or(FieldRef::Other),
335        rmpv::Value::F32(f) => FieldRef::Float(f64::from(*f)),
336        rmpv::Value::F64(f) => FieldRef::Float(*f),
337        // Map / Array / Binary / Ext: routers do not key off containers.
338        _ => FieldRef::Other,
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    // ---- Helpers: build real MsgPack blobs by hand (no serde encode) -------
347    //
348    // We hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv
349    // decoder against the real wire format, not a serde round-trip.
350
351    /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
352    fn fixstr(s: &str) -> Vec<u8> {
353        let bytes = s.as_bytes();
354        assert!(bytes.len() < 32, "fixstr helper only handles len < 32");
355        let len = u8::try_from(bytes.len()).expect("len < 32 fits u8");
356        let mut out = vec![0xa0 | len];
357        out.extend_from_slice(bytes);
358        out
359    }
360
361    /// fixmap header: 0x80 | n (n < 16 entries).
362    fn fixmap_header(n: u8) -> u8 {
363        assert!(n < 16, "fixmap helper only handles < 16 entries");
364        0x80 | n
365    }
366
367    /// Build a logical record `{"_table": "events", "org_id": 42, "live":
368    /// true, "ratio": <f64>, "missing": nil}` as a MsgPack fixmap.
369    fn sample_msgpack() -> Bytes {
370        let mut buf = vec![fixmap_header(5)];
371        // "_table": "events"
372        buf.extend(fixstr("_table"));
373        buf.extend(fixstr("events"));
374        // "org_id": 42  (positive fixint -- the byte is its own value)
375        buf.extend(fixstr("org_id"));
376        buf.push(42);
377        // "live": true (0xc3)
378        buf.extend(fixstr("live"));
379        buf.push(0xc3);
380        // "ratio": 1.5 (float64 0xcb + 8 bytes big-endian)
381        buf.extend(fixstr("ratio"));
382        buf.push(0xcb);
383        buf.extend_from_slice(&1.5f64.to_be_bytes());
384        // "missing": nil (0xc0)
385        buf.extend(fixstr("missing"));
386        buf.push(0xc0);
387        Bytes::from(buf)
388    }
389
390    /// The same logical record as JSON.
391    fn sample_json() -> Bytes {
392        Bytes::from_static(
393            br#"{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":null}"#,
394        )
395    }
396
397    // ---- parse(): JSON -----------------------------------------------------
398
399    #[test]
400    fn parse_json_object() {
401        let parsed = parse(&sample_json(), PayloadFormat::Json).unwrap();
402        assert!(parsed.is_json());
403        assert!(!parsed.is_msgpack());
404        assert_eq!(parsed.field_str("_table"), Some("events"));
405    }
406
407    #[test]
408    fn parse_json_array_is_ok() {
409        // A top-level array is valid JSON; it simply has no named fields.
410        let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
411        assert!(parsed.is_json());
412        assert_eq!(parsed.field_str("anything"), None);
413    }
414
415    #[test]
416    fn parse_msgpack_rejects_excessive_nesting() {
417        // 70 nested single-element arrays exceed the 64-level decode-depth bound
418        // that guards worker-thread stacks against a deeply-nested hostile or
419        // corrupt payload. 0x91 = fixarray of 1 element; 0xc0 = nil leaf.
420        let mut buf = vec![0x91u8; 70];
421        buf.push(0xc0);
422        let result = parse(&Bytes::from(buf), PayloadFormat::MsgPack);
423        assert!(
424            matches!(result, Err(CodecError::MsgPack(_))),
425            "deeply nested msgpack must be rejected by the depth bound, got {result:?}"
426        );
427    }
428
429    #[test]
430    fn parse_msgpack_allows_reasonable_nesting() {
431        // Nesting well under the bound parses fine (no false positive).
432        let mut buf = vec![0x91u8; 8];
433        buf.push(0xc0);
434        assert!(parse(&Bytes::from(buf), PayloadFormat::MsgPack).is_ok());
435    }
436
437    // ---- parse(): MsgPack (native rmpv, hand-rolled bytes) -----------------
438
439    #[test]
440    fn parse_msgpack_map() {
441        let parsed = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
442        assert!(parsed.is_msgpack());
443        assert!(!parsed.is_json());
444        assert_eq!(parsed.field_str("_table"), Some("events"));
445    }
446
447    #[test]
448    fn parse_minimal_fixmap() {
449        // {"k": "v"} -- the smallest interesting map.
450        let mut buf = vec![fixmap_header(1)];
451        buf.extend(fixstr("k"));
452        buf.extend(fixstr("v"));
453        let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
454        assert_eq!(parsed.field_str("k"), Some("v"));
455    }
456
457    // ---- Auto detection dispatch ------------------------------------------
458
459    #[test]
460    fn parse_auto_dispatches_to_json() {
461        let parsed = parse(&sample_json(), PayloadFormat::Auto).unwrap();
462        assert!(parsed.is_json(), "object byte '{{' must detect as JSON");
463        assert_eq!(parsed.field_str("_table"), Some("events"));
464    }
465
466    #[test]
467    fn parse_auto_dispatches_to_msgpack() {
468        let parsed = parse(&sample_msgpack(), PayloadFormat::Auto).unwrap();
469        assert!(
470            parsed.is_msgpack(),
471            "fixmap byte 0x85 must detect as MsgPack"
472        );
473        assert_eq!(parsed.field_str("_table"), Some("events"));
474    }
475
476    // ---- Unified accessor: SAME field value from BOTH formats --------------
477
478    #[test]
479    fn field_str_identical_across_formats() {
480        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
481        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
482        // The whole point: same logical record, same routing field, regardless
483        // of wire format.
484        assert_eq!(j.field_str("_table"), m.field_str("_table"));
485        assert_eq!(j.field_str("_table"), Some("events"));
486    }
487
488    #[test]
489    fn field_str_returns_none_for_non_string() {
490        // org_id is an int -- field_str only returns strings.
491        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
492        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
493        assert_eq!(j.field_str("org_id"), None);
494        assert_eq!(m.field_str("org_id"), None);
495    }
496
497    #[test]
498    fn field_str_returns_none_for_missing_key() {
499        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
500        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
501        assert_eq!(j.field_str("nope"), None);
502        assert_eq!(m.field_str("nope"), None);
503    }
504
505    #[test]
506    fn field_str_value_is_present_via_field_too() {
507        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
508        assert_eq!(j.field("_table"), Some(FieldRef::Str("events")));
509    }
510
511    #[test]
512    fn field_int_identical_across_formats() {
513        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
514        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
515        assert_eq!(j.field("org_id"), Some(FieldRef::Int(42)));
516        assert_eq!(m.field("org_id"), Some(FieldRef::Int(42)));
517    }
518
519    #[test]
520    fn field_bool_identical_across_formats() {
521        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
522        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
523        assert_eq!(j.field("live"), Some(FieldRef::Bool(true)));
524        assert_eq!(m.field("live"), Some(FieldRef::Bool(true)));
525    }
526
527    #[test]
528    fn field_float_identical_across_formats() {
529        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
530        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
531        assert_eq!(j.field("ratio"), Some(FieldRef::Float(1.5)));
532        assert_eq!(m.field("ratio"), Some(FieldRef::Float(1.5)));
533    }
534
535    #[test]
536    fn field_null_identical_across_formats() {
537        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
538        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
539        assert_eq!(j.field("missing"), Some(FieldRef::Null));
540        assert_eq!(m.field("missing"), Some(FieldRef::Null));
541    }
542
543    #[test]
544    fn field_missing_key_is_none_for_both() {
545        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
546        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
547        assert_eq!(j.field("nope"), None);
548        assert_eq!(m.field("nope"), None);
549    }
550
551    #[test]
552    fn field_nested_object_is_other() {
553        // A field whose value is a container collapses to Other, not None.
554        let j = parse(
555            &Bytes::from_static(br#"{"k":{"nested":1}}"#),
556            PayloadFormat::Json,
557        )
558        .unwrap();
559        assert_eq!(j.field("k"), Some(FieldRef::Other));
560        // ...but it is not a routing scalar, so field_str is None.
561        assert_eq!(j.field_str("k"), None);
562
563        // MsgPack: {"k": [1]} -- an array value also collapses to Other.
564        // fixmap(1) "k" -> fixarray(1) [positive fixint 1]
565        let mut buf = vec![fixmap_header(1)];
566        buf.extend(fixstr("k"));
567        buf.push(0x91); // fixarray with 1 element
568        buf.push(0x01); // positive fixint 1
569        let m = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
570        assert_eq!(m.field("k"), Some(FieldRef::Other));
571    }
572
573    #[test]
574    fn field_on_non_object_top_level_is_none() {
575        // A top-level JSON array has no named fields.
576        let j = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
577        assert_eq!(j.field("0"), None);
578
579        // A top-level MsgPack array (fixarray) likewise.
580        // fixarray(2) [1, 2]
581        let m = parse(&Bytes::from(vec![0x92, 0x01, 0x02]), PayloadFormat::MsgPack).unwrap();
582        assert_eq!(m.field("0"), None);
583    }
584
585    // ---- Error cases -------------------------------------------------------
586
587    #[test]
588    fn malformed_json_errors() {
589        let err = parse(&Bytes::from_static(b"{not valid json"), PayloadFormat::Json).unwrap_err();
590        assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
591        assert!(!err.to_string().is_empty());
592    }
593
594    #[test]
595    fn empty_blob_auto_errors_as_json() {
596        // detect() maps empty -> Json; empty is not valid JSON.
597        let err = parse(&Bytes::new(), PayloadFormat::Auto).unwrap_err();
598        assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
599    }
600
601    #[test]
602    fn malformed_msgpack_errors() {
603        // 0x81 declares a fixmap with one entry but supplies no key/value.
604        let err = parse(&Bytes::from_static(&[0x81]), PayloadFormat::MsgPack).unwrap_err();
605        assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
606        assert!(!err.to_string().is_empty());
607    }
608
609    #[test]
610    fn msgpack_truncated_float_errors() {
611        // 0xcb declares a float64 but supplies only 3 of the 8 payload bytes.
612        let mut buf = vec![fixmap_header(1)];
613        buf.extend(fixstr("ratio"));
614        buf.push(0xcb);
615        buf.extend_from_slice(&[0x00, 0x01, 0x02]); // short
616        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap_err();
617        assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
618    }
619
620    // ---- Task 0.3b: serialise-out (round-trips through real bytes) ---------
621    //
622    // The contract is "parse -> (mutate) -> serialise -> parse again preserves
623    // the logical value". We assert on re-parsed VALUES (via the unified
624    // accessor), NOT on raw bytes: re-serialise may reorder keys or reformat
625    // numbers, so a byte-for-byte equality assertion would be wrong.
626
627    /// Compare every routing field of the canonical sample across two payloads.
628    fn assert_sample_fields_eq(a: &ParsedPayload, b: &ParsedPayload) {
629        assert_eq!(a.field("_table"), b.field("_table"));
630        assert_eq!(a.field("org_id"), b.field("org_id"));
631        assert_eq!(a.field("live"), b.field("live"));
632        assert_eq!(a.field("ratio"), b.field("ratio"));
633        assert_eq!(a.field("missing"), b.field("missing"));
634    }
635
636    #[test]
637    fn json_to_bytes_round_trips() {
638        // parse JSON -> to_bytes -> parse again -> values equal.
639        let original = parse(&sample_json(), PayloadFormat::Json).unwrap();
640        assert!(original.is_json());
641
642        let bytes = original.to_bytes().unwrap();
643        assert!(!bytes.is_empty());
644
645        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
646        assert!(reparsed.is_json(), "JSON must round-trip as JSON");
647        assert_sample_fields_eq(&original, &reparsed);
648    }
649
650    #[test]
651    fn msgpack_to_bytes_round_trips_via_native_bytes() {
652        // parse MsgPack (hand-rolled bytes) -> to_bytes (native rmpv encode)
653        // -> parse again -> values equal. No serde, no JSON bridge anywhere.
654        let original = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
655        assert!(original.is_msgpack());
656
657        let bytes = original.to_bytes().unwrap();
658        assert!(!bytes.is_empty());
659        // First byte must be a MsgPack map marker (fixmap 0x80..=0x8f for 5
660        // entries -> 0x85), proving native MsgPack came out, not JSON.
661        assert_eq!(bytes[0], fixmap_header(5), "expected fixmap(5) wire marker");
662
663        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
664        assert!(reparsed.is_msgpack(), "MsgPack must round-trip as MsgPack");
665        assert_sample_fields_eq(&original, &reparsed);
666    }
667
668    #[test]
669    fn to_json_bytes_reparses_to_same_value() {
670        // Free function: serialise a sonic_rs::Value, re-parse, compare.
671        let ParsedPayload::Json(value) = parse(&sample_json(), PayloadFormat::Json).unwrap() else {
672            panic!("expected JSON");
673        };
674        let bytes = to_json_bytes(&value).unwrap();
675        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
676        assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
677        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
678        assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
679    }
680
681    #[test]
682    fn to_msgpack_bytes_reparses_to_same_value() {
683        // Free function: serialise an rmpv::Value via native write_value,
684        // re-parse, compare.
685        let ParsedPayload::MsgPack(value) =
686            parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap()
687        else {
688            panic!("expected MsgPack");
689        };
690        let bytes = to_msgpack_bytes(&value).unwrap();
691        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
692        assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
693        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
694        assert_eq!(reparsed.field("live"), Some(FieldRef::Bool(true)));
695        assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
696        assert_eq!(reparsed.field("missing"), Some(FieldRef::Null));
697    }
698
699    #[test]
700    fn to_bytes_preserves_a_mutated_json_field() {
701        // The realistic case: a transform CHANGED a field, then re-serialises.
702        // Only then is to_bytes the right tool (unmodified records pass through
703        // the original Bytes -- see the to_bytes doc).
704        let ParsedPayload::Json(mut value) = parse(&sample_json(), PayloadFormat::Json).unwrap()
705        else {
706            panic!("expected JSON");
707        };
708        // Mutate _table in place via sonic_rs's object insert (overwrites the
709        // existing key) -- the value model is what gets re-serialised.
710        value.insert("_table", sonic_rs::Value::from("audit"));
711        let bytes = to_json_bytes(&value).unwrap();
712        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
713        assert_eq!(reparsed.field_str("_table"), Some("audit"));
714        // Untouched siblings survive the round-trip.
715        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
716    }
717
718    #[test]
719    fn json_to_bytes_handles_top_level_array() {
720        // Egress is not object-only: a top-level array must round-trip too.
721        let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
722        let bytes = parsed.to_bytes().unwrap();
723        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
724        assert!(reparsed.is_json());
725        // No named fields either way; the value re-parses without error.
726        assert_eq!(reparsed.field_str("anything"), None);
727    }
728
729    #[test]
730    fn msgpack_to_bytes_handles_top_level_scalar() {
731        // A bare MsgPack integer (positive fixint 42) round-trips as itself,
732        // not wrapped in a map.
733        let parsed = parse(&Bytes::from(vec![42u8]), PayloadFormat::MsgPack).unwrap();
734        let bytes = parsed.to_bytes().unwrap();
735        assert_eq!(
736            bytes.as_ref(),
737            &[42u8],
738            "fixint must re-emit byte-identical"
739        );
740        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
741        assert!(reparsed.is_msgpack());
742    }
743
744    #[test]
745    fn double_round_trip_is_stable() {
746        // parse -> to_bytes -> parse -> to_bytes: the SECOND serialise must
747        // equal the first (the value model is the fixed point, even if it
748        // differs from the original hand-rolled bytes).
749        let first = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
750        let b1 = first.to_bytes().unwrap();
751        let second = parse(&b1, PayloadFormat::MsgPack).unwrap();
752        let b2 = second.to_bytes().unwrap();
753        assert_eq!(b1, b2, "re-serialising a re-parsed value must be stable");
754    }
755
756    // ---- Phase 5: trailing-bytes hardening ------------------------------------
757
758    #[test]
759    fn msgpack_rejects_trailing_bytes() {
760        // A valid fixmap {"k": "v"} followed by a stray nil byte (0xc0).
761        // Before the fix this returns Ok (silently ignoring 0xc0).
762        // After the fix it must return CodecError::TrailingBytes(1).
763        let mut buf = vec![fixmap_header(1)];
764        buf.extend(fixstr("k"));
765        buf.extend(fixstr("v"));
766        buf.push(0xc0); // stray nil -- trailing garbage
767        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
768            .expect_err("trailing byte must be rejected");
769        match err {
770            CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
771            other => panic!("expected TrailingBytes(1), got {other:?}"),
772        }
773    }
774
775    #[test]
776    fn msgpack_rejects_concatenated_values() {
777        // Two valid MsgPack values back-to-back: fixint 1 then fixint 2.
778        // parse() decodes ONE value; the second byte is trailing and must error.
779        let buf = vec![0x01u8, 0x02u8]; // positive fixint 1, positive fixint 2
780        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
781            .expect_err("concatenated values must be rejected");
782        match err {
783            CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
784            other => panic!("expected TrailingBytes(1), got {other:?}"),
785        }
786    }
787
788    #[test]
789    fn msgpack_clean_single_value_still_parses_ok() {
790        // A single valid fixmap with no trailing bytes -- must still parse Ok.
791        // Regression guard: the fix must not break the happy path.
792        let mut buf = vec![fixmap_header(1)];
793        buf.extend(fixstr("k"));
794        buf.extend(fixstr("v"));
795        let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
796        assert_eq!(parsed.field_str("k"), Some("v"));
797    }
798
799    #[test]
800    fn json_rejects_trailing_garbage() {
801        // The MsgPack path rejects trailing bytes (CodecError::TrailingBytes).
802        // The JSON path delegates to sonic_rs, which rejects trailing
803        // NON-whitespace content after a complete value (serde_json semantics).
804        // A valid object followed by stray garbage must error.
805        let mut buf = br#"{"_table":"events"}"#.to_vec();
806        buf.extend_from_slice(b"garbage");
807        let err = parse(&Bytes::from(buf), PayloadFormat::Json)
808            .expect_err("trailing non-whitespace garbage must be rejected");
809        assert!(
810            matches!(err, CodecError::Json(_)),
811            "expected CodecError::Json for trailing garbage, got {err:?}"
812        );
813    }
814
815    #[test]
816    fn json_accepts_trailing_whitespace() {
817        // sonic_rs (like serde_json) tolerates trailing whitespace after a
818        // complete value -- a pretty-printer's trailing newline must not be a
819        // parse error. Asserted alongside the trailing-garbage rejection so the
820        // JSON trailing-byte contract is pinned both ways.
821        let mut buf = br#"{"_table":"events"}"#.to_vec();
822        buf.extend_from_slice(b" \t\r\n");
823        let parsed = parse(&Bytes::from(buf), PayloadFormat::Json)
824            .expect("trailing whitespace must be accepted");
825        assert_eq!(parsed.field_str("_table"), Some("events"));
826    }
827
828    #[test]
829    fn json_parsed_as_msgpack_errors() {
830        // Force the wrong decoder: JSON bytes through the MsgPack path. '{' is
831        // 0x7b, which rmpv reads as a positive fixint -- a single-byte value
832        // -- leaving the remaining 69 bytes of the JSON payload as trailing
833        // bytes. After Phase 5 hardening this MUST error with TrailingBytes.
834        let err = parse(&sample_json(), PayloadFormat::MsgPack)
835            .expect_err("JSON fed to MsgPack path must error after trailing-bytes hardening");
836        assert!(
837            matches!(err, CodecError::TrailingBytes(_)),
838            "expected TrailingBytes, got {err:?}"
839        );
840    }
841}