1extern crate alloc;
26use alloc::collections::{BTreeMap, BTreeSet};
27use alloc::vec;
28use alloc::vec::Vec;
29
30use crate::submessages::{DataFragSubmessage, FragmentNumberSet};
31use crate::wire_types::{FragmentNumber, SequenceNumber};
32
33pub const DEFAULT_MAX_PENDING_SNS: usize = 64;
35pub const DEFAULT_MAX_SAMPLE_BYTES: usize = 1024 * 1024;
39pub const DEFAULT_MAX_FRAGMENT_SIZE: u16 = u16::MAX;
41
42#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct CompletedSample {
45 pub sequence_number: SequenceNumber,
47 pub payload: Vec<u8>,
49}
50
51#[derive(Debug, Clone, Copy)]
53pub struct AssemblerCaps {
54 pub max_pending_sns: usize,
56 pub max_sample_bytes: usize,
58 pub max_fragment_size: u16,
60}
61
62impl Default for AssemblerCaps {
63 fn default() -> Self {
66 Self {
67 max_pending_sns: DEFAULT_MAX_PENDING_SNS,
68 max_sample_bytes: DEFAULT_MAX_SAMPLE_BYTES,
69 max_fragment_size: DEFAULT_MAX_FRAGMENT_SIZE,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
76struct FragmentBuffer {
77 sample_size: u32,
78 fragment_size: u16,
79 total_fragments: u32,
80 received: BTreeSet<FragmentNumber>,
81 data: Vec<u8>,
82}
83
84impl FragmentBuffer {
85 fn new(sample_size: u32, fragment_size: u16) -> Self {
86 let total = if fragment_size == 0 {
87 0
88 } else {
89 sample_size.div_ceil(u32::from(fragment_size))
90 };
91 Self {
92 sample_size,
93 fragment_size,
94 total_fragments: total,
95 received: BTreeSet::new(),
96 data: vec![0u8; sample_size as usize],
97 }
98 }
99
100 fn is_complete(&self) -> bool {
101 self.total_fragments > 0 && self.received.len() as u32 == self.total_fragments
102 }
103
104 fn missing(&self) -> FragmentNumberSet {
105 if self.total_fragments == 0 {
106 return FragmentNumberSet::from_missing(FragmentNumber(1), &[]);
107 }
108 let mut missing_nums = Vec::new();
109 for f in 1..=self.total_fragments {
110 let fnum = FragmentNumber(f);
111 if !self.received.contains(&fnum) {
112 missing_nums.push(fnum);
113 }
114 }
115 let base = missing_nums
116 .first()
117 .copied()
118 .unwrap_or(FragmentNumber(self.total_fragments.saturating_add(1)));
119 FragmentNumberSet::from_missing(base, &missing_nums)
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130#[non_exhaustive]
131pub enum DropReason {
132 SampleTooLarge,
134 FragmentSizeInvalid,
136 FragmentIndexZero,
138 FragmentIndexOutOfRange,
140 PayloadSizeMismatch,
142 InconsistentWithBuffered,
145 FragmentsInSubmessageInvalid,
147 PendingSnsCapExceeded,
150 AssemblerDisabled,
152}
153
154impl DropReason {
155 #[must_use]
158 pub const fn as_str(self) -> &'static str {
159 match self {
160 Self::SampleTooLarge => "sample_too_large",
161 Self::FragmentSizeInvalid => "fragment_size_invalid",
162 Self::FragmentIndexZero => "fragment_index_zero",
163 Self::FragmentIndexOutOfRange => "fragment_index_out_of_range",
164 Self::PayloadSizeMismatch => "payload_size_mismatch",
165 Self::InconsistentWithBuffered => "inconsistent_with_buffered",
166 Self::FragmentsInSubmessageInvalid => "fragments_in_submessage_invalid",
167 Self::PendingSnsCapExceeded => "pending_sns_cap_exceeded",
168 Self::AssemblerDisabled => "assembler_disabled",
169 }
170 }
171}
172
173impl core::fmt::Display for DropReason {
174 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
175 f.write_str(self.as_str())
176 }
177}
178
179#[derive(Debug, Clone, Default)]
184pub struct FragmentAssembler {
185 buffers: BTreeMap<SequenceNumber, FragmentBuffer>,
186 caps: AssemblerCaps,
187 drop_count: u64,
188 last_drop_reason: Option<DropReason>,
189}
190
191impl FragmentAssembler {
192 #[must_use]
194 pub fn new(caps: AssemblerCaps) -> Self {
195 Self {
196 buffers: BTreeMap::new(),
197 caps,
198 drop_count: 0,
199 last_drop_reason: None,
200 }
201 }
202
203 #[must_use]
205 pub fn len(&self) -> usize {
206 self.buffers.len()
207 }
208
209 #[must_use]
211 pub fn is_empty(&self) -> bool {
212 self.buffers.is_empty()
213 }
214
215 #[must_use]
218 pub fn drop_count(&self) -> u64 {
219 self.drop_count
220 }
221
222 #[must_use]
226 pub fn last_drop_reason(&self) -> Option<DropReason> {
227 self.last_drop_reason
228 }
229
230 pub fn reset_diagnostics(&mut self) {
234 self.drop_count = 0;
235 self.last_drop_reason = None;
236 }
237
238 #[must_use]
240 pub fn has_gaps(&self) -> bool {
241 self.buffers.values().any(|b| !b.is_complete())
242 }
243
244 pub fn incomplete_sns(&self) -> impl Iterator<Item = SequenceNumber> + '_ {
246 self.buffers
247 .iter()
248 .filter(|(_, b)| !b.is_complete())
249 .map(|(sn, _)| *sn)
250 }
251
252 #[must_use]
255 pub fn missing_fragments(&self, sn: SequenceNumber) -> FragmentNumberSet {
256 match self.buffers.get(&sn) {
257 Some(b) => b.missing(),
258 None => FragmentNumberSet::from_missing(FragmentNumber(1), &[]),
259 }
260 }
261
262 pub fn discard(&mut self, sn: SequenceNumber) -> bool {
265 self.buffers.remove(&sn).is_some()
266 }
267
268 pub fn insert(&mut self, df: &DataFragSubmessage) -> Option<CompletedSample> {
275 if df.fragment_size == 0 || df.fragment_size > self.caps.max_fragment_size {
277 self.record_drop(DropReason::FragmentSizeInvalid);
278 return None;
279 }
280 if df.fragments_in_submessage == 0 {
281 self.record_drop(DropReason::FragmentsInSubmessageInvalid);
282 return None;
283 }
284 if df.sample_size as usize > self.caps.max_sample_bytes {
285 self.record_drop(DropReason::SampleTooLarge);
286 return None;
287 }
288 if df.fragment_starting_num.0 == 0 {
289 self.record_drop(DropReason::FragmentIndexZero);
290 return None;
291 }
292
293 let total_fragments = df.sample_size.div_ceil(u32::from(df.fragment_size));
295 let last_frag = df
296 .fragment_starting_num
297 .0
298 .checked_add(u32::from(df.fragments_in_submessage) - 1)
299 .unwrap_or(u32::MAX);
300 if last_frag > total_fragments {
301 self.record_drop(DropReason::FragmentIndexOutOfRange);
302 return None;
303 }
304
305 if !self.buffers.contains_key(&df.writer_sn)
307 && self.buffers.len() >= self.caps.max_pending_sns
308 {
309 let Some(&oldest) = self.buffers.keys().next() else {
312 self.record_drop(DropReason::AssemblerDisabled);
314 return None;
315 };
316 self.buffers.remove(&oldest);
317 self.record_drop(DropReason::PendingSnsCapExceeded);
318 }
319
320 let buffer = match self.buffers.get_mut(&df.writer_sn) {
322 Some(existing) => {
323 if existing.sample_size != df.sample_size
324 || existing.fragment_size != df.fragment_size
325 {
326 self.record_drop(DropReason::InconsistentWithBuffered);
327 return None;
328 }
329 existing
330 }
331 None => {
332 self.buffers.insert(
333 df.writer_sn,
334 FragmentBuffer::new(df.sample_size, df.fragment_size),
335 );
336 self.buffers.get_mut(&df.writer_sn)?
337 }
338 };
339
340 let frag_size_usize = buffer.fragment_size as usize;
342 let frag_count = df.fragments_in_submessage as usize;
343 let first_idx = (df.fragment_starting_num.0 - 1) as usize;
344 let byte_start = first_idx * frag_size_usize;
345 let expected_last_frag = core::cmp::min(last_frag, buffer.total_fragments);
346 let full_portion = (frag_count - 1) * frag_size_usize;
349 let tail_size = if expected_last_frag == buffer.total_fragments {
350 buffer.sample_size as usize - ((buffer.total_fragments - 1) as usize) * frag_size_usize
352 } else {
353 frag_size_usize
354 };
355 let expected_len = full_portion + tail_size;
356 if df.serialized_payload.len() != expected_len {
357 self.record_drop(DropReason::PayloadSizeMismatch);
358 return None;
359 }
360
361 let data_end = byte_start + df.serialized_payload.len();
363 if data_end > buffer.data.len() {
364 self.record_drop(DropReason::PayloadSizeMismatch);
365 return None;
366 }
367 buffer.data[byte_start..data_end].copy_from_slice(&df.serialized_payload);
368 for f in 0..df.fragments_in_submessage as u32 {
369 buffer
370 .received
371 .insert(FragmentNumber(df.fragment_starting_num.0 + f));
372 }
373
374 if buffer.is_complete() {
375 let buf = self.buffers.remove(&df.writer_sn)?;
377 return Some(CompletedSample {
378 sequence_number: df.writer_sn,
379 payload: buf.data,
380 });
381 }
382 None
383 }
384
385 fn record_drop(&mut self, reason: DropReason) {
386 self.drop_count = self.drop_count.saturating_add(1);
387 self.last_drop_reason = Some(reason);
388 }
389}
390
391#[cfg(test)]
392#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
393mod tests {
394 use super::*;
395 use crate::wire_types::EntityId;
396
397 fn wid() -> EntityId {
398 EntityId::user_writer_with_key([0x10, 0x20, 0x30])
399 }
400 fn rid() -> EntityId {
401 EntityId::user_reader_with_key([0x40, 0x50, 0x60])
402 }
403
404 fn df(
405 sn: i64,
406 starting: u32,
407 count: u16,
408 frag_size: u16,
409 sample_size: u32,
410 payload: Vec<u8>,
411 ) -> DataFragSubmessage {
412 DataFragSubmessage {
413 extra_flags: 0,
414 reader_id: rid(),
415 writer_id: wid(),
416 writer_sn: SequenceNumber(sn),
417 fragment_starting_num: FragmentNumber(starting),
418 fragments_in_submessage: count,
419 fragment_size: frag_size,
420 sample_size,
421 serialized_payload: alloc::sync::Arc::from(payload),
422 inline_qos_flag: false,
423 hash_key_flag: false,
424 key_flag: false,
425 non_standard_flag: false,
426 }
427 }
428
429 #[test]
430 fn single_fragment_sample_completes_immediately() {
431 let mut a = FragmentAssembler::default();
432 let res = a.insert(&df(1, 1, 1, 4, 4, vec![1, 2, 3, 4]));
434 assert!(res.is_some());
435 let s = res.unwrap();
436 assert_eq!(s.sequence_number, SequenceNumber(1));
437 assert_eq!(s.payload, vec![1, 2, 3, 4]);
438 assert_eq!(a.len(), 0);
439 }
440
441 #[test]
442 fn two_fragments_complete_in_order() {
443 let mut a = FragmentAssembler::default();
444 assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
445 let res = a.insert(&df(1, 2, 1, 4, 8, vec![5, 6, 7, 8])).unwrap();
446 assert_eq!(res.payload, vec![1, 2, 3, 4, 5, 6, 7, 8]);
447 }
448
449 #[test]
450 fn fragments_complete_out_of_order() {
451 let mut a = FragmentAssembler::default();
452 assert!(a.insert(&df(1, 2, 1, 4, 10, vec![5, 6, 7, 8])).is_none());
454 assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
455 let res = a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).unwrap();
456 assert_eq!(res.payload, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
457 }
458
459 #[test]
460 fn last_fragment_shorter_than_fragment_size() {
461 let mut a = FragmentAssembler::default();
462 assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
463 assert!(a.insert(&df(1, 2, 1, 4, 10, vec![5, 6, 7, 8])).is_none());
464 let res = a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).unwrap();
465 assert_eq!(res.payload.len(), 10);
466 }
467
468 #[test]
469 fn duplicate_fragment_is_idempotent() {
470 let mut a = FragmentAssembler::default();
471 assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
472 assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
473 assert_eq!(a.missing_fragments(SequenceNumber(1)).num_bits, 1);
474 }
475
476 #[test]
477 fn missing_fragments_enumerates_gaps() {
478 let mut a = FragmentAssembler::default();
479 assert!(a.insert(&df(1, 1, 1, 4, 10, vec![1, 2, 3, 4])).is_none());
481 assert!(a.insert(&df(1, 3, 1, 4, 10, vec![9, 10])).is_none());
482 let ms = a.missing_fragments(SequenceNumber(1));
483 let collected: Vec<_> = ms.iter_set().collect();
484 assert_eq!(collected, vec![FragmentNumber(2)]);
485 }
486
487 #[test]
488 fn inconsistent_sample_size_drops_fragment() {
489 let mut a = FragmentAssembler::default();
490 assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4])).is_none());
491 let res = a.insert(&df(1, 2, 1, 4, 12, vec![5, 6, 7, 8]));
493 assert!(res.is_none());
494 assert_eq!(a.drop_count(), 1);
495 assert_eq!(a.missing_fragments(SequenceNumber(1)).num_bits, 1);
497 }
498
499 #[test]
500 fn sample_too_large_drops_without_alloc() {
501 let caps = AssemblerCaps {
502 max_sample_bytes: 16,
503 ..AssemblerCaps::default()
504 };
505 let mut a = FragmentAssembler::new(caps);
506 assert!(a.insert(&df(1, 1, 1, 4, 100, vec![1, 2, 3, 4])).is_none());
508 assert!(a.is_empty());
509 assert_eq!(a.drop_count(), 1);
510 }
511
512 #[test]
513 fn fragment_size_zero_dropped() {
514 let mut a = FragmentAssembler::default();
515 assert!(a.insert(&df(1, 1, 1, 0, 4, vec![1, 2, 3, 4])).is_none());
517 assert_eq!(a.drop_count(), 1);
518 }
519
520 #[test]
521 fn fragment_index_zero_dropped() {
522 let mut a = FragmentAssembler::default();
523 assert!(a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4])).is_none());
524 assert_eq!(a.drop_count(), 1);
525 }
526
527 #[test]
528 fn fragment_index_out_of_range_dropped() {
529 let mut a = FragmentAssembler::default();
530 assert!(a.insert(&df(1, 2, 1, 4, 4, vec![0])).is_none());
532 assert_eq!(a.drop_count(), 1);
533 }
534
535 #[test]
536 fn payload_size_mismatch_dropped() {
537 let mut a = FragmentAssembler::default();
538 assert!(a.insert(&df(1, 1, 1, 4, 8, vec![1, 2])).is_none());
540 assert_eq!(a.drop_count(), 1);
541 }
542
543 #[test]
544 fn max_pending_sns_evicts_oldest() {
545 let caps = AssemblerCaps {
546 max_pending_sns: 2,
547 ..AssemblerCaps::default()
548 };
549 let mut a = FragmentAssembler::new(caps);
550 a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
552 a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
553 assert_eq!(a.len(), 2);
554 a.insert(&df(3, 1, 1, 4, 8, vec![1, 2, 3, 4]));
556 assert_eq!(a.len(), 2);
557 assert!(a.buffers.contains_key(&SequenceNumber(2)));
558 assert!(a.buffers.contains_key(&SequenceNumber(3)));
559 assert_eq!(a.drop_count(), 1);
560 }
561
562 #[test]
563 fn has_gaps_flips_to_false_after_completion() {
564 let mut a = FragmentAssembler::default();
565 a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
566 assert!(a.has_gaps());
567 a.insert(&df(1, 2, 1, 4, 8, vec![5, 6, 7, 8]));
568 assert!(!a.has_gaps());
569 }
570
571 #[test]
572 fn incomplete_sns_enumerates_in_order() {
573 let mut a = FragmentAssembler::default();
574 a.insert(&df(5, 1, 1, 4, 8, vec![1, 2, 3, 4]));
575 a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
576 let sns: Vec<_> = a.incomplete_sns().collect();
577 assert_eq!(sns, vec![SequenceNumber(2), SequenceNumber(5)]);
578 }
579
580 #[test]
581 fn discard_removes_buffer() {
582 let mut a = FragmentAssembler::default();
583 a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
584 assert!(a.discard(SequenceNumber(1)));
585 assert!(a.is_empty());
586 assert!(!a.discard(SequenceNumber(1)));
587 }
588
589 #[test]
590 fn missing_for_unknown_sn_is_empty() {
591 let a = FragmentAssembler::default();
592 assert_eq!(a.missing_fragments(SequenceNumber(42)).num_bits, 0);
593 }
594
595 #[test]
598 fn bundled_fragments_all_full() {
599 let mut a = FragmentAssembler::default();
602 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
603 let res = a.insert(&df(1, 1, 3, 4, 18, payload.clone()));
604 assert!(res.is_none(), "not yet complete");
605 let ms: Vec<_> = a.missing_fragments(SequenceNumber(1)).iter_set().collect();
607 assert_eq!(ms, vec![FragmentNumber(4), FragmentNumber(5)]);
608 }
609
610 #[test]
611 fn bundled_fragments_including_last_with_tail() {
612 let mut a = FragmentAssembler::default();
615 assert!(
617 a.insert(&df(1, 1, 1, 4, 10, vec![0xA, 0xB, 0xC, 0xD]))
618 .is_none()
619 );
620 let bundle = vec![5, 6, 7, 8, 9, 10];
622 let res = a.insert(&df(1, 2, 2, 4, 10, bundle));
623 assert!(res.is_some());
624 let s = res.unwrap();
625 assert_eq!(s.payload, vec![0xA, 0xB, 0xC, 0xD, 5, 6, 7, 8, 9, 10]);
626 }
627
628 #[test]
629 fn bundled_fragments_payload_size_mismatch_rejected() {
630 let mut a = FragmentAssembler::default();
633 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
634 assert!(a.insert(&df(1, 1, 3, 4, 20, payload)).is_none());
635 assert_eq!(a.drop_count(), 1);
636 assert_eq!(a.last_drop_reason(), Some(DropReason::PayloadSizeMismatch));
637 }
638
639 #[test]
642 fn last_drop_reason_tracks_most_recent() {
643 let mut a = FragmentAssembler::default();
644 assert_eq!(a.last_drop_reason(), None);
645 a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4]));
646 assert_eq!(a.last_drop_reason(), Some(DropReason::FragmentIndexZero));
647 a.insert(&df(1, 1, 1, 0, 4, vec![1, 2, 3, 4]));
648 assert_eq!(a.last_drop_reason(), Some(DropReason::FragmentSizeInvalid));
649 }
650
651 #[test]
652 fn pending_sns_cap_exceeded_uses_dedicated_reason() {
653 let caps = AssemblerCaps {
654 max_pending_sns: 1,
655 ..AssemblerCaps::default()
656 };
657 let mut a = FragmentAssembler::new(caps);
658 a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
659 a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4]));
660 assert_eq!(
661 a.last_drop_reason(),
662 Some(DropReason::PendingSnsCapExceeded)
663 );
664 }
665
666 #[test]
667 fn default_assembler_uses_default_caps() {
668 let mut a = FragmentAssembler::default();
671 assert!(a.is_empty());
672 let res = a.insert(&df(1, 1, 1, 4, 4, vec![1, 2, 3, 4]));
674 assert!(res.is_some());
675 }
676
677 #[test]
678 fn reset_diagnostics_clears_counters_but_keeps_buffers() {
679 let mut a = FragmentAssembler::default();
682 a.insert(&df(1, 0, 1, 4, 4, vec![1, 2, 3, 4])); a.insert(&df(2, 1, 1, 4, 8, vec![1, 2, 3, 4])); assert_eq!(a.drop_count(), 1);
685 assert_eq!(a.len(), 1);
686 a.reset_diagnostics();
687 assert_eq!(a.drop_count(), 0);
688 assert!(a.last_drop_reason().is_none());
689 assert_eq!(a.len(), 1, "buffers must stay intact");
690 }
691
692 #[test]
693 fn max_pending_sns_zero_rejects_with_assembler_disabled() {
694 let caps = AssemblerCaps {
695 max_pending_sns: 0,
696 ..AssemblerCaps::default()
697 };
698 let mut a = FragmentAssembler::new(caps);
699 a.insert(&df(1, 1, 1, 4, 8, vec![1, 2, 3, 4]));
700 assert_eq!(a.last_drop_reason(), Some(DropReason::AssemblerDisabled));
701 assert!(a.is_empty());
702 }
703}