Skip to main content

zerodds_rtps/
datagram.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Datagram-Encoder/-Decoder: kombiniert RTPS-Header und Submessages
4//! zu einem fertigen Wire-Datagram (W4).
5
6extern crate alloc;
7use alloc::vec::Vec;
8
9use crate::error::WireError;
10use crate::header::RtpsHeader;
11use crate::header_extension::{HeaderExtension, SUBMESSAGE_ID_HEADER_EXTENSION};
12use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
13use crate::submessages::{
14    ACKNACK_FLAG_FINAL, AckNackSubmessage, DATA_FRAG_FLAG_HASH_KEY, DATA_FRAG_FLAG_INLINE_QOS,
15    DATA_FRAG_FLAG_KEY, DATA_FRAG_FLAG_NON_STANDARD, DataFragSubmessage, DataSubmessage,
16    GAP_FLAG_FILTERED_COUNT, GAP_FLAG_GROUP_INFO, GapSubmessage, HEARTBEAT_FLAG_FINAL,
17    HEARTBEAT_FLAG_GROUP_INFO, HEARTBEAT_FLAG_LIVELINESS, HeartbeatFragSubmessage,
18    HeartbeatSubmessage, INFO_REPLY_FLAG_MULTICAST, INFO_TIMESTAMP_FLAG_INVALIDATE,
19    InfoReplySubmessage, InfoSourceSubmessage, InfoTimestampSubmessage, NackFragSubmessage,
20};
21
22/// Encoded ein RTPS-Datagram = `RtpsHeader` + Sequenz von `DATA`-
23/// Submessages. Variante: alle Submessages sind LE; ein
24/// Datagram traegt eine Liste DATA-Bodies.
25pub fn encode_data_datagram(
26    header: RtpsHeader,
27    data_submessages: &[DataSubmessage],
28) -> Result<Vec<u8>, WireError> {
29    let mut out = Vec::new();
30    out.extend_from_slice(&header.to_bytes());
31    for d in data_submessages {
32        let (body, flags) = d.write_body(true);
33        let body_len = u16::try_from(body.len()).map_err(|_| WireError::ValueOutOfRange {
34            message: "DATA submessage body exceeds u16::MAX",
35        })?;
36        let sh = SubmessageHeader {
37            submessage_id: SubmessageId::Data,
38            flags,
39            octets_to_next_header: body_len,
40        };
41        out.extend_from_slice(&sh.to_bytes());
42        out.extend_from_slice(&body);
43    }
44    Ok(out)
45}
46
47/// Geparstes Datagram: Header + alle erkannten Submessages.
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct ParsedDatagram {
50    /// RTPS-Header.
51    pub header: RtpsHeader,
52    /// Alle erkannten Submessages in Reihenfolge.
53    pub submessages: Vec<ParsedSubmessage>,
54}
55
56/// Eine erkannte Submessage. unterstützt DATA/HEARTBEAT/ACKNACK/GAP/DATA_FRAG/HEARTBEAT_FRAG/NACK_FRAG/INFO_*; andere werden via `octets_to_next_header`
57/// uebersprungen und als [`ParsedSubmessage::Unknown`] gemerkt.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum ParsedSubmessage {
60    /// DATA-Submessage.
61    Data(DataSubmessage),
62    /// DATA_FRAG-Submessage (Fragmentation).
63    DataFrag(DataFragSubmessage),
64    /// HEARTBEAT-Submessage.
65    Heartbeat(HeartbeatSubmessage),
66    /// HEARTBEAT_FRAG-Submessage.
67    HeartbeatFrag(HeartbeatFragSubmessage),
68    /// ACKNACK-Submessage.
69    AckNack(AckNackSubmessage),
70    /// NACK_FRAG-Submessage.
71    NackFrag(NackFragSubmessage),
72    /// GAP-Submessage.
73    Gap(GapSubmessage),
74    /// HeaderExtension-Submessage (DDSI-RTPS 2.5 §8.3.3.2).
75    HeaderExtension(HeaderExtension),
76    /// InfoSource-Submessage (§8.3.8.9.4).
77    InfoSource(InfoSourceSubmessage),
78    /// InfoReply-Submessage (§8.3.8.10.4).
79    InfoReply(InfoReplySubmessage),
80    /// InfoTimestamp-Submessage (§8.3.8.5 / §8.3.7.5).
81    InfoTimestamp(InfoTimestampSubmessage),
82    /// Andere Submessage-Klasse (geskippt). Traegt id + flags fuer
83    /// Diagnostik.
84    Unknown {
85        /// Submessage-ID-Byte.
86        id: u8,
87        /// Flag-Byte.
88        flags: u8,
89    },
90}
91
92/// Submessage-Header Must-Understand-Bit (Bit 7 des Flag-Bytes,
93/// DDSI-RTPS 2.5 §8.3.3.2). Bei unbekannter Submessage-ID + gesetztem
94/// Bit MUSS die ganze RTPS-Message verworfen werden.
95pub const SUBMESSAGE_FLAG_MUST_UNDERSTAND: u8 = 0x80;
96
97/// Decoded ein RTPS-Datagram in Header + Submessage-Liste.
98///
99/// `octets_to_next_header == 0` (Last-Submessage-Marker, Spec §8.3.4.2)
100/// wird so behandelt: die Submessage erstreckt sich bis zum Ende des
101/// Datagrams.
102///
103/// # Errors
104/// `InvalidMagic`, `UnexpectedEof`, oder Sub-Decoder-Fehler. Unbekannte
105/// Submessage-IDs werden als `Unknown` markiert (kein Fehler).
106pub fn decode_datagram(bytes: &[u8]) -> Result<ParsedDatagram, WireError> {
107    let header = RtpsHeader::from_bytes(bytes)?;
108    let mut pos = RtpsHeader::WIRE_SIZE;
109    let mut submessages = Vec::new();
110
111    while pos < bytes.len() {
112        if bytes.len() < pos + SubmessageHeader::WIRE_SIZE {
113            return Err(WireError::UnexpectedEof {
114                needed: SubmessageHeader::WIRE_SIZE,
115                offset: pos,
116            });
117        }
118        // Wir lesen den Submessage-Header zuerst; bei Unknown-ID
119        // resilient skippen.
120        let id_byte = bytes[pos];
121        let flags = bytes[pos + 1];
122        let mut len_bytes = [0u8; 2];
123        len_bytes.copy_from_slice(&bytes[pos + 2..pos + 4]);
124        let little_endian = (flags & FLAG_E_LITTLE_ENDIAN) != 0;
125        let octets = if little_endian {
126            u16::from_le_bytes(len_bytes)
127        } else {
128            u16::from_be_bytes(len_bytes)
129        };
130        let body_start = pos + SubmessageHeader::WIRE_SIZE;
131        let body_end = if octets == 0 {
132            // Last-submessage marker: bis Ende des Datagrams.
133            bytes.len()
134        } else {
135            body_start + octets as usize
136        };
137        if body_end > bytes.len() {
138            return Err(WireError::UnexpectedEof {
139                needed: body_end - bytes.len(),
140                offset: body_start,
141            });
142        }
143        let body = &bytes[body_start..body_end];
144        let sub = match SubmessageId::from_u8(id_byte) {
145            Ok(SubmessageId::Data) => {
146                let d = DataSubmessage::read_body_with_flags(body, little_endian, flags)?;
147                if let Some(pl) = &d.inline_qos {
148                    pl.validate_must_understand_in_data_pipeline()?;
149                }
150                ParsedSubmessage::Data(d)
151            }
152            Ok(SubmessageId::Heartbeat) => {
153                let final_flag = (flags & HEARTBEAT_FLAG_FINAL) != 0;
154                let liveliness_flag = (flags & HEARTBEAT_FLAG_LIVELINESS) != 0;
155                let group_info_flag = (flags & HEARTBEAT_FLAG_GROUP_INFO) != 0;
156                ParsedSubmessage::Heartbeat(HeartbeatSubmessage::read_body(
157                    body,
158                    little_endian,
159                    final_flag,
160                    liveliness_flag,
161                    group_info_flag,
162                )?)
163            }
164            Ok(SubmessageId::AckNack) => {
165                let final_flag = (flags & ACKNACK_FLAG_FINAL) != 0;
166                ParsedSubmessage::AckNack(AckNackSubmessage::read_body(
167                    body,
168                    little_endian,
169                    final_flag,
170                )?)
171            }
172            Ok(SubmessageId::Gap) => {
173                let group_info_flag = (flags & GAP_FLAG_GROUP_INFO) != 0;
174                let filtered_count_flag = (flags & GAP_FLAG_FILTERED_COUNT) != 0;
175                ParsedSubmessage::Gap(GapSubmessage::read_body(
176                    body,
177                    little_endian,
178                    group_info_flag,
179                    filtered_count_flag,
180                )?)
181            }
182            Ok(SubmessageId::DataFrag) => {
183                let inline_qos = (flags & DATA_FRAG_FLAG_INLINE_QOS) != 0;
184                let hash_key = (flags & DATA_FRAG_FLAG_HASH_KEY) != 0;
185                let key = (flags & DATA_FRAG_FLAG_KEY) != 0;
186                let non_standard = (flags & DATA_FRAG_FLAG_NON_STANDARD) != 0;
187                ParsedSubmessage::DataFrag(DataFragSubmessage::read_body(
188                    body,
189                    little_endian,
190                    inline_qos,
191                    hash_key,
192                    key,
193                    non_standard,
194                )?)
195            }
196            Ok(SubmessageId::HeartbeatFrag) => ParsedSubmessage::HeartbeatFrag(
197                HeartbeatFragSubmessage::read_body(body, little_endian)?,
198            ),
199            Ok(SubmessageId::NackFrag) => {
200                ParsedSubmessage::NackFrag(NackFragSubmessage::read_body(body, little_endian)?)
201            }
202            Ok(SubmessageId::InfoSrc) => {
203                ParsedSubmessage::InfoSource(InfoSourceSubmessage::read_body(body, little_endian)?)
204            }
205            Ok(SubmessageId::InfoTs) => {
206                let invalidate = (flags & INFO_TIMESTAMP_FLAG_INVALIDATE) != 0;
207                ParsedSubmessage::InfoTimestamp(InfoTimestampSubmessage::read_body(
208                    body,
209                    little_endian,
210                    invalidate,
211                )?)
212            }
213            Ok(SubmessageId::InfoReply) => {
214                let multicast_flag = (flags & INFO_REPLY_FLAG_MULTICAST) != 0;
215                ParsedSubmessage::InfoReply(InfoReplySubmessage::read_body(
216                    body,
217                    little_endian,
218                    multicast_flag,
219                )?)
220            }
221            // HeaderExtension (SubmessageId 0x80, ausserhalb der
222            // Enum-Range — wir matchen explizit ueber das
223            // ID-Byte). Spec §8.3.7.3: HE MUSS direkt nach dem Header
224            // stehen (also als erste Submessage). Anderenfalls reject.
225            //
226            // Vendor-Compat: nur RTPS >= 2.5 hat 0x80 als HE definiert.
227            // Bei aelteren Vendoren (z.B. Cyclone-2.1, FastDDS-2.x mit
228            // protocol_version=2.1) faellt 0x80 in den Vendor-Specific-
229            // Range [0x80, 0xFF] (Spec §8.3.3.2). Solche Submessages
230            // werden — sofern Must-Understand-Flag nicht gesetzt —
231            // einfach als `Unknown` markiert und uebersprungen.
232            Ok(_) | Err(WireError::UnknownSubmessageId { .. })
233                if id_byte == SUBMESSAGE_ID_HEADER_EXTENSION
234                    && header.protocol_version
235                        >= crate::wire_types::ProtocolVersion { major: 2, minor: 5 } =>
236            {
237                if !submessages.is_empty() {
238                    return Err(WireError::ValueOutOfRange {
239                        message: "HeaderExtension must appear directly after the RTPS header",
240                    });
241                }
242                let he = HeaderExtension::decode_body(body, flags)?;
243                if let Some(pl) = &he.parameters {
244                    pl.validate_must_understand_in_data_pipeline()?;
245                }
246                ParsedSubmessage::HeaderExtension(he)
247            }
248            // Unbekannte Submessage-ID + Must-Understand-Bit:
249            // SPEC §8.3.3.2 / §9.4.5.1 — ganze RTPS-Message
250            // verwerfen.
251            Ok(_) | Err(WireError::UnknownSubmessageId { .. })
252                if (flags & SUBMESSAGE_FLAG_MUST_UNDERSTAND) != 0 =>
253            {
254                return Err(WireError::ValueOutOfRange {
255                    message: "Unknown submessage id with must-understand flag",
256                });
257            }
258            // Andere bekannte Submessage-IDs ohne Decoder
259            // (PAD, InfoTs, …): skippen und als Unknown markieren.
260            Ok(_) | Err(WireError::UnknownSubmessageId { .. }) => {
261                ParsedSubmessage::Unknown { id: id_byte, flags }
262            }
263            Err(other) => return Err(other),
264        };
265        submessages.push(sub);
266        pos = body_end;
267    }
268
269    Ok(ParsedDatagram {
270        header,
271        submessages,
272    })
273}
274
275#[cfg(test)]
276mod tests {
277    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
278    use super::*;
279    use crate::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
280    use alloc::vec;
281
282    fn header() -> RtpsHeader {
283        RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]))
284    }
285
286    fn data_msg(sn: i64, payload: &[u8]) -> DataSubmessage {
287        DataSubmessage {
288            extra_flags: 0,
289            reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
290            writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
291            writer_sn: SequenceNumber(sn),
292            inline_qos: None,
293            key_flag: false,
294            non_standard_flag: false,
295            serialized_payload: alloc::sync::Arc::from(payload),
296        }
297    }
298
299    #[test]
300    fn encode_decode_single_data_datagram() {
301        let h = header();
302        let d = data_msg(1, b"hello");
303        let bytes = encode_data_datagram(h, &[d.clone()]).unwrap();
304        let parsed = decode_datagram(&bytes).unwrap();
305        assert_eq!(parsed.header, h);
306        assert_eq!(parsed.submessages.len(), 1);
307        match &parsed.submessages[0] {
308            ParsedSubmessage::Data(decoded) => assert_eq!(decoded, &d),
309            other => panic!("expected Data, got {other:?}"),
310        }
311    }
312
313    #[test]
314    fn encode_decode_two_data_submessages() {
315        let h = header();
316        let d1 = data_msg(1, b"first");
317        let d2 = data_msg(2, b"second-payload");
318        let bytes = encode_data_datagram(h, &[d1.clone(), d2.clone()]).unwrap();
319        let parsed = decode_datagram(&bytes).unwrap();
320        assert_eq!(parsed.submessages.len(), 2);
321        match (&parsed.submessages[0], &parsed.submessages[1]) {
322            (ParsedSubmessage::Data(a), ParsedSubmessage::Data(b)) => {
323                assert_eq!(a, &d1);
324                assert_eq!(b, &d2);
325            }
326            other => panic!("unexpected: {other:?}"),
327        }
328    }
329
330    #[test]
331    fn encode_decode_empty_payload() {
332        let h = header();
333        let d = data_msg(42, b"");
334        let bytes = encode_data_datagram(h, &[d.clone()]).unwrap();
335        let parsed = decode_datagram(&bytes).unwrap();
336        assert_eq!(parsed.submessages.len(), 1);
337        match &parsed.submessages[0] {
338            ParsedSubmessage::Data(decoded) => {
339                assert!(decoded.serialized_payload.is_empty());
340                assert_eq!(decoded.writer_sn, SequenceNumber(42));
341            }
342            other => panic!("expected Data, got {other:?}"),
343        }
344    }
345
346    #[test]
347    fn decode_rejects_invalid_magic() {
348        let mut bytes = vec![0u8; 32];
349        bytes[..4].copy_from_slice(b"XXXX");
350        let res = decode_datagram(&bytes);
351        assert!(matches!(res, Err(WireError::InvalidMagic { .. })));
352    }
353
354    #[test]
355    fn decode_handles_last_submessage_zero_length() {
356        // Manuell konstruieren: Header + DATA-SH mit octets=0
357        // Body ist DATA-Format (mind. 20 byte + payload).
358        let h = header();
359        let mut bytes = h.to_bytes().to_vec();
360        let d = data_msg(7, b"X");
361        let (body, flags) = d.write_body(true);
362        // Submessage-Header mit octets=0 (last-marker)
363        let sh = SubmessageHeader {
364            submessage_id: SubmessageId::Data,
365            flags,
366            octets_to_next_header: 0,
367        };
368        bytes.extend_from_slice(&sh.to_bytes());
369        bytes.extend_from_slice(&body);
370        let parsed = decode_datagram(&bytes).unwrap();
371        match &parsed.submessages[0] {
372            ParsedSubmessage::Data(decoded) => {
373                assert_eq!(decoded, &d);
374            }
375            other => panic!("expected Data, got {other:?}"),
376        }
377    }
378
379    #[test]
380    fn decode_marks_unknown_submessage_id_without_failing() {
381        // Header + Sub-Header mit ID 0x01 (PAD-Submessage). Wir nutzen
382        // PAD als Stand-In, weil InfoTs jetzt typed dekodiert wird (R3).
383        let h = header();
384        let mut bytes = h.to_bytes().to_vec();
385        let body = [0u8; 0]; // PAD hat keinen Body
386        let sh = SubmessageHeader {
387            submessage_id: SubmessageId::Pad,
388            flags: FLAG_E_LITTLE_ENDIAN,
389            octets_to_next_header: body.len() as u16,
390        };
391        bytes.extend_from_slice(&sh.to_bytes());
392        bytes.extend_from_slice(&body);
393        let parsed = decode_datagram(&bytes).unwrap();
394        assert_eq!(parsed.submessages.len(), 1);
395        match &parsed.submessages[0] {
396            ParsedSubmessage::Unknown { id, flags } => {
397                assert_eq!(*id, 0x01);
398                assert_eq!(*flags, FLAG_E_LITTLE_ENDIAN);
399            }
400            other => panic!("expected Unknown, got {other:?}"),
401        }
402    }
403
404    #[test]
405    fn decode_heartbeat_preserves_final_and_liveliness_flags() {
406        // Regression fuer WP-1.1-Finding: F/L-Flag aus Submessage-Header
407        // muss in HeartbeatSubmessage verfuegbar sein.
408        let h = header();
409        let hb = HeartbeatSubmessage {
410            reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
411            writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
412            first_sn: SequenceNumber(1),
413            last_sn: SequenceNumber(7),
414            count: 42,
415            final_flag: true,
416            liveliness_flag: true,
417            group_info: None,
418        };
419        let (body, flags) = hb.write_body(true);
420        let mut bytes = h.to_bytes().to_vec();
421        let sh = SubmessageHeader {
422            submessage_id: SubmessageId::Heartbeat,
423            flags,
424            octets_to_next_header: body.len() as u16,
425        };
426        bytes.extend_from_slice(&sh.to_bytes());
427        bytes.extend_from_slice(&body);
428        let parsed = decode_datagram(&bytes).unwrap();
429        match &parsed.submessages[0] {
430            ParsedSubmessage::Heartbeat(decoded) => {
431                assert_eq!(decoded, &hb);
432                assert!(decoded.final_flag);
433                assert!(decoded.liveliness_flag);
434            }
435            other => panic!("expected Heartbeat, got {other:?}"),
436        }
437    }
438
439    #[test]
440    fn decode_acknack_preserves_final_flag() {
441        let h = header();
442        let ack = AckNackSubmessage {
443            reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
444            writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
445            reader_sn_state: crate::submessages::SequenceNumberSet {
446                bitmap_base: SequenceNumber(1),
447                num_bits: 0,
448                bitmap: vec![],
449            },
450            count: 3,
451            final_flag: true,
452        };
453        let (body, flags) = ack.write_body(true);
454        let mut bytes = h.to_bytes().to_vec();
455        let sh = SubmessageHeader {
456            submessage_id: SubmessageId::AckNack,
457            flags,
458            octets_to_next_header: body.len() as u16,
459        };
460        bytes.extend_from_slice(&sh.to_bytes());
461        bytes.extend_from_slice(&body);
462        let parsed = decode_datagram(&bytes).unwrap();
463        match &parsed.submessages[0] {
464            ParsedSubmessage::AckNack(decoded) => {
465                assert_eq!(decoded, &ack);
466                assert!(decoded.final_flag);
467            }
468            other => panic!("expected AckNack, got {other:?}"),
469        }
470    }
471
472    #[test]
473    fn decode_data_frag_preserves_flags_and_payload() {
474        let h = header();
475        let df = DataFragSubmessage {
476            extra_flags: 0,
477            reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
478            writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
479            writer_sn: SequenceNumber(7),
480            fragment_starting_num: crate::wire_types::FragmentNumber(1),
481            fragments_in_submessage: 1,
482            fragment_size: 4,
483            sample_size: 12,
484            serialized_payload: alloc::sync::Arc::<[u8]>::from([0xAA, 0xBB, 0xCC, 0xDD].as_slice()),
485            inline_qos_flag: false,
486            hash_key_flag: true,
487            key_flag: false,
488            non_standard_flag: false,
489        };
490        let (body, flags) = df.write_body(true);
491        let mut bytes = h.to_bytes().to_vec();
492        let sh = SubmessageHeader {
493            submessage_id: SubmessageId::DataFrag,
494            flags,
495            octets_to_next_header: body.len() as u16,
496        };
497        bytes.extend_from_slice(&sh.to_bytes());
498        bytes.extend_from_slice(&body);
499        let parsed = decode_datagram(&bytes).unwrap();
500        match &parsed.submessages[0] {
501            ParsedSubmessage::DataFrag(decoded) => {
502                assert_eq!(decoded, &df);
503                assert!(decoded.hash_key_flag);
504                assert!(!decoded.inline_qos_flag);
505            }
506            other => panic!("expected DataFrag, got {other:?}"),
507        }
508    }
509
510    #[test]
511    fn decode_heartbeat_frag_roundtrip() {
512        let h = header();
513        let hf = HeartbeatFragSubmessage {
514            reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
515            writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
516            writer_sn: SequenceNumber(42),
517            last_fragment_num: crate::wire_types::FragmentNumber(8),
518            count: 3,
519        };
520        let (body, flags) = hf.write_body(true);
521        let mut bytes = h.to_bytes().to_vec();
522        let sh = SubmessageHeader {
523            submessage_id: SubmessageId::HeartbeatFrag,
524            flags,
525            octets_to_next_header: body.len() as u16,
526        };
527        bytes.extend_from_slice(&sh.to_bytes());
528        bytes.extend_from_slice(&body);
529        let parsed = decode_datagram(&bytes).unwrap();
530        match &parsed.submessages[0] {
531            ParsedSubmessage::HeartbeatFrag(decoded) => assert_eq!(decoded, &hf),
532            other => panic!("expected HeartbeatFrag, got {other:?}"),
533        }
534    }
535
536    #[test]
537    fn decode_nack_frag_roundtrip() {
538        let h = header();
539        let nf = NackFragSubmessage {
540            reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
541            writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
542            writer_sn: SequenceNumber(5),
543            fragment_number_state: crate::submessages::FragmentNumberSet {
544                bitmap_base: crate::wire_types::FragmentNumber(1),
545                num_bits: 4,
546                bitmap: vec![0b1010_0000_0000_0000_0000_0000_0000_0000],
547            },
548            count: 1,
549        };
550        let (body, flags) = nf.write_body(true);
551        let mut bytes = h.to_bytes().to_vec();
552        let sh = SubmessageHeader {
553            submessage_id: SubmessageId::NackFrag,
554            flags,
555            octets_to_next_header: body.len() as u16,
556        };
557        bytes.extend_from_slice(&sh.to_bytes());
558        bytes.extend_from_slice(&body);
559        let parsed = decode_datagram(&bytes).unwrap();
560        match &parsed.submessages[0] {
561            ParsedSubmessage::NackFrag(decoded) => assert_eq!(decoded, &nf),
562            other => panic!("expected NackFrag, got {other:?}"),
563        }
564    }
565
566    // ---- WP 1.E Stufe-E/F: InfoSource + InfoReply via Datagram ----
567
568    #[test]
569    fn decode_info_source_via_datagram() {
570        use crate::wire_types::{GuidPrefix, ProtocolVersion as PV, VendorId};
571        let h = header();
572        let info = InfoSourceSubmessage {
573            unused: 0,
574            protocol_version: PV::V2_5,
575            vendor_id: VendorId([0xAB, 0xCD]),
576            guid_prefix: GuidPrefix::from_bytes([3; 12]),
577        };
578        let (body, flags) = info.write_body(true);
579        let mut bytes = h.to_bytes().to_vec();
580        let sh = SubmessageHeader {
581            submessage_id: SubmessageId::InfoSrc,
582            flags,
583            octets_to_next_header: body.len() as u16,
584        };
585        bytes.extend_from_slice(&sh.to_bytes());
586        bytes.extend_from_slice(&body);
587        let parsed = decode_datagram(&bytes).unwrap();
588        match &parsed.submessages[0] {
589            ParsedSubmessage::InfoSource(decoded) => assert_eq!(decoded, &info),
590            other => panic!("expected InfoSource, got {other:?}"),
591        }
592    }
593
594    #[test]
595    fn decode_info_reply_with_multicast_via_datagram() {
596        use crate::wire_types::Locator;
597        let h = header();
598        let info = InfoReplySubmessage {
599            unicast_locators: alloc::vec![Locator::udp_v4([10, 1, 2, 3], 7411)],
600            multicast_locators: Some(alloc::vec![Locator::udp_v4([239, 255, 0, 1], 7400)]),
601        };
602        let (body, flags) = info.write_body(true);
603        let mut bytes = h.to_bytes().to_vec();
604        let sh = SubmessageHeader {
605            submessage_id: SubmessageId::InfoReply,
606            flags,
607            octets_to_next_header: body.len() as u16,
608        };
609        bytes.extend_from_slice(&sh.to_bytes());
610        bytes.extend_from_slice(&body);
611        let parsed = decode_datagram(&bytes).unwrap();
612        match &parsed.submessages[0] {
613            ParsedSubmessage::InfoReply(decoded) => assert_eq!(decoded, &info),
614            other => panic!("expected InfoReply, got {other:?}"),
615        }
616    }
617
618    #[test]
619    fn decode_rejects_truncated_after_header() {
620        let h = header();
621        let mut bytes = h.to_bytes().to_vec();
622        bytes.extend_from_slice(&[0u8, 0, 0]); // nur 3 Byte Sub-Header statt 4
623        let res = decode_datagram(&bytes);
624        assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
625    }
626
627    #[test]
628    fn decode_header_extension_in_datagram() {
629        let h = header();
630        let he = crate::header_extension::HeaderExtension {
631            little_endian: true,
632            message_length: Some(123),
633            timestamp: Some(crate::header_extension::HeTimestamp {
634                seconds: 1,
635                fraction: 2,
636            }),
637            checksum: crate::header_extension::ChecksumValue::Crc32c(0xDEAD_BEEF),
638            ..crate::header_extension::HeaderExtension::default()
639        };
640        let mut bytes = h.to_bytes().to_vec();
641        bytes.extend_from_slice(&he.encode().unwrap());
642        let parsed = decode_datagram(&bytes).unwrap();
643        assert_eq!(parsed.submessages.len(), 1);
644        match &parsed.submessages[0] {
645            ParsedSubmessage::HeaderExtension(decoded) => assert_eq!(decoded, &he),
646            other => panic!("expected HE, got {other:?}"),
647        }
648    }
649
650    #[test]
651    fn decode_rejects_unknown_submessage_with_must_understand() {
652        // Submessage-ID 0x7E mit Must-Understand-Bit (0x80 im flag-byte)
653        // → ganze Message verwerfen.
654        let h = header();
655        let mut bytes = h.to_bytes().to_vec();
656        let body = [0u8; 4];
657        let sh = SubmessageHeader {
658            submessage_id: SubmessageId::Pad, // ID-Wert wird gleich ueberschrieben
659            flags: FLAG_E_LITTLE_ENDIAN | SUBMESSAGE_FLAG_MUST_UNDERSTAND,
660            octets_to_next_header: body.len() as u16,
661        };
662        let mut sh_bytes = sh.to_bytes();
663        sh_bytes[0] = 0x7E; // unbekannte ID, ausserhalb 0x80-Range
664        bytes.extend_from_slice(&sh_bytes);
665        bytes.extend_from_slice(&body);
666        let res = decode_datagram(&bytes);
667        assert!(matches!(
668            res,
669            Err(WireError::ValueOutOfRange { message: msg }) if msg.contains("must-understand")
670        ));
671    }
672
673    #[test]
674    fn decode_skips_unknown_submessage_without_must_understand() {
675        // Ohne Must-Understand-Bit: skippen + als Unknown markieren.
676        let h = header();
677        let mut bytes = h.to_bytes().to_vec();
678        let body = [0u8; 4];
679        let sh = SubmessageHeader {
680            submessage_id: SubmessageId::Pad,
681            flags: FLAG_E_LITTLE_ENDIAN,
682            octets_to_next_header: body.len() as u16,
683        };
684        let mut sh_bytes = sh.to_bytes();
685        sh_bytes[0] = 0x7E;
686        bytes.extend_from_slice(&sh_bytes);
687        bytes.extend_from_slice(&body);
688        let parsed = decode_datagram(&bytes).unwrap();
689        assert_eq!(parsed.submessages.len(), 1);
690        match &parsed.submessages[0] {
691            ParsedSubmessage::Unknown { id, .. } => assert_eq!(*id, 0x7E),
692            other => panic!("expected Unknown, got {other:?}"),
693        }
694    }
695
696    #[test]
697    fn decode_data_after_header_extension() {
698        // Wire-Layout: RtpsHeader || HE || DATA.
699        let h = header();
700        let he = crate::header_extension::HeaderExtension {
701            little_endian: true,
702            message_length: Some(0),
703            ..crate::header_extension::HeaderExtension::default()
704        };
705        let d = data_msg(7, b"after-he");
706        let mut bytes = h.to_bytes().to_vec();
707        bytes.extend_from_slice(&he.encode().unwrap());
708        let (body, flags) = d.write_body(true);
709        let sh = SubmessageHeader {
710            submessage_id: SubmessageId::Data,
711            flags,
712            octets_to_next_header: body.len() as u16,
713        };
714        bytes.extend_from_slice(&sh.to_bytes());
715        bytes.extend_from_slice(&body);
716        let parsed = decode_datagram(&bytes).unwrap();
717        assert_eq!(parsed.submessages.len(), 2);
718        assert!(matches!(
719            &parsed.submessages[0],
720            ParsedSubmessage::HeaderExtension(_)
721        ));
722        assert!(matches!(&parsed.submessages[1], ParsedSubmessage::Data(_)));
723    }
724
725    #[test]
726    fn decode_rejects_header_extension_after_data_submessage() {
727        // Spec §8.3.7.3: HE MUSS direkt nach dem RTPS-Header stehen.
728        // Wenn vorher eine andere Submessage parst wurde, reject.
729        // Wire-Layout: RtpsHeader || DATA || HE.
730        let h = header();
731        let d = data_msg(7, b"first");
732        let he = crate::header_extension::HeaderExtension {
733            little_endian: true,
734            message_length: Some(0),
735            ..crate::header_extension::HeaderExtension::default()
736        };
737        let mut bytes = h.to_bytes().to_vec();
738        let (dbody, dflags) = d.write_body(true);
739        let dsh = SubmessageHeader {
740            submessage_id: SubmessageId::Data,
741            flags: dflags,
742            octets_to_next_header: dbody.len() as u16,
743        };
744        bytes.extend_from_slice(&dsh.to_bytes());
745        bytes.extend_from_slice(&dbody);
746        bytes.extend_from_slice(&he.encode().unwrap());
747        let res = decode_datagram(&bytes);
748        assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
749    }
750
751    // ---- §9.4.2.11.2 Must-Understand-Bit Reject-Pfad ----
752
753    #[test]
754    fn decode_rejects_data_with_unknown_must_understand_pid_in_inline_qos() {
755        use crate::parameter_list::{MUST_UNDERSTAND_BIT, Parameter, ParameterList};
756        let h = header();
757        // Inline-QoS mit unbekannter MU-PID 0x3500 (kein Standard-PID).
758        let mut pl = ParameterList::new();
759        pl.push(Parameter::new(
760            MUST_UNDERSTAND_BIT | 0x3500,
761            vec![1, 2, 3, 4],
762        ));
763        let d = DataSubmessage {
764            extra_flags: 0,
765            reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
766            writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
767            writer_sn: SequenceNumber(1),
768            inline_qos: Some(pl),
769            key_flag: false,
770            non_standard_flag: false,
771            serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
772        };
773        let mut bytes = h.to_bytes().to_vec();
774        let (body, flags) = d.write_body(true);
775        let sh = SubmessageHeader {
776            submessage_id: SubmessageId::Data,
777            flags,
778            octets_to_next_header: body.len() as u16,
779        };
780        bytes.extend_from_slice(&sh.to_bytes());
781        bytes.extend_from_slice(&body);
782        let res = decode_datagram(&bytes);
783        assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
784    }
785
786    #[test]
787    fn decode_accepts_data_with_known_must_understand_pid_in_inline_qos() {
788        use crate::parameter_list::{MUST_UNDERSTAND_BIT, Parameter, ParameterList, pid};
789        let h = header();
790        let mut pl = ParameterList::new();
791        // KEY_HASH ist Standard-PID, MU-Bit erlaubt.
792        pl.push(Parameter::new(
793            MUST_UNDERSTAND_BIT | pid::KEY_HASH,
794            vec![0; 16],
795        ));
796        let d = DataSubmessage {
797            extra_flags: 0,
798            reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
799            writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
800            writer_sn: SequenceNumber(2),
801            inline_qos: Some(pl),
802            key_flag: false,
803            non_standard_flag: false,
804            serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
805        };
806        let mut bytes = h.to_bytes().to_vec();
807        let (body, flags) = d.write_body(true);
808        let sh = SubmessageHeader {
809            submessage_id: SubmessageId::Data,
810            flags,
811            octets_to_next_header: body.len() as u16,
812        };
813        bytes.extend_from_slice(&sh.to_bytes());
814        bytes.extend_from_slice(&body);
815        decode_datagram(&bytes).expect("known MU PID should pass");
816    }
817
818    #[test]
819    fn decode_accepts_vendor_specific_must_understand_pid() {
820        use crate::parameter_list::{
821            MUST_UNDERSTAND_BIT, Parameter, ParameterList, VENDOR_SPECIFIC_BIT,
822        };
823        let h = header();
824        let mut pl = ParameterList::new();
825        // Vendor-spezifische MU-PID — Spec §9.6.2 erlaubt ignorieren.
826        pl.push(Parameter::new(
827            MUST_UNDERSTAND_BIT | VENDOR_SPECIFIC_BIT | 0x0050,
828            vec![0xCA, 0xFE, 0xBA, 0xBE],
829        ));
830        let d = DataSubmessage {
831            extra_flags: 0,
832            reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
833            writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
834            writer_sn: SequenceNumber(3),
835            inline_qos: Some(pl),
836            key_flag: false,
837            non_standard_flag: false,
838            serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
839        };
840        let mut bytes = h.to_bytes().to_vec();
841        let (body, flags) = d.write_body(true);
842        let sh = SubmessageHeader {
843            submessage_id: SubmessageId::Data,
844            flags,
845            octets_to_next_header: body.len() as u16,
846        };
847        bytes.extend_from_slice(&sh.to_bytes());
848        bytes.extend_from_slice(&body);
849        decode_datagram(&bytes).expect("vendor-specific MU PID should pass");
850    }
851
852    #[test]
853    fn rtps_2_1_treats_0x80_as_vendor_specific_not_header_extension() {
854        use crate::wire_types::ProtocolVersion;
855        // Spec §8.3.7.3: HeaderExtension (0x80) ist ein 2.5-Submessage.
856        // Aeltere Vendoren (z.B. Cyclone-2.1) duerfen 0x80 als Vendor-
857        // Specific verwenden. Wir verwerfen NICHT, sondern skippen.
858        let mut h = header();
859        h.protocol_version = ProtocolVersion::V2_1;
860        let mut bytes = h.to_bytes().to_vec();
861        // Erst eine echte DATA-Submessage (damit `submessages` non-empty
862        // wird), dann 0x80 als Vendor-Submessage hinten dran.
863        let d = data_msg(1, b"x");
864        let (body, flags) = d.write_body(true);
865        let sh = SubmessageHeader {
866            submessage_id: SubmessageId::Data,
867            flags,
868            octets_to_next_header: body.len() as u16,
869        };
870        bytes.extend_from_slice(&sh.to_bytes());
871        bytes.extend_from_slice(&body);
872        // Vendor-Submessage 0x80 mit 4 Byte Body, kein MU-Bit.
873        bytes.extend_from_slice(&[0x80, FLAG_E_LITTLE_ENDIAN, 4, 0]);
874        bytes.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
875        let parsed = decode_datagram(&bytes).expect("0x80 unter RTPS-2.1 muss skippen");
876        assert!(matches!(
877            parsed.submessages.last(),
878            Some(ParsedSubmessage::Unknown { id: 0x80, .. })
879        ));
880    }
881}