pim_protocol/reassembler.rs
1//! Reassembly of fragmented mesh data payloads.
2//!
3//! [`Reassembler`] collects [`FragmentFrame`]s by their `fragment_id`,
4//! reassembles them in order once all bytes have arrived, and discards
5//! incomplete buffers after a configurable timeout.
6
7use std::collections::{BTreeMap, HashMap};
8use std::time::{Duration, Instant};
9
10use crate::fragment_frame::FragmentFrame;
11
12/// Default timeout before an incomplete reassembly buffer is discarded.
13pub const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10);
14
15// ── Internal state ─────────────────────────────────────────────────────────
16
17struct ReassemblyBuffer {
18 /// Total byte length of the original packet.
19 total_length: u16,
20 /// Received chunks keyed by their starting byte offset within the original
21 /// packet. Stored in a `BTreeMap` so we can iterate offsets in order.
22 chunks: BTreeMap<u16, bytes::Bytes>,
23 /// Monotonic timestamp of the first fragment received.
24 created_at: Instant,
25}
26
27impl ReassemblyBuffer {
28 fn new(total_length: u16) -> Self {
29 Self {
30 total_length,
31 chunks: BTreeMap::new(),
32 created_at: Instant::now(),
33 }
34 }
35
36 /// Insert a chunk. Returns `true` if this chunk was new (not a duplicate).
37 fn insert(&mut self, offset: u16, payload: bytes::Bytes) -> bool {
38 use std::collections::btree_map::Entry;
39 match self.chunks.entry(offset) {
40 Entry::Vacant(e) => {
41 e.insert(payload);
42 true
43 }
44 Entry::Occupied(_) => false,
45 }
46 }
47
48 /// Returns `Some(packet)` when all bytes `[0, total_length)` have been
49 /// received, otherwise `None`.
50 fn try_reassemble(&self) -> Option<Vec<u8>> {
51 // PERFORMANCE: Pre-validate that all chunks are present before
52 // allocating the target vector. If we allocate unconditionally,
53 // every fragment arrival costs a redundant `Vec` allocation
54 // equal to `total_length` that gets immediately discarded.
55 let mut covered_up_to = 0usize;
56 let total = self.total_length as usize;
57
58 for (&offset, chunk) in &self.chunks {
59 let start = offset as usize;
60 if start > covered_up_to {
61 // There is a gap — not complete yet.
62 return None;
63 }
64 let end = start + chunk.len();
65 if end > total {
66 // Chunk extends past declared total — malformed, bail out.
67 return None;
68 }
69 if end > covered_up_to {
70 covered_up_to = end;
71 }
72 }
73
74 if covered_up_to != total {
75 return None;
76 }
77
78 // All bytes present. Perform the final allocation and copy.
79 let mut buf = vec![0u8; total];
80 for (&offset, chunk) in &self.chunks {
81 let start = offset as usize;
82 let end = start + chunk.len();
83 buf[start..end].copy_from_slice(chunk);
84 }
85
86 Some(buf)
87 }
88
89 fn is_expired(&self, timeout: Duration) -> bool {
90 self.created_at.elapsed() >= timeout
91 }
92}
93
94// ── Public API ────────────────────────────────────────────────────────────────
95
96/// Reassembles fragmented mesh payloads.
97///
98/// Call [`Reassembler::insert`] for each received [`FragmentFrame`]. When the
99/// last required fragment arrives the complete packet is returned. Call
100/// [`Reassembler::expire_stale`] periodically (e.g. once per second) to free
101/// memory for timed-out incomplete buffers.
102pub struct Reassembler {
103 timeout: Duration,
104 buffers: HashMap<u32, ReassemblyBuffer>,
105}
106
107impl Reassembler {
108 /// Create a reassembler with the default 10-second timeout.
109 pub fn new() -> Self {
110 Self {
111 timeout: DEFAULT_REASSEMBLY_TIMEOUT,
112 buffers: HashMap::new(),
113 }
114 }
115
116 /// Create a reassembler with a custom timeout (useful in tests).
117 pub fn with_timeout(timeout: Duration) -> Self {
118 Self {
119 timeout,
120 buffers: HashMap::new(),
121 }
122 }
123
124 /// Feed a [`FragmentFrame`] to the reassembler.
125 ///
126 /// Returns `Some(packet)` when the complete packet is ready, otherwise
127 /// `None`. Duplicate fragments are silently ignored.
128 pub fn insert(&mut self, frame: FragmentFrame) -> Option<Vec<u8>> {
129 let buf = self
130 .buffers
131 .entry(frame.fragment_id)
132 .or_insert_with(|| ReassemblyBuffer::new(frame.total_length));
133
134 // If total_length mismatches a previously-seen fragment, ignore.
135 if buf.total_length != frame.total_length {
136 return None;
137 }
138
139 buf.insert(frame.fragment_offset, frame.payload);
140
141 if let Some(packet) = buf.try_reassemble() {
142 self.buffers.remove(&frame.fragment_id);
143 return Some(packet);
144 }
145
146 None
147 }
148
149 /// Discard all reassembly buffers that have been waiting longer than the
150 /// configured timeout.
151 pub fn expire_stale(&mut self) {
152 let timeout = self.timeout;
153 self.buffers.retain(|_, buf| !buf.is_expired(timeout));
154 }
155
156 /// Number of in-progress reassembly buffers currently held.
157 pub fn buffer_count(&self) -> usize {
158 self.buffers.len()
159 }
160}
161
162impl Default for Reassembler {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168// ── Tests ─────────────────────────────────────────────────────────────────────
169
170#[cfg(test)]
171mod tests;