hyperi_rustlib/transport/codec.rs
1// Project: hyperi-rustlib
2// File: src/transport/codec.rs
3// Purpose: Parse-on-demand WorkBatch codec (native JSON + MsgPack, no bridge)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Parse-on-demand codec (Task 0.3a)
10//!
11//! The data-plane spine frames bytes into a [`WorkBatch`](super::WorkBatch)
12//! WITHOUT parsing (Task 0.2). A transform / router that needs to read a field
13//! parses on demand -- and it should not care whether the record arrived as
14//! JSON or MsgPack. This module is that parse step.
15//!
16//! ## Native, no JSON bridge
17//!
18//! There is no `rmp_serde -> serde_json::Value -> serde_json::to_vec ->
19//! sonic_rs` bridge anywhere on the parse path -- that double-parse-and-
20//! re-serialise was killed in Phase 0.7c (the engine `parse.rs` MsgPack path
21//! now walks `rmpv` straight into a `sonic_rs::Value`). Both this codec and
22//! the engine decode natively:
23//!
24//! - **JSON** is parsed once with [`sonic_rs`] (SIMD, AVX2/NEON).
25//! - **MsgPack** is parsed once with [`rmpv`] -- the schema-less `Value` decoder
26//! from the same `3Hren/msgpack-rust` workspace as `rmp-serde`. No
27//! intermediate `serde_json::Value`, no JSON re-serialise.
28//!
29//! ## Unified routing-field accessor
30//!
31//! A router keys off ONE field (`_table`, `org_id`, ...) and must not branch on
32//! wire format. [`ParsedPayload`] exposes a format-agnostic accessor:
33//!
34//! - [`ParsedPayload::field_str`] -- the common case: a top-level string field.
35//! - [`ParsedPayload::field`] -- a [`FieldRef`] covering the scalar routing
36//! cases (string / int / float / bool / null), with everything else
37//! ([`FieldRef::Other`]) deliberately collapsed because routers do not key
38//! off nested containers.
39//!
40//! **Scope:** top-level object-key lookup only. No deep JSON-path -- routing
41//! keys live at the top level, and a deep-path query is a separate concern that
42//! YAGNI keeps out of the hot routing path.
43//!
44//! See `docs/MIGRATIONS.md` (codec consolidation: native rmpv, JSON bridge
45//! removed) and `docs/SELF-REGULATION.md` for where this codec sits in the
46//! `WorkBatch` data-plane spine. The block contract is in
47//! [`WorkBatch`](crate::transport::WorkBatch).
48
49use super::types::PayloadFormat;
50use bytes::Bytes;
51use sonic_rs::JsonValueTrait as _;
52use thiserror::Error;
53
54/// A parse failure, tagged by the format that failed.
55#[derive(Debug, Error)]
56pub enum CodecError {
57 /// JSON parse failed (sonic_rs SIMD parser).
58 #[error("json parse error: {0}")]
59 Json(#[from] sonic_rs::Error),
60
61 /// MsgPack parse failed (native rmpv decoder).
62 #[error("msgpack parse error: {0}")]
63 MsgPack(#[from] rmpv::decode::Error),
64
65 /// MsgPack serialise failed (native rmpv encoder).
66 ///
67 /// `rmpv::encode::Error` is `rmp::encode::ValueWriteError` -- an I/O write
68 /// failure from the underlying writer. Serialising into an in-memory `Vec`
69 /// effectively never fails, but the encoder is fallible so we surface it
70 /// rather than panic. JSON serialise reuses [`CodecError::Json`]
71 /// (`sonic_rs::Error` already covers both parse and serialise).
72 #[error("msgpack encode error: {0}")]
73 Encode(#[from] rmpv::encode::Error),
74
75 /// Trailing bytes remain after a complete MsgPack value was decoded.
76 ///
77 /// A single-record payload must encode exactly ONE value with no leftover
78 /// bytes. Trailing bytes indicate corruption, a framing error, or two
79 /// values concatenated (MsgPack stream framing is a separate, deferred
80 /// feature -- it is NOT supported here).
81 ///
82 /// The `usize` is the number of bytes that remained unconsumed.
83 #[error("msgpack trailing bytes: {0} byte(s) remain after value")]
84 TrailingBytes(usize),
85}
86
87/// A parsed payload, retaining its native value representation.
88///
89/// JSON stays a [`sonic_rs::Value`] (so the SIMD parse is not thrown away) and
90/// MsgPack stays an [`rmpv::Value`] (native, no JSON bridge). A consumer that
91/// only needs a routing field should reach for [`ParsedPayload::field_str`] /
92/// [`ParsedPayload::field`] rather than matching the variant -- that is the
93/// whole point of the unified accessor.
94#[derive(Debug, Clone)]
95pub enum ParsedPayload {
96 /// JSON value parsed by sonic_rs (SIMD).
97 Json(sonic_rs::Value),
98 /// MsgPack value parsed natively by rmpv (no serde_json bridge).
99 MsgPack(rmpv::Value),
100}
101
102/// A borrowed view of one routing field, format-agnostic.
103///
104/// This is the shared currency the unified accessor returns so a router need
105/// not know whether the record was JSON or MsgPack. It covers the scalar cases
106/// a router actually keys off; nested objects / arrays / binary / ext collapse
107/// to [`FieldRef::Other`] because routing never branches on a container.
108///
109/// `Str` borrows from the parsed value (zero-copy); the numeric / bool variants
110/// are `Copy` scalars.
111#[derive(Debug, Clone, Copy, PartialEq)]
112pub enum FieldRef<'a> {
113 /// A string field (borrowed from the parsed value).
114 Str(&'a str),
115 /// An integer field (MsgPack ints and JSON integers fold to `i64`).
116 Int(i64),
117 /// A floating-point field.
118 Float(f64),
119 /// A boolean field.
120 Bool(bool),
121 /// An explicit null / nil field.
122 Null,
123 /// Present but not a routing scalar (object, array, binary, ext, ...).
124 Other,
125}
126
127/// Parse a framed payload into a native [`ParsedPayload`].
128///
129/// - [`PayloadFormat::Json`] -> sonic_rs (SIMD).
130/// - [`PayloadFormat::MsgPack`] -> rmpv (native, no JSON bridge).
131/// - [`PayloadFormat::Auto`] -> [`PayloadFormat::detect`] then dispatch. An empty
132/// blob detects as JSON (matching `detect`'s contract) and surfaces a
133/// [`CodecError::Json`] -- empty input is not valid JSON.
134///
135/// # Errors
136///
137/// Returns [`CodecError::Json`] or [`CodecError::MsgPack`] when the bytes are
138/// malformed for the (detected or declared) format.
139pub fn parse(payload: &Bytes, format: PayloadFormat) -> Result<ParsedPayload, CodecError> {
140 let effective = match format {
141 PayloadFormat::Auto => PayloadFormat::detect(payload),
142 other => other,
143 };
144
145 match effective {
146 // detect() never yields Auto, but treat a residual Auto as JSON.
147 PayloadFormat::Json | PayloadFormat::Auto => {
148 let value: sonic_rs::Value = sonic_rs::from_slice(payload)?;
149 Ok(ParsedPayload::Json(value))
150 }
151 PayloadFormat::MsgPack => {
152 // rmpv::decode::read_value reads from any `io::Read`; a byte slice
153 // is one. `&mut &[u8]` advances the cursor as it decodes. This is a
154 // SINGLE native decode -- no rmp_serde, no serde_json, no re-encode.
155 let mut cursor: &[u8] = payload.as_ref();
156 let value = rmpv::decode::read_value(&mut cursor)?;
157 // A single-record payload encodes exactly ONE value. Any bytes still
158 // in `cursor` after the value was decoded indicate corruption,
159 // framing misalignment, or concatenated values. Reject them.
160 // MsgPack-stream framing (multiple values per payload) is a separate
161 // deferred feature -- do NOT silently accept trailing bytes here.
162 let remaining = cursor.len();
163 if remaining > 0 {
164 return Err(CodecError::TrailingBytes(remaining));
165 }
166 Ok(ParsedPayload::MsgPack(value))
167 }
168 }
169}
170
171/// Serialise a JSON value to bytes via [`sonic_rs`] (SIMD), no bridge.
172///
173/// The inverse of the JSON arm of [`parse`]. Reuses sonic_rs end-to-end so a
174/// transform that mutates a parsed JSON value re-emits it without ever touching
175/// `serde_json`.
176///
177/// # Errors
178///
179/// Returns [`CodecError::Json`] if sonic_rs fails to serialise the value.
180pub fn to_json_bytes(value: &sonic_rs::Value) -> Result<Bytes, CodecError> {
181 let buf = sonic_rs::to_vec(value)?;
182 Ok(Bytes::from(buf))
183}
184
185/// Serialise a MsgPack value to bytes via NATIVE [`rmpv::encode::write_value`].
186///
187/// The inverse of the MsgPack arm of [`parse`]. This is the native rmpv encoder
188/// -- NOT `rmp_serde`, NOT a JSON bridge. A transform that mutates a parsed
189/// `rmpv::Value` re-emits MsgPack wire bytes with a single native encode, no
190/// intermediate `serde_json::Value`, no re-parse.
191///
192/// # Errors
193///
194/// Returns [`CodecError::Encode`] if the encoder fails to write the value. For
195/// an in-memory `Vec` writer this is effectively unreachable, but the encoder
196/// is fallible so the error is surfaced rather than unwrapped.
197pub fn to_msgpack_bytes(value: &rmpv::Value) -> Result<Bytes, CodecError> {
198 // write_value writes into any `io::Write`; a Vec<u8> is one and never
199 // returns a short write, so the only failure path is the encoder's own.
200 let mut buf: Vec<u8> = Vec::new();
201 rmpv::encode::write_value(&mut buf, value)?;
202 Ok(Bytes::from(buf))
203}
204
205impl ParsedPayload {
206 /// Whether the payload was decoded from JSON.
207 #[must_use]
208 pub fn is_json(&self) -> bool {
209 matches!(self, Self::Json(_))
210 }
211
212 /// Whether the payload was decoded from MsgPack.
213 #[must_use]
214 pub fn is_msgpack(&self) -> bool {
215 matches!(self, Self::MsgPack(_))
216 }
217
218 /// Read a top-level string field, format-agnostic.
219 ///
220 /// The common routing case: a router keys off one string field and does not
221 /// care about wire format. Returns `None` if the value is not a top-level
222 /// object, the key is absent, or the field is not a string. Borrows from the
223 /// parsed value (zero-copy).
224 ///
225 /// Top-level lookup only -- see the module docs.
226 #[must_use]
227 pub fn field_str(&self, name: &str) -> Option<&str> {
228 match self {
229 Self::Json(v) => v.get(name).and_then(|f| f.as_str()),
230 Self::MsgPack(v) => msgpack_field(v, name).and_then(rmpv::Value::as_str),
231 }
232 }
233
234 /// Read a top-level field as a format-agnostic [`FieldRef`].
235 ///
236 /// Returns `None` only when the value is not a top-level object or the key
237 /// is absent. A present-but-non-scalar field yields [`FieldRef::Other`]
238 /// (routers never key off containers). Borrows from the parsed value.
239 ///
240 /// Top-level lookup only -- see the module docs.
241 #[must_use]
242 pub fn field(&self, name: &str) -> Option<FieldRef<'_>> {
243 match self {
244 Self::Json(v) => v.get(name).map(json_field_ref),
245 Self::MsgPack(v) => msgpack_field(v, name).map(msgpack_field_ref),
246 }
247 }
248
249 /// Serialise back to the payload's OWN wire format (Task 0.3b).
250 ///
251 /// `Json` -> JSON bytes (via [`to_json_bytes`]), `MsgPack` -> MsgPack bytes
252 /// (via [`to_msgpack_bytes`]). Same format in, same format out -- no
253 /// cross-format conversion, no bridge.
254 ///
255 /// ## Pass-through contract -- DO NOT round-trip untouched records
256 ///
257 /// This is the egress face of a *parse-on-demand* spine. The governing
258 /// principle is "serde is the enemy / zero re-representation": a record that
259 /// a transform did NOT change must re-use its original `Record.payload`
260 /// (the `Bytes` it arrived as) directly on egress. `to_bytes` is ONLY for a
261 /// record a transform actually mutated.
262 ///
263 /// Calling `to_bytes` on an unmodified record is a correctness *and*
264 /// performance bug: it pays a full parse + re-serialise for nothing AND can
265 /// alter the wire bytes (key order, number formatting, whitespace) even
266 /// though the logical value is identical. Reuse the original `Bytes`; only
267 /// reach for `to_bytes` once the value has been edited.
268 ///
269 /// There is deliberately NO `to_bytes_as` cross-format egress. JSON and
270 /// MsgPack have distinct value models (`sonic_rs::Value` vs `rmpv::Value`)
271 /// with no native conversion between them; bridging would mean either a
272 /// hand-rolled recursive value walker or a `serde_json` hop -- the exact
273 /// double-representation this spine exists to avoid. Cross-format egress, if
274 /// a consumer ever needs it, is a separate, explicit concern (YAGNI).
275 ///
276 /// # Errors
277 ///
278 /// Returns [`CodecError::Json`] (JSON serialise) or [`CodecError::Encode`]
279 /// (MsgPack serialise) on encoder failure.
280 pub fn to_bytes(&self) -> Result<Bytes, CodecError> {
281 match self {
282 Self::Json(v) => to_json_bytes(v),
283 Self::MsgPack(v) => to_msgpack_bytes(v),
284 }
285 }
286}
287
288/// Classify a sonic_rs JSON value into a [`FieldRef`] (borrows from `v`).
289///
290/// Order matters: probe the scalar accessors in turn. `as_i64` is tried before
291/// `as_f64` so integers stay [`FieldRef::Int`]; a JSON number with a fractional
292/// part falls through to [`FieldRef::Float`].
293fn json_field_ref(v: &sonic_rs::Value) -> FieldRef<'_> {
294 if let Some(s) = v.as_str() {
295 FieldRef::Str(s)
296 } else if v.is_null() {
297 FieldRef::Null
298 } else if let Some(b) = v.as_bool() {
299 FieldRef::Bool(b)
300 } else if let Some(i) = v.as_i64() {
301 FieldRef::Int(i)
302 } else if let Some(f) = v.as_f64() {
303 FieldRef::Float(f)
304 } else {
305 FieldRef::Other
306 }
307}
308
309/// Find a top-level value for `name` in an rmpv MsgPack value.
310///
311/// Only a [`rmpv::Value::Map`] has named fields. The map is a `Vec<(Value,
312/// Value)>`, so this is a linear scan -- routing maps are small (a handful of
313/// keys), so a linear scan beats building an index. Only string keys match.
314fn msgpack_field<'a>(v: &'a rmpv::Value, name: &str) -> Option<&'a rmpv::Value> {
315 match v {
316 rmpv::Value::Map(pairs) => pairs
317 .iter()
318 .find(|(k, _)| k.as_str() == Some(name))
319 .map(|(_, val)| val),
320 _ => None,
321 }
322}
323
324/// Classify an rmpv MsgPack value into a [`FieldRef`].
325///
326/// MsgPack integers split into signed/unsigned at the wire level; both fold to
327/// `i64` here when they fit. An unsigned value above `i64::MAX` cannot fit `i64`
328/// and is surfaced as [`FieldRef::Float`] via `as_f64` (lossy but it keeps a
329/// numeric field numeric for routing) rather than dropped to `Other`.
330fn msgpack_field_ref(v: &rmpv::Value) -> FieldRef<'_> {
331 match v {
332 rmpv::Value::String(s) => s.as_str().map_or(FieldRef::Other, FieldRef::Str),
333 rmpv::Value::Nil => FieldRef::Null,
334 rmpv::Value::Boolean(b) => FieldRef::Bool(*b),
335 rmpv::Value::Integer(_) => v
336 .as_i64()
337 .map(FieldRef::Int)
338 .or_else(|| v.as_f64().map(FieldRef::Float))
339 .unwrap_or(FieldRef::Other),
340 rmpv::Value::F32(f) => FieldRef::Float(f64::from(*f)),
341 rmpv::Value::F64(f) => FieldRef::Float(*f),
342 // Map / Array / Binary / Ext: routers do not key off containers.
343 _ => FieldRef::Other,
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 // ---- Helpers: build real MsgPack blobs by hand (no serde encode) -------
352 //
353 // We hand-roll the MsgPack bytes so the test exercises the NATIVE rmpv
354 // decoder against the real wire format, not a serde round-trip.
355
356 /// fixstr: 0xa0 | len, then the UTF-8 bytes (len < 32).
357 fn fixstr(s: &str) -> Vec<u8> {
358 let bytes = s.as_bytes();
359 assert!(bytes.len() < 32, "fixstr helper only handles len < 32");
360 let len = u8::try_from(bytes.len()).expect("len < 32 fits u8");
361 let mut out = vec![0xa0 | len];
362 out.extend_from_slice(bytes);
363 out
364 }
365
366 /// fixmap header: 0x80 | n (n < 16 entries).
367 fn fixmap_header(n: u8) -> u8 {
368 assert!(n < 16, "fixmap helper only handles < 16 entries");
369 0x80 | n
370 }
371
372 /// Build a logical record `{"_table": "events", "org_id": 42, "live":
373 /// true, "ratio": <f64>, "missing": nil}` as a MsgPack fixmap.
374 fn sample_msgpack() -> Bytes {
375 let mut buf = vec![fixmap_header(5)];
376 // "_table": "events"
377 buf.extend(fixstr("_table"));
378 buf.extend(fixstr("events"));
379 // "org_id": 42 (positive fixint -- the byte is its own value)
380 buf.extend(fixstr("org_id"));
381 buf.push(42);
382 // "live": true (0xc3)
383 buf.extend(fixstr("live"));
384 buf.push(0xc3);
385 // "ratio": 1.5 (float64 0xcb + 8 bytes big-endian)
386 buf.extend(fixstr("ratio"));
387 buf.push(0xcb);
388 buf.extend_from_slice(&1.5f64.to_be_bytes());
389 // "missing": nil (0xc0)
390 buf.extend(fixstr("missing"));
391 buf.push(0xc0);
392 Bytes::from(buf)
393 }
394
395 /// The same logical record as JSON.
396 fn sample_json() -> Bytes {
397 Bytes::from_static(
398 br#"{"_table":"events","org_id":42,"live":true,"ratio":1.5,"missing":null}"#,
399 )
400 }
401
402 // ---- parse(): JSON -----------------------------------------------------
403
404 #[test]
405 fn parse_json_object() {
406 let parsed = parse(&sample_json(), PayloadFormat::Json).unwrap();
407 assert!(parsed.is_json());
408 assert!(!parsed.is_msgpack());
409 assert_eq!(parsed.field_str("_table"), Some("events"));
410 }
411
412 #[test]
413 fn parse_json_array_is_ok() {
414 // A top-level array is valid JSON; it simply has no named fields.
415 let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
416 assert!(parsed.is_json());
417 assert_eq!(parsed.field_str("anything"), None);
418 }
419
420 // ---- parse(): MsgPack (native rmpv, hand-rolled bytes) -----------------
421
422 #[test]
423 fn parse_msgpack_map() {
424 let parsed = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
425 assert!(parsed.is_msgpack());
426 assert!(!parsed.is_json());
427 assert_eq!(parsed.field_str("_table"), Some("events"));
428 }
429
430 #[test]
431 fn parse_minimal_fixmap() {
432 // {"k": "v"} -- the smallest interesting map.
433 let mut buf = vec![fixmap_header(1)];
434 buf.extend(fixstr("k"));
435 buf.extend(fixstr("v"));
436 let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
437 assert_eq!(parsed.field_str("k"), Some("v"));
438 }
439
440 // ---- Auto detection dispatch ------------------------------------------
441
442 #[test]
443 fn parse_auto_dispatches_to_json() {
444 let parsed = parse(&sample_json(), PayloadFormat::Auto).unwrap();
445 assert!(parsed.is_json(), "object byte '{{' must detect as JSON");
446 assert_eq!(parsed.field_str("_table"), Some("events"));
447 }
448
449 #[test]
450 fn parse_auto_dispatches_to_msgpack() {
451 let parsed = parse(&sample_msgpack(), PayloadFormat::Auto).unwrap();
452 assert!(
453 parsed.is_msgpack(),
454 "fixmap byte 0x85 must detect as MsgPack"
455 );
456 assert_eq!(parsed.field_str("_table"), Some("events"));
457 }
458
459 // ---- Unified accessor: SAME field value from BOTH formats --------------
460
461 #[test]
462 fn field_str_identical_across_formats() {
463 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
464 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
465 // The whole point: same logical record, same routing field, regardless
466 // of wire format.
467 assert_eq!(j.field_str("_table"), m.field_str("_table"));
468 assert_eq!(j.field_str("_table"), Some("events"));
469 }
470
471 #[test]
472 fn field_str_returns_none_for_non_string() {
473 // org_id is an int -- field_str only returns strings.
474 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
475 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
476 assert_eq!(j.field_str("org_id"), None);
477 assert_eq!(m.field_str("org_id"), None);
478 }
479
480 #[test]
481 fn field_str_returns_none_for_missing_key() {
482 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
483 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
484 assert_eq!(j.field_str("nope"), None);
485 assert_eq!(m.field_str("nope"), None);
486 }
487
488 #[test]
489 fn field_str_value_is_present_via_field_too() {
490 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
491 assert_eq!(j.field("_table"), Some(FieldRef::Str("events")));
492 }
493
494 #[test]
495 fn field_int_identical_across_formats() {
496 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
497 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
498 assert_eq!(j.field("org_id"), Some(FieldRef::Int(42)));
499 assert_eq!(m.field("org_id"), Some(FieldRef::Int(42)));
500 }
501
502 #[test]
503 fn field_bool_identical_across_formats() {
504 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
505 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
506 assert_eq!(j.field("live"), Some(FieldRef::Bool(true)));
507 assert_eq!(m.field("live"), Some(FieldRef::Bool(true)));
508 }
509
510 #[test]
511 fn field_float_identical_across_formats() {
512 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
513 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
514 assert_eq!(j.field("ratio"), Some(FieldRef::Float(1.5)));
515 assert_eq!(m.field("ratio"), Some(FieldRef::Float(1.5)));
516 }
517
518 #[test]
519 fn field_null_identical_across_formats() {
520 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
521 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
522 assert_eq!(j.field("missing"), Some(FieldRef::Null));
523 assert_eq!(m.field("missing"), Some(FieldRef::Null));
524 }
525
526 #[test]
527 fn field_missing_key_is_none_for_both() {
528 let j = parse(&sample_json(), PayloadFormat::Json).unwrap();
529 let m = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
530 assert_eq!(j.field("nope"), None);
531 assert_eq!(m.field("nope"), None);
532 }
533
534 #[test]
535 fn field_nested_object_is_other() {
536 // A field whose value is a container collapses to Other, not None.
537 let j = parse(
538 &Bytes::from_static(br#"{"k":{"nested":1}}"#),
539 PayloadFormat::Json,
540 )
541 .unwrap();
542 assert_eq!(j.field("k"), Some(FieldRef::Other));
543 // ...but it is not a routing scalar, so field_str is None.
544 assert_eq!(j.field_str("k"), None);
545
546 // MsgPack: {"k": [1]} -- an array value also collapses to Other.
547 // fixmap(1) "k" -> fixarray(1) [positive fixint 1]
548 let mut buf = vec![fixmap_header(1)];
549 buf.extend(fixstr("k"));
550 buf.push(0x91); // fixarray with 1 element
551 buf.push(0x01); // positive fixint 1
552 let m = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
553 assert_eq!(m.field("k"), Some(FieldRef::Other));
554 }
555
556 #[test]
557 fn field_on_non_object_top_level_is_none() {
558 // A top-level JSON array has no named fields.
559 let j = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
560 assert_eq!(j.field("0"), None);
561
562 // A top-level MsgPack array (fixarray) likewise.
563 // fixarray(2) [1, 2]
564 let m = parse(&Bytes::from(vec![0x92, 0x01, 0x02]), PayloadFormat::MsgPack).unwrap();
565 assert_eq!(m.field("0"), None);
566 }
567
568 // ---- Error cases -------------------------------------------------------
569
570 #[test]
571 fn malformed_json_errors() {
572 let err = parse(&Bytes::from_static(b"{not valid json"), PayloadFormat::Json).unwrap_err();
573 assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
574 assert!(!err.to_string().is_empty());
575 }
576
577 #[test]
578 fn empty_blob_auto_errors_as_json() {
579 // detect() maps empty -> Json; empty is not valid JSON.
580 let err = parse(&Bytes::new(), PayloadFormat::Auto).unwrap_err();
581 assert!(matches!(err, CodecError::Json(_)), "got {err:?}");
582 }
583
584 #[test]
585 fn malformed_msgpack_errors() {
586 // 0x81 declares a fixmap with one entry but supplies no key/value.
587 let err = parse(&Bytes::from_static(&[0x81]), PayloadFormat::MsgPack).unwrap_err();
588 assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
589 assert!(!err.to_string().is_empty());
590 }
591
592 #[test]
593 fn msgpack_truncated_float_errors() {
594 // 0xcb declares a float64 but supplies only 3 of the 8 payload bytes.
595 let mut buf = vec![fixmap_header(1)];
596 buf.extend(fixstr("ratio"));
597 buf.push(0xcb);
598 buf.extend_from_slice(&[0x00, 0x01, 0x02]); // short
599 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap_err();
600 assert!(matches!(err, CodecError::MsgPack(_)), "got {err:?}");
601 }
602
603 // ---- Task 0.3b: serialise-out (round-trips through real bytes) ---------
604 //
605 // The contract is "parse -> (mutate) -> serialise -> parse again preserves
606 // the logical value". We assert on re-parsed VALUES (via the unified
607 // accessor), NOT on raw bytes: re-serialise may reorder keys or reformat
608 // numbers, so a byte-for-byte equality assertion would be wrong.
609
610 /// Compare every routing field of the canonical sample across two payloads.
611 fn assert_sample_fields_eq(a: &ParsedPayload, b: &ParsedPayload) {
612 assert_eq!(a.field("_table"), b.field("_table"));
613 assert_eq!(a.field("org_id"), b.field("org_id"));
614 assert_eq!(a.field("live"), b.field("live"));
615 assert_eq!(a.field("ratio"), b.field("ratio"));
616 assert_eq!(a.field("missing"), b.field("missing"));
617 }
618
619 #[test]
620 fn json_to_bytes_round_trips() {
621 // parse JSON -> to_bytes -> parse again -> values equal.
622 let original = parse(&sample_json(), PayloadFormat::Json).unwrap();
623 assert!(original.is_json());
624
625 let bytes = original.to_bytes().unwrap();
626 assert!(!bytes.is_empty());
627
628 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
629 assert!(reparsed.is_json(), "JSON must round-trip as JSON");
630 assert_sample_fields_eq(&original, &reparsed);
631 }
632
633 #[test]
634 fn msgpack_to_bytes_round_trips_via_native_bytes() {
635 // parse MsgPack (hand-rolled bytes) -> to_bytes (native rmpv encode)
636 // -> parse again -> values equal. No serde, no JSON bridge anywhere.
637 let original = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
638 assert!(original.is_msgpack());
639
640 let bytes = original.to_bytes().unwrap();
641 assert!(!bytes.is_empty());
642 // First byte must be a MsgPack map marker (fixmap 0x80..=0x8f for 5
643 // entries -> 0x85), proving native MsgPack came out, not JSON.
644 assert_eq!(bytes[0], fixmap_header(5), "expected fixmap(5) wire marker");
645
646 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
647 assert!(reparsed.is_msgpack(), "MsgPack must round-trip as MsgPack");
648 assert_sample_fields_eq(&original, &reparsed);
649 }
650
651 #[test]
652 fn to_json_bytes_reparses_to_same_value() {
653 // Free function: serialise a sonic_rs::Value, re-parse, compare.
654 let ParsedPayload::Json(value) = parse(&sample_json(), PayloadFormat::Json).unwrap() else {
655 panic!("expected JSON");
656 };
657 let bytes = to_json_bytes(&value).unwrap();
658 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
659 assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
660 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
661 assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
662 }
663
664 #[test]
665 fn to_msgpack_bytes_reparses_to_same_value() {
666 // Free function: serialise an rmpv::Value via native write_value,
667 // re-parse, compare.
668 let ParsedPayload::MsgPack(value) =
669 parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap()
670 else {
671 panic!("expected MsgPack");
672 };
673 let bytes = to_msgpack_bytes(&value).unwrap();
674 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
675 assert_eq!(reparsed.field("_table"), Some(FieldRef::Str("events")));
676 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
677 assert_eq!(reparsed.field("live"), Some(FieldRef::Bool(true)));
678 assert_eq!(reparsed.field("ratio"), Some(FieldRef::Float(1.5)));
679 assert_eq!(reparsed.field("missing"), Some(FieldRef::Null));
680 }
681
682 #[test]
683 fn to_bytes_preserves_a_mutated_json_field() {
684 // The realistic case: a transform CHANGED a field, then re-serialises.
685 // Only then is to_bytes the right tool (unmodified records pass through
686 // the original Bytes -- see the to_bytes doc).
687 let ParsedPayload::Json(mut value) = parse(&sample_json(), PayloadFormat::Json).unwrap()
688 else {
689 panic!("expected JSON");
690 };
691 // Mutate _table in place via sonic_rs's object insert (overwrites the
692 // existing key) -- the value model is what gets re-serialised.
693 value.insert("_table", sonic_rs::Value::from("audit"));
694 let bytes = to_json_bytes(&value).unwrap();
695 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
696 assert_eq!(reparsed.field_str("_table"), Some("audit"));
697 // Untouched siblings survive the round-trip.
698 assert_eq!(reparsed.field("org_id"), Some(FieldRef::Int(42)));
699 }
700
701 #[test]
702 fn json_to_bytes_handles_top_level_array() {
703 // Egress is not object-only: a top-level array must round-trip too.
704 let parsed = parse(&Bytes::from_static(b"[1,2,3]"), PayloadFormat::Json).unwrap();
705 let bytes = parsed.to_bytes().unwrap();
706 let reparsed = parse(&bytes, PayloadFormat::Json).unwrap();
707 assert!(reparsed.is_json());
708 // No named fields either way; the value re-parses without error.
709 assert_eq!(reparsed.field_str("anything"), None);
710 }
711
712 #[test]
713 fn msgpack_to_bytes_handles_top_level_scalar() {
714 // A bare MsgPack integer (positive fixint 42) round-trips as itself,
715 // not wrapped in a map.
716 let parsed = parse(&Bytes::from(vec![42u8]), PayloadFormat::MsgPack).unwrap();
717 let bytes = parsed.to_bytes().unwrap();
718 assert_eq!(
719 bytes.as_ref(),
720 &[42u8],
721 "fixint must re-emit byte-identical"
722 );
723 let reparsed = parse(&bytes, PayloadFormat::MsgPack).unwrap();
724 assert!(reparsed.is_msgpack());
725 }
726
727 #[test]
728 fn double_round_trip_is_stable() {
729 // parse -> to_bytes -> parse -> to_bytes: the SECOND serialise must
730 // equal the first (the value model is the fixed point, even if it
731 // differs from the original hand-rolled bytes).
732 let first = parse(&sample_msgpack(), PayloadFormat::MsgPack).unwrap();
733 let b1 = first.to_bytes().unwrap();
734 let second = parse(&b1, PayloadFormat::MsgPack).unwrap();
735 let b2 = second.to_bytes().unwrap();
736 assert_eq!(b1, b2, "re-serialising a re-parsed value must be stable");
737 }
738
739 // ---- Phase 5: trailing-bytes hardening ------------------------------------
740
741 #[test]
742 fn msgpack_rejects_trailing_bytes() {
743 // A valid fixmap {"k": "v"} followed by a stray nil byte (0xc0).
744 // Before the fix this returns Ok (silently ignoring 0xc0).
745 // After the fix it must return CodecError::TrailingBytes(1).
746 let mut buf = vec![fixmap_header(1)];
747 buf.extend(fixstr("k"));
748 buf.extend(fixstr("v"));
749 buf.push(0xc0); // stray nil -- trailing garbage
750 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
751 .expect_err("trailing byte must be rejected");
752 match err {
753 CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
754 other => panic!("expected TrailingBytes(1), got {other:?}"),
755 }
756 }
757
758 #[test]
759 fn msgpack_rejects_concatenated_values() {
760 // Two valid MsgPack values back-to-back: fixint 1 then fixint 2.
761 // parse() decodes ONE value; the second byte is trailing and must error.
762 let buf = vec![0x01u8, 0x02u8]; // positive fixint 1, positive fixint 2
763 let err = parse(&Bytes::from(buf), PayloadFormat::MsgPack)
764 .expect_err("concatenated values must be rejected");
765 match err {
766 CodecError::TrailingBytes(n) => assert_eq!(n, 1, "expected 1 trailing byte"),
767 other => panic!("expected TrailingBytes(1), got {other:?}"),
768 }
769 }
770
771 #[test]
772 fn msgpack_clean_single_value_still_parses_ok() {
773 // A single valid fixmap with no trailing bytes -- must still parse Ok.
774 // Regression guard: the fix must not break the happy path.
775 let mut buf = vec![fixmap_header(1)];
776 buf.extend(fixstr("k"));
777 buf.extend(fixstr("v"));
778 let parsed = parse(&Bytes::from(buf), PayloadFormat::MsgPack).unwrap();
779 assert_eq!(parsed.field_str("k"), Some("v"));
780 }
781
782 #[test]
783 fn json_rejects_trailing_garbage() {
784 // The MsgPack path rejects trailing bytes (CodecError::TrailingBytes).
785 // The JSON path delegates to sonic_rs, which rejects trailing
786 // NON-whitespace content after a complete value (serde_json semantics).
787 // A valid object followed by stray garbage must error.
788 let mut buf = br#"{"_table":"events"}"#.to_vec();
789 buf.extend_from_slice(b"garbage");
790 let err = parse(&Bytes::from(buf), PayloadFormat::Json)
791 .expect_err("trailing non-whitespace garbage must be rejected");
792 assert!(
793 matches!(err, CodecError::Json(_)),
794 "expected CodecError::Json for trailing garbage, got {err:?}"
795 );
796 }
797
798 #[test]
799 fn json_accepts_trailing_whitespace() {
800 // sonic_rs (like serde_json) tolerates trailing whitespace after a
801 // complete value -- a pretty-printer's trailing newline must not be a
802 // parse error. Asserted alongside the trailing-garbage rejection so the
803 // JSON trailing-byte contract is pinned both ways.
804 let mut buf = br#"{"_table":"events"}"#.to_vec();
805 buf.extend_from_slice(b" \t\r\n");
806 let parsed = parse(&Bytes::from(buf), PayloadFormat::Json)
807 .expect("trailing whitespace must be accepted");
808 assert_eq!(parsed.field_str("_table"), Some("events"));
809 }
810
811 #[test]
812 fn json_parsed_as_msgpack_errors() {
813 // Force the wrong decoder: JSON bytes through the MsgPack path. '{' is
814 // 0x7b, which rmpv reads as a positive fixint -- a single-byte value
815 // -- leaving the remaining 69 bytes of the JSON payload as trailing
816 // bytes. After Phase 5 hardening this MUST error with TrailingBytes.
817 let err = parse(&sample_json(), PayloadFormat::MsgPack)
818 .expect_err("JSON fed to MsgPack path must error after trailing-bytes hardening");
819 assert!(
820 matches!(err, CodecError::TrailingBytes(_)),
821 "expected TrailingBytes, got {err:?}"
822 );
823 }
824}