Skip to main content

pim_protocol/
reassembler.rs

1//! Reassembly of fragmented mesh data payloads.
2//!
3//! [`Reassembler`] collects [`FragmentFrame`]s by their `fragment_id`,
4//! reassembles them in order once all bytes have arrived, and discards
5//! incomplete buffers after a configurable timeout.
6
7use std::collections::{BTreeMap, HashMap};
8use std::time::{Duration, Instant};
9
10use crate::fragment_frame::FragmentFrame;
11
12/// Default timeout before an incomplete reassembly buffer is discarded.
13pub const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(10);
14
15// ── Internal state ─────────────────────────────────────────────────────────
16
17struct ReassemblyBuffer {
18    /// Total byte length of the original packet.
19    total_length: u16,
20    /// Received chunks keyed by their starting byte offset within the original
21    /// packet.  Stored in a `BTreeMap` so we can iterate offsets in order.
22    chunks: BTreeMap<u16, bytes::Bytes>,
23    /// Monotonic timestamp of the first fragment received.
24    created_at: Instant,
25}
26
27impl ReassemblyBuffer {
28    fn new(total_length: u16) -> Self {
29        Self {
30            total_length,
31            chunks: BTreeMap::new(),
32            created_at: Instant::now(),
33        }
34    }
35
36    /// Insert a chunk.  Returns `true` if this chunk was new (not a duplicate).
37    fn insert(&mut self, offset: u16, payload: bytes::Bytes) -> bool {
38        use std::collections::btree_map::Entry;
39        match self.chunks.entry(offset) {
40            Entry::Vacant(e) => {
41                e.insert(payload);
42                true
43            }
44            Entry::Occupied(_) => false,
45        }
46    }
47
48    /// Returns `Some(packet)` when all bytes `[0, total_length)` have been
49    /// received, otherwise `None`.
50    fn try_reassemble(&self) -> Option<Vec<u8>> {
51        let mut buf = vec![0u8; self.total_length as usize];
52        let mut covered_up_to = 0usize;
53
54        for (&offset, chunk) in &self.chunks {
55            let start = offset as usize;
56            if start > covered_up_to {
57                // There is a gap — not complete yet.
58                return None;
59            }
60            let end = start + chunk.len();
61            if end > buf.len() {
62                // Chunk extends past declared total — malformed, bail out.
63                return None;
64            }
65            buf[start..end].copy_from_slice(chunk);
66            if end > covered_up_to {
67                covered_up_to = end;
68            }
69        }
70
71        if covered_up_to == self.total_length as usize {
72            Some(buf)
73        } else {
74            None
75        }
76    }
77
78    fn is_expired(&self, timeout: Duration) -> bool {
79        self.created_at.elapsed() >= timeout
80    }
81}
82
83// ── Public API ────────────────────────────────────────────────────────────────
84
85/// Reassembles fragmented mesh payloads.
86///
87/// Call [`Reassembler::insert`] for each received [`FragmentFrame`].  When the
88/// last required fragment arrives the complete packet is returned.  Call
89/// [`Reassembler::expire_stale`] periodically (e.g. once per second) to free
90/// memory for timed-out incomplete buffers.
91pub struct Reassembler {
92    timeout: Duration,
93    buffers: HashMap<u32, ReassemblyBuffer>,
94}
95
96impl Reassembler {
97    /// Create a reassembler with the default 10-second timeout.
98    pub fn new() -> Self {
99        Self {
100            timeout: DEFAULT_REASSEMBLY_TIMEOUT,
101            buffers: HashMap::new(),
102        }
103    }
104
105    /// Create a reassembler with a custom timeout (useful in tests).
106    pub fn with_timeout(timeout: Duration) -> Self {
107        Self {
108            timeout,
109            buffers: HashMap::new(),
110        }
111    }
112
113    /// Feed a [`FragmentFrame`] to the reassembler.
114    ///
115    /// Returns `Some(packet)` when the complete packet is ready, otherwise
116    /// `None`.  Duplicate fragments are silently ignored.
117    pub fn insert(&mut self, frame: FragmentFrame) -> Option<Vec<u8>> {
118        let buf = self
119            .buffers
120            .entry(frame.fragment_id)
121            .or_insert_with(|| ReassemblyBuffer::new(frame.total_length));
122
123        // If total_length mismatches a previously-seen fragment, ignore.
124        if buf.total_length != frame.total_length {
125            return None;
126        }
127
128        buf.insert(frame.fragment_offset, frame.payload);
129
130        if let Some(packet) = buf.try_reassemble() {
131            self.buffers.remove(&frame.fragment_id);
132            return Some(packet);
133        }
134
135        None
136    }
137
138    /// Discard all reassembly buffers that have been waiting longer than the
139    /// configured timeout.
140    pub fn expire_stale(&mut self) {
141        let timeout = self.timeout;
142        self.buffers.retain(|_, buf| !buf.is_expired(timeout));
143    }
144
145    /// Number of in-progress reassembly buffers currently held.
146    pub fn buffer_count(&self) -> usize {
147        self.buffers.len()
148    }
149}
150
151impl Default for Reassembler {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157// ── Tests ─────────────────────────────────────────────────────────────────────
158
159#[cfg(test)]
160mod tests;