Skip to main content

hdds_micro/transport/lora/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! Packet fragmentation for LoRa transport
5//!
6//! LoRa has a 255-byte maximum packet size. This module provides
7//! fragmentation and reassembly for larger RTPS messages.
8//!
9//! ## Fragment Header Format
10//!
11//! ```text
12//! +--------+--------+--------+--------+
13//! | src_id | msg_seq| frag_id|total_fr|
14//! +--------+--------+--------+--------+
15//!     1B       1B       1B       1B
16//! ```
17//!
18//! - `src_node`: Source node ID (0-255)
19//! - `msg_seq`: Message sequence number (0-255)
20//! - `frag_idx`: Fragment index (0 = first, 255 = single packet)
21//! - `total_frags`: Total fragments (1-255, or 0 for single packet marker)
22
23use crate::error::{Error, Result};
24
25/// Maximum number of fragments we can reassemble
26const MAX_FRAGMENTS: usize = 16;
27
28/// Maximum payload per fragment (255 - 4 header bytes)
29const MAX_FRAGMENT_PAYLOAD: usize = 251;
30
31/// Maximum reassembled message size (16 fragments * 251 bytes payload)
32const MAX_MESSAGE_SIZE: usize = MAX_FRAGMENTS * MAX_FRAGMENT_PAYLOAD;
33
34/// Fragment header size in bytes
35pub const FRAGMENT_HEADER_SIZE: usize = 4;
36
37/// Fragment header
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct FragmentHeader {
40    /// Source node ID
41    pub src_node: u8,
42    /// Message sequence number
43    pub msg_seq: u8,
44    /// Fragment index (255 = single unfragmented packet)
45    pub frag_idx: u8,
46    /// Total number of fragments (0 = single packet)
47    pub total_frags: u8,
48}
49
50impl FragmentHeader {
51    /// Header size in bytes
52    pub const SIZE: usize = FRAGMENT_HEADER_SIZE;
53
54    /// Create header for a single (unfragmented) packet
55    pub const fn single(src_node: u8, msg_seq: u8) -> Self {
56        Self {
57            src_node,
58            msg_seq,
59            frag_idx: 255,
60            total_frags: 0,
61        }
62    }
63
64    /// Create header for a fragment
65    pub const fn fragment(src_node: u8, msg_seq: u8, frag_idx: u8, total_frags: u8) -> Self {
66        Self {
67            src_node,
68            msg_seq,
69            frag_idx,
70            total_frags,
71        }
72    }
73
74    /// Check if this is a single (unfragmented) packet
75    pub const fn is_single(&self) -> bool {
76        self.frag_idx == 255 && self.total_frags == 0
77    }
78
79    /// Check if this is the first fragment
80    pub const fn is_first(&self) -> bool {
81        self.frag_idx == 0
82    }
83
84    /// Check if this is the last fragment
85    pub const fn is_last(&self) -> bool {
86        self.total_frags > 0 && self.frag_idx + 1 == self.total_frags
87    }
88
89    /// Encode header into buffer
90    ///
91    /// # Returns
92    ///
93    /// Number of bytes written (always 4)
94    pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
95        if buf.len() < Self::SIZE {
96            return Err(Error::BufferTooSmall);
97        }
98
99        buf[0] = self.src_node;
100        buf[1] = self.msg_seq;
101        buf[2] = self.frag_idx;
102        buf[3] = self.total_frags;
103
104        Ok(Self::SIZE)
105    }
106
107    /// Decode header from buffer
108    pub fn decode(buf: &[u8]) -> Result<Self> {
109        if buf.len() < Self::SIZE {
110            return Err(Error::BufferTooSmall);
111        }
112
113        Ok(Self {
114            src_node: buf[0],
115            msg_seq: buf[1],
116            frag_idx: buf[2],
117            total_frags: buf[3],
118        })
119    }
120}
121
122/// State of a message being reassembled
123///
124/// Uses fixed-size slots for each fragment, allowing out-of-order arrival.
125/// When complete, data is compacted into a contiguous buffer.
126#[derive(Debug)]
127struct ReassemblyState {
128    /// Source node ID
129    src_node: u8,
130    /// Message sequence number
131    msg_seq: u8,
132    /// Expected total fragments
133    total_frags: u8,
134    /// Received fragment bitmap (up to 16 fragments)
135    received_mask: u16,
136    /// Fragment slots (each can hold up to MAX_FRAGMENT_PAYLOAD bytes)
137    slots: [[u8; MAX_FRAGMENT_PAYLOAD]; MAX_FRAGMENTS],
138    /// Fragment payload sizes (actual data length in each slot)
139    slot_sizes: [u8; MAX_FRAGMENTS],
140    /// Compacted output buffer (filled when complete)
141    output: [u8; MAX_MESSAGE_SIZE],
142    /// Output data length
143    output_len: usize,
144}
145
146impl ReassemblyState {
147    fn new() -> Self {
148        Self {
149            src_node: 0,
150            msg_seq: 0,
151            total_frags: 0,
152            received_mask: 0,
153            slots: [[0u8; MAX_FRAGMENT_PAYLOAD]; MAX_FRAGMENTS],
154            slot_sizes: [0u8; MAX_FRAGMENTS],
155            output: [0u8; MAX_MESSAGE_SIZE],
156            output_len: 0,
157        }
158    }
159
160    fn reset(&mut self, src_node: u8, msg_seq: u8, total_frags: u8) {
161        self.src_node = src_node;
162        self.msg_seq = msg_seq;
163        self.total_frags = total_frags;
164        self.received_mask = 0;
165        self.slot_sizes = [0u8; MAX_FRAGMENTS];
166        self.output_len = 0;
167    }
168
169    fn matches(&self, src_node: u8, msg_seq: u8) -> bool {
170        self.src_node == src_node && self.msg_seq == msg_seq && self.total_frags > 0
171    }
172
173    fn add_fragment(&mut self, frag_idx: u8, payload: &[u8]) -> bool {
174        if frag_idx as usize >= MAX_FRAGMENTS {
175            return false;
176        }
177
178        if payload.len() > MAX_FRAGMENT_PAYLOAD {
179            return false;
180        }
181
182        let mask = 1u16 << frag_idx;
183
184        // Already received this fragment?
185        if self.received_mask & mask != 0 {
186            return self.is_complete();
187        }
188
189        // Store fragment in its slot
190        let slot_idx = frag_idx as usize;
191        self.slots[slot_idx][..payload.len()].copy_from_slice(payload);
192        self.slot_sizes[slot_idx] = payload.len() as u8;
193        self.received_mask |= mask;
194
195        // Check if complete
196        if self.is_complete() {
197            // Compact all fragments into output buffer
198            self.compact();
199            true
200        } else {
201            false
202        }
203    }
204
205    fn is_complete(&self) -> bool {
206        if self.total_frags == 0 {
207            return false;
208        }
209
210        // Check if we have all fragments
211        let expected_mask = (1u16 << self.total_frags) - 1;
212        self.received_mask == expected_mask
213    }
214
215    /// Compact all fragments into contiguous output buffer
216    fn compact(&mut self) {
217        let mut offset = 0;
218        for i in 0..self.total_frags as usize {
219            let size = self.slot_sizes[i] as usize;
220            self.output[offset..offset + size].copy_from_slice(&self.slots[i][..size]);
221            offset += size;
222        }
223        self.output_len = offset;
224    }
225
226    fn get_data(&self) -> &[u8] {
227        &self.output[..self.output_len]
228    }
229}
230
231/// Fragment assembler for incoming packets
232///
233/// Handles reassembly of fragmented RTPS messages.
234/// Uses a simple single-slot cache (one message at a time per source).
235pub struct FragmentAssembler {
236    /// Reassembly states (one per source, simple LRU)
237    states: [ReassemblyState; 4],
238    /// Next slot to use (round-robin)
239    next_slot: usize,
240}
241
242impl FragmentAssembler {
243    /// Create a new fragment assembler
244    pub fn new() -> Self {
245        Self {
246            states: [
247                ReassemblyState::new(),
248                ReassemblyState::new(),
249                ReassemblyState::new(),
250                ReassemblyState::new(),
251            ],
252            next_slot: 0,
253        }
254    }
255
256    /// Add a fragment and return complete message if ready
257    ///
258    /// # Arguments
259    ///
260    /// * `header` - Fragment header
261    /// * `payload` - Fragment payload (without header)
262    ///
263    /// # Returns
264    ///
265    /// Complete message data if all fragments received, None otherwise
266    pub fn add_fragment(
267        &mut self,
268        header: &FragmentHeader,
269        payload: &[u8],
270    ) -> Result<Option<&[u8]>> {
271        if header.is_single() {
272            // Single packet - shouldn't go through assembler
273            return Err(Error::InvalidParameter);
274        }
275
276        if header.total_frags as usize > MAX_FRAGMENTS {
277            return Err(Error::BufferTooSmall);
278        }
279
280        // Find existing state for this message
281        let slot = self.find_or_create_slot(header.src_node, header.msg_seq, header.total_frags);
282
283        // Add fragment to state
284        if self.states[slot].add_fragment(header.frag_idx, payload) {
285            // Complete! Return the data
286            Ok(Some(self.states[slot].get_data()))
287        } else {
288            Ok(None)
289        }
290    }
291
292    /// Find existing slot or create new one
293    fn find_or_create_slot(&mut self, src_node: u8, msg_seq: u8, total_frags: u8) -> usize {
294        // Look for existing state
295        for (i, state) in self.states.iter().enumerate() {
296            if state.matches(src_node, msg_seq) {
297                return i;
298            }
299        }
300
301        // Not found, use next slot (round-robin eviction)
302        let slot = self.next_slot;
303        self.next_slot = (self.next_slot + 1) % self.states.len();
304
305        // Reset slot for new message
306        self.states[slot].reset(src_node, msg_seq, total_frags);
307
308        slot
309    }
310
311    /// Clear all reassembly state
312    pub fn clear(&mut self) {
313        for state in &mut self.states {
314            state.reset(0, 0, 0);
315        }
316        self.next_slot = 0;
317    }
318}
319
320impl Default for FragmentAssembler {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_fragment_header_single() {
332        let header = FragmentHeader::single(42, 5);
333        assert!(header.is_single());
334        assert!(!header.is_first());
335        assert!(!header.is_last());
336    }
337
338    #[test]
339    fn test_fragment_header_fragment() {
340        let header = FragmentHeader::fragment(42, 5, 0, 3);
341        assert!(!header.is_single());
342        assert!(header.is_first());
343        assert!(!header.is_last());
344
345        let header = FragmentHeader::fragment(42, 5, 2, 3);
346        assert!(!header.is_single());
347        assert!(!header.is_first());
348        assert!(header.is_last());
349    }
350
351    #[test]
352    fn test_fragment_header_encode_decode() {
353        let header = FragmentHeader::fragment(42, 5, 1, 3);
354
355        let mut buf = [0u8; 8];
356        let len = header.encode(&mut buf).unwrap();
357        assert_eq!(len, 4);
358
359        let decoded = FragmentHeader::decode(&buf).unwrap();
360        assert_eq!(decoded, header);
361    }
362
363    #[test]
364    fn test_assembler_single_message() {
365        let mut assembler = FragmentAssembler::new();
366
367        // 3 fragments
368        let h0 = FragmentHeader::fragment(1, 0, 0, 3);
369        let h1 = FragmentHeader::fragment(1, 0, 1, 3);
370        let h2 = FragmentHeader::fragment(1, 0, 2, 3);
371
372        let p0 = b"Hello";
373        let p1 = b", ";
374        let p2 = b"World!";
375
376        // Add fragments
377        assert!(assembler.add_fragment(&h0, p0).unwrap().is_none());
378        assert!(assembler.add_fragment(&h1, p1).unwrap().is_none());
379
380        // Last fragment completes the message
381        let result = assembler.add_fragment(&h2, p2).unwrap();
382        assert!(result.is_some());
383
384        let data = result.unwrap();
385        assert_eq!(data, b"Hello, World!");
386    }
387
388    #[test]
389    fn test_assembler_out_of_order() {
390        let mut assembler = FragmentAssembler::new();
391
392        let h0 = FragmentHeader::fragment(1, 0, 0, 3);
393        let h1 = FragmentHeader::fragment(1, 0, 1, 3);
394        let h2 = FragmentHeader::fragment(1, 0, 2, 3);
395
396        // Add out of order: 2, 0, 1
397        assert!(assembler.add_fragment(&h2, b"C").unwrap().is_none());
398        assert!(assembler.add_fragment(&h0, b"A").unwrap().is_none());
399
400        // Last one completes
401        let result = assembler.add_fragment(&h1, b"B").unwrap();
402        assert!(result.is_some());
403        assert_eq!(result.unwrap(), b"ABC");
404    }
405
406    #[test]
407    fn test_assembler_duplicate_fragment() {
408        let mut assembler = FragmentAssembler::new();
409
410        let h0 = FragmentHeader::fragment(1, 0, 0, 2);
411        let h1 = FragmentHeader::fragment(1, 0, 1, 2);
412
413        // Add first fragment twice
414        assert!(assembler.add_fragment(&h0, b"A").unwrap().is_none());
415        assert!(assembler.add_fragment(&h0, b"A").unwrap().is_none());
416
417        // Complete with second fragment
418        let result = assembler.add_fragment(&h1, b"B").unwrap();
419        assert!(result.is_some());
420        assert_eq!(result.unwrap(), b"AB");
421    }
422
423    #[test]
424    fn test_assembler_multiple_messages() {
425        let mut assembler = FragmentAssembler::new();
426
427        // Two different messages from same source
428        let h0_m0 = FragmentHeader::fragment(1, 0, 0, 2);
429        let h1_m0 = FragmentHeader::fragment(1, 0, 1, 2);
430
431        let h0_m1 = FragmentHeader::fragment(1, 1, 0, 2);
432        let h1_m1 = FragmentHeader::fragment(1, 1, 1, 2);
433
434        // Interleave fragments
435        assert!(assembler.add_fragment(&h0_m0, b"A0").unwrap().is_none());
436        assert!(assembler.add_fragment(&h0_m1, b"B0").unwrap().is_none());
437
438        // Complete message 0
439        let result = assembler.add_fragment(&h1_m0, b"A1").unwrap();
440        assert!(result.is_some());
441        assert_eq!(result.unwrap(), b"A0A1");
442
443        // Complete message 1
444        let result = assembler.add_fragment(&h1_m1, b"B1").unwrap();
445        assert!(result.is_some());
446        assert_eq!(result.unwrap(), b"B0B1");
447    }
448
449    #[test]
450    fn test_assembler_clear() {
451        let mut assembler = FragmentAssembler::new();
452
453        let h0 = FragmentHeader::fragment(1, 0, 0, 2);
454        assembler.add_fragment(&h0, b"data").unwrap();
455
456        assembler.clear();
457
458        // After clear, we start fresh
459        let h0 = FragmentHeader::fragment(1, 0, 0, 2);
460        let h1 = FragmentHeader::fragment(1, 0, 1, 2);
461
462        assert!(assembler.add_fragment(&h0, b"X").unwrap().is_none());
463        let result = assembler.add_fragment(&h1, b"Y").unwrap();
464        assert!(result.is_some());
465        assert_eq!(result.unwrap(), b"XY");
466    }
467
468    #[test]
469    fn test_fragment_header_buffer_too_small() {
470        let header = FragmentHeader::single(1, 1);
471        let mut buf = [0u8; 2]; // Too small
472        assert_eq!(header.encode(&mut buf), Err(Error::BufferTooSmall));
473
474        let buf = [0u8; 2]; // Too small
475        assert_eq!(FragmentHeader::decode(&buf), Err(Error::BufferTooSmall));
476    }
477}