Skip to main content

hdds_micro/rtps/
submessages.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2// Copyright (c) 2025-2026 naskel.com
3
4//! RTPS Submessages
5//!
6//! Minimal subset for BEST_EFFORT QoS:
7//! - DATA: User data
8//! - HEARTBEAT: Writer liveliness
9//! - ACKNACK: Reader acknowledgment (optional for RELIABLE)
10
11use super::types::{EntityId, SequenceNumber};
12use crate::error::{Error, Result};
13
14/// Submessage kind (1 byte)
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum SubmessageKind {
18    /// DATA submessage (0x15)
19    Data = 0x15,
20    /// HEARTBEAT submessage (0x07)
21    Heartbeat = 0x07,
22    /// ACKNACK submessage (0x06)
23    AckNack = 0x06,
24}
25
26impl SubmessageKind {
27    /// Parse from byte
28    pub fn from_u8(value: u8) -> Option<Self> {
29        match value {
30            0x15 => Some(Self::Data),
31            0x07 => Some(Self::Heartbeat),
32            0x06 => Some(Self::AckNack),
33            _ => None,
34        }
35    }
36}
37
38// Compile-time assertion to ensure enum discriminants are correct
39const _: () = {
40    assert!(
41        SubmessageKind::Data as u8 == 0x15,
42        "DATA submessage ID must be 0x15"
43    );
44    assert!(
45        SubmessageKind::Heartbeat as u8 == 0x07,
46        "HEARTBEAT submessage ID must be 0x07"
47    );
48    assert!(
49        SubmessageKind::AckNack as u8 == 0x06,
50        "ACKNACK submessage ID must be 0x06"
51    );
52};
53
54/// Submessage flags (1 byte)
55///
56/// Bit 0: Endianness (0 = Big-Endian, 1 = Little-Endian)
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct SubmessageFlags(pub u8);
59
60impl SubmessageFlags {
61    /// Little-endian flag
62    pub const LITTLE_ENDIAN: u8 = 0x01;
63
64    /// Check if little-endian
65    pub const fn is_little_endian(&self) -> bool {
66        self.0 & Self::LITTLE_ENDIAN != 0
67    }
68
69    /// Create flags for little-endian
70    pub const fn little_endian() -> Self {
71        Self(Self::LITTLE_ENDIAN)
72    }
73
74    /// Create flags for big-endian
75    pub const fn big_endian() -> Self {
76        Self(0)
77    }
78}
79
80impl Default for SubmessageFlags {
81    fn default() -> Self {
82        // Default to little-endian (most common)
83        Self::little_endian()
84    }
85}
86
87/// Submessage header (4 bytes)
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct SubmessageHeader {
90    /// Submessage kind
91    pub kind: SubmessageKind,
92    /// Flags
93    pub flags: SubmessageFlags,
94    /// Octets to next header (length of submessage body)
95    pub octets_to_next: u16,
96}
97
98impl SubmessageHeader {
99    /// Size of submessage header in bytes
100    pub const SIZE: usize = 4;
101
102    /// Create a new submessage header
103    pub const fn new(kind: SubmessageKind, flags: SubmessageFlags, octets_to_next: u16) -> Self {
104        Self {
105            kind,
106            flags,
107            octets_to_next,
108        }
109    }
110
111    /// Encode header to bytes (4 bytes)
112    pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
113        if buf.len() < Self::SIZE {
114            return Err(Error::BufferTooSmall);
115        }
116
117        // Use explicit match instead of enum cast to avoid Xtensa LLVM compiler bugs
118        buf[0] = match self.kind {
119            SubmessageKind::Data => 0x15,
120            SubmessageKind::Heartbeat => 0x07,
121            SubmessageKind::AckNack => 0x06,
122        };
123        buf[1] = self.flags.0;
124
125        // Encode octets_to_next (little-endian for simplicity)
126        buf[2] = (self.octets_to_next & 0xff) as u8;
127        buf[3] = ((self.octets_to_next >> 8) & 0xff) as u8;
128
129        Ok(Self::SIZE)
130    }
131
132    /// Decode header from bytes
133    pub fn decode(buf: &[u8]) -> Result<Self> {
134        if buf.len() < Self::SIZE {
135            return Err(Error::BufferTooSmall);
136        }
137
138        let kind = SubmessageKind::from_u8(buf[0]).ok_or(Error::InvalidSubmessage)?;
139        let flags = SubmessageFlags(buf[1]);
140
141        // Decode octets_to_next (always little-endian in header)
142        let octets_to_next = u16::from_le_bytes([buf[2], buf[3]]);
143
144        Ok(Self {
145            kind,
146            flags,
147            octets_to_next,
148        })
149    }
150}
151
152/// DATA submessage
153///
154/// Carries user data from writer to reader.
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub struct Data {
157    /// Reader entity ID
158    pub reader_id: EntityId,
159    /// Writer entity ID
160    pub writer_id: EntityId,
161    /// Sequence number
162    pub writer_sn: SequenceNumber,
163}
164
165impl Data {
166    /// Minimum size (without payload)
167    /// Layout: 4 (submsg header) + 2 (extraFlags) + 2 (octetsToInlineQos) + 4 (readerId) + 4 (writerId) + 8 (seqNum) = 24
168    pub const MIN_SIZE: usize = 24;
169
170    /// Create a new DATA submessage
171    pub const fn new(reader_id: EntityId, writer_id: EntityId, writer_sn: SequenceNumber) -> Self {
172        Self {
173            reader_id,
174            writer_id,
175            writer_sn,
176        }
177    }
178
179    /// Encode DATA submessage (header + fixed fields, no payload)
180    ///
181    /// Encodes per RTPS 2.3 Sec.8.3.7.2:
182    /// - Submessage header: id=0x15, flags=0x05 (LE + data present), octetsToNext
183    /// - extraFlags (2 bytes)
184    /// - octetsToInlineQos = 16 (skip to payload after readerId+writerId+seqNum)
185    /// - readerEntityId (4 bytes)
186    /// - writerEntityId (4 bytes)
187    /// - writerSN (8 bytes as high:i32 + low:u32)
188    ///
189    /// Payload should be appended separately by caller.
190    pub fn encode_header(&self, buf: &mut [u8]) -> Result<usize> {
191        if buf.len() < Self::MIN_SIZE {
192            return Err(Error::BufferTooSmall);
193        }
194
195        // Submessage header (4 bytes)
196        // Flags: 0x05 = bit0 (LE) + bit2 (data present, no key)
197        // Note: octets_to_next will be updated by caller to include payload
198        let header = SubmessageHeader::new(
199            SubmessageKind::Data,
200            SubmessageFlags(0x05), // LE + data present
201            20,                    // fixed fields size (will be updated with payload)
202        );
203        header.encode(&mut buf[0..4])?;
204
205        // Extra flags (2 bytes) - reserved, set to 0
206        buf[4] = 0x00;
207        buf[5] = 0x00;
208
209        // Octets to inline QoS (2 bytes)
210        // Value = 16: offset from here to serialized payload (4+4+8 = entityIds + seqNum)
211        buf[6] = 0x10; // 16 in LE
212        buf[7] = 0x00;
213
214        // Reader entity ID (4 bytes)
215        buf[8..12].copy_from_slice(self.reader_id.as_bytes());
216
217        // Writer entity ID (4 bytes)
218        buf[12..16].copy_from_slice(self.writer_id.as_bytes());
219
220        // Writer sequence number (8 bytes)
221        // RTPS SequenceNumber_t: high (i32) + low (u32) in little-endian
222        let sn = self.writer_sn.value();
223        let sn_high = (sn >> 32) as i32;
224        let sn_low = sn as u32;
225        buf[16..20].copy_from_slice(&sn_high.to_le_bytes());
226        buf[20..24].copy_from_slice(&sn_low.to_le_bytes());
227
228        Ok(Self::MIN_SIZE)
229    }
230
231    /// Decode DATA submessage (header + fixed fields)
232    ///
233    /// Decodes per RTPS 2.3 Sec.8.3.7.2. Uses octetsToInlineQos to find payload.
234    ///
235    /// Returns (Data, payload_offset)
236    pub fn decode(buf: &[u8]) -> Result<(Self, usize)> {
237        if buf.len() < Self::MIN_SIZE {
238            return Err(Error::BufferTooSmall);
239        }
240
241        // Verify submessage header
242        let header = SubmessageHeader::decode(&buf[0..4])?;
243        if header.kind != SubmessageKind::Data {
244            return Err(Error::InvalidSubmessage);
245        }
246
247        // Skip extraFlags (2 bytes at offset 4)
248        // Read octetsToInlineQos (2 bytes at offset 6)
249        let octets_to_inline_qos = u16::from_le_bytes([buf[6], buf[7]]) as usize;
250
251        // Reader entity ID (4 bytes at offset 8)
252        let mut reader_id_bytes = [0u8; 4];
253        reader_id_bytes.copy_from_slice(&buf[8..12]);
254        let reader_id = EntityId::new(reader_id_bytes);
255
256        // Writer entity ID (4 bytes at offset 12)
257        let mut writer_id_bytes = [0u8; 4];
258        writer_id_bytes.copy_from_slice(&buf[12..16]);
259        let writer_id = EntityId::new(writer_id_bytes);
260
261        // Writer sequence number (8 bytes at offset 16)
262        // RTPS SequenceNumber_t: high (i32) + low (u32) in little-endian
263        let sn_high = i32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
264        let sn_low = u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]);
265        let sn_value = ((sn_high as i64) << 32) | (sn_low as i64);
266        let writer_sn = SequenceNumber::new(sn_value);
267
268        let data = Self {
269            reader_id,
270            writer_id,
271            writer_sn,
272        };
273
274        // Payload offset = 8 (submsg header + extraFlags + octetsToInlineQos) + octetsToInlineQos
275        // Standard value: 8 + 16 = 24
276        let payload_offset = 8 + octets_to_inline_qos;
277
278        Ok((data, payload_offset))
279    }
280}
281
282/// HEARTBEAT submessage
283///
284/// Announces writer's available sequence number range.
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub struct Heartbeat {
287    /// Reader entity ID
288    pub reader_id: EntityId,
289    /// Writer entity ID
290    pub writer_id: EntityId,
291    /// First available sequence number
292    pub first_sn: SequenceNumber,
293    /// Last available sequence number
294    pub last_sn: SequenceNumber,
295    /// Heartbeat count
296    pub count: u32,
297}
298
299impl Heartbeat {
300    /// Size of HEARTBEAT submessage
301    pub const SIZE: usize = 32; // 4 (header) + 28 (fixed fields)
302
303    /// Create a new HEARTBEAT submessage
304    pub const fn new(
305        reader_id: EntityId,
306        writer_id: EntityId,
307        first_sn: SequenceNumber,
308        last_sn: SequenceNumber,
309        count: u32,
310    ) -> Self {
311        Self {
312            reader_id,
313            writer_id,
314            first_sn,
315            last_sn,
316            count,
317        }
318    }
319
320    /// Encode HEARTBEAT submessage
321    pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
322        if buf.len() < Self::SIZE {
323            return Err(Error::BufferTooSmall);
324        }
325
326        // Submessage header
327        let header =
328            SubmessageHeader::new(SubmessageKind::Heartbeat, SubmessageFlags::default(), 28);
329        header.encode(&mut buf[0..4])?;
330
331        // Reader entity ID
332        buf[4..8].copy_from_slice(self.reader_id.as_bytes());
333
334        // Writer entity ID
335        buf[8..12].copy_from_slice(self.writer_id.as_bytes());
336
337        // First sequence number (8 bytes)
338        let first_sn_bytes = self.first_sn.value().to_le_bytes();
339        buf[12..20].copy_from_slice(&first_sn_bytes);
340
341        // Last sequence number (8 bytes)
342        let last_sn_bytes = self.last_sn.value().to_le_bytes();
343        buf[20..28].copy_from_slice(&last_sn_bytes);
344
345        // Count (4 bytes)
346        let count_bytes = self.count.to_le_bytes();
347        buf[28..32].copy_from_slice(&count_bytes);
348
349        Ok(Self::SIZE)
350    }
351
352    /// Decode HEARTBEAT submessage
353    pub fn decode(buf: &[u8]) -> Result<Self> {
354        if buf.len() < Self::SIZE {
355            return Err(Error::BufferTooSmall);
356        }
357
358        // Verify submessage header
359        let header = SubmessageHeader::decode(&buf[0..4])?;
360        if header.kind != SubmessageKind::Heartbeat {
361            return Err(Error::InvalidSubmessage);
362        }
363
364        // Reader entity ID
365        let mut reader_id_bytes = [0u8; 4];
366        reader_id_bytes.copy_from_slice(&buf[4..8]);
367        let reader_id = EntityId::new(reader_id_bytes);
368
369        // Writer entity ID
370        let mut writer_id_bytes = [0u8; 4];
371        writer_id_bytes.copy_from_slice(&buf[8..12]);
372        let writer_id = EntityId::new(writer_id_bytes);
373
374        // First sequence number
375        let mut first_sn_bytes = [0u8; 8];
376        first_sn_bytes.copy_from_slice(&buf[12..20]);
377        let first_sn = SequenceNumber::new(i64::from_le_bytes(first_sn_bytes));
378
379        // Last sequence number
380        let mut last_sn_bytes = [0u8; 8];
381        last_sn_bytes.copy_from_slice(&buf[20..28]);
382        let last_sn = SequenceNumber::new(i64::from_le_bytes(last_sn_bytes));
383
384        // Count
385        let mut count_bytes = [0u8; 4];
386        count_bytes.copy_from_slice(&buf[28..32]);
387        let count = u32::from_le_bytes(count_bytes);
388
389        Ok(Self {
390            reader_id,
391            writer_id,
392            first_sn,
393            last_sn,
394            count,
395        })
396    }
397}
398
399/// ACKNACK submessage
400///
401/// Reader acknowledges received samples and requests missing ones.
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub struct AckNack {
404    /// Reader entity ID
405    pub reader_id: EntityId,
406    /// Writer entity ID
407    pub writer_id: EntityId,
408    /// Base sequence number
409    pub reader_sn_state_base: SequenceNumber,
410    /// Count
411    pub count: u32,
412}
413
414impl AckNack {
415    /// Minimum size (without bitmap)
416    pub const MIN_SIZE: usize = 24; // 4 (header) + 20 (fixed fields)
417
418    /// Create a new ACKNACK submessage
419    pub const fn new(
420        reader_id: EntityId,
421        writer_id: EntityId,
422        reader_sn_state_base: SequenceNumber,
423        count: u32,
424    ) -> Self {
425        Self {
426            reader_id,
427            writer_id,
428            reader_sn_state_base,
429            count,
430        }
431    }
432
433    /// Encode ACKNACK submessage (simplified: no bitmap for embedded)
434    pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
435        if buf.len() < Self::MIN_SIZE {
436            return Err(Error::BufferTooSmall);
437        }
438
439        // Submessage header
440        let header = SubmessageHeader::new(SubmessageKind::AckNack, SubmessageFlags::default(), 20);
441        header.encode(&mut buf[0..4])?;
442
443        // Reader entity ID
444        buf[4..8].copy_from_slice(self.reader_id.as_bytes());
445
446        // Writer entity ID
447        buf[8..12].copy_from_slice(self.writer_id.as_bytes());
448
449        // Base sequence number (8 bytes)
450        let base_sn_bytes = self.reader_sn_state_base.value().to_le_bytes();
451        buf[12..20].copy_from_slice(&base_sn_bytes);
452
453        // Count (4 bytes)
454        let count_bytes = self.count.to_le_bytes();
455        buf[20..24].copy_from_slice(&count_bytes);
456
457        Ok(Self::MIN_SIZE)
458    }
459
460    /// Decode ACKNACK submessage
461    pub fn decode(buf: &[u8]) -> Result<Self> {
462        if buf.len() < Self::MIN_SIZE {
463            return Err(Error::BufferTooSmall);
464        }
465
466        // Verify submessage header
467        let header = SubmessageHeader::decode(&buf[0..4])?;
468        if header.kind != SubmessageKind::AckNack {
469            return Err(Error::InvalidSubmessage);
470        }
471
472        // Reader entity ID
473        let mut reader_id_bytes = [0u8; 4];
474        reader_id_bytes.copy_from_slice(&buf[4..8]);
475        let reader_id = EntityId::new(reader_id_bytes);
476
477        // Writer entity ID
478        let mut writer_id_bytes = [0u8; 4];
479        writer_id_bytes.copy_from_slice(&buf[8..12]);
480        let writer_id = EntityId::new(writer_id_bytes);
481
482        // Base sequence number
483        let mut base_sn_bytes = [0u8; 8];
484        base_sn_bytes.copy_from_slice(&buf[12..20]);
485        let reader_sn_state_base = SequenceNumber::new(i64::from_le_bytes(base_sn_bytes));
486
487        // Count
488        let mut count_bytes = [0u8; 4];
489        count_bytes.copy_from_slice(&buf[20..24]);
490        let count = u32::from_le_bytes(count_bytes);
491
492        Ok(Self {
493            reader_id,
494            writer_id,
495            reader_sn_state_base,
496            count,
497        })
498    }
499}
500
501/// Generic submessage enum
502#[derive(Debug, Clone, Copy, PartialEq, Eq)]
503pub enum Submessage {
504    /// DATA submessage
505    Data(Data),
506    /// HEARTBEAT submessage
507    Heartbeat(Heartbeat),
508    /// ACKNACK submessage
509    AckNack(AckNack),
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn test_submessage_header_encode_decode() {
518        let header =
519            SubmessageHeader::new(SubmessageKind::Data, SubmessageFlags::little_endian(), 100);
520
521        let mut buf = [0u8; 16];
522        header.encode(&mut buf).unwrap();
523
524        let decoded = SubmessageHeader::decode(&buf).unwrap();
525        assert_eq!(decoded.kind, SubmessageKind::Data);
526        assert_eq!(decoded.octets_to_next, 100);
527    }
528
529    #[test]
530    fn test_data_encode_decode() {
531        let data = Data::new(
532            EntityId::new([0, 0, 0, 1]),
533            EntityId::new([0, 0, 0, 2]),
534            SequenceNumber::new(42),
535        );
536
537        let mut buf = [0u8; 64];
538        let written = data.encode_header(&mut buf).unwrap();
539        assert_eq!(written, Data::MIN_SIZE);
540
541        let (decoded, offset) = Data::decode(&buf).unwrap();
542        assert_eq!(decoded, data);
543        assert_eq!(offset, Data::MIN_SIZE);
544    }
545
546    #[test]
547    fn test_heartbeat_encode_decode() {
548        let hb = Heartbeat::new(
549            EntityId::new([0, 0, 0, 1]),
550            EntityId::new([0, 0, 0, 2]),
551            SequenceNumber::new(1),
552            SequenceNumber::new(10),
553            5,
554        );
555
556        let mut buf = [0u8; 64];
557        let written = hb.encode(&mut buf).unwrap();
558        assert_eq!(written, Heartbeat::SIZE);
559
560        let decoded = Heartbeat::decode(&buf).unwrap();
561        assert_eq!(decoded, hb);
562    }
563
564    #[test]
565    fn test_acknack_encode_decode() {
566        let acknack = AckNack::new(
567            EntityId::new([0, 0, 0, 1]),
568            EntityId::new([0, 0, 0, 2]),
569            SequenceNumber::new(5),
570            3,
571        );
572
573        let mut buf = [0u8; 64];
574        let written = acknack.encode(&mut buf).unwrap();
575        assert_eq!(written, AckNack::MIN_SIZE);
576
577        let decoded = AckNack::decode(&buf).unwrap();
578        assert_eq!(decoded, acknack);
579    }
580}