pim_protocol/reassembler.rs
1//! Reassembly of fragmented mesh data payloads.
2//!
3//! [`Reassembler`] collects [`FragmentFrame`]s by their `fragment_id`,
4//! reassembles them in order once all bytes have arrived, and discards
5//! incomplete buffers after a configurable timeout.
6
7use std::collections::{BTreeMap, HashMap};
8use std::time::{Duration, Instant};
9
10use crate::fragment_frame::FragmentFrame;
11
12/// Default timeout before an incomplete reassembly buffer is discarded.
13pub const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10);
14
15// ── Internal state ─────────────────────────────────────────────────────────
16
17struct ReassemblyBuffer {
18 /// Total byte length of the original packet.
19 total_length: u16,
20 /// Received chunks keyed by their starting byte offset within the original
21 /// packet. Stored in a `BTreeMap` so we can iterate offsets in order.
22 chunks: BTreeMap<u16, bytes::Bytes>,
23 /// Monotonic timestamp of the first fragment received.
24 created_at: Instant,
25}
26
27impl ReassemblyBuffer {
28 fn new(total_length: u16) -> Self {
29 Self {
30 total_length,
31 chunks: BTreeMap::new(),
32 created_at: Instant::now(),
33 }
34 }
35
36 /// Insert a chunk. Returns `true` if this chunk was new (not a duplicate).
37 fn insert(&mut self, offset: u16, payload: bytes::Bytes) -> bool {
38 use std::collections::btree_map::Entry;
39 match self.chunks.entry(offset) {
40 Entry::Vacant(e) => {
41 e.insert(payload);
42 true
43 }
44 Entry::Occupied(_) => false,
45 }
46 }
47
48 /// Returns `Some(packet)` when all bytes `[0, total_length)` have been
49 /// received, otherwise `None`.
50 fn try_reassemble(&self) -> Option<Vec<u8>> {
51 let mut buf = vec![0u8; self.total_length as usize];
52 let mut covered_up_to = 0usize;
53
54 for (&offset, chunk) in &self.chunks {
55 let start = offset as usize;
56 if start > covered_up_to {
57 // There is a gap — not complete yet.
58 return None;
59 }
60 let end = start + chunk.len();
61 if end > buf.len() {
62 // Chunk extends past declared total — malformed, bail out.
63 return None;
64 }
65 buf[start..end].copy_from_slice(chunk);
66 if end > covered_up_to {
67 covered_up_to = end;
68 }
69 }
70
71 if covered_up_to == self.total_length as usize {
72 Some(buf)
73 } else {
74 None
75 }
76 }
77
78 fn is_expired(&self, timeout: Duration) -> bool {
79 self.created_at.elapsed() >= timeout
80 }
81}
82
83// ── Public API ────────────────────────────────────────────────────────────────
84
85/// Reassembles fragmented mesh payloads.
86///
87/// Call [`Reassembler::insert`] for each received [`FragmentFrame`]. When the
88/// last required fragment arrives the complete packet is returned. Call
89/// [`Reassembler::expire_stale`] periodically (e.g. once per second) to free
90/// memory for timed-out incomplete buffers.
91pub struct Reassembler {
92 timeout: Duration,
93 buffers: HashMap<u32, ReassemblyBuffer>,
94}
95
96impl Reassembler {
97 /// Create a reassembler with the default 10-second timeout.
98 pub fn new() -> Self {
99 Self {
100 timeout: DEFAULT_REASSEMBLY_TIMEOUT,
101 buffers: HashMap::new(),
102 }
103 }
104
105 /// Create a reassembler with a custom timeout (useful in tests).
106 pub fn with_timeout(timeout: Duration) -> Self {
107 Self {
108 timeout,
109 buffers: HashMap::new(),
110 }
111 }
112
113 /// Feed a [`FragmentFrame`] to the reassembler.
114 ///
115 /// Returns `Some(packet)` when the complete packet is ready, otherwise
116 /// `None`. Duplicate fragments are silently ignored.
117 pub fn insert(&mut self, frame: FragmentFrame) -> Option<Vec<u8>> {
118 let buf = self
119 .buffers
120 .entry(frame.fragment_id)
121 .or_insert_with(|| ReassemblyBuffer::new(frame.total_length));
122
123 // If total_length mismatches a previously-seen fragment, ignore.
124 if buf.total_length != frame.total_length {
125 return None;
126 }
127
128 buf.insert(frame.fragment_offset, frame.payload);
129
130 if let Some(packet) = buf.try_reassemble() {
131 self.buffers.remove(&frame.fragment_id);
132 return Some(packet);
133 }
134
135 None
136 }
137
138 /// Discard all reassembly buffers that have been waiting longer than the
139 /// configured timeout.
140 pub fn expire_stale(&mut self) {
141 let timeout = self.timeout;
142 self.buffers.retain(|_, buf| !buf.is_expired(timeout));
143 }
144
145 /// Number of in-progress reassembly buffers currently held.
146 pub fn buffer_count(&self) -> usize {
147 self.buffers.len()
148 }
149}
150
151impl Default for Reassembler {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157// ── Tests ─────────────────────────────────────────────────────────────────────
158
159#[cfg(test)]
160mod tests;