Skip to main content

pim_protocol/
reassembler.rs

1//! Reassembly of fragmented mesh data payloads.
2//!
3//! [`Reassembler`] collects [`FragmentFrame`]s by their `fragment_id`,
4//! reassembles them in order once all bytes have arrived, and discards
5//! incomplete buffers after a configurable timeout.
6
7use std::collections::{BTreeMap, HashMap};
8use std::time::{Duration, Instant};
9
10use crate::fragment_frame::FragmentFrame;
11
12/// Default timeout before an incomplete reassembly buffer is discarded.
13pub const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10);
14
15// ── Internal state ─────────────────────────────────────────────────────────
16
17struct ReassemblyBuffer {
18    /// Total byte length of the original packet.
19    total_length: u16,
20    /// Received chunks keyed by their starting byte offset within the original
21    /// packet.  Stored in a `BTreeMap` so we can iterate offsets in order.
22    chunks: BTreeMap<u16, bytes::Bytes>,
23    /// Monotonic timestamp of the first fragment received.
24    created_at: Instant,
25}
26
27impl ReassemblyBuffer {
28    fn new(total_length: u16) -> Self {
29        Self {
30            total_length,
31            chunks: BTreeMap::new(),
32            created_at: Instant::now(),
33        }
34    }
35
36    /// Insert a chunk.  Returns `true` if this chunk was new (not a duplicate).
37    fn insert(&mut self, offset: u16, payload: bytes::Bytes) -> bool {
38        use std::collections::btree_map::Entry;
39        match self.chunks.entry(offset) {
40            Entry::Vacant(e) => {
41                e.insert(payload);
42                true
43            }
44            Entry::Occupied(_) => false,
45        }
46    }
47
48    /// Returns `Some(packet)` when all bytes `[0, total_length)` have been
49    /// received, otherwise `None`.
50    fn try_reassemble(&self) -> Option<Vec<u8>> {
51        // PERFORMANCE: Pre-validate that all chunks are present before
52        // allocating the target vector. If we allocate unconditionally,
53        // every fragment arrival costs a redundant `Vec` allocation
54        // equal to `total_length` that gets immediately discarded.
55        let mut covered_up_to = 0usize;
56        let total = self.total_length as usize;
57
58        for (&offset, chunk) in &self.chunks {
59            let start = offset as usize;
60            if start > covered_up_to {
61                // There is a gap — not complete yet.
62                return None;
63            }
64            let end = start + chunk.len();
65            if end > total {
66                // Chunk extends past declared total — malformed, bail out.
67                return None;
68            }
69            if end > covered_up_to {
70                covered_up_to = end;
71            }
72        }
73
74        if covered_up_to != total {
75            return None;
76        }
77
78        // All bytes present. Perform the final allocation and copy.
79        let mut buf = vec![0u8; total];
80        for (&offset, chunk) in &self.chunks {
81            let start = offset as usize;
82            let end = start + chunk.len();
83            buf[start..end].copy_from_slice(chunk);
84        }
85
86        Some(buf)
87    }
88
89    fn is_expired(&self, timeout: Duration) -> bool {
90        self.created_at.elapsed() >= timeout
91    }
92}
93
94// ── Public API ────────────────────────────────────────────────────────────────
95
96/// Reassembles fragmented mesh payloads.
97///
98/// Call [`Reassembler::insert`] for each received [`FragmentFrame`].  When the
99/// last required fragment arrives the complete packet is returned.  Call
100/// [`Reassembler::expire_stale`] periodically (e.g. once per second) to free
101/// memory for timed-out incomplete buffers.
102pub struct Reassembler {
103    timeout: Duration,
104    buffers: HashMap<u32, ReassemblyBuffer>,
105}
106
107impl Reassembler {
108    /// Create a reassembler with the default 10-second timeout.
109    pub fn new() -> Self {
110        Self {
111            timeout: DEFAULT_REASSEMBLY_TIMEOUT,
112            buffers: HashMap::new(),
113        }
114    }
115
116    /// Create a reassembler with a custom timeout (useful in tests).
117    pub fn with_timeout(timeout: Duration) -> Self {
118        Self {
119            timeout,
120            buffers: HashMap::new(),
121        }
122    }
123
124    /// Feed a [`FragmentFrame`] to the reassembler.
125    ///
126    /// Returns `Some(packet)` when the complete packet is ready, otherwise
127    /// `None`.  Duplicate fragments are silently ignored.
128    pub fn insert(&mut self, frame: FragmentFrame) -> Option<Vec<u8>> {
129        let buf = self
130            .buffers
131            .entry(frame.fragment_id)
132            .or_insert_with(|| ReassemblyBuffer::new(frame.total_length));
133
134        // If total_length mismatches a previously-seen fragment, ignore.
135        if buf.total_length != frame.total_length {
136            return None;
137        }
138
139        buf.insert(frame.fragment_offset, frame.payload);
140
141        if let Some(packet) = buf.try_reassemble() {
142            self.buffers.remove(&frame.fragment_id);
143            return Some(packet);
144        }
145
146        None
147    }
148
149    /// Discard all reassembly buffers that have been waiting longer than the
150    /// configured timeout.
151    pub fn expire_stale(&mut self) {
152        let timeout = self.timeout;
153        self.buffers.retain(|_, buf| !buf.is_expired(timeout));
154    }
155
156    /// Number of in-progress reassembly buffers currently held.
157    pub fn buffer_count(&self) -> usize {
158        self.buffers.len()
159    }
160}
161
162impl Default for Reassembler {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168// ── Tests ─────────────────────────────────────────────────────────────────────
169
170#[cfg(test)]
171mod tests;