Skip to main content

tensogram_core/
framing.rs

1// (C) Copyright 2026- ECMWF and individual contributors.
2//
3// This software is licensed under the terms of the Apache Licence Version 2.0
4// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
5// In applying this licence, ECMWF does not waive the privileges and immunities
6// granted to it by virtue of its status as an intergovernmental organisation nor
7// does it submit to any jurisdiction.
8
9use std::collections::BTreeMap;
10
11use crate::error::{Result, TensogramError};
12use crate::metadata::{self, RESERVED_KEY};
13use crate::types::{DataObjectDescriptor, GlobalMetadata, HashFrame, IndexFrame};
14use crate::wire::{
15    DATA_OBJECT_FOOTER_SIZE, DataObjectFlags, FRAME_END, FRAME_HEADER_SIZE, FrameHeader, FrameType,
16    MAGIC, MessageFlags, POSTAMBLE_SIZE, PREAMBLE_SIZE, Postamble, Preamble,
17};
18
19// ── Frame-level primitives ───────────────────────────────────────────────────
20
21/// Write a complete frame: frame_header + payload + ENDF.
22/// Optionally pads to 8-byte alignment after ENDF.
23fn write_frame(
24    out: &mut Vec<u8>,
25    frame_type: FrameType,
26    version: u16,
27    flags: u16,
28    payload: &[u8],
29    align: bool,
30) {
31    let total_length = (FRAME_HEADER_SIZE + payload.len() + FRAME_END.len()) as u64;
32
33    let fh = FrameHeader {
34        frame_type,
35        version,
36        flags,
37        total_length,
38    };
39    fh.write_to(out);
40    out.extend_from_slice(payload);
41    out.extend_from_slice(FRAME_END);
42
43    if align {
44        let pad = (8 - (out.len() % 8)) % 8;
45        out.extend(std::iter::repeat_n(0u8, pad));
46    }
47}
48
49/// Read one frame from a buffer. Returns (FrameHeader, payload_slice, total_bytes_consumed).
50/// `total_bytes_consumed` includes any padding to the next 8-byte boundary.
51fn read_frame(buf: &[u8]) -> Result<(FrameHeader, &[u8], usize)> {
52    let fh = FrameHeader::read_from(buf)?;
53    let frame_total = usize::try_from(fh.total_length).map_err(|_| {
54        TensogramError::Framing(format!(
55            "frame total_length {} overflows usize",
56            fh.total_length
57        ))
58    })?;
59
60    // Minimum frame: header(16) + ENDF(4) = 20 bytes
61    let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
62    if frame_total < min_frame_size {
63        return Err(TensogramError::Framing(format!(
64            "frame total_length {} is smaller than minimum {min_frame_size}",
65            frame_total
66        )));
67    }
68
69    if frame_total > buf.len() {
70        return Err(TensogramError::Framing(format!(
71            "frame total_length {} exceeds buffer: {}",
72            frame_total,
73            buf.len()
74        )));
75    }
76
77    // Validate ENDF marker
78    let endf_start = frame_total - FRAME_END.len();
79    if &buf[endf_start..frame_total] != FRAME_END {
80        return Err(TensogramError::Framing(format!(
81            "missing ENDF marker at offset {endf_start}"
82        )));
83    }
84
85    let payload = &buf[FRAME_HEADER_SIZE..endf_start];
86
87    // Skip padding after ENDF to next 8-byte boundary
88    let mut consumed = frame_total;
89    let aligned = (consumed + 7) & !7;
90    if aligned <= buf.len() {
91        consumed = aligned;
92    }
93
94    Ok((fh, payload, consumed))
95}
96
97// ── Data Object Frame encode/decode ──────────────────────────────────────────
98
99/// Encode a data object frame with CBOR descriptor after payload (default).
100///
101/// Layout: FrameHeader(16) | payload_bytes | cbor_bytes | cbor_offset(8) + ENDF(4)
102///
103/// The cbor_offset in the footer is the byte offset from frame start to the
104/// start of the CBOR descriptor.
105pub fn encode_data_object_frame(
106    descriptor: &DataObjectDescriptor,
107    payload: &[u8],
108    cbor_before: bool,
109) -> Result<Vec<u8>> {
110    let cbor_bytes = metadata::object_descriptor_to_cbor(descriptor)?;
111    let flags = if cbor_before {
112        0
113    } else {
114        DataObjectFlags::CBOR_AFTER_PAYLOAD
115    };
116
117    // Calculate the total frame length:
118    // frame_header(16) + cbor_bytes + payload + cbor_offset(8) + ENDF(4)
119    let frame_body_len = cbor_bytes.len() + payload.len() + DATA_OBJECT_FOOTER_SIZE;
120    let total_length = (FRAME_HEADER_SIZE + frame_body_len) as u64;
121
122    let mut out = Vec::with_capacity(total_length as usize);
123
124    // Frame header
125    let fh = FrameHeader {
126        frame_type: FrameType::DataObject,
127        version: 1,
128        flags,
129        total_length,
130    };
131    fh.write_to(&mut out);
132
133    if cbor_before {
134        // CBOR descriptor first, then payload
135        let cbor_offset = FRAME_HEADER_SIZE as u64;
136        out.extend_from_slice(&cbor_bytes);
137        out.extend_from_slice(payload);
138        out.extend_from_slice(&cbor_offset.to_be_bytes());
139    } else {
140        // Payload first, then CBOR descriptor (default)
141        let cbor_offset = (FRAME_HEADER_SIZE + payload.len()) as u64;
142        out.extend_from_slice(payload);
143        out.extend_from_slice(&cbor_bytes);
144        out.extend_from_slice(&cbor_offset.to_be_bytes());
145    }
146
147    out.extend_from_slice(FRAME_END);
148
149    debug_assert_eq!(out.len(), total_length as usize);
150    Ok(out)
151}
152
153/// Decode a data object frame, returning the descriptor and payload slice.
154///
155/// `buf` must start at the frame header.
156pub fn decode_data_object_frame(buf: &[u8]) -> Result<(DataObjectDescriptor, &[u8], usize)> {
157    let fh = FrameHeader::read_from(buf)?;
158    if fh.frame_type != FrameType::DataObject {
159        return Err(TensogramError::Framing(format!(
160            "expected DataObject frame, got {:?}",
161            fh.frame_type
162        )));
163    }
164
165    let frame_total = usize::try_from(fh.total_length).map_err(|_| {
166        TensogramError::Framing(format!(
167            "data object frame total_length {} overflows usize",
168            fh.total_length
169        ))
170    })?;
171    // Minimum: frame_header(16) + cbor_offset(8) + ENDF(4) = 28
172    let min_frame_size = FRAME_HEADER_SIZE + DATA_OBJECT_FOOTER_SIZE;
173    if frame_total < min_frame_size {
174        return Err(TensogramError::Framing(format!(
175            "data object frame too small: {} < {}",
176            frame_total, min_frame_size
177        )));
178    }
179    if frame_total > buf.len() {
180        return Err(TensogramError::Framing(format!(
181            "data object frame total_length {} exceeds buffer: {}",
182            frame_total,
183            buf.len()
184        )));
185    }
186
187    // Validate ENDF marker
188    let endf_start = frame_total - FRAME_END.len();
189    if &buf[endf_start..frame_total] != FRAME_END {
190        return Err(TensogramError::Framing(
191            "missing ENDF marker in data object frame".to_string(),
192        ));
193    }
194
195    // Read cbor_offset from the data object footer (8 bytes before ENDF)
196    if endf_start < 8 {
197        return Err(TensogramError::Framing(format!(
198            "data object frame too small for cbor_offset: endf_start={endf_start} < 8"
199        )));
200    }
201    let cbor_offset_pos = endf_start - 8;
202    // cbor_offset_pos is guaranteed >= 0 (checked endf_start >= 8 above),
203    // and cbor_offset_pos + 8 <= endf_start <= frame_total <= buf.len(),
204    // so read_u64_be is safe.
205    let cbor_offset_raw = crate::wire::read_u64_be(buf, cbor_offset_pos);
206    let cbor_offset = usize::try_from(cbor_offset_raw).map_err(|_| {
207        TensogramError::Framing(format!("cbor_offset {cbor_offset_raw} overflows usize"))
208    })?;
209
210    // Validate cbor_offset points within the frame body
211    if cbor_offset < FRAME_HEADER_SIZE || cbor_offset > cbor_offset_pos {
212        return Err(TensogramError::Framing(format!(
213            "cbor_offset {cbor_offset} out of valid range [{FRAME_HEADER_SIZE}, {cbor_offset_pos}]"
214        )));
215    }
216
217    let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
218
219    let (descriptor, payload_slice) = if cbor_after {
220        // Layout: header(16) | payload | cbor | cbor_offset(8) | ENDF(4)
221        let payload_start = FRAME_HEADER_SIZE;
222        let cbor_start = cbor_offset;
223        let cbor_end = cbor_offset_pos;
224        let cbor_slice = &buf[cbor_start..cbor_end];
225        let desc = metadata::cbor_to_object_descriptor(cbor_slice)?;
226        (desc, &buf[payload_start..cbor_start])
227    } else {
228        // Layout: header(16) | cbor | payload | cbor_offset(8) | ENDF(4)
229        // Use Cursor to measure exact consumed CBOR bytes on the wire.
230        // Re-serialization would produce different lengths for non-canonical CBOR.
231        let cbor_start = cbor_offset;
232        let region = &buf[cbor_start..cbor_offset_pos];
233        let mut cursor = std::io::Cursor::new(region);
234        let cbor_value: ciborium::Value = ciborium::from_reader(&mut cursor).map_err(|e| {
235            TensogramError::Metadata(format!("failed to parse object descriptor CBOR: {e}"))
236        })?;
237        let cbor_len = usize::try_from(cursor.position()).map_err(|_| {
238            TensogramError::Metadata("CBOR descriptor length overflows usize".to_string())
239        })?;
240        let payload_start = cbor_start + cbor_len;
241        // Deserialize directly from parsed Value — avoids a second CBOR parse
242        let desc: DataObjectDescriptor = cbor_value.deserialized().map_err(|e| {
243            TensogramError::Metadata(format!("failed to deserialize descriptor: {e}"))
244        })?;
245        (desc, &buf[payload_start..cbor_offset_pos])
246    };
247
248    // Bytes consumed, including padding
249    let mut consumed = frame_total;
250    let aligned = (consumed + 7) & !7;
251    if aligned <= buf.len() {
252        consumed = aligned;
253    }
254
255    Ok((descriptor, payload_slice, consumed))
256}
257
258// ── Message-level encode (buffered mode) ─────────────────────────────────────
259
260/// Encoded data object: descriptor + encoded payload bytes.
261pub struct EncodedObject {
262    pub descriptor: DataObjectDescriptor,
263    pub encoded_payload: Vec<u8>,
264}
265
266/// Build hash frame CBOR if any objects carry hashes.
267fn build_hash_frame_cbor(objects: &[EncodedObject]) -> Result<Option<Vec<u8>>> {
268    let has_hashes = objects.iter().any(|o| o.descriptor.hash.is_some());
269    if !has_hashes {
270        return Ok(None);
271    }
272
273    let hash_type = objects
274        .iter()
275        .find_map(|o| o.descriptor.hash.as_ref())
276        .map(|h| h.hash_type.clone())
277        .unwrap_or_default();
278    let hashes: Vec<String> = objects
279        .iter()
280        .map(|o| {
281            o.descriptor
282                .hash
283                .as_ref()
284                .map(|h| h.value.clone())
285                .unwrap_or_default()
286        })
287        .collect();
288    let hf = HashFrame {
289        object_count: objects.len() as u64,
290        hash_type,
291        hashes,
292    };
293    Ok(Some(metadata::hash_frame_to_cbor(&hf)?))
294}
295
296/// Two-pass index construction: compute data object offsets accounting for
297/// the index frame's own size, which depends on the offsets.
298fn build_index_frame(
299    header_size_no_index: usize,
300    object_frames: &[Vec<u8>],
301) -> Result<Option<Vec<u8>>> {
302    if object_frames.is_empty() {
303        return Ok(None);
304    }
305
306    let frame_lengths: Vec<u64> = object_frames.iter().map(|f| f.len() as u64).collect();
307
308    // First estimate: use dummy offsets of 0 to estimate CBOR size
309    let dummy_idx = IndexFrame {
310        object_count: object_frames.len() as u64,
311        offsets: vec![0u64; object_frames.len()],
312        lengths: frame_lengths.clone(),
313    };
314    let dummy_cbor = metadata::index_to_cbor(&dummy_idx)?;
315    let dummy_frame_size = aligned_frame_total_size(dummy_cbor.len());
316    let data_cursor = header_size_no_index + dummy_frame_size;
317
318    // Compute object offsets with the estimated index size
319    let offsets = compute_object_offsets(data_cursor, object_frames);
320
321    // Build real index with actual offsets
322    let real_idx = IndexFrame {
323        object_count: object_frames.len() as u64,
324        offsets,
325        lengths: frame_lengths.clone(),
326    };
327    let real_cbor = metadata::index_to_cbor(&real_idx)?;
328
329    // If offset values changed CBOR size, recompute once more
330    let final_cbor = if real_cbor.len() != dummy_cbor.len() {
331        let real_frame_size = aligned_frame_total_size(real_cbor.len());
332        let new_data_cursor = header_size_no_index + real_frame_size;
333        let new_offsets = compute_object_offsets(new_data_cursor, object_frames);
334        let final_idx = IndexFrame {
335            object_count: object_frames.len() as u64,
336            offsets: new_offsets,
337            lengths: frame_lengths,
338        };
339        let third_cbor = metadata::index_to_cbor(&final_idx)?;
340        // Guard: a third size change means offsets crossed another CBOR
341        // integer encoding tier, invalidating the layout.
342        if aligned_frame_total_size(third_cbor.len()) != real_frame_size {
343            return Err(TensogramError::Framing(
344                "index CBOR size changed unexpectedly on third pass".to_string(),
345            ));
346        }
347        third_cbor
348    } else {
349        real_cbor
350    };
351
352    let mut idx_frame = Vec::new();
353    write_frame(
354        &mut idx_frame,
355        FrameType::HeaderIndex,
356        1,
357        0,
358        &final_cbor,
359        true,
360    );
361    Ok(Some(idx_frame))
362}
363
364/// Compute byte offsets for each object frame, accounting for 8-byte alignment.
365fn compute_object_offsets(start: usize, object_frames: &[Vec<u8>]) -> Vec<u64> {
366    let mut offsets = Vec::with_capacity(object_frames.len());
367    let mut cursor = start;
368    for frame in object_frames {
369        offsets.push(cursor as u64);
370        cursor += frame.len();
371        cursor = (cursor + 7) & !7;
372    }
373    offsets
374}
375
376/// Compute message flags from the presence of optional frames.
377fn compute_message_flags(has_index: bool, has_hashes: bool) -> MessageFlags {
378    let mut flags = MessageFlags::default();
379    flags.set(MessageFlags::HEADER_METADATA);
380    if has_index {
381        flags.set(MessageFlags::HEADER_INDEX);
382    }
383    if has_hashes {
384        flags.set(MessageFlags::HEADER_HASHES);
385    }
386    flags
387}
388
389/// Assemble the final message buffer from pre-computed components.
390fn assemble_message(
391    flags: MessageFlags,
392    meta_cbor: &[u8],
393    index_frame_bytes: Option<&[u8]>,
394    hash_cbor: Option<&[u8]>,
395    object_frames: &[Vec<u8>],
396) -> Vec<u8> {
397    let mut out = Vec::new();
398
399    // Preamble placeholder (patched after we know total_length)
400    let preamble_pos = out.len();
401    out.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
402
403    // Header metadata frame
404    write_frame(&mut out, FrameType::HeaderMetadata, 1, 0, meta_cbor, true);
405
406    // Header index frame (between metadata and hash, per spec ordering)
407    if let Some(idx_bytes) = index_frame_bytes {
408        out.extend_from_slice(idx_bytes);
409    }
410
411    // Header hash frame
412    if let Some(h_cbor) = hash_cbor {
413        write_frame(&mut out, FrameType::HeaderHash, 1, 0, h_cbor, true);
414    }
415
416    // Data object frames with inter-frame alignment
417    for (i, frame) in object_frames.iter().enumerate() {
418        out.extend_from_slice(frame);
419        if i + 1 < object_frames.len() {
420            let pad = (8 - (out.len() % 8)) % 8;
421            out.extend(std::iter::repeat_n(0u8, pad));
422        }
423    }
424
425    // Postamble (no footer frames in buffered mode)
426    let postamble_offset = out.len();
427    let postamble = Postamble {
428        first_footer_offset: postamble_offset as u64,
429    };
430    postamble.write_to(&mut out);
431
432    let total_length = out.len() as u64;
433
434    // Patch the preamble with the real values
435    let preamble = Preamble {
436        version: 2,
437        flags,
438        reserved: 0,
439        total_length,
440    };
441    let mut preamble_bytes = Vec::new();
442    preamble.write_to(&mut preamble_bytes);
443    out[preamble_pos..preamble_pos + PREAMBLE_SIZE].copy_from_slice(&preamble_bytes);
444
445    out
446}
447
448/// Encode a complete message in buffered mode.
449///
450/// All objects are known upfront. Header contains metadata + index + hashes.
451/// Footer has only the postamble (no footer frames).
452///
453/// Strategy: build the message in two passes.
454/// Pass 1: serialize all pieces, compute sizes/offsets.
455/// Pass 2: assemble into final buffer.
456pub fn encode_message(global_meta: &GlobalMetadata, objects: &[EncodedObject]) -> Result<Vec<u8>> {
457    // Serialize metadata CBOR
458    let meta_cbor = metadata::global_metadata_to_cbor(global_meta)?;
459
460    // Pre-encode all data object frames
461    let mut object_frames: Vec<Vec<u8>> = Vec::with_capacity(objects.len());
462    for obj in objects {
463        let frame = encode_data_object_frame(&obj.descriptor, &obj.encoded_payload, false)?;
464        object_frames.push(frame);
465    }
466
467    // Build hash frame CBOR (if any objects have hashes)
468    let hash_cbor = build_hash_frame_cbor(objects)?;
469
470    // Measure header size without index to feed the two-pass index builder
471    let mut header_no_index = Vec::new();
472    header_no_index.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
473    write_frame(
474        &mut header_no_index,
475        FrameType::HeaderMetadata,
476        1,
477        0,
478        &meta_cbor,
479        true,
480    );
481    if let Some(ref h_cbor) = hash_cbor {
482        write_frame(
483            &mut header_no_index,
484            FrameType::HeaderHash,
485            1,
486            0,
487            h_cbor,
488            true,
489        );
490    }
491
492    // Two-pass index construction
493    let index_frame_bytes = build_index_frame(header_no_index.len(), &object_frames)?;
494
495    // Compute flags and assemble
496    let flags = compute_message_flags(index_frame_bytes.is_some(), hash_cbor.is_some());
497
498    Ok(assemble_message(
499        flags,
500        &meta_cbor,
501        index_frame_bytes.as_deref(),
502        hash_cbor.as_deref(),
503        &object_frames,
504    ))
505}
506
507// ── Message-level decode ─────────────────────────────────────────────────────
508
509/// A decoded message with all components.
510#[derive(Debug)]
511pub struct DecodedMessage<'a> {
512    pub preamble: Preamble,
513    pub global_metadata: GlobalMetadata,
514    pub index: Option<IndexFrame>,
515    pub hash_frame: Option<HashFrame>,
516    /// (descriptor, payload_slice, frame_offset_in_message)
517    pub objects: Vec<(DataObjectDescriptor, &'a [u8], usize)>,
518    /// Per-object preceder metadata, parallel to `objects`.
519    /// `Some(map)` if a PrecederMetadata frame preceded that object.
520    /// After decode, these entries are merged into `global_metadata.base`
521    /// (preceder wins over footer entries for the same object index).
522    pub preceder_payloads: Vec<Option<BTreeMap<String, ciborium::Value>>>,
523}
524
525/// Decode phase tracks expected frame ordering.
526/// Valid order: Headers → DataObjects → Footers.
527#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
528enum DecodePhase {
529    Headers = 0,
530    DataObjects = 1,
531    Footers = 2,
532}
533
534/// Returns the decode phase a frame type belongs to.
535fn frame_phase(ft: FrameType) -> DecodePhase {
536    match ft {
537        FrameType::HeaderMetadata | FrameType::HeaderIndex | FrameType::HeaderHash => {
538            DecodePhase::Headers
539        }
540        // PrecederMetadata lives alongside DataObject frames — it must appear
541        // immediately before the DataObject it describes, within the data phase.
542        FrameType::DataObject | FrameType::PrecederMetadata => DecodePhase::DataObjects,
543        FrameType::FooterHash | FrameType::FooterIndex | FrameType::FooterMetadata => {
544            DecodePhase::Footers
545        }
546    }
547}
548
549/// Decode a complete message from a buffer.
550///
551/// Validates that frames appear in the expected order:
552/// header frames first, then data objects, then footer frames.
553pub fn decode_message(buf: &[u8]) -> Result<DecodedMessage<'_>> {
554    let preamble = Preamble::read_from(buf)?;
555
556    // Validate total_length if non-zero
557    if preamble.total_length > 0 {
558        let total_len = usize::try_from(preamble.total_length).map_err(|_| {
559            TensogramError::Framing(format!(
560                "total_length {} overflows usize",
561                preamble.total_length
562            ))
563        })?;
564        if total_len > buf.len() {
565            return Err(TensogramError::Framing(format!(
566                "total_length {} exceeds buffer size {}",
567                preamble.total_length,
568                buf.len()
569            )));
570        }
571
572        // Validate postamble
573        let pa_offset = total_len - POSTAMBLE_SIZE;
574        let _postamble = Postamble::read_from(&buf[pa_offset..])?;
575    }
576
577    let mut pos = PREAMBLE_SIZE;
578    let msg_end = if preamble.total_length > 0 {
579        // Safe: validated above that total_length fits in usize
580        preamble.total_length as usize - POSTAMBLE_SIZE
581    } else {
582        buf.len().checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
583            TensogramError::Framing(format!(
584                "buffer too short for postamble: {} < {POSTAMBLE_SIZE}",
585                buf.len()
586            ))
587        })?
588    };
589
590    let mut global_metadata: Option<GlobalMetadata> = None;
591    let mut index: Option<IndexFrame> = None;
592    let mut hash_frame: Option<HashFrame> = None;
593    let mut objects: Vec<(DataObjectDescriptor, &[u8], usize)> = Vec::new();
594    let mut preceder_payloads: Vec<Option<BTreeMap<String, ciborium::Value>>> = Vec::new();
595    let mut current_phase = DecodePhase::Headers;
596
597    // Tracks a PrecederMetadata payload waiting for its DataObject.
598    // Two consecutive preceders (without an intervening DataObject) are invalid.
599    let mut pending_preceder: Option<BTreeMap<String, ciborium::Value>> = None;
600
601    // Parse frames sequentially
602    while pos < msg_end {
603        // Check if we've reached a frame header or the postamble
604        if pos + 2 > buf.len() {
605            break;
606        }
607
608        // Skip padding bytes (zeros between ENDF and next FR)
609        if &buf[pos..pos + 2] != b"FR" {
610            pos += 1;
611            continue;
612        }
613
614        // Peek at frame type
615        if pos + FRAME_HEADER_SIZE > buf.len() {
616            break;
617        }
618        let fh = FrameHeader::read_from(&buf[pos..])?;
619
620        // Validate frame ordering
621        let phase = frame_phase(fh.frame_type);
622        if phase < current_phase {
623            return Err(TensogramError::Framing(format!(
624                "unexpected {:?} frame after {:?} phase — frames must appear in order: headers, data objects, footers",
625                fh.frame_type, current_phase
626            )));
627        }
628
629        // A pending preceder must be followed by a DataObject, not a footer
630        // or another preceder.
631        if pending_preceder.is_some() && fh.frame_type != FrameType::DataObject {
632            return Err(TensogramError::Framing(format!(
633                "PrecederMetadata must be followed by a DataObject frame, got {:?}",
634                fh.frame_type
635            )));
636        }
637
638        current_phase = phase;
639        let frame_start = pos;
640
641        match fh.frame_type {
642            FrameType::HeaderMetadata | FrameType::FooterMetadata => {
643                let (_, payload, consumed) = read_frame(&buf[pos..])?;
644                let meta = metadata::cbor_to_global_metadata(payload)?;
645                global_metadata = Some(meta);
646                pos += consumed;
647            }
648            FrameType::HeaderIndex | FrameType::FooterIndex => {
649                let (_, payload, consumed) = read_frame(&buf[pos..])?;
650                let idx = metadata::cbor_to_index(payload)?;
651                index = Some(idx);
652                pos += consumed;
653            }
654            FrameType::HeaderHash | FrameType::FooterHash => {
655                let (_, payload, consumed) = read_frame(&buf[pos..])?;
656                let hf = metadata::cbor_to_hash_frame(payload)?;
657                hash_frame = Some(hf);
658                pos += consumed;
659            }
660            FrameType::PrecederMetadata => {
661                let (_, payload, consumed) = read_frame(&buf[pos..])?;
662                let preceder_meta = metadata::cbor_to_global_metadata(payload)?;
663                // Preceder base must have exactly one entry
664                let n = preceder_meta.base.len();
665                if n != 1 {
666                    return Err(TensogramError::Metadata(format!(
667                        "PrecederMetadata base must have exactly 1 entry, got {n}"
668                    )));
669                }
670                // Safe: we just checked len == 1
671                let mut entry = preceder_meta.base.into_iter().next().unwrap_or_default();
672                // Strip _reserved_ from preceder — it's library-managed and
673                // would collide with the encoder's auto-populated _reserved_.tensor.
674                // The decoder is permissive: strip rather than reject, since the
675                // data may come from a non-standard producer.
676                entry.remove(RESERVED_KEY);
677                pending_preceder = Some(entry);
678                pos += consumed;
679            }
680            FrameType::DataObject => {
681                let (desc, payload, consumed) = decode_data_object_frame(&buf[pos..])?;
682                objects.push((desc, payload, frame_start));
683                // Consume the pending preceder (if any) for this object
684                preceder_payloads.push(pending_preceder.take());
685                pos += consumed;
686            }
687        }
688    }
689
690    // A preceder at the end of the stream with no following DataObject is invalid
691    if pending_preceder.is_some() {
692        return Err(TensogramError::Framing(
693            "dangling PrecederMetadata: no DataObject frame followed".to_string(),
694        ));
695    }
696
697    let mut global_metadata = global_metadata.ok_or_else(|| {
698        TensogramError::Metadata("no metadata frame found in message".to_string())
699    })?;
700
701    // Merge preceder payloads into global_metadata.base (preceder wins).
702    // Key-level merge: preceder keys override footer keys on conflict,
703    // but footer-only keys (e.g. _reserved_.tensor) are preserved when
704    // absent from the preceder.
705    let obj_count = objects.len();
706    if global_metadata.base.len() > obj_count {
707        return Err(TensogramError::Metadata(format!(
708            "metadata base has {} entries but message contains {} objects",
709            global_metadata.base.len(),
710            obj_count
711        )));
712    }
713    if global_metadata.base.len() < obj_count {
714        global_metadata.base.resize_with(obj_count, BTreeMap::new);
715    }
716    for (i, preceder) in preceder_payloads.iter().enumerate() {
717        if let Some(prec_map) = preceder {
718            for (k, v) in prec_map {
719                global_metadata.base[i].insert(k.clone(), v.clone());
720            }
721        }
722    }
723
724    Ok(DecodedMessage {
725        preamble,
726        global_metadata,
727        index,
728        hash_frame,
729        objects,
730        preceder_payloads,
731    })
732}
733
734/// Decode only global metadata from a message, skipping data frames.
735pub fn decode_metadata_only(buf: &[u8]) -> Result<GlobalMetadata> {
736    let preamble = Preamble::read_from(buf)?;
737
738    let mut pos = PREAMBLE_SIZE;
739    let msg_end = if preamble.total_length > 0 {
740        // Safe: on 64-bit usize == u64; on 32-bit the message would already
741        // fail to fit in memory, so this truncation is acceptable.
742        let total_len = usize::try_from(preamble.total_length).map_err(|_| {
743            TensogramError::Framing(format!(
744                "total_length {} overflows usize",
745                preamble.total_length
746            ))
747        })?;
748        total_len.checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
749            TensogramError::Framing(format!(
750                "total_length {} too small for postamble",
751                preamble.total_length
752            ))
753        })?
754    } else {
755        buf.len().checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
756            TensogramError::Framing(format!(
757                "buffer too short for postamble: {} < {POSTAMBLE_SIZE}",
758                buf.len()
759            ))
760        })?
761    };
762
763    while pos < msg_end {
764        if pos + 2 > buf.len() {
765            break;
766        }
767        if &buf[pos..pos + 2] != b"FR" {
768            pos += 1;
769            continue;
770        }
771        if pos + FRAME_HEADER_SIZE > buf.len() {
772            break;
773        }
774        let fh = FrameHeader::read_from(&buf[pos..])?;
775        match fh.frame_type {
776            FrameType::HeaderMetadata | FrameType::FooterMetadata => {
777                let (_, payload, _) = read_frame(&buf[pos..])?;
778                return metadata::cbor_to_global_metadata(payload);
779            }
780            _ => {
781                // Skip this frame
782                let frame_total = usize::try_from(fh.total_length).map_err(|_| {
783                    TensogramError::Framing(format!(
784                        "frame total_length {} overflows usize",
785                        fh.total_length
786                    ))
787                })?;
788                pos += frame_total;
789                pos = (pos + 7) & !7; // align
790            }
791        }
792    }
793
794    Err(TensogramError::Metadata(
795        "no metadata frame found in message".to_string(),
796    ))
797}
798
799// ── Scan ─────────────────────────────────────────────────────────────────────
800
801/// Scan a multi-message buffer for message boundaries.
802/// Returns (offset, length) of each message found.
803#[tracing::instrument(skip(buf), fields(buf_len = buf.len()))]
804pub fn scan(buf: &[u8]) -> Vec<(usize, usize)> {
805    let mut messages = Vec::new();
806    let mut pos = 0;
807
808    while pos + PREAMBLE_SIZE + POSTAMBLE_SIZE <= buf.len() {
809        if &buf[pos..pos + MAGIC.len()] == MAGIC {
810            // Try to read preamble
811            if let Ok(preamble) = Preamble::read_from(&buf[pos..]) {
812                if preamble.total_length > 0 {
813                    let Ok(total) = usize::try_from(preamble.total_length) else {
814                        pos += 1;
815                        continue;
816                    };
817                    if pos + total <= buf.len() {
818                        // Validate end magic
819                        let end_magic_offset = pos + total - 8;
820                        if &buf[end_magic_offset..end_magic_offset + 8] == crate::wire::END_MAGIC {
821                            messages.push((pos, total));
822                            pos += total;
823                            continue;
824                        }
825                    }
826                } else {
827                    // Streaming mode: scan forward to find end magic
828                    // Look for the next 39277777 pattern
829                    let mut end_pos = pos + PREAMBLE_SIZE;
830                    let mut found = false;
831                    while end_pos + 8 <= buf.len() {
832                        if &buf[end_pos..end_pos + 8] == crate::wire::END_MAGIC {
833                            let msg_len = end_pos + 8 - pos;
834                            messages.push((pos, msg_len));
835                            pos = end_pos + 8;
836                            found = true;
837                            break;
838                        }
839                        end_pos += 1;
840                    }
841                    if found {
842                        continue;
843                    }
844                }
845            }
846        }
847        pos += 1;
848    }
849
850    messages
851}
852
853// ── File-based scan ──────────────────────────────────────────────────────────
854
855/// Scan a file for message boundaries without loading the entire file into memory.
856///
857/// Reads preamble-sized chunks and seeks forward, avoiding full-file reads
858/// for large files. Returns the same `(offset, length)` pairs as `scan()`.
859pub fn scan_file(file: &mut (impl std::io::Read + std::io::Seek)) -> Result<Vec<(usize, usize)>> {
860    use std::io::SeekFrom;
861
862    let file_len_u64 = file.seek(SeekFrom::End(0))?;
863    let file_len = usize::try_from(file_len_u64).map_err(|_| {
864        TensogramError::Framing(format!("file size {file_len_u64} overflows usize"))
865    })?;
866    file.seek(SeekFrom::Start(0))?;
867
868    let mut messages = Vec::new();
869    let mut pos: usize = 0;
870
871    let mut preamble_buf = [0u8; PREAMBLE_SIZE];
872
873    while pos + PREAMBLE_SIZE + POSTAMBLE_SIZE <= file_len {
874        file.seek(SeekFrom::Start(pos as u64))?;
875        if file.read_exact(&mut preamble_buf).is_err() {
876            break;
877        }
878
879        if &preamble_buf[..MAGIC.len()] == MAGIC
880            && let Ok(preamble) = Preamble::read_from(&preamble_buf)
881        {
882            if preamble.total_length > 0 {
883                let Ok(total) = usize::try_from(preamble.total_length) else {
884                    pos += 1;
885                    continue;
886                };
887                if pos + total <= file_len {
888                    // Read end magic to validate
889                    let end_magic_offset = pos + total - 8;
890                    file.seek(SeekFrom::Start(end_magic_offset as u64))?;
891                    let mut end_buf = [0u8; 8];
892                    if file.read_exact(&mut end_buf).is_ok() && &end_buf == crate::wire::END_MAGIC {
893                        messages.push((pos, total));
894                        pos += total;
895                        continue;
896                    }
897                }
898            } else {
899                // Streaming mode: scan forward for END_MAGIC
900                // Read in chunks to find the terminator
901                let mut search_pos = pos + PREAMBLE_SIZE;
902                let mut found = false;
903                let chunk_size = 4096;
904                let mut chunk = vec![0u8; chunk_size];
905
906                while search_pos + 8 <= file_len {
907                    file.seek(SeekFrom::Start(search_pos as u64))?;
908                    let to_read = (file_len - search_pos).min(chunk_size);
909                    let buf = &mut chunk[..to_read];
910                    if file.read_exact(buf).is_err() {
911                        break;
912                    }
913
914                    // Search for END_MAGIC in this chunk
915                    for i in 0..to_read.saturating_sub(7) {
916                        if &buf[i..i + 8] == crate::wire::END_MAGIC {
917                            let end_pos = search_pos + i;
918                            let msg_len = end_pos + 8 - pos;
919                            messages.push((pos, msg_len));
920                            pos = end_pos + 8;
921                            found = true;
922                            break;
923                        }
924                    }
925                    if found {
926                        break;
927                    }
928                    // Overlap by 7 bytes to catch END_MAGIC spanning chunks
929                    search_pos += to_read.saturating_sub(7);
930                }
931                if found {
932                    continue;
933                }
934            }
935        }
936        pos += 1;
937    }
938
939    Ok(messages)
940}
941
942// ── Helpers ──────────────────────────────────────────────────────────────────
943
944/// Total frame size: header + payload + ENDF
945fn frame_total_size(payload_len: usize) -> usize {
946    FRAME_HEADER_SIZE + payload_len + FRAME_END.len()
947}
948
949/// Frame total size aligned to 8 bytes
950fn aligned_frame_total_size(payload_len: usize) -> usize {
951    let raw = frame_total_size(payload_len);
952    (raw + 7) & !7
953}
954
955#[cfg(test)]
956mod tests {
957    use super::*;
958    use crate::dtype::Dtype;
959    use crate::types::ByteOrder;
960    use std::collections::BTreeMap;
961
962    fn make_global_meta() -> GlobalMetadata {
963        GlobalMetadata {
964            version: 2,
965            ..Default::default()
966        }
967    }
968
969    fn make_descriptor(shape: Vec<u64>) -> DataObjectDescriptor {
970        let strides = {
971            let mut s = vec![1u64; shape.len()];
972            for i in (0..shape.len().saturating_sub(1)).rev() {
973                s[i] = s[i + 1] * shape[i + 1];
974            }
975            s
976        };
977        DataObjectDescriptor {
978            obj_type: "ntensor".to_string(),
979            ndim: shape.len() as u64,
980            shape,
981            strides,
982            dtype: Dtype::Float32,
983            byte_order: ByteOrder::Little,
984            encoding: "none".to_string(),
985            filter: "none".to_string(),
986            compression: "none".to_string(),
987            params: BTreeMap::new(),
988            hash: None,
989        }
990    }
991
992    #[test]
993    fn test_data_object_frame_round_trip_cbor_after() {
994        let desc = make_descriptor(vec![4]);
995        let payload = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
996
997        let frame = encode_data_object_frame(&desc, &payload, false).unwrap();
998
999        let (decoded_desc, decoded_payload, consumed) = decode_data_object_frame(&frame).unwrap();
1000        assert_eq!(decoded_desc.shape, vec![4]);
1001        assert_eq!(decoded_desc.dtype, Dtype::Float32);
1002        assert_eq!(decoded_payload, &payload[..]);
1003        assert!(consumed >= frame.len());
1004    }
1005
1006    #[test]
1007    fn test_data_object_frame_round_trip_cbor_before() {
1008        let desc = make_descriptor(vec![2, 3]);
1009        let payload = vec![0xABu8; 24]; // 2*3*4 = 24 bytes for float32
1010
1011        let frame = encode_data_object_frame(&desc, &payload, true).unwrap();
1012
1013        let (decoded_desc, decoded_payload, _) = decode_data_object_frame(&frame).unwrap();
1014        assert_eq!(decoded_desc.shape, vec![2, 3]);
1015        assert_eq!(decoded_payload, &payload[..]);
1016    }
1017
1018    #[test]
1019    fn test_empty_message_round_trip() {
1020        let meta = make_global_meta();
1021        let msg = encode_message(&meta, &[]).unwrap();
1022
1023        // Check magic and end magic
1024        assert_eq!(&msg[0..8], MAGIC);
1025        assert_eq!(&msg[msg.len() - 8..], crate::wire::END_MAGIC);
1026
1027        let decoded = decode_message(&msg).unwrap();
1028        assert_eq!(decoded.global_metadata.version, 2);
1029        assert_eq!(decoded.objects.len(), 0);
1030        assert!(decoded.index.is_none()); // no objects = no index
1031    }
1032
1033    #[test]
1034    fn test_single_object_message_round_trip() {
1035        let meta = make_global_meta();
1036        let desc = make_descriptor(vec![4]);
1037        let payload = vec![42u8; 16]; // 4 * float32
1038
1039        let objects = vec![EncodedObject {
1040            descriptor: desc,
1041            encoded_payload: payload.clone(),
1042        }];
1043
1044        let msg = encode_message(&meta, &objects).unwrap();
1045        let decoded = decode_message(&msg).unwrap();
1046
1047        assert_eq!(decoded.global_metadata.version, 2);
1048        assert_eq!(decoded.objects.len(), 1);
1049        assert_eq!(decoded.objects[0].0.shape, vec![4]);
1050        assert_eq!(decoded.objects[0].1, &payload[..]);
1051        assert!(decoded.index.is_some());
1052        assert_eq!(decoded.index.as_ref().unwrap().object_count, 1);
1053    }
1054
1055    #[test]
1056    fn test_multi_object_message_round_trip() {
1057        let meta = make_global_meta();
1058        let desc0 = make_descriptor(vec![4]);
1059        let desc1 = make_descriptor(vec![2, 3]);
1060        let payload0 = vec![10u8; 16];
1061        let payload1 = vec![20u8; 24];
1062
1063        let objects = vec![
1064            EncodedObject {
1065                descriptor: desc0,
1066                encoded_payload: payload0.clone(),
1067            },
1068            EncodedObject {
1069                descriptor: desc1,
1070                encoded_payload: payload1.clone(),
1071            },
1072        ];
1073
1074        let msg = encode_message(&meta, &objects).unwrap();
1075        let decoded = decode_message(&msg).unwrap();
1076
1077        assert_eq!(decoded.objects.len(), 2);
1078        assert_eq!(decoded.objects[0].0.shape, vec![4]);
1079        assert_eq!(decoded.objects[0].1, &payload0[..]);
1080        assert_eq!(decoded.objects[1].0.shape, vec![2, 3]);
1081        assert_eq!(decoded.objects[1].1, &payload1[..]);
1082
1083        let idx = decoded.index.as_ref().unwrap();
1084        assert_eq!(idx.object_count, 2);
1085        assert_eq!(idx.offsets.len(), 2);
1086    }
1087
1088    #[test]
1089    fn test_scan_multi_message() {
1090        let meta = make_global_meta();
1091        let msg1 = encode_message(
1092            &meta,
1093            &[EncodedObject {
1094                descriptor: make_descriptor(vec![4]),
1095                encoded_payload: vec![1u8; 16],
1096            }],
1097        )
1098        .unwrap();
1099        let msg2 = encode_message(
1100            &meta,
1101            &[EncodedObject {
1102                descriptor: make_descriptor(vec![2]),
1103                encoded_payload: vec![2u8; 8],
1104            }],
1105        )
1106        .unwrap();
1107
1108        let mut buf = msg1.clone();
1109        buf.extend_from_slice(&msg2);
1110
1111        let offsets = scan(&buf);
1112        assert_eq!(offsets.len(), 2);
1113        assert_eq!(offsets[0], (0, msg1.len()));
1114        assert_eq!(offsets[1], (msg1.len(), msg2.len()));
1115    }
1116
1117    #[test]
1118    fn test_scan_with_garbage() {
1119        let meta = make_global_meta();
1120        let msg = encode_message(
1121            &meta,
1122            &[EncodedObject {
1123                descriptor: make_descriptor(vec![4]),
1124                encoded_payload: vec![1u8; 16],
1125            }],
1126        )
1127        .unwrap();
1128
1129        let mut buf = vec![0xFF; 10];
1130        buf.extend_from_slice(&msg);
1131        buf.extend_from_slice(&[0xAA; 5]);
1132
1133        let offsets = scan(&buf);
1134        assert_eq!(offsets.len(), 1);
1135        assert_eq!(offsets[0], (10, msg.len()));
1136    }
1137
1138    #[test]
1139    fn test_decode_metadata_only() {
1140        let mut meta = make_global_meta();
1141        meta.extra.insert(
1142            "test_key".to_string(),
1143            ciborium::Value::Text("test_value".to_string()),
1144        );
1145
1146        let msg = encode_message(
1147            &meta,
1148            &[EncodedObject {
1149                descriptor: make_descriptor(vec![4]),
1150                encoded_payload: vec![0u8; 16],
1151            }],
1152        )
1153        .unwrap();
1154
1155        let decoded_meta = decode_metadata_only(&msg).unwrap();
1156        assert_eq!(decoded_meta.version, 2);
1157        // _extra_ is the CBOR key name (via serde rename)
1158        assert!(decoded_meta.extra.contains_key("test_key"));
1159    }
1160
1161    // ── Phase 2: Frame ordering validation tests ─────────────────────────
1162
1163    /// Helper: build a raw message from manually ordered frames.
1164    fn build_raw_message(frames: &[(&[u8], FrameType)]) -> Vec<u8> {
1165        let meta = make_global_meta();
1166        let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1167        let desc = make_descriptor(vec![4]);
1168        let payload = vec![0u8; 16];
1169
1170        let mut out = Vec::new();
1171        // Preamble placeholder
1172        out.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
1173
1174        for (content, frame_type) in frames {
1175            match frame_type {
1176                FrameType::DataObject => {
1177                    let frame = encode_data_object_frame(&desc, &payload, false).unwrap();
1178                    out.extend_from_slice(&frame);
1179                    let pad = (8 - (out.len() % 8)) % 8;
1180                    out.extend(std::iter::repeat_n(0u8, pad));
1181                }
1182                _ => {
1183                    let data = if content.is_empty() {
1184                        &meta_cbor
1185                    } else {
1186                        *content
1187                    };
1188                    write_frame(&mut out, *frame_type, 1, 0, data, true);
1189                }
1190            }
1191        }
1192
1193        // Postamble
1194        let postamble_offset = out.len();
1195        let postamble = Postamble {
1196            first_footer_offset: postamble_offset as u64,
1197        };
1198        postamble.write_to(&mut out);
1199
1200        let total_length = out.len() as u64;
1201        let mut flags = MessageFlags::default();
1202        flags.set(MessageFlags::HEADER_METADATA);
1203        let preamble = Preamble {
1204            version: 2,
1205            flags,
1206            reserved: 0,
1207            total_length,
1208        };
1209        let mut preamble_bytes = Vec::new();
1210        preamble.write_to(&mut preamble_bytes);
1211        out[0..PREAMBLE_SIZE].copy_from_slice(&preamble_bytes);
1212
1213        out
1214    }
1215
1216    #[test]
1217    fn test_decode_rejects_header_after_data_object() {
1218        // DataObject before HeaderMetadata — should fail
1219        let msg = build_raw_message(&[
1220            (&[], FrameType::DataObject),
1221            (&[], FrameType::HeaderMetadata),
1222        ]);
1223        let result = decode_message(&msg);
1224        assert!(
1225            result.is_err(),
1226            "header frame after data object should be rejected"
1227        );
1228        let err = result.unwrap_err().to_string();
1229        assert!(
1230            err.contains("order") || err.contains("unexpected"),
1231            "error should mention ordering: {err}"
1232        );
1233    }
1234
1235    #[test]
1236    fn test_decode_rejects_data_object_after_footer() {
1237        // HeaderMetadata, FooterHash, then DataObject — should fail
1238        let meta = make_global_meta();
1239        let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1240        let hf = HashFrame {
1241            object_count: 0,
1242            hash_type: "xxh3".to_string(),
1243            hashes: vec![],
1244        };
1245        let hash_cbor = crate::metadata::hash_frame_to_cbor(&hf).unwrap();
1246
1247        let msg = build_raw_message(&[
1248            (&meta_cbor, FrameType::HeaderMetadata),
1249            (&hash_cbor, FrameType::FooterHash),
1250            (&[], FrameType::DataObject),
1251        ]);
1252        let result = decode_message(&msg);
1253        assert!(
1254            result.is_err(),
1255            "data object after footer should be rejected"
1256        );
1257    }
1258
1259    #[test]
1260    fn test_decode_accepts_valid_frame_order() {
1261        // HeaderMetadata → DataObject — canonical order
1262        let msg = build_raw_message(&[
1263            (&[], FrameType::HeaderMetadata),
1264            (&[], FrameType::DataObject),
1265        ]);
1266        let result = decode_message(&msg);
1267        assert!(
1268            result.is_ok(),
1269            "valid frame order should be accepted: {:?}",
1270            result.err()
1271        );
1272    }
1273
1274    // ── Phase 3: Streaming scan_file tests ─────────────────────────────
1275
1276    #[test]
1277    fn test_scan_file_matches_scan_buffer() {
1278        let meta = make_global_meta();
1279        let msg1 = encode_message(
1280            &meta,
1281            &[EncodedObject {
1282                descriptor: make_descriptor(vec![4]),
1283                encoded_payload: vec![1u8; 16],
1284            }],
1285        )
1286        .unwrap();
1287        let msg2 = encode_message(
1288            &meta,
1289            &[EncodedObject {
1290                descriptor: make_descriptor(vec![2]),
1291                encoded_payload: vec![2u8; 8],
1292            }],
1293        )
1294        .unwrap();
1295
1296        let mut buf = msg1.clone();
1297        buf.extend_from_slice(&msg2);
1298
1299        // Compare scan() (buffer) vs scan_file() (seeking)
1300        let buffer_offsets = scan(&buf);
1301
1302        let mut cursor = std::io::Cursor::new(&buf);
1303        let file_offsets = scan_file(&mut cursor).unwrap();
1304
1305        assert_eq!(buffer_offsets, file_offsets);
1306    }
1307
1308    #[test]
1309    fn test_scan_file_with_garbage() {
1310        let meta = make_global_meta();
1311        let msg = encode_message(
1312            &meta,
1313            &[EncodedObject {
1314                descriptor: make_descriptor(vec![4]),
1315                encoded_payload: vec![1u8; 16],
1316            }],
1317        )
1318        .unwrap();
1319
1320        let mut buf = vec![0xFF; 10];
1321        buf.extend_from_slice(&msg);
1322        buf.extend_from_slice(&[0xAA; 5]);
1323
1324        let mut cursor = std::io::Cursor::new(&buf);
1325        let offsets = scan_file(&mut cursor).unwrap();
1326        assert_eq!(offsets.len(), 1);
1327        assert_eq!(offsets[0], (10, msg.len()));
1328    }
1329
1330    #[test]
1331    fn test_scan_file_empty() {
1332        let buf: Vec<u8> = Vec::new();
1333        let mut cursor = std::io::Cursor::new(&buf);
1334        let offsets = scan_file(&mut cursor).unwrap();
1335        assert!(offsets.is_empty());
1336    }
1337
1338    #[test]
1339    fn test_decode_accepts_footer_after_data_objects() {
1340        // HeaderMetadata → DataObject → FooterMetadata — valid streaming layout
1341        let meta = make_global_meta();
1342        let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1343
1344        let msg = build_raw_message(&[
1345            (&meta_cbor, FrameType::HeaderMetadata),
1346            (&[], FrameType::DataObject),
1347            (&meta_cbor, FrameType::FooterMetadata),
1348        ]);
1349        let result = decode_message(&msg);
1350        assert!(
1351            result.is_ok(),
1352            "footer after data objects should be accepted: {:?}",
1353            result.err()
1354        );
1355    }
1356
1357    // ── Phase 4: PrecederMetadata frame tests ────────────────────────────
1358
1359    /// Helper: build a preceder metadata CBOR blob with a single base entry.
1360    fn make_preceder_cbor(entries: std::collections::BTreeMap<String, ciborium::Value>) -> Vec<u8> {
1361        let meta = GlobalMetadata {
1362            version: 2,
1363            base: vec![entries],
1364            ..Default::default()
1365        };
1366        crate::metadata::global_metadata_to_cbor(&meta).unwrap()
1367    }
1368
1369    #[test]
1370    fn test_decode_preceder_before_data_object() {
1371        let mut entries = BTreeMap::new();
1372        entries.insert(
1373            "mars".to_string(),
1374            ciborium::Value::Map(vec![(
1375                ciborium::Value::Text("param".to_string()),
1376                ciborium::Value::Text("2t".to_string()),
1377            )]),
1378        );
1379        let preceder_cbor = make_preceder_cbor(entries);
1380
1381        let msg = build_raw_message(&[
1382            (&[], FrameType::HeaderMetadata),
1383            (&preceder_cbor, FrameType::PrecederMetadata),
1384            (&[], FrameType::DataObject),
1385        ]);
1386        let decoded = decode_message(&msg).unwrap();
1387
1388        assert_eq!(decoded.objects.len(), 1);
1389        assert_eq!(decoded.preceder_payloads.len(), 1);
1390        assert!(decoded.preceder_payloads[0].is_some());
1391
1392        // Verify preceder merged into global_metadata.base
1393        assert_eq!(decoded.global_metadata.base.len(), 1);
1394        assert!(decoded.global_metadata.base[0].contains_key("mars"));
1395    }
1396
1397    #[test]
1398    fn test_decode_consecutive_preceders_rejected() {
1399        let entries = BTreeMap::new();
1400        let preceder_cbor = make_preceder_cbor(entries);
1401
1402        let msg = build_raw_message(&[
1403            (&[], FrameType::HeaderMetadata),
1404            (&preceder_cbor, FrameType::PrecederMetadata),
1405            (&preceder_cbor, FrameType::PrecederMetadata),
1406            (&[], FrameType::DataObject),
1407        ]);
1408        let result = decode_message(&msg);
1409        assert!(result.is_err(), "consecutive preceders should be rejected");
1410        let err = result.unwrap_err().to_string();
1411        assert!(
1412            err.contains("PrecederMetadata") && err.contains("DataObject"),
1413            "error should explain preceder must precede DataObject: {err}"
1414        );
1415    }
1416
1417    #[test]
1418    fn test_decode_dangling_preceder_rejected() {
1419        let entries = BTreeMap::new();
1420        let preceder_cbor = make_preceder_cbor(entries);
1421
1422        // Preceder at end of message without a following DataObject
1423        let msg = build_raw_message(&[
1424            (&[], FrameType::HeaderMetadata),
1425            (&[], FrameType::DataObject),
1426            (&preceder_cbor, FrameType::PrecederMetadata),
1427        ]);
1428        let result = decode_message(&msg);
1429        assert!(result.is_err(), "dangling preceder should be rejected");
1430        let err = result.unwrap_err().to_string();
1431        assert!(
1432            err.contains("dangling"),
1433            "error should mention dangling: {err}"
1434        );
1435    }
1436
1437    #[test]
1438    fn test_decode_preceder_with_multiple_base_entries_rejected() {
1439        // Preceder with 2 base entries — should be rejected (must have exactly 1)
1440        let meta = GlobalMetadata {
1441            version: 2,
1442            base: vec![BTreeMap::new(), BTreeMap::new()],
1443            ..Default::default()
1444        };
1445        let bad_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1446
1447        let msg = build_raw_message(&[
1448            (&[], FrameType::HeaderMetadata),
1449            (&bad_cbor, FrameType::PrecederMetadata),
1450            (&[], FrameType::DataObject),
1451        ]);
1452        let result = decode_message(&msg);
1453        assert!(
1454            result.is_err(),
1455            "preceder with 2 payload entries should be rejected"
1456        );
1457        let err = result.unwrap_err().to_string();
1458        assert!(
1459            err.contains("exactly 1"),
1460            "error should mention 'exactly 1': {err}"
1461        );
1462    }
1463
1464    #[test]
1465    fn test_decode_preceder_with_zero_base_entries_rejected() {
1466        // Preceder with 0 base entries — should be rejected
1467        let meta = GlobalMetadata {
1468            version: 2,
1469            base: vec![],
1470            ..Default::default()
1471        };
1472        let bad_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1473
1474        let msg = build_raw_message(&[
1475            (&[], FrameType::HeaderMetadata),
1476            (&bad_cbor, FrameType::PrecederMetadata),
1477            (&[], FrameType::DataObject),
1478        ]);
1479        let result = decode_message(&msg);
1480        assert!(
1481            result.is_err(),
1482            "preceder with 0 payload entries should be rejected"
1483        );
1484        let err = result.unwrap_err().to_string();
1485        assert!(
1486            err.contains("exactly 1") && err.contains("got 0"),
1487            "error should mention 'exactly 1' and 'got 0': {err}"
1488        );
1489    }
1490
1491    #[test]
1492    fn test_decode_preceder_followed_by_footer_rejected() {
1493        let entries = BTreeMap::new();
1494        let preceder_cbor = make_preceder_cbor(entries);
1495        let meta = make_global_meta();
1496        let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1497
1498        let msg = build_raw_message(&[
1499            (&meta_cbor, FrameType::HeaderMetadata),
1500            (&preceder_cbor, FrameType::PrecederMetadata),
1501            (&meta_cbor, FrameType::FooterMetadata),
1502        ]);
1503        let result = decode_message(&msg);
1504        assert!(
1505            result.is_err(),
1506            "preceder followed by footer should be rejected"
1507        );
1508    }
1509
1510    #[test]
1511    fn test_decode_mixed_preceder_and_no_preceder() {
1512        // Object 0: has preceder, Object 1: no preceder
1513        let mut entries = BTreeMap::new();
1514        entries.insert(
1515            "note".to_string(),
1516            ciborium::Value::Text("from preceder".to_string()),
1517        );
1518        let preceder_cbor = make_preceder_cbor(entries);
1519
1520        let msg = build_raw_message(&[
1521            (&[], FrameType::HeaderMetadata),
1522            (&preceder_cbor, FrameType::PrecederMetadata),
1523            (&[], FrameType::DataObject),
1524            (&[], FrameType::DataObject),
1525        ]);
1526        let decoded = decode_message(&msg).unwrap();
1527
1528        assert_eq!(decoded.objects.len(), 2);
1529        assert_eq!(decoded.preceder_payloads.len(), 2);
1530        assert!(decoded.preceder_payloads[0].is_some());
1531        assert!(decoded.preceder_payloads[1].is_none());
1532
1533        // base[0] should have preceder entry, base[1] should be empty
1534        assert!(decoded.global_metadata.base[0].contains_key("note"));
1535        assert!(!decoded.global_metadata.base[1].contains_key("note"));
1536    }
1537
1538    #[test]
1539    fn test_decode_preceder_wins_over_footer_payload() {
1540        // Build a message where both footer metadata and preceder provide
1541        // payload[0] — preceder should win.
1542        let mut prec_entries = BTreeMap::new();
1543        prec_entries.insert(
1544            "source".to_string(),
1545            ciborium::Value::Text("preceder".to_string()),
1546        );
1547        let preceder_cbor = make_preceder_cbor(prec_entries);
1548
1549        // Footer metadata with different base[0]
1550        let mut footer_base = BTreeMap::new();
1551        footer_base.insert(
1552            "source".to_string(),
1553            ciborium::Value::Text("footer".to_string()),
1554        );
1555        let footer_meta = GlobalMetadata {
1556            version: 2,
1557            base: vec![footer_base],
1558            ..Default::default()
1559        };
1560        let footer_cbor = crate::metadata::global_metadata_to_cbor(&footer_meta).unwrap();
1561
1562        let msg = build_raw_message(&[
1563            (&[], FrameType::HeaderMetadata),
1564            (&preceder_cbor, FrameType::PrecederMetadata),
1565            (&[], FrameType::DataObject),
1566            (&footer_cbor, FrameType::FooterMetadata),
1567        ]);
1568        let decoded = decode_message(&msg).unwrap();
1569
1570        // Footer metadata is parsed last, so global_metadata would have
1571        // footer base. But after merging, preceder wins.
1572        let source = decoded.global_metadata.base[0]
1573            .get("source")
1574            .and_then(|v| match v {
1575                ciborium::Value::Text(s) => Some(s.as_str()),
1576                _ => None,
1577            });
1578        assert_eq!(source, Some("preceder"), "preceder should win over footer");
1579    }
1580
1581    #[test]
1582    fn test_decode_rejects_base_count_exceeding_objects() {
1583        // Footer metadata with 3 base entries but only 1 data object
1584        // should be rejected (base.len > obj_count).
1585        let footer_meta = GlobalMetadata {
1586            version: 2,
1587            base: vec![BTreeMap::new(), BTreeMap::new(), BTreeMap::new()],
1588            ..Default::default()
1589        };
1590        let footer_cbor = crate::metadata::global_metadata_to_cbor(&footer_meta).unwrap();
1591
1592        let msg = build_raw_message(&[
1593            (&footer_cbor, FrameType::HeaderMetadata),
1594            (&[], FrameType::DataObject),
1595        ]);
1596        let result = decode_message(&msg);
1597        assert!(
1598            result.is_err(),
1599            "base with more entries than objects should be rejected"
1600        );
1601        let err = result.unwrap_err().to_string();
1602        assert!(
1603            err.contains("3") && err.contains("1"),
1604            "error should mention counts: {err}"
1605        );
1606    }
1607}