Skip to main content

hyperi_rustlib/transport/
codec.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/codec.rs
3// Purpose:   Parse-on-demand WorkBatch codec (native JSON + MsgPack, no bridge)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Parse-on-demand codec (Task 0.3a)
10//!
11//! The data-plane spine frames bytes into a [`WorkBatch`](super::WorkBatch)
12//! WITHOUT parsing (Task 0.2). A transform / router that needs to read a field
13//! parses on demand -- and it should not care whether the record arrived as
14//! JSON or MsgPack. This module is that parse step.
15//!
16//! ## Native, no JSON bridge
17//!
18//! There is no `rmp_serde -> serde_json::Value -> serde_json::to_vec ->
19//! sonic_rs` bridge anywhere on the parse path -- that double-parse-and-
20//! re-serialise was killed in Phase 0.7c (the engine `parse.rs` MsgPack path
21//! now walks `rmpv` straight into a `sonic_rs::Value`). Both this codec and
22//! the engine decode natively:
23//!
24//! - **JSON** is parsed once with [`sonic_rs`] (SIMD, AVX2/NEON).
25//! - **MsgPack** is parsed once with [`rmpv`] -- the schema-less `Value` decoder
26//!   from the same `3Hren/msgpack-rust` workspace as `rmp-serde`. No
27//!   intermediate `serde_json::Value`, no JSON re-serialise.
28//!
29//! ## Unified routing-field accessor
30//!
31//! A router keys off ONE field (`_table`, `org_id`, ...) and must not branch on
32//! wire format. [`ParsedPayload`] exposes a format-agnostic accessor:
33//!
34//! - [`ParsedPayload::field_str`] -- the common case: a top-level string field.
35//! - [`ParsedPayload::field`] -- a [`FieldRef`] covering the scalar routing
36//!   cases (string / int / float / bool / null), with everything else
37//!   ([`FieldRef::Other`]) deliberately collapsed because routers do not key
38//!   off nested containers.
39//!
40//! **Scope:** top-level object-key lookup only. No deep JSON-path -- routing
41//! keys live at the top level, and a deep-path query is a separate concern that
42//! YAGNI keeps out of the hot routing path.
43//!
44//! See `docs/MIGRATIONS.md` (codec consolidation: native rmpv, JSON bridge
45//! removed) and `docs/SELF-REGULATION.md` for where this codec sits in the
46//! `WorkBatch` data-plane spine. The block contract is in
47//! [`WorkBatch`](crate::transport::WorkBatch).
48
49use super::types::PayloadFormat;
50use bytes::Bytes;
51use sonic_rs::JsonValueTrait as _;
52use thiserror::Error;
53
54/// A parse failure, tagged by the format that failed.
55#[derive(Debug, Error)]
56pub enum CodecError {
57    /// JSON parse failed (sonic_rs SIMD parser).
58    #[error("json parse error: {0}")]
59    Json(#[from] sonic_rs::Error),
60
61    /// MsgPack parse failed (native rmpv decoder).
62    #[error("msgpack parse error: {0}")]
63    MsgPack(#[from] rmpv::decode::Error),
64
65    /// MsgPack serialise failed (native rmpv encoder).
66    ///
67    /// `rmpv::encode::Error` is `rmp::encode::ValueWriteError` -- an I/O write
68    /// failure from the underlying writer. Serialising into an in-memory `Vec`
69    /// effectively never fails, but the encoder is fallible so we surface it
70    /// rather than panic. JSON serialise reuses [`CodecError::Json`]
71    /// (`sonic_rs::Error` already covers both parse and serialise).
72    #[error("msgpack encode error: {0}")]
73    Encode(#[from] rmpv::encode::Error),
74
75    /// Trailing bytes remain after a complete MsgPack value was decoded.
76    ///
77    /// A single-record payload must encode exactly ONE value with no leftover
78    /// bytes. Trailing bytes indicate corruption, a framing error, or two
79    /// values concatenated (MsgPack stream framing is a separate, deferred
80    /// feature -- it is NOT supported here).
81    ///
82    /// The `usize` is the number of bytes that remained unconsumed.
83    #[error("msgpack trailing bytes: {0} byte(s) remain after value")]
84    TrailingBytes(usize),
85}
86
87/// A parsed payload, retaining its native value representation.
88///
89/// JSON stays a [`sonic_rs::Value`] (so the SIMD parse is not thrown away) and
90/// MsgPack stays an [`rmpv::Value`] (native, no JSON bridge). A consumer that
91/// only needs a routing field should reach for [`ParsedPayload::field_str`] /
92/// [`ParsedPayload::field`] rather than matching the variant -- that is the
93/// whole point of the unified accessor.
94#[derive(Debug, Clone)]
95pub enum ParsedPayload {
96    /// JSON value parsed by sonic_rs (SIMD).
97    Json(sonic_rs::Value),
98    /// MsgPack value parsed natively by rmpv (no serde_json bridge).
99    MsgPack(rmpv::Value),
100}
101
102/// A borrowed view of one routing field, format-agnostic.
103///
104/// This is the shared currency the unified accessor returns so a router need
105/// not know whether the record was JSON or MsgPack. It covers the scalar cases
106/// a router actually keys off; nested objects / arrays / binary / ext collapse
107/// to [`FieldRef::Other`] because routing never branches on a container.
108///
109/// `Str` borrows from the parsed value (zero-copy); the numeric / bool variants
110/// are `Copy` scalars.
111#[derive(Debug, Clone, Copy, PartialEq)]
112pub enum FieldRef<'a> {
113    /// A string field (borrowed from the parsed value).
114    Str(&'a str),
115    /// An integer field (MsgPack ints and JSON integers fold to `i64`).
116    Int(i64),
117    /// A floating-point field.
118    Float(f64),
119    /// A boolean field.
120    Bool(bool),
121    /// An explicit null / nil field.
122    Null,
123    /// Present but not a routing scalar (object, array, binary, ext, ...).
124    Other,
125}
126
127/// Parse a framed payload into a native [`ParsedPayload`].
128///
129/// - [`PayloadFormat::Json`] -> sonic_rs (SIMD).
130/// - [`PayloadFormat::MsgPack`] -> rmpv (native, no JSON bridge).
131/// - [`PayloadFormat::Auto`] -> [`PayloadFormat::detect`] then dispatch. An empty
132///   blob detects as JSON (matching `detect`'s contract) and surfaces a
133///   [`CodecError::Json`] -- empty input is not valid JSON.
134///
135/// # Errors
136///
137/// Returns [`CodecError::Json`] or [`CodecError::MsgPack`] when the bytes are
138/// malformed for the (detected or declared) format.
139pub fn parse(payload: &Bytes, format: PayloadFormat) -> Result<ParsedPayload, CodecError> {
140    let effective = match format {
141        PayloadFormat::Auto => PayloadFormat::detect(payload),
142        other => other,
143    };
144
145    match effective {
146        // detect() never yields Auto, but treat a residual Auto as JSON.
147        PayloadFormat::Json | PayloadFormat::Auto => {
148            let value: sonic_rs::Value = sonic_rs::from_slice(payload)?;
149            Ok(ParsedPayload::Json(value))
150        }
151        PayloadFormat::MsgPack => {
152            // rmpv::decode::read_value reads from any `io::Read`; a byte slice
153            // is one. `&mut &[u8]` advances the cursor as it decodes. This is a
154            // SINGLE native decode -- no rmp_serde, no serde_json, no re-encode.
155            let mut cursor: &[u8] = payload.as_ref();
156            let value = rmpv::decode::read_value(&mut cursor)?;
157            // A single-record payload encodes exactly ONE value. Any bytes still
158            // in `cursor` after the value was decoded indicate corruption,
159            // framing misalignment, or concatenated values. Reject them.
160            // MsgPack-stream framing (multiple values per payload) is a separate
161            // deferred feature -- do NOT silently accept trailing bytes here.
162            let remaining = cursor.len();
163            if remaining > 0 {
164                return Err(CodecError::TrailingBytes(remaining));
165            }
166            Ok(ParsedPayload::MsgPack(value))
167        }
168    }
169}
170
171/// Serialise a JSON value to bytes via [`sonic_rs`] (SIMD), no bridge.
172///
173/// The inverse of the JSON arm of [`parse`]. Reuses sonic_rs end-to-end so a
174/// transform that mutates a parsed JSON value re-emits it without ever touching
175/// `serde_json`.
176///
177/// # Errors
178///
179/// Returns [`CodecError::Json`] if sonic_rs fails to serialise the value.
180pub fn to_json_bytes(value: &sonic_rs::Value) -> Result<Bytes, CodecError> {
181    let buf = sonic_rs::to_vec(value)?;
182    Ok(Bytes::from(buf))
183}
184
185/// Serialise a MsgPack value to bytes via NATIVE [`rmpv::encode::write_value`].
186///
187/// The inverse of the MsgPack arm of [`parse`]. This is the native rmpv encoder
188/// -- NOT `rmp_serde`, NOT a JSON bridge. A transform that mutates a parsed
189/// `rmpv::Value` re-emits MsgPack wire bytes with a single native encode, no
190/// intermediate `serde_json::Value`, no re-parse.
191///
192/// # Errors
193///
194/// Returns [`CodecError::Encode`] if the encoder fails to write the value. For
195/// an in-memory `Vec` writer this is effectively unreachable, but the encoder
196/// is fallible so the error is surfaced rather than unwrapped.
197pub fn to_msgpack_bytes(value: &rmpv::Value) -> Result<Bytes, CodecError> {
198    // write_value writes into any `io::Write`; a Vec<u8> is one and never
199    // returns a short write, so the only failure path is the encoder's own.
200    let mut buf: Vec<u8> = Vec::new();
201    rmpv::encode::write_value(&mut buf, value)?;
202    Ok(Bytes::from(buf))
203}
204
205impl ParsedPayload {
206    /// Whether the payload was decoded from JSON.
207    #[must_use]
208    pub fn is_json(&self) -> bool {
209        matches!(self, Self::Json(_))
210    }
211
212    /// Whether the payload was decoded from MsgPack.
213    #[must_use]
214    pub fn is_msgpack(&self) -> bool {
215        matches!(self, Self::MsgPack(_))
216    }
217
218    /// Read a top-level string field, format-agnostic.
219    ///
220    /// The common routing case: a router keys off one string field and does not
221    /// care about wire format. Returns `None` if the value is not a top-level
222    /// object, the key is absent, or the field is not a string. Borrows from the
223    /// parsed value (zero-copy).
224    ///
225    /// Top-level lookup only -- see the module docs.
226    #[must_use]
227    pub fn field_str(&self, name: &str) -> Option<&str> {
228        match self {
229            Self::Json(v) => v.get(name).and_then(|f| f.as_str()),
230            Self::MsgPack(v) => msgpack_field(v, name).and_then(rmpv::Value::as_str),
231        }
232    }
233
234    /// Read a top-level field as a format-agnostic [`FieldRef`].
235    ///
236    /// Returns `None` only when the value is not a top-level object or the key
237    /// is absent. A present-but-non-scalar field yields [`FieldRef::Other`]
238    /// (routers never key off containers). Borrows from the parsed value.
239    ///
240    /// Top-level lookup only -- see the module docs.
241    #[must_use]
242    pub fn field(&self, name: &str) -> Option<FieldRef<'_>> {
243        match self {
244            Self::Json(v) => v.get(name).map(json_field_ref),
245            Self::MsgPack(v) => msgpack_field(v, name).map(msgpack_field_ref),
246        }
247    }
248
249    /// Serialise back to the payload's OWN wire format (Task 0.3b).
250    ///
251    /// `Json` -> JSON bytes (via [`to_json_bytes`]), `MsgPack` -> MsgPack bytes
252    /// (via [`to_msgpack_bytes`]). Same format in, same format out -- no
253    /// cross-format conversion, no bridge.
254    ///
255    /// ## Pass-through contract -- DO NOT round-trip untouched records
256    ///
257    /// This is the egress face of a *parse-on-demand* spine. The governing
258    /// principle is "serde is the enemy / zero re-representation": a record that
259    /// a transform did NOT change must re-use its original `Record.payload`
260    /// (the `Bytes` it arrived as) directly on egress. `to_bytes` is ONLY for a
261    /// record a transform actually mutated.
262    ///
263    /// Calling `to_bytes` on an unmodified record is a correctness *and*
264    /// performance bug: it pays a full parse + re-serialise for nothing AND can
265    /// alter the wire bytes (key order, number formatting, whitespace) even
266    /// though the logical value is identical. Reuse the original `Bytes`; only
267    /// reach for `to_bytes` once the value has been edited.
268    ///
269    /// There is deliberately NO `to_bytes_as` cross-format egress. JSON and
270    /// MsgPack have distinct value models (`sonic_rs::Value` vs `rmpv::Value`)
271    /// with no native conversion between them; bridging would mean either a
272    /// hand-rolled recursive value walker or a `serde_json` hop -- the exact
273    /// double-representation this spine exists to avoid. Cross-format egress, if
274    /// a consumer ever needs it, is a separate, explicit concern (YAGNI).
275    ///
276    /// # Errors
277    ///
278    /// Returns [`CodecError::Json`] (JSON serialise) or [`CodecError::Encode`]
279    /// (MsgPack serialise) on encoder failure.
280    pub fn to_bytes(&self) -> Result<Bytes, CodecError> {
281        match self {
282            Self::Json(v) => to_json_bytes(v),
283            Self::MsgPack(v) => to_msgpack_bytes(v),
284        }
285    }
286}
287
288/// Classify a sonic_rs JSON value into a [`FieldRef`] (borrows from `v`).
289///
290/// Order matters: probe the scalar accessors in turn. `as_i64` is tried before
291/// `as_f64` so integers stay [`FieldRef::Int`]; a JSON number with a fractional
292/// part falls through to [`FieldRef::Float`].
293fn json_field_ref(v: &sonic_rs::Value) -> FieldRef<'_> {
294    if let Some(s) = v.as_str() {
295        FieldRef::Str(s)
296    } else if v.is_null() {
297        FieldRef::Null
298    } else if let Some(b) = v.as_bool() {
299        FieldRef::Bool(b)
300    } else if let Some(i) = v.as_i64() {
301        FieldRef::Int(i)
302    } else if let Some(f) = v.as_f64() {
303        FieldRef::Float(f)
304    } else {
305        FieldRef::Other
306    }
307}
308
309/// Find a top-level value for `name` in an rmpv MsgPack value.
310///
311/// Only a [`rmpv::Value::Map`] has named fields. The map is a `Vec<(Value,
312/// Value)>`, so this is a linear scan -- routing maps are small (a handful of
313/// keys), so a linear scan beats building an index. Only string keys match.
314fn msgpack_field<'a>(v: &'a rmpv::Value, name: &str) -> Option<&'a rmpv::Value> {
315    match v {
316        rmpv::Value::Map(pairs) => pairs
317            .iter()
318            .find(|(k, _)| k.as_str() == Some(name))
319            .map(|(_, val)| val),
320        _ => None,
321    }
322}
323
324/// Classify an rmpv MsgPack value into a [`FieldRef`].
325///
326/// MsgPack integers split into signed/unsigned at the wire level; both fold to
327/// `i64` here when they fit. An unsigned value above `i64::MAX` cannot fit `i64`
328/// and is surfaced as [`FieldRef::Float`] via `as_f64` (lossy but it keeps a
329/// numeric field numeric for routing) rather than dropped to `Other`.
330fn msgpack_field_ref(v: &rmpv::Value) -> FieldRef<'_> {
331    match v {
332        rmpv::Value::String(s) => s.as_str().map_or(FieldRef::Other, FieldRef::Str),
333        rmpv::Value::Nil => FieldRef::Null,
334        rmpv::Value::Boolean(b) => FieldRef::Bool(*b),
335        rmpv::Value::Integer(_) => v
336            .as_i64()
337            .map(FieldRef::Int)
338            .or_else(|| v.as_f64().map(FieldRef::Float))
339            .unwrap_or(FieldRef::Other),
340        rmpv::Value::F32(f) => FieldRef::Float(f64::from(*f)),
341        rmpv::Value::F64(f) => FieldRef::Float(*f),
342        // Map / Array / Binary / Ext: routers do not key off containers.
343        _ => FieldRef::Other,
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    // ---- Helpers: build real MsgPack blobs by hand (no serde encode) -------
352    //
353    // We hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv
354    // decoder against the real wire format, not a serde round-trip.
355
356    /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
357    fn fixstr(s: &str) -> Vec<u8> {
358        let bytes = s.as_bytes();
359        assert!(bytes.len() < 32, "fixstr helper only handles len < 32");
360        let len = u8::try_from(bytes.len()).expect("len < 32 fits u8");
361        let mut out = vec![0xa0 | len];
362        out.extend_from_slice(bytes);
363        out
364    }
365
366    /// fixmap header: 0x80 | n (n < 16 entries).
367    fn fixmap_header(n: u8) -> u8 {
368        assert!(n < 16, "fixmap helper only handles < 16 entries");
369        0x80 | n
370    }
371
372    /// Build a logical record `{"_table": "events", "org_id": 42, "live":
373    /// true, "ratio": <f64>, "missing": nil}` as a MsgPack fixmap.
374    fn sample_msgpack() -> Bytes {
375        let mut buf = vec![fixmap_header(5)];
376        // "_table": "events"
377        buf.extend(fixstr("_table"));
378        buf.extend(fixstr("events"));
379        // "org_id": 42  (positive fixint -- the byte is its own value)
380        buf.extend(fixstr("org_id"));
381        buf.push(42);
382        // "live": true (0xc3)
383        buf.extend(fixstr("live"));
384        buf.push(0xc3);
385        // "ratio": 1.5 (float64 0xcb + 8 bytes big-endian)
386        buf.extend(fixstr("ratio"));
387        buf.push(0xcb);
388        buf.extend_from_slice(&1.5f64.to_be_bytes());
389        // "missing": nil (0xc0)
390        buf.extend(fixstr("missing"));
391        buf.push(0xc0);
392        Bytes::from(buf)
393    }
394
395    /// The same logical record as JSON.
396    fn sample_json() -> Bytes {
397        Bytes::from_static(
398            br#"{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":null}"#,
399        )
400    }
401
402    // ---- parse(): JSON -----------------------------------------------------
403
404    #[test]
405    fn parse_json_object() {
406        let parsed = parse(&sample_json(), PayloadFormat::Json).unwrap();
407        assert!(parsed.is_json());
408        assert!(!parsed.is_msgpack());
409        assert_eq!(parsed.field_str("_table"), Some("events"));
410    }
411
412    #[test]
413    fn parse_json_array_is_ok() {
414        // A top-level array is valid JSON; it simply has no named fields.
415        let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
416        assert!(parsed.is_json());
417        assert_eq!(parsed.field_str("anything"), None);
418    }
419
420    // ---- parse(): MsgPack (native rmpv, hand-rolled bytes) -----------------
421
422    #[test]
423    fn parse_msgpack_map() {
424        let parsed = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
425        assert!(parsed.is_msgpack());
426        assert!(!parsed.is_json());
427        assert_eq!(parsed.field_str("_table"), Some("events"));
428    }
429
430    #[test]
431    fn parse_minimal_fixmap() {
432        // {"k": "v"} -- the smallest interesting map.
433        let mut buf = vec![fixmap_header(1)];
434        buf.extend(fixstr("k"));
435        buf.extend(fixstr("v"));
436        let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
437        assert_eq!(parsed.field_str("k"), Some("v"));
438    }
439
440    // ---- Auto detection dispatch ------------------------------------------
441
442    #[test]
443    fn parse_auto_dispatches_to_json() {
444        let parsed = parse(&sample_json(), PayloadFormat::Auto).unwrap();
445        assert!(parsed.is_json(), "object byte '{{' must detect as JSON");
446        assert_eq!(parsed.field_str("_table"), Some("events"));
447    }
448
449    #[test]
450    fn parse_auto_dispatches_to_msgpack() {
451        let parsed = parse(&sample_msgpack(), PayloadFormat::Auto).unwrap();
452        assert!(
453            parsed.is_msgpack(),
454            "fixmap byte 0x85 must detect as MsgPack"
455        );
456        assert_eq!(parsed.field_str("_table"), Some("events"));
457    }
458
459    // ---- Unified accessor: SAME field value from BOTH formats --------------
460
461    #[test]
462    fn field_str_identical_across_formats() {
463        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
464        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
465        // The whole point: same logical record, same routing field, regardless
466        // of wire format.
467        assert_eq!(j.field_str("_table"), m.field_str("_table"));
468        assert_eq!(j.field_str("_table"), Some("events"));
469    }
470
471    #[test]
472    fn field_str_returns_none_for_non_string() {
473        // org_id is an int -- field_str only returns strings.
474        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
475        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
476        assert_eq!(j.field_str("org_id"), None);
477        assert_eq!(m.field_str("org_id"), None);
478    }
479
480    #[test]
481    fn field_str_returns_none_for_missing_key() {
482        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
483        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
484        assert_eq!(j.field_str("nope"), None);
485        assert_eq!(m.field_str("nope"), None);
486    }
487
488    #[test]
489    fn field_str_value_is_present_via_field_too() {
490        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
491        assert_eq!(j.field("_table"), Some(FieldRef::Str("events")));
492    }
493
494    #[test]
495    fn field_int_identical_across_formats() {
496        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
497        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
498        assert_eq!(j.field("org_id"), Some(FieldRef::Int(42)));
499        assert_eq!(m.field("org_id"), Some(FieldRef::Int(42)));
500    }
501
502    #[test]
503    fn field_bool_identical_across_formats() {
504        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
505        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
506        assert_eq!(j.field("live"), Some(FieldRef::Bool(true)));
507        assert_eq!(m.field("live"), Some(FieldRef::Bool(true)));
508    }
509
510    #[test]
511    fn field_float_identical_across_formats() {
512        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
513        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
514        assert_eq!(j.field("ratio"), Some(FieldRef::Float(1.5)));
515        assert_eq!(m.field("ratio"), Some(FieldRef::Float(1.5)));
516    }
517
518    #[test]
519    fn field_null_identical_across_formats() {
520        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
521        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
522        assert_eq!(j.field("missing"), Some(FieldRef::Null));
523        assert_eq!(m.field("missing"), Some(FieldRef::Null));
524    }
525
526    #[test]
527    fn field_missing_key_is_none_for_both() {
528        let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
529        let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
530        assert_eq!(j.field("nope"), None);
531        assert_eq!(m.field("nope"), None);
532    }
533
534    #[test]
535    fn field_nested_object_is_other() {
536        // A field whose value is a container collapses to Other, not None.
537        let j = parse(
538            &Bytes::from_static(br#"{"k":{"nested":1}}"#),
539            PayloadFormat::Json,
540        )
541        .unwrap();
542        assert_eq!(j.field("k"), Some(FieldRef::Other));
543        // ...but it is not a routing scalar, so field_str is None.
544        assert_eq!(j.field_str("k"), None);
545
546        // MsgPack: {"k": [1]} -- an array value also collapses to Other.
547        // fixmap(1) "k" -> fixarray(1) [positive fixint 1]
548        let mut buf = vec![fixmap_header(1)];
549        buf.extend(fixstr("k"));
550        buf.push(0x91); // fixarray with 1 element
551        buf.push(0x01); // positive fixint 1
552        let m = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
553        assert_eq!(m.field("k"), Some(FieldRef::Other));
554    }
555
556    #[test]
557    fn field_on_non_object_top_level_is_none() {
558        // A top-level JSON array has no named fields.
559        let j = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
560        assert_eq!(j.field("0"), None);
561
562        // A top-level MsgPack array (fixarray) likewise.
563        // fixarray(2) [1, 2]
564        let m = parse(&Bytes::from(vec![0x92, 0x01, 0x02]), PayloadFormat::MsgPack).unwrap();
565        assert_eq!(m.field("0"), None);
566    }
567
568    // ---- Error cases -------------------------------------------------------
569
570    #[test]
571    fn malformed_json_errors() {
572        let err = parse(&Bytes::from_static(b"{not valid json"), PayloadFormat::Json).unwrap_err();
573        assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
574        assert!(!err.to_string().is_empty());
575    }
576
577    #[test]
578    fn empty_blob_auto_errors_as_json() {
579        // detect() maps empty -> Json; empty is not valid JSON.
580        let err = parse(&Bytes::new(), PayloadFormat::Auto).unwrap_err();
581        assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
582    }
583
584    #[test]
585    fn malformed_msgpack_errors() {
586        // 0x81 declares a fixmap with one entry but supplies no key/value.
587        let err = parse(&Bytes::from_static(&[0x81]), PayloadFormat::MsgPack).unwrap_err();
588        assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
589        assert!(!err.to_string().is_empty());
590    }
591
592    #[test]
593    fn msgpack_truncated_float_errors() {
594        // 0xcb declares a float64 but supplies only 3 of the 8 payload bytes.
595        let mut buf = vec![fixmap_header(1)];
596        buf.extend(fixstr("ratio"));
597        buf.push(0xcb);
598        buf.extend_from_slice(&[0x00, 0x01, 0x02]); // short
599        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap_err();
600        assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
601    }
602
603    // ---- Task 0.3b: serialise-out (round-trips through real bytes) ---------
604    //
605    // The contract is "parse -> (mutate) -> serialise -> parse again preserves
606    // the logical value". We assert on re-parsed VALUES (via the unified
607    // accessor), NOT on raw bytes: re-serialise may reorder keys or reformat
608    // numbers, so a byte-for-byte equality assertion would be wrong.
609
610    /// Compare every routing field of the canonical sample across two payloads.
611    fn assert_sample_fields_eq(a: &ParsedPayload, b: &ParsedPayload) {
612        assert_eq!(a.field("_table"), b.field("_table"));
613        assert_eq!(a.field("org_id"), b.field("org_id"));
614        assert_eq!(a.field("live"), b.field("live"));
615        assert_eq!(a.field("ratio"), b.field("ratio"));
616        assert_eq!(a.field("missing"), b.field("missing"));
617    }
618
619    #[test]
620    fn json_to_bytes_round_trips() {
621        // parse JSON -> to_bytes -> parse again -> values equal.
622        let original = parse(&sample_json(), PayloadFormat::Json).unwrap();
623        assert!(original.is_json());
624
625        let bytes = original.to_bytes().unwrap();
626        assert!(!bytes.is_empty());
627
628        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
629        assert!(reparsed.is_json(), "JSON must round-trip as JSON");
630        assert_sample_fields_eq(&original, &reparsed);
631    }
632
633    #[test]
634    fn msgpack_to_bytes_round_trips_via_native_bytes() {
635        // parse MsgPack (hand-rolled bytes) -> to_bytes (native rmpv encode)
636        // -> parse again -> values equal. No serde, no JSON bridge anywhere.
637        let original = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
638        assert!(original.is_msgpack());
639
640        let bytes = original.to_bytes().unwrap();
641        assert!(!bytes.is_empty());
642        // First byte must be a MsgPack map marker (fixmap 0x80..=0x8f for 5
643        // entries -> 0x85), proving native MsgPack came out, not JSON.
644        assert_eq!(bytes[0], fixmap_header(5), "expected fixmap(5) wire marker");
645
646        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
647        assert!(reparsed.is_msgpack(), "MsgPack must round-trip as MsgPack");
648        assert_sample_fields_eq(&original, &reparsed);
649    }
650
651    #[test]
652    fn to_json_bytes_reparses_to_same_value() {
653        // Free function: serialise a sonic_rs::Value, re-parse, compare.
654        let ParsedPayload::Json(value) = parse(&sample_json(), PayloadFormat::Json).unwrap() else {
655            panic!("expected JSON");
656        };
657        let bytes = to_json_bytes(&value).unwrap();
658        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
659        assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
660        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
661        assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
662    }
663
664    #[test]
665    fn to_msgpack_bytes_reparses_to_same_value() {
666        // Free function: serialise an rmpv::Value via native write_value,
667        // re-parse, compare.
668        let ParsedPayload::MsgPack(value) =
669            parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap()
670        else {
671            panic!("expected MsgPack");
672        };
673        let bytes = to_msgpack_bytes(&value).unwrap();
674        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
675        assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
676        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
677        assert_eq!(reparsed.field("live"), Some(FieldRef::Bool(true)));
678        assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
679        assert_eq!(reparsed.field("missing"), Some(FieldRef::Null));
680    }
681
682    #[test]
683    fn to_bytes_preserves_a_mutated_json_field() {
684        // The realistic case: a transform CHANGED a field, then re-serialises.
685        // Only then is to_bytes the right tool (unmodified records pass through
686        // the original Bytes -- see the to_bytes doc).
687        let ParsedPayload::Json(mut value) = parse(&sample_json(), PayloadFormat::Json).unwrap()
688        else {
689            panic!("expected JSON");
690        };
691        // Mutate _table in place via sonic_rs's object insert (overwrites the
692        // existing key) -- the value model is what gets re-serialised.
693        value.insert("_table", sonic_rs::Value::from("audit"));
694        let bytes = to_json_bytes(&value).unwrap();
695        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
696        assert_eq!(reparsed.field_str("_table"), Some("audit"));
697        // Untouched siblings survive the round-trip.
698        assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
699    }
700
701    #[test]
702    fn json_to_bytes_handles_top_level_array() {
703        // Egress is not object-only: a top-level array must round-trip too.
704        let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
705        let bytes = parsed.to_bytes().unwrap();
706        let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
707        assert!(reparsed.is_json());
708        // No named fields either way; the value re-parses without error.
709        assert_eq!(reparsed.field_str("anything"), None);
710    }
711
712    #[test]
713    fn msgpack_to_bytes_handles_top_level_scalar() {
714        // A bare MsgPack integer (positive fixint 42) round-trips as itself,
715        // not wrapped in a map.
716        let parsed = parse(&Bytes::from(vec![42u8]), PayloadFormat::MsgPack).unwrap();
717        let bytes = parsed.to_bytes().unwrap();
718        assert_eq!(
719            bytes.as_ref(),
720            &[42u8],
721            "fixint must re-emit byte-identical"
722        );
723        let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
724        assert!(reparsed.is_msgpack());
725    }
726
727    #[test]
728    fn double_round_trip_is_stable() {
729        // parse -> to_bytes -> parse -> to_bytes: the SECOND serialise must
730        // equal the first (the value model is the fixed point, even if it
731        // differs from the original hand-rolled bytes).
732        let first = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
733        let b1 = first.to_bytes().unwrap();
734        let second = parse(&b1, PayloadFormat::MsgPack).unwrap();
735        let b2 = second.to_bytes().unwrap();
736        assert_eq!(b1, b2, "re-serialising a re-parsed value must be stable");
737    }
738
739    // ---- Phase 5: trailing-bytes hardening ------------------------------------
740
741    #[test]
742    fn msgpack_rejects_trailing_bytes() {
743        // A valid fixmap {"k": "v"} followed by a stray nil byte (0xc0).
744        // Before the fix this returns Ok (silently ignoring 0xc0).
745        // After the fix it must return CodecError::TrailingBytes(1).
746        let mut buf = vec![fixmap_header(1)];
747        buf.extend(fixstr("k"));
748        buf.extend(fixstr("v"));
749        buf.push(0xc0); // stray nil -- trailing garbage
750        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
751            .expect_err("trailing byte must be rejected");
752        match err {
753            CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
754            other => panic!("expected TrailingBytes(1), got {other:?}"),
755        }
756    }
757
758    #[test]
759    fn msgpack_rejects_concatenated_values() {
760        // Two valid MsgPack values back-to-back: fixint 1 then fixint 2.
761        // parse() decodes ONE value; the second byte is trailing and must error.
762        let buf = vec![0x01u8, 0x02u8]; // positive fixint 1, positive fixint 2
763        let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
764            .expect_err("concatenated values must be rejected");
765        match err {
766            CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
767            other => panic!("expected TrailingBytes(1), got {other:?}"),
768        }
769    }
770
771    #[test]
772    fn msgpack_clean_single_value_still_parses_ok() {
773        // A single valid fixmap with no trailing bytes -- must still parse Ok.
774        // Regression guard: the fix must not break the happy path.
775        let mut buf = vec![fixmap_header(1)];
776        buf.extend(fixstr("k"));
777        buf.extend(fixstr("v"));
778        let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
779        assert_eq!(parsed.field_str("k"), Some("v"));
780    }
781
782    #[test]
783    fn json_rejects_trailing_garbage() {
784        // The MsgPack path rejects trailing bytes (CodecError::TrailingBytes).
785        // The JSON path delegates to sonic_rs, which rejects trailing
786        // NON-whitespace content after a complete value (serde_json semantics).
787        // A valid object followed by stray garbage must error.
788        let mut buf = br#"{"_table":"events"}"#.to_vec();
789        buf.extend_from_slice(b"garbage");
790        let err = parse(&Bytes::from(buf), PayloadFormat::Json)
791            .expect_err("trailing non-whitespace garbage must be rejected");
792        assert!(
793            matches!(err, CodecError::Json(_)),
794            "expected CodecError::Json for trailing garbage, got {err:?}"
795        );
796    }
797
798    #[test]
799    fn json_accepts_trailing_whitespace() {
800        // sonic_rs (like serde_json) tolerates trailing whitespace after a
801        // complete value -- a pretty-printer's trailing newline must not be a
802        // parse error. Asserted alongside the trailing-garbage rejection so the
803        // JSON trailing-byte contract is pinned both ways.
804        let mut buf = br#"{"_table":"events"}"#.to_vec();
805        buf.extend_from_slice(b" \t\r\n");
806        let parsed = parse(&Bytes::from(buf), PayloadFormat::Json)
807            .expect("trailing whitespace must be accepted");
808        assert_eq!(parsed.field_str("_table"), Some("events"));
809    }
810
811    #[test]
812    fn json_parsed_as_msgpack_errors() {
813        // Force the wrong decoder: JSON bytes through the MsgPack path. '{' is
814        // 0x7b, which rmpv reads as a positive fixint -- a single-byte value
815        // -- leaving the remaining 69 bytes of the JSON payload as trailing
816        // bytes. After Phase 5 hardening this MUST error with TrailingBytes.
817        let err = parse(&sample_json(), PayloadFormat::MsgPack)
818            .expect_err("JSON fed to MsgPack path must error after trailing-bytes hardening");
819        assert!(
820            matches!(err, CodecError::TrailingBytes(_)),
821            "expected TrailingBytes, got {err:?}"
822        );
823    }
824}