1use std::collections::BTreeMap;
10
11use crate::error::{Result, TensogramError};
12use crate::metadata::{self, RESERVED_KEY};
13use crate::types::{DataObjectDescriptor, GlobalMetadata, HashFrame, IndexFrame};
14use crate::wire::{
15 DATA_OBJECT_FOOTER_SIZE, DataObjectFlags, FRAME_END, FRAME_HEADER_SIZE, FrameHeader, FrameType,
16 MAGIC, MessageFlags, POSTAMBLE_SIZE, PREAMBLE_SIZE, Postamble, Preamble,
17};
18
19fn write_frame(
24 out: &mut Vec<u8>,
25 frame_type: FrameType,
26 version: u16,
27 flags: u16,
28 payload: &[u8],
29 align: bool,
30) {
31 let total_length = (FRAME_HEADER_SIZE + payload.len() + FRAME_END.len()) as u64;
32
33 let fh = FrameHeader {
34 frame_type,
35 version,
36 flags,
37 total_length,
38 };
39 fh.write_to(out);
40 out.extend_from_slice(payload);
41 out.extend_from_slice(FRAME_END);
42
43 if align {
44 let pad = (8 - (out.len() % 8)) % 8;
45 out.extend(std::iter::repeat_n(0u8, pad));
46 }
47}
48
49fn read_frame(buf: &[u8]) -> Result<(FrameHeader, &[u8], usize)> {
52 let fh = FrameHeader::read_from(buf)?;
53 let frame_total = usize::try_from(fh.total_length).map_err(|_| {
54 TensogramError::Framing(format!(
55 "frame total_length {} overflows usize",
56 fh.total_length
57 ))
58 })?;
59
60 let min_frame_size = FRAME_HEADER_SIZE + FRAME_END.len();
62 if frame_total < min_frame_size {
63 return Err(TensogramError::Framing(format!(
64 "frame total_length {} is smaller than minimum {min_frame_size}",
65 frame_total
66 )));
67 }
68
69 if frame_total > buf.len() {
70 return Err(TensogramError::Framing(format!(
71 "frame total_length {} exceeds buffer: {}",
72 frame_total,
73 buf.len()
74 )));
75 }
76
77 let endf_start = frame_total - FRAME_END.len();
79 if &buf[endf_start..frame_total] != FRAME_END {
80 return Err(TensogramError::Framing(format!(
81 "missing ENDF marker at offset {endf_start}"
82 )));
83 }
84
85 let payload = &buf[FRAME_HEADER_SIZE..endf_start];
86
87 let mut consumed = frame_total;
89 let aligned = (consumed + 7) & !7;
90 if aligned <= buf.len() {
91 consumed = aligned;
92 }
93
94 Ok((fh, payload, consumed))
95}
96
97pub fn encode_data_object_frame(
106 descriptor: &DataObjectDescriptor,
107 payload: &[u8],
108 cbor_before: bool,
109) -> Result<Vec<u8>> {
110 let cbor_bytes = metadata::object_descriptor_to_cbor(descriptor)?;
111 let flags = if cbor_before {
112 0
113 } else {
114 DataObjectFlags::CBOR_AFTER_PAYLOAD
115 };
116
117 let frame_body_len = cbor_bytes.len() + payload.len() + DATA_OBJECT_FOOTER_SIZE;
120 let total_length = (FRAME_HEADER_SIZE + frame_body_len) as u64;
121
122 let mut out = Vec::with_capacity(total_length as usize);
123
124 let fh = FrameHeader {
126 frame_type: FrameType::DataObject,
127 version: 1,
128 flags,
129 total_length,
130 };
131 fh.write_to(&mut out);
132
133 if cbor_before {
134 let cbor_offset = FRAME_HEADER_SIZE as u64;
136 out.extend_from_slice(&cbor_bytes);
137 out.extend_from_slice(payload);
138 out.extend_from_slice(&cbor_offset.to_be_bytes());
139 } else {
140 let cbor_offset = (FRAME_HEADER_SIZE + payload.len()) as u64;
142 out.extend_from_slice(payload);
143 out.extend_from_slice(&cbor_bytes);
144 out.extend_from_slice(&cbor_offset.to_be_bytes());
145 }
146
147 out.extend_from_slice(FRAME_END);
148
149 debug_assert_eq!(out.len(), total_length as usize);
150 Ok(out)
151}
152
153pub fn decode_data_object_frame(buf: &[u8]) -> Result<(DataObjectDescriptor, &[u8], usize)> {
157 let fh = FrameHeader::read_from(buf)?;
158 if fh.frame_type != FrameType::DataObject {
159 return Err(TensogramError::Framing(format!(
160 "expected DataObject frame, got {:?}",
161 fh.frame_type
162 )));
163 }
164
165 let frame_total = usize::try_from(fh.total_length).map_err(|_| {
166 TensogramError::Framing(format!(
167 "data object frame total_length {} overflows usize",
168 fh.total_length
169 ))
170 })?;
171 let min_frame_size = FRAME_HEADER_SIZE + DATA_OBJECT_FOOTER_SIZE;
173 if frame_total < min_frame_size {
174 return Err(TensogramError::Framing(format!(
175 "data object frame too small: {} < {}",
176 frame_total, min_frame_size
177 )));
178 }
179 if frame_total > buf.len() {
180 return Err(TensogramError::Framing(format!(
181 "data object frame total_length {} exceeds buffer: {}",
182 frame_total,
183 buf.len()
184 )));
185 }
186
187 let endf_start = frame_total - FRAME_END.len();
189 if &buf[endf_start..frame_total] != FRAME_END {
190 return Err(TensogramError::Framing(
191 "missing ENDF marker in data object frame".to_string(),
192 ));
193 }
194
195 if endf_start < 8 {
197 return Err(TensogramError::Framing(format!(
198 "data object frame too small for cbor_offset: endf_start={endf_start} < 8"
199 )));
200 }
201 let cbor_offset_pos = endf_start - 8;
202 let cbor_offset_raw = crate::wire::read_u64_be(buf, cbor_offset_pos);
206 let cbor_offset = usize::try_from(cbor_offset_raw).map_err(|_| {
207 TensogramError::Framing(format!("cbor_offset {cbor_offset_raw} overflows usize"))
208 })?;
209
210 if cbor_offset < FRAME_HEADER_SIZE || cbor_offset > cbor_offset_pos {
212 return Err(TensogramError::Framing(format!(
213 "cbor_offset {cbor_offset} out of valid range [{FRAME_HEADER_SIZE}, {cbor_offset_pos}]"
214 )));
215 }
216
217 let cbor_after = fh.flags & DataObjectFlags::CBOR_AFTER_PAYLOAD != 0;
218
219 let (descriptor, payload_slice) = if cbor_after {
220 let payload_start = FRAME_HEADER_SIZE;
222 let cbor_start = cbor_offset;
223 let cbor_end = cbor_offset_pos;
224 let cbor_slice = &buf[cbor_start..cbor_end];
225 let desc = metadata::cbor_to_object_descriptor(cbor_slice)?;
226 (desc, &buf[payload_start..cbor_start])
227 } else {
228 let cbor_start = cbor_offset;
232 let region = &buf[cbor_start..cbor_offset_pos];
233 let mut cursor = std::io::Cursor::new(region);
234 let cbor_value: ciborium::Value = ciborium::from_reader(&mut cursor).map_err(|e| {
235 TensogramError::Metadata(format!("failed to parse object descriptor CBOR: {e}"))
236 })?;
237 let cbor_len = usize::try_from(cursor.position()).map_err(|_| {
238 TensogramError::Metadata("CBOR descriptor length overflows usize".to_string())
239 })?;
240 let payload_start = cbor_start + cbor_len;
241 let desc: DataObjectDescriptor = cbor_value.deserialized().map_err(|e| {
243 TensogramError::Metadata(format!("failed to deserialize descriptor: {e}"))
244 })?;
245 (desc, &buf[payload_start..cbor_offset_pos])
246 };
247
248 let mut consumed = frame_total;
250 let aligned = (consumed + 7) & !7;
251 if aligned <= buf.len() {
252 consumed = aligned;
253 }
254
255 Ok((descriptor, payload_slice, consumed))
256}
257
258pub struct EncodedObject {
262 pub descriptor: DataObjectDescriptor,
263 pub encoded_payload: Vec<u8>,
264}
265
266fn build_hash_frame_cbor(objects: &[EncodedObject]) -> Result<Option<Vec<u8>>> {
268 let has_hashes = objects.iter().any(|o| o.descriptor.hash.is_some());
269 if !has_hashes {
270 return Ok(None);
271 }
272
273 let hash_type = objects
274 .iter()
275 .find_map(|o| o.descriptor.hash.as_ref())
276 .map(|h| h.hash_type.clone())
277 .unwrap_or_default();
278 let hashes: Vec<String> = objects
279 .iter()
280 .map(|o| {
281 o.descriptor
282 .hash
283 .as_ref()
284 .map(|h| h.value.clone())
285 .unwrap_or_default()
286 })
287 .collect();
288 let hf = HashFrame {
289 object_count: objects.len() as u64,
290 hash_type,
291 hashes,
292 };
293 Ok(Some(metadata::hash_frame_to_cbor(&hf)?))
294}
295
296fn build_index_frame(
299 header_size_no_index: usize,
300 object_frames: &[Vec<u8>],
301) -> Result<Option<Vec<u8>>> {
302 if object_frames.is_empty() {
303 return Ok(None);
304 }
305
306 let frame_lengths: Vec<u64> = object_frames.iter().map(|f| f.len() as u64).collect();
307
308 let dummy_idx = IndexFrame {
310 object_count: object_frames.len() as u64,
311 offsets: vec![0u64; object_frames.len()],
312 lengths: frame_lengths.clone(),
313 };
314 let dummy_cbor = metadata::index_to_cbor(&dummy_idx)?;
315 let dummy_frame_size = aligned_frame_total_size(dummy_cbor.len());
316 let data_cursor = header_size_no_index + dummy_frame_size;
317
318 let offsets = compute_object_offsets(data_cursor, object_frames);
320
321 let real_idx = IndexFrame {
323 object_count: object_frames.len() as u64,
324 offsets,
325 lengths: frame_lengths.clone(),
326 };
327 let real_cbor = metadata::index_to_cbor(&real_idx)?;
328
329 let final_cbor = if real_cbor.len() != dummy_cbor.len() {
331 let real_frame_size = aligned_frame_total_size(real_cbor.len());
332 let new_data_cursor = header_size_no_index + real_frame_size;
333 let new_offsets = compute_object_offsets(new_data_cursor, object_frames);
334 let final_idx = IndexFrame {
335 object_count: object_frames.len() as u64,
336 offsets: new_offsets,
337 lengths: frame_lengths,
338 };
339 let third_cbor = metadata::index_to_cbor(&final_idx)?;
340 if aligned_frame_total_size(third_cbor.len()) != real_frame_size {
343 return Err(TensogramError::Framing(
344 "index CBOR size changed unexpectedly on third pass".to_string(),
345 ));
346 }
347 third_cbor
348 } else {
349 real_cbor
350 };
351
352 let mut idx_frame = Vec::new();
353 write_frame(
354 &mut idx_frame,
355 FrameType::HeaderIndex,
356 1,
357 0,
358 &final_cbor,
359 true,
360 );
361 Ok(Some(idx_frame))
362}
363
364fn compute_object_offsets(start: usize, object_frames: &[Vec<u8>]) -> Vec<u64> {
366 let mut offsets = Vec::with_capacity(object_frames.len());
367 let mut cursor = start;
368 for frame in object_frames {
369 offsets.push(cursor as u64);
370 cursor += frame.len();
371 cursor = (cursor + 7) & !7;
372 }
373 offsets
374}
375
376fn compute_message_flags(has_index: bool, has_hashes: bool) -> MessageFlags {
378 let mut flags = MessageFlags::default();
379 flags.set(MessageFlags::HEADER_METADATA);
380 if has_index {
381 flags.set(MessageFlags::HEADER_INDEX);
382 }
383 if has_hashes {
384 flags.set(MessageFlags::HEADER_HASHES);
385 }
386 flags
387}
388
389fn assemble_message(
391 flags: MessageFlags,
392 meta_cbor: &[u8],
393 index_frame_bytes: Option<&[u8]>,
394 hash_cbor: Option<&[u8]>,
395 object_frames: &[Vec<u8>],
396) -> Vec<u8> {
397 let mut out = Vec::new();
398
399 let preamble_pos = out.len();
401 out.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
402
403 write_frame(&mut out, FrameType::HeaderMetadata, 1, 0, meta_cbor, true);
405
406 if let Some(idx_bytes) = index_frame_bytes {
408 out.extend_from_slice(idx_bytes);
409 }
410
411 if let Some(h_cbor) = hash_cbor {
413 write_frame(&mut out, FrameType::HeaderHash, 1, 0, h_cbor, true);
414 }
415
416 for (i, frame) in object_frames.iter().enumerate() {
418 out.extend_from_slice(frame);
419 if i + 1 < object_frames.len() {
420 let pad = (8 - (out.len() % 8)) % 8;
421 out.extend(std::iter::repeat_n(0u8, pad));
422 }
423 }
424
425 let postamble_offset = out.len();
427 let postamble = Postamble {
428 first_footer_offset: postamble_offset as u64,
429 };
430 postamble.write_to(&mut out);
431
432 let total_length = out.len() as u64;
433
434 let preamble = Preamble {
436 version: 2,
437 flags,
438 reserved: 0,
439 total_length,
440 };
441 let mut preamble_bytes = Vec::new();
442 preamble.write_to(&mut preamble_bytes);
443 out[preamble_pos..preamble_pos + PREAMBLE_SIZE].copy_from_slice(&preamble_bytes);
444
445 out
446}
447
448pub fn encode_message(global_meta: &GlobalMetadata, objects: &[EncodedObject]) -> Result<Vec<u8>> {
457 let meta_cbor = metadata::global_metadata_to_cbor(global_meta)?;
459
460 let mut object_frames: Vec<Vec<u8>> = Vec::with_capacity(objects.len());
462 for obj in objects {
463 let frame = encode_data_object_frame(&obj.descriptor, &obj.encoded_payload, false)?;
464 object_frames.push(frame);
465 }
466
467 let hash_cbor = build_hash_frame_cbor(objects)?;
469
470 let mut header_no_index = Vec::new();
472 header_no_index.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
473 write_frame(
474 &mut header_no_index,
475 FrameType::HeaderMetadata,
476 1,
477 0,
478 &meta_cbor,
479 true,
480 );
481 if let Some(ref h_cbor) = hash_cbor {
482 write_frame(
483 &mut header_no_index,
484 FrameType::HeaderHash,
485 1,
486 0,
487 h_cbor,
488 true,
489 );
490 }
491
492 let index_frame_bytes = build_index_frame(header_no_index.len(), &object_frames)?;
494
495 let flags = compute_message_flags(index_frame_bytes.is_some(), hash_cbor.is_some());
497
498 Ok(assemble_message(
499 flags,
500 &meta_cbor,
501 index_frame_bytes.as_deref(),
502 hash_cbor.as_deref(),
503 &object_frames,
504 ))
505}
506
507#[derive(Debug)]
511pub struct DecodedMessage<'a> {
512 pub preamble: Preamble,
513 pub global_metadata: GlobalMetadata,
514 pub index: Option<IndexFrame>,
515 pub hash_frame: Option<HashFrame>,
516 pub objects: Vec<(DataObjectDescriptor, &'a [u8], usize)>,
518 pub preceder_payloads: Vec<Option<BTreeMap<String, ciborium::Value>>>,
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
528enum DecodePhase {
529 Headers = 0,
530 DataObjects = 1,
531 Footers = 2,
532}
533
534fn frame_phase(ft: FrameType) -> DecodePhase {
536 match ft {
537 FrameType::HeaderMetadata | FrameType::HeaderIndex | FrameType::HeaderHash => {
538 DecodePhase::Headers
539 }
540 FrameType::DataObject | FrameType::PrecederMetadata => DecodePhase::DataObjects,
543 FrameType::FooterHash | FrameType::FooterIndex | FrameType::FooterMetadata => {
544 DecodePhase::Footers
545 }
546 }
547}
548
549pub fn decode_message(buf: &[u8]) -> Result<DecodedMessage<'_>> {
554 let preamble = Preamble::read_from(buf)?;
555
556 if preamble.total_length > 0 {
558 let total_len = usize::try_from(preamble.total_length).map_err(|_| {
559 TensogramError::Framing(format!(
560 "total_length {} overflows usize",
561 preamble.total_length
562 ))
563 })?;
564 if total_len > buf.len() {
565 return Err(TensogramError::Framing(format!(
566 "total_length {} exceeds buffer size {}",
567 preamble.total_length,
568 buf.len()
569 )));
570 }
571
572 let pa_offset = total_len - POSTAMBLE_SIZE;
574 let _postamble = Postamble::read_from(&buf[pa_offset..])?;
575 }
576
577 let mut pos = PREAMBLE_SIZE;
578 let msg_end = if preamble.total_length > 0 {
579 preamble.total_length as usize - POSTAMBLE_SIZE
581 } else {
582 buf.len().checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
583 TensogramError::Framing(format!(
584 "buffer too short for postamble: {} < {POSTAMBLE_SIZE}",
585 buf.len()
586 ))
587 })?
588 };
589
590 let mut global_metadata: Option<GlobalMetadata> = None;
591 let mut index: Option<IndexFrame> = None;
592 let mut hash_frame: Option<HashFrame> = None;
593 let mut objects: Vec<(DataObjectDescriptor, &[u8], usize)> = Vec::new();
594 let mut preceder_payloads: Vec<Option<BTreeMap<String, ciborium::Value>>> = Vec::new();
595 let mut current_phase = DecodePhase::Headers;
596
597 let mut pending_preceder: Option<BTreeMap<String, ciborium::Value>> = None;
600
601 while pos < msg_end {
603 if pos + 2 > buf.len() {
605 break;
606 }
607
608 if &buf[pos..pos + 2] != b"FR" {
610 pos += 1;
611 continue;
612 }
613
614 if pos + FRAME_HEADER_SIZE > buf.len() {
616 break;
617 }
618 let fh = FrameHeader::read_from(&buf[pos..])?;
619
620 let phase = frame_phase(fh.frame_type);
622 if phase < current_phase {
623 return Err(TensogramError::Framing(format!(
624 "unexpected {:?} frame after {:?} phase — frames must appear in order: headers, data objects, footers",
625 fh.frame_type, current_phase
626 )));
627 }
628
629 if pending_preceder.is_some() && fh.frame_type != FrameType::DataObject {
632 return Err(TensogramError::Framing(format!(
633 "PrecederMetadata must be followed by a DataObject frame, got {:?}",
634 fh.frame_type
635 )));
636 }
637
638 current_phase = phase;
639 let frame_start = pos;
640
641 match fh.frame_type {
642 FrameType::HeaderMetadata | FrameType::FooterMetadata => {
643 let (_, payload, consumed) = read_frame(&buf[pos..])?;
644 let meta = metadata::cbor_to_global_metadata(payload)?;
645 global_metadata = Some(meta);
646 pos += consumed;
647 }
648 FrameType::HeaderIndex | FrameType::FooterIndex => {
649 let (_, payload, consumed) = read_frame(&buf[pos..])?;
650 let idx = metadata::cbor_to_index(payload)?;
651 index = Some(idx);
652 pos += consumed;
653 }
654 FrameType::HeaderHash | FrameType::FooterHash => {
655 let (_, payload, consumed) = read_frame(&buf[pos..])?;
656 let hf = metadata::cbor_to_hash_frame(payload)?;
657 hash_frame = Some(hf);
658 pos += consumed;
659 }
660 FrameType::PrecederMetadata => {
661 let (_, payload, consumed) = read_frame(&buf[pos..])?;
662 let preceder_meta = metadata::cbor_to_global_metadata(payload)?;
663 let n = preceder_meta.base.len();
665 if n != 1 {
666 return Err(TensogramError::Metadata(format!(
667 "PrecederMetadata base must have exactly 1 entry, got {n}"
668 )));
669 }
670 let mut entry = preceder_meta.base.into_iter().next().unwrap_or_default();
672 entry.remove(RESERVED_KEY);
677 pending_preceder = Some(entry);
678 pos += consumed;
679 }
680 FrameType::DataObject => {
681 let (desc, payload, consumed) = decode_data_object_frame(&buf[pos..])?;
682 objects.push((desc, payload, frame_start));
683 preceder_payloads.push(pending_preceder.take());
685 pos += consumed;
686 }
687 }
688 }
689
690 if pending_preceder.is_some() {
692 return Err(TensogramError::Framing(
693 "dangling PrecederMetadata: no DataObject frame followed".to_string(),
694 ));
695 }
696
697 let mut global_metadata = global_metadata.ok_or_else(|| {
698 TensogramError::Metadata("no metadata frame found in message".to_string())
699 })?;
700
701 let obj_count = objects.len();
706 if global_metadata.base.len() > obj_count {
707 return Err(TensogramError::Metadata(format!(
708 "metadata base has {} entries but message contains {} objects",
709 global_metadata.base.len(),
710 obj_count
711 )));
712 }
713 if global_metadata.base.len() < obj_count {
714 global_metadata.base.resize_with(obj_count, BTreeMap::new);
715 }
716 for (i, preceder) in preceder_payloads.iter().enumerate() {
717 if let Some(prec_map) = preceder {
718 for (k, v) in prec_map {
719 global_metadata.base[i].insert(k.clone(), v.clone());
720 }
721 }
722 }
723
724 Ok(DecodedMessage {
725 preamble,
726 global_metadata,
727 index,
728 hash_frame,
729 objects,
730 preceder_payloads,
731 })
732}
733
734pub fn decode_metadata_only(buf: &[u8]) -> Result<GlobalMetadata> {
736 let preamble = Preamble::read_from(buf)?;
737
738 let mut pos = PREAMBLE_SIZE;
739 let msg_end = if preamble.total_length > 0 {
740 let total_len = usize::try_from(preamble.total_length).map_err(|_| {
743 TensogramError::Framing(format!(
744 "total_length {} overflows usize",
745 preamble.total_length
746 ))
747 })?;
748 total_len.checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
749 TensogramError::Framing(format!(
750 "total_length {} too small for postamble",
751 preamble.total_length
752 ))
753 })?
754 } else {
755 buf.len().checked_sub(POSTAMBLE_SIZE).ok_or_else(|| {
756 TensogramError::Framing(format!(
757 "buffer too short for postamble: {} < {POSTAMBLE_SIZE}",
758 buf.len()
759 ))
760 })?
761 };
762
763 while pos < msg_end {
764 if pos + 2 > buf.len() {
765 break;
766 }
767 if &buf[pos..pos + 2] != b"FR" {
768 pos += 1;
769 continue;
770 }
771 if pos + FRAME_HEADER_SIZE > buf.len() {
772 break;
773 }
774 let fh = FrameHeader::read_from(&buf[pos..])?;
775 match fh.frame_type {
776 FrameType::HeaderMetadata | FrameType::FooterMetadata => {
777 let (_, payload, _) = read_frame(&buf[pos..])?;
778 return metadata::cbor_to_global_metadata(payload);
779 }
780 _ => {
781 let frame_total = usize::try_from(fh.total_length).map_err(|_| {
783 TensogramError::Framing(format!(
784 "frame total_length {} overflows usize",
785 fh.total_length
786 ))
787 })?;
788 pos += frame_total;
789 pos = (pos + 7) & !7; }
791 }
792 }
793
794 Err(TensogramError::Metadata(
795 "no metadata frame found in message".to_string(),
796 ))
797}
798
799#[tracing::instrument(skip(buf), fields(buf_len = buf.len()))]
804pub fn scan(buf: &[u8]) -> Vec<(usize, usize)> {
805 let mut messages = Vec::new();
806 let mut pos = 0;
807
808 while pos + PREAMBLE_SIZE + POSTAMBLE_SIZE <= buf.len() {
809 if &buf[pos..pos + MAGIC.len()] == MAGIC {
810 if let Ok(preamble) = Preamble::read_from(&buf[pos..]) {
812 if preamble.total_length > 0 {
813 let Ok(total) = usize::try_from(preamble.total_length) else {
814 pos += 1;
815 continue;
816 };
817 if pos + total <= buf.len() {
818 let end_magic_offset = pos + total - 8;
820 if &buf[end_magic_offset..end_magic_offset + 8] == crate::wire::END_MAGIC {
821 messages.push((pos, total));
822 pos += total;
823 continue;
824 }
825 }
826 } else {
827 let mut end_pos = pos + PREAMBLE_SIZE;
830 let mut found = false;
831 while end_pos + 8 <= buf.len() {
832 if &buf[end_pos..end_pos + 8] == crate::wire::END_MAGIC {
833 let msg_len = end_pos + 8 - pos;
834 messages.push((pos, msg_len));
835 pos = end_pos + 8;
836 found = true;
837 break;
838 }
839 end_pos += 1;
840 }
841 if found {
842 continue;
843 }
844 }
845 }
846 }
847 pos += 1;
848 }
849
850 messages
851}
852
853pub fn scan_file(file: &mut (impl std::io::Read + std::io::Seek)) -> Result<Vec<(usize, usize)>> {
860 use std::io::SeekFrom;
861
862 let file_len_u64 = file.seek(SeekFrom::End(0))?;
863 let file_len = usize::try_from(file_len_u64).map_err(|_| {
864 TensogramError::Framing(format!("file size {file_len_u64} overflows usize"))
865 })?;
866 file.seek(SeekFrom::Start(0))?;
867
868 let mut messages = Vec::new();
869 let mut pos: usize = 0;
870
871 let mut preamble_buf = [0u8; PREAMBLE_SIZE];
872
873 while pos + PREAMBLE_SIZE + POSTAMBLE_SIZE <= file_len {
874 file.seek(SeekFrom::Start(pos as u64))?;
875 if file.read_exact(&mut preamble_buf).is_err() {
876 break;
877 }
878
879 if &preamble_buf[..MAGIC.len()] == MAGIC
880 && let Ok(preamble) = Preamble::read_from(&preamble_buf)
881 {
882 if preamble.total_length > 0 {
883 let Ok(total) = usize::try_from(preamble.total_length) else {
884 pos += 1;
885 continue;
886 };
887 if pos + total <= file_len {
888 let end_magic_offset = pos + total - 8;
890 file.seek(SeekFrom::Start(end_magic_offset as u64))?;
891 let mut end_buf = [0u8; 8];
892 if file.read_exact(&mut end_buf).is_ok() && &end_buf == crate::wire::END_MAGIC {
893 messages.push((pos, total));
894 pos += total;
895 continue;
896 }
897 }
898 } else {
899 let mut search_pos = pos + PREAMBLE_SIZE;
902 let mut found = false;
903 let chunk_size = 4096;
904 let mut chunk = vec![0u8; chunk_size];
905
906 while search_pos + 8 <= file_len {
907 file.seek(SeekFrom::Start(search_pos as u64))?;
908 let to_read = (file_len - search_pos).min(chunk_size);
909 let buf = &mut chunk[..to_read];
910 if file.read_exact(buf).is_err() {
911 break;
912 }
913
914 for i in 0..to_read.saturating_sub(7) {
916 if &buf[i..i + 8] == crate::wire::END_MAGIC {
917 let end_pos = search_pos + i;
918 let msg_len = end_pos + 8 - pos;
919 messages.push((pos, msg_len));
920 pos = end_pos + 8;
921 found = true;
922 break;
923 }
924 }
925 if found {
926 break;
927 }
928 search_pos += to_read.saturating_sub(7);
930 }
931 if found {
932 continue;
933 }
934 }
935 }
936 pos += 1;
937 }
938
939 Ok(messages)
940}
941
942fn frame_total_size(payload_len: usize) -> usize {
946 FRAME_HEADER_SIZE + payload_len + FRAME_END.len()
947}
948
949fn aligned_frame_total_size(payload_len: usize) -> usize {
951 let raw = frame_total_size(payload_len);
952 (raw + 7) & !7
953}
954
955#[cfg(test)]
956mod tests {
957 use super::*;
958 use crate::dtype::Dtype;
959 use crate::types::ByteOrder;
960 use std::collections::BTreeMap;
961
962 fn make_global_meta() -> GlobalMetadata {
963 GlobalMetadata {
964 version: 2,
965 ..Default::default()
966 }
967 }
968
969 fn make_descriptor(shape: Vec<u64>) -> DataObjectDescriptor {
970 let strides = {
971 let mut s = vec![1u64; shape.len()];
972 for i in (0..shape.len().saturating_sub(1)).rev() {
973 s[i] = s[i + 1] * shape[i + 1];
974 }
975 s
976 };
977 DataObjectDescriptor {
978 obj_type: "ntensor".to_string(),
979 ndim: shape.len() as u64,
980 shape,
981 strides,
982 dtype: Dtype::Float32,
983 byte_order: ByteOrder::Little,
984 encoding: "none".to_string(),
985 filter: "none".to_string(),
986 compression: "none".to_string(),
987 params: BTreeMap::new(),
988 hash: None,
989 }
990 }
991
992 #[test]
993 fn test_data_object_frame_round_trip_cbor_after() {
994 let desc = make_descriptor(vec![4]);
995 let payload = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
996
997 let frame = encode_data_object_frame(&desc, &payload, false).unwrap();
998
999 let (decoded_desc, decoded_payload, consumed) = decode_data_object_frame(&frame).unwrap();
1000 assert_eq!(decoded_desc.shape, vec![4]);
1001 assert_eq!(decoded_desc.dtype, Dtype::Float32);
1002 assert_eq!(decoded_payload, &payload[..]);
1003 assert!(consumed >= frame.len());
1004 }
1005
1006 #[test]
1007 fn test_data_object_frame_round_trip_cbor_before() {
1008 let desc = make_descriptor(vec![2, 3]);
1009 let payload = vec![0xABu8; 24]; let frame = encode_data_object_frame(&desc, &payload, true).unwrap();
1012
1013 let (decoded_desc, decoded_payload, _) = decode_data_object_frame(&frame).unwrap();
1014 assert_eq!(decoded_desc.shape, vec![2, 3]);
1015 assert_eq!(decoded_payload, &payload[..]);
1016 }
1017
1018 #[test]
1019 fn test_empty_message_round_trip() {
1020 let meta = make_global_meta();
1021 let msg = encode_message(&meta, &[]).unwrap();
1022
1023 assert_eq!(&msg[0..8], MAGIC);
1025 assert_eq!(&msg[msg.len() - 8..], crate::wire::END_MAGIC);
1026
1027 let decoded = decode_message(&msg).unwrap();
1028 assert_eq!(decoded.global_metadata.version, 2);
1029 assert_eq!(decoded.objects.len(), 0);
1030 assert!(decoded.index.is_none()); }
1032
1033 #[test]
1034 fn test_single_object_message_round_trip() {
1035 let meta = make_global_meta();
1036 let desc = make_descriptor(vec![4]);
1037 let payload = vec![42u8; 16]; let objects = vec![EncodedObject {
1040 descriptor: desc,
1041 encoded_payload: payload.clone(),
1042 }];
1043
1044 let msg = encode_message(&meta, &objects).unwrap();
1045 let decoded = decode_message(&msg).unwrap();
1046
1047 assert_eq!(decoded.global_metadata.version, 2);
1048 assert_eq!(decoded.objects.len(), 1);
1049 assert_eq!(decoded.objects[0].0.shape, vec![4]);
1050 assert_eq!(decoded.objects[0].1, &payload[..]);
1051 assert!(decoded.index.is_some());
1052 assert_eq!(decoded.index.as_ref().unwrap().object_count, 1);
1053 }
1054
1055 #[test]
1056 fn test_multi_object_message_round_trip() {
1057 let meta = make_global_meta();
1058 let desc0 = make_descriptor(vec![4]);
1059 let desc1 = make_descriptor(vec![2, 3]);
1060 let payload0 = vec![10u8; 16];
1061 let payload1 = vec![20u8; 24];
1062
1063 let objects = vec![
1064 EncodedObject {
1065 descriptor: desc0,
1066 encoded_payload: payload0.clone(),
1067 },
1068 EncodedObject {
1069 descriptor: desc1,
1070 encoded_payload: payload1.clone(),
1071 },
1072 ];
1073
1074 let msg = encode_message(&meta, &objects).unwrap();
1075 let decoded = decode_message(&msg).unwrap();
1076
1077 assert_eq!(decoded.objects.len(), 2);
1078 assert_eq!(decoded.objects[0].0.shape, vec![4]);
1079 assert_eq!(decoded.objects[0].1, &payload0[..]);
1080 assert_eq!(decoded.objects[1].0.shape, vec![2, 3]);
1081 assert_eq!(decoded.objects[1].1, &payload1[..]);
1082
1083 let idx = decoded.index.as_ref().unwrap();
1084 assert_eq!(idx.object_count, 2);
1085 assert_eq!(idx.offsets.len(), 2);
1086 }
1087
1088 #[test]
1089 fn test_scan_multi_message() {
1090 let meta = make_global_meta();
1091 let msg1 = encode_message(
1092 &meta,
1093 &[EncodedObject {
1094 descriptor: make_descriptor(vec![4]),
1095 encoded_payload: vec![1u8; 16],
1096 }],
1097 )
1098 .unwrap();
1099 let msg2 = encode_message(
1100 &meta,
1101 &[EncodedObject {
1102 descriptor: make_descriptor(vec![2]),
1103 encoded_payload: vec![2u8; 8],
1104 }],
1105 )
1106 .unwrap();
1107
1108 let mut buf = msg1.clone();
1109 buf.extend_from_slice(&msg2);
1110
1111 let offsets = scan(&buf);
1112 assert_eq!(offsets.len(), 2);
1113 assert_eq!(offsets[0], (0, msg1.len()));
1114 assert_eq!(offsets[1], (msg1.len(), msg2.len()));
1115 }
1116
1117 #[test]
1118 fn test_scan_with_garbage() {
1119 let meta = make_global_meta();
1120 let msg = encode_message(
1121 &meta,
1122 &[EncodedObject {
1123 descriptor: make_descriptor(vec![4]),
1124 encoded_payload: vec![1u8; 16],
1125 }],
1126 )
1127 .unwrap();
1128
1129 let mut buf = vec![0xFF; 10];
1130 buf.extend_from_slice(&msg);
1131 buf.extend_from_slice(&[0xAA; 5]);
1132
1133 let offsets = scan(&buf);
1134 assert_eq!(offsets.len(), 1);
1135 assert_eq!(offsets[0], (10, msg.len()));
1136 }
1137
1138 #[test]
1139 fn test_decode_metadata_only() {
1140 let mut meta = make_global_meta();
1141 meta.extra.insert(
1142 "test_key".to_string(),
1143 ciborium::Value::Text("test_value".to_string()),
1144 );
1145
1146 let msg = encode_message(
1147 &meta,
1148 &[EncodedObject {
1149 descriptor: make_descriptor(vec![4]),
1150 encoded_payload: vec![0u8; 16],
1151 }],
1152 )
1153 .unwrap();
1154
1155 let decoded_meta = decode_metadata_only(&msg).unwrap();
1156 assert_eq!(decoded_meta.version, 2);
1157 assert!(decoded_meta.extra.contains_key("test_key"));
1159 }
1160
1161 fn build_raw_message(frames: &[(&[u8], FrameType)]) -> Vec<u8> {
1165 let meta = make_global_meta();
1166 let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1167 let desc = make_descriptor(vec![4]);
1168 let payload = vec![0u8; 16];
1169
1170 let mut out = Vec::new();
1171 out.extend_from_slice(&[0u8; PREAMBLE_SIZE]);
1173
1174 for (content, frame_type) in frames {
1175 match frame_type {
1176 FrameType::DataObject => {
1177 let frame = encode_data_object_frame(&desc, &payload, false).unwrap();
1178 out.extend_from_slice(&frame);
1179 let pad = (8 - (out.len() % 8)) % 8;
1180 out.extend(std::iter::repeat_n(0u8, pad));
1181 }
1182 _ => {
1183 let data = if content.is_empty() {
1184 &meta_cbor
1185 } else {
1186 *content
1187 };
1188 write_frame(&mut out, *frame_type, 1, 0, data, true);
1189 }
1190 }
1191 }
1192
1193 let postamble_offset = out.len();
1195 let postamble = Postamble {
1196 first_footer_offset: postamble_offset as u64,
1197 };
1198 postamble.write_to(&mut out);
1199
1200 let total_length = out.len() as u64;
1201 let mut flags = MessageFlags::default();
1202 flags.set(MessageFlags::HEADER_METADATA);
1203 let preamble = Preamble {
1204 version: 2,
1205 flags,
1206 reserved: 0,
1207 total_length,
1208 };
1209 let mut preamble_bytes = Vec::new();
1210 preamble.write_to(&mut preamble_bytes);
1211 out[0..PREAMBLE_SIZE].copy_from_slice(&preamble_bytes);
1212
1213 out
1214 }
1215
1216 #[test]
1217 fn test_decode_rejects_header_after_data_object() {
1218 let msg = build_raw_message(&[
1220 (&[], FrameType::DataObject),
1221 (&[], FrameType::HeaderMetadata),
1222 ]);
1223 let result = decode_message(&msg);
1224 assert!(
1225 result.is_err(),
1226 "header frame after data object should be rejected"
1227 );
1228 let err = result.unwrap_err().to_string();
1229 assert!(
1230 err.contains("order") || err.contains("unexpected"),
1231 "error should mention ordering: {err}"
1232 );
1233 }
1234
1235 #[test]
1236 fn test_decode_rejects_data_object_after_footer() {
1237 let meta = make_global_meta();
1239 let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1240 let hf = HashFrame {
1241 object_count: 0,
1242 hash_type: "xxh3".to_string(),
1243 hashes: vec![],
1244 };
1245 let hash_cbor = crate::metadata::hash_frame_to_cbor(&hf).unwrap();
1246
1247 let msg = build_raw_message(&[
1248 (&meta_cbor, FrameType::HeaderMetadata),
1249 (&hash_cbor, FrameType::FooterHash),
1250 (&[], FrameType::DataObject),
1251 ]);
1252 let result = decode_message(&msg);
1253 assert!(
1254 result.is_err(),
1255 "data object after footer should be rejected"
1256 );
1257 }
1258
1259 #[test]
1260 fn test_decode_accepts_valid_frame_order() {
1261 let msg = build_raw_message(&[
1263 (&[], FrameType::HeaderMetadata),
1264 (&[], FrameType::DataObject),
1265 ]);
1266 let result = decode_message(&msg);
1267 assert!(
1268 result.is_ok(),
1269 "valid frame order should be accepted: {:?}",
1270 result.err()
1271 );
1272 }
1273
1274 #[test]
1277 fn test_scan_file_matches_scan_buffer() {
1278 let meta = make_global_meta();
1279 let msg1 = encode_message(
1280 &meta,
1281 &[EncodedObject {
1282 descriptor: make_descriptor(vec![4]),
1283 encoded_payload: vec![1u8; 16],
1284 }],
1285 )
1286 .unwrap();
1287 let msg2 = encode_message(
1288 &meta,
1289 &[EncodedObject {
1290 descriptor: make_descriptor(vec![2]),
1291 encoded_payload: vec![2u8; 8],
1292 }],
1293 )
1294 .unwrap();
1295
1296 let mut buf = msg1.clone();
1297 buf.extend_from_slice(&msg2);
1298
1299 let buffer_offsets = scan(&buf);
1301
1302 let mut cursor = std::io::Cursor::new(&buf);
1303 let file_offsets = scan_file(&mut cursor).unwrap();
1304
1305 assert_eq!(buffer_offsets, file_offsets);
1306 }
1307
1308 #[test]
1309 fn test_scan_file_with_garbage() {
1310 let meta = make_global_meta();
1311 let msg = encode_message(
1312 &meta,
1313 &[EncodedObject {
1314 descriptor: make_descriptor(vec![4]),
1315 encoded_payload: vec![1u8; 16],
1316 }],
1317 )
1318 .unwrap();
1319
1320 let mut buf = vec![0xFF; 10];
1321 buf.extend_from_slice(&msg);
1322 buf.extend_from_slice(&[0xAA; 5]);
1323
1324 let mut cursor = std::io::Cursor::new(&buf);
1325 let offsets = scan_file(&mut cursor).unwrap();
1326 assert_eq!(offsets.len(), 1);
1327 assert_eq!(offsets[0], (10, msg.len()));
1328 }
1329
1330 #[test]
1331 fn test_scan_file_empty() {
1332 let buf: Vec<u8> = Vec::new();
1333 let mut cursor = std::io::Cursor::new(&buf);
1334 let offsets = scan_file(&mut cursor).unwrap();
1335 assert!(offsets.is_empty());
1336 }
1337
1338 #[test]
1339 fn test_decode_accepts_footer_after_data_objects() {
1340 let meta = make_global_meta();
1342 let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1343
1344 let msg = build_raw_message(&[
1345 (&meta_cbor, FrameType::HeaderMetadata),
1346 (&[], FrameType::DataObject),
1347 (&meta_cbor, FrameType::FooterMetadata),
1348 ]);
1349 let result = decode_message(&msg);
1350 assert!(
1351 result.is_ok(),
1352 "footer after data objects should be accepted: {:?}",
1353 result.err()
1354 );
1355 }
1356
1357 fn make_preceder_cbor(entries: std::collections::BTreeMap<String, ciborium::Value>) -> Vec<u8> {
1361 let meta = GlobalMetadata {
1362 version: 2,
1363 base: vec![entries],
1364 ..Default::default()
1365 };
1366 crate::metadata::global_metadata_to_cbor(&meta).unwrap()
1367 }
1368
1369 #[test]
1370 fn test_decode_preceder_before_data_object() {
1371 let mut entries = BTreeMap::new();
1372 entries.insert(
1373 "mars".to_string(),
1374 ciborium::Value::Map(vec![(
1375 ciborium::Value::Text("param".to_string()),
1376 ciborium::Value::Text("2t".to_string()),
1377 )]),
1378 );
1379 let preceder_cbor = make_preceder_cbor(entries);
1380
1381 let msg = build_raw_message(&[
1382 (&[], FrameType::HeaderMetadata),
1383 (&preceder_cbor, FrameType::PrecederMetadata),
1384 (&[], FrameType::DataObject),
1385 ]);
1386 let decoded = decode_message(&msg).unwrap();
1387
1388 assert_eq!(decoded.objects.len(), 1);
1389 assert_eq!(decoded.preceder_payloads.len(), 1);
1390 assert!(decoded.preceder_payloads[0].is_some());
1391
1392 assert_eq!(decoded.global_metadata.base.len(), 1);
1394 assert!(decoded.global_metadata.base[0].contains_key("mars"));
1395 }
1396
1397 #[test]
1398 fn test_decode_consecutive_preceders_rejected() {
1399 let entries = BTreeMap::new();
1400 let preceder_cbor = make_preceder_cbor(entries);
1401
1402 let msg = build_raw_message(&[
1403 (&[], FrameType::HeaderMetadata),
1404 (&preceder_cbor, FrameType::PrecederMetadata),
1405 (&preceder_cbor, FrameType::PrecederMetadata),
1406 (&[], FrameType::DataObject),
1407 ]);
1408 let result = decode_message(&msg);
1409 assert!(result.is_err(), "consecutive preceders should be rejected");
1410 let err = result.unwrap_err().to_string();
1411 assert!(
1412 err.contains("PrecederMetadata") && err.contains("DataObject"),
1413 "error should explain preceder must precede DataObject: {err}"
1414 );
1415 }
1416
1417 #[test]
1418 fn test_decode_dangling_preceder_rejected() {
1419 let entries = BTreeMap::new();
1420 let preceder_cbor = make_preceder_cbor(entries);
1421
1422 let msg = build_raw_message(&[
1424 (&[], FrameType::HeaderMetadata),
1425 (&[], FrameType::DataObject),
1426 (&preceder_cbor, FrameType::PrecederMetadata),
1427 ]);
1428 let result = decode_message(&msg);
1429 assert!(result.is_err(), "dangling preceder should be rejected");
1430 let err = result.unwrap_err().to_string();
1431 assert!(
1432 err.contains("dangling"),
1433 "error should mention dangling: {err}"
1434 );
1435 }
1436
1437 #[test]
1438 fn test_decode_preceder_with_multiple_base_entries_rejected() {
1439 let meta = GlobalMetadata {
1441 version: 2,
1442 base: vec![BTreeMap::new(), BTreeMap::new()],
1443 ..Default::default()
1444 };
1445 let bad_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1446
1447 let msg = build_raw_message(&[
1448 (&[], FrameType::HeaderMetadata),
1449 (&bad_cbor, FrameType::PrecederMetadata),
1450 (&[], FrameType::DataObject),
1451 ]);
1452 let result = decode_message(&msg);
1453 assert!(
1454 result.is_err(),
1455 "preceder with 2 payload entries should be rejected"
1456 );
1457 let err = result.unwrap_err().to_string();
1458 assert!(
1459 err.contains("exactly 1"),
1460 "error should mention 'exactly 1': {err}"
1461 );
1462 }
1463
1464 #[test]
1465 fn test_decode_preceder_with_zero_base_entries_rejected() {
1466 let meta = GlobalMetadata {
1468 version: 2,
1469 base: vec![],
1470 ..Default::default()
1471 };
1472 let bad_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1473
1474 let msg = build_raw_message(&[
1475 (&[], FrameType::HeaderMetadata),
1476 (&bad_cbor, FrameType::PrecederMetadata),
1477 (&[], FrameType::DataObject),
1478 ]);
1479 let result = decode_message(&msg);
1480 assert!(
1481 result.is_err(),
1482 "preceder with 0 payload entries should be rejected"
1483 );
1484 let err = result.unwrap_err().to_string();
1485 assert!(
1486 err.contains("exactly 1") && err.contains("got 0"),
1487 "error should mention 'exactly 1' and 'got 0': {err}"
1488 );
1489 }
1490
1491 #[test]
1492 fn test_decode_preceder_followed_by_footer_rejected() {
1493 let entries = BTreeMap::new();
1494 let preceder_cbor = make_preceder_cbor(entries);
1495 let meta = make_global_meta();
1496 let meta_cbor = crate::metadata::global_metadata_to_cbor(&meta).unwrap();
1497
1498 let msg = build_raw_message(&[
1499 (&meta_cbor, FrameType::HeaderMetadata),
1500 (&preceder_cbor, FrameType::PrecederMetadata),
1501 (&meta_cbor, FrameType::FooterMetadata),
1502 ]);
1503 let result = decode_message(&msg);
1504 assert!(
1505 result.is_err(),
1506 "preceder followed by footer should be rejected"
1507 );
1508 }
1509
1510 #[test]
1511 fn test_decode_mixed_preceder_and_no_preceder() {
1512 let mut entries = BTreeMap::new();
1514 entries.insert(
1515 "note".to_string(),
1516 ciborium::Value::Text("from preceder".to_string()),
1517 );
1518 let preceder_cbor = make_preceder_cbor(entries);
1519
1520 let msg = build_raw_message(&[
1521 (&[], FrameType::HeaderMetadata),
1522 (&preceder_cbor, FrameType::PrecederMetadata),
1523 (&[], FrameType::DataObject),
1524 (&[], FrameType::DataObject),
1525 ]);
1526 let decoded = decode_message(&msg).unwrap();
1527
1528 assert_eq!(decoded.objects.len(), 2);
1529 assert_eq!(decoded.preceder_payloads.len(), 2);
1530 assert!(decoded.preceder_payloads[0].is_some());
1531 assert!(decoded.preceder_payloads[1].is_none());
1532
1533 assert!(decoded.global_metadata.base[0].contains_key("note"));
1535 assert!(!decoded.global_metadata.base[1].contains_key("note"));
1536 }
1537
1538 #[test]
1539 fn test_decode_preceder_wins_over_footer_payload() {
1540 let mut prec_entries = BTreeMap::new();
1543 prec_entries.insert(
1544 "source".to_string(),
1545 ciborium::Value::Text("preceder".to_string()),
1546 );
1547 let preceder_cbor = make_preceder_cbor(prec_entries);
1548
1549 let mut footer_base = BTreeMap::new();
1551 footer_base.insert(
1552 "source".to_string(),
1553 ciborium::Value::Text("footer".to_string()),
1554 );
1555 let footer_meta = GlobalMetadata {
1556 version: 2,
1557 base: vec![footer_base],
1558 ..Default::default()
1559 };
1560 let footer_cbor = crate::metadata::global_metadata_to_cbor(&footer_meta).unwrap();
1561
1562 let msg = build_raw_message(&[
1563 (&[], FrameType::HeaderMetadata),
1564 (&preceder_cbor, FrameType::PrecederMetadata),
1565 (&[], FrameType::DataObject),
1566 (&footer_cbor, FrameType::FooterMetadata),
1567 ]);
1568 let decoded = decode_message(&msg).unwrap();
1569
1570 let source = decoded.global_metadata.base[0]
1573 .get("source")
1574 .and_then(|v| match v {
1575 ciborium::Value::Text(s) => Some(s.as_str()),
1576 _ => None,
1577 });
1578 assert_eq!(source, Some("preceder"), "preceder should win over footer");
1579 }
1580
1581 #[test]
1582 fn test_decode_rejects_base_count_exceeding_objects() {
1583 let footer_meta = GlobalMetadata {
1586 version: 2,
1587 base: vec![BTreeMap::new(), BTreeMap::new(), BTreeMap::new()],
1588 ..Default::default()
1589 };
1590 let footer_cbor = crate::metadata::global_metadata_to_cbor(&footer_meta).unwrap();
1591
1592 let msg = build_raw_message(&[
1593 (&footer_cbor, FrameType::HeaderMetadata),
1594 (&[], FrameType::DataObject),
1595 ]);
1596 let result = decode_message(&msg);
1597 assert!(
1598 result.is_err(),
1599 "base with more entries than objects should be rejected"
1600 );
1601 let err = result.unwrap_err().to_string();
1602 assert!(
1603 err.contains("3") && err.contains("1"),
1604 "error should mention counts: {err}"
1605 );
1606 }
1607}