1extern crate alloc;
7use alloc::vec::Vec;
8
9use crate::error::WireError;
10use crate::header::RtpsHeader;
11use crate::header_extension::{HeaderExtension, SUBMESSAGE_ID_HEADER_EXTENSION};
12use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
13use crate::submessages::{
14 ACKNACK_FLAG_FINAL, AckNackSubmessage, DATA_FRAG_FLAG_HASH_KEY, DATA_FRAG_FLAG_INLINE_QOS,
15 DATA_FRAG_FLAG_KEY, DATA_FRAG_FLAG_NON_STANDARD, DataFragSubmessage, DataSubmessage,
16 GAP_FLAG_FILTERED_COUNT, GAP_FLAG_GROUP_INFO, GapSubmessage, HEARTBEAT_FLAG_FINAL,
17 HEARTBEAT_FLAG_GROUP_INFO, HEARTBEAT_FLAG_LIVELINESS, HeartbeatFragSubmessage,
18 HeartbeatSubmessage, INFO_REPLY_FLAG_MULTICAST, INFO_TIMESTAMP_FLAG_INVALIDATE,
19 InfoReplySubmessage, InfoSourceSubmessage, InfoTimestampSubmessage, NackFragSubmessage,
20};
21
22pub fn encode_data_datagram(
26 header: RtpsHeader,
27 data_submessages: &[DataSubmessage],
28) -> Result<Vec<u8>, WireError> {
29 let mut out = Vec::new();
30 out.extend_from_slice(&header.to_bytes());
31 for d in data_submessages {
32 let (body, flags) = d.write_body(true);
33 let body_len = u16::try_from(body.len()).map_err(|_| WireError::ValueOutOfRange {
34 message: "DATA submessage body exceeds u16::MAX",
35 })?;
36 let sh = SubmessageHeader {
37 submessage_id: SubmessageId::Data,
38 flags,
39 octets_to_next_header: body_len,
40 };
41 out.extend_from_slice(&sh.to_bytes());
42 out.extend_from_slice(&body);
43 }
44 Ok(out)
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct ParsedDatagram {
50 pub header: RtpsHeader,
52 pub submessages: Vec<ParsedSubmessage>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum ParsedSubmessage {
60 Data(DataSubmessage),
62 DataFrag(DataFragSubmessage),
64 Heartbeat(HeartbeatSubmessage),
66 HeartbeatFrag(HeartbeatFragSubmessage),
68 AckNack(AckNackSubmessage),
70 NackFrag(NackFragSubmessage),
72 Gap(GapSubmessage),
74 HeaderExtension(HeaderExtension),
76 InfoSource(InfoSourceSubmessage),
78 InfoReply(InfoReplySubmessage),
80 InfoTimestamp(InfoTimestampSubmessage),
82 Unknown {
85 id: u8,
87 flags: u8,
89 },
90}
91
92pub const SUBMESSAGE_FLAG_MUST_UNDERSTAND: u8 = 0x80;
96
97pub fn decode_datagram(bytes: &[u8]) -> Result<ParsedDatagram, WireError> {
107 let header = RtpsHeader::from_bytes(bytes)?;
108 let mut pos = RtpsHeader::WIRE_SIZE;
109 let mut submessages = Vec::new();
110
111 while pos < bytes.len() {
112 if bytes.len() < pos + SubmessageHeader::WIRE_SIZE {
113 return Err(WireError::UnexpectedEof {
114 needed: SubmessageHeader::WIRE_SIZE,
115 offset: pos,
116 });
117 }
118 let id_byte = bytes[pos];
121 let flags = bytes[pos + 1];
122 let mut len_bytes = [0u8; 2];
123 len_bytes.copy_from_slice(&bytes[pos + 2..pos + 4]);
124 let little_endian = (flags & FLAG_E_LITTLE_ENDIAN) != 0;
125 let octets = if little_endian {
126 u16::from_le_bytes(len_bytes)
127 } else {
128 u16::from_be_bytes(len_bytes)
129 };
130 let body_start = pos + SubmessageHeader::WIRE_SIZE;
131 let body_end = if octets == 0 {
132 bytes.len()
134 } else {
135 body_start + octets as usize
136 };
137 if body_end > bytes.len() {
138 return Err(WireError::UnexpectedEof {
139 needed: body_end - bytes.len(),
140 offset: body_start,
141 });
142 }
143 let body = &bytes[body_start..body_end];
144 let sub = match SubmessageId::from_u8(id_byte) {
145 Ok(SubmessageId::Data) => {
146 let d = DataSubmessage::read_body_with_flags(body, little_endian, flags)?;
147 if let Some(pl) = &d.inline_qos {
148 pl.validate_must_understand_in_data_pipeline()?;
149 }
150 ParsedSubmessage::Data(d)
151 }
152 Ok(SubmessageId::Heartbeat) => {
153 let final_flag = (flags & HEARTBEAT_FLAG_FINAL) != 0;
154 let liveliness_flag = (flags & HEARTBEAT_FLAG_LIVELINESS) != 0;
155 let group_info_flag = (flags & HEARTBEAT_FLAG_GROUP_INFO) != 0;
156 ParsedSubmessage::Heartbeat(HeartbeatSubmessage::read_body(
157 body,
158 little_endian,
159 final_flag,
160 liveliness_flag,
161 group_info_flag,
162 )?)
163 }
164 Ok(SubmessageId::AckNack) => {
165 let final_flag = (flags & ACKNACK_FLAG_FINAL) != 0;
166 ParsedSubmessage::AckNack(AckNackSubmessage::read_body(
167 body,
168 little_endian,
169 final_flag,
170 )?)
171 }
172 Ok(SubmessageId::Gap) => {
173 let group_info_flag = (flags & GAP_FLAG_GROUP_INFO) != 0;
174 let filtered_count_flag = (flags & GAP_FLAG_FILTERED_COUNT) != 0;
175 ParsedSubmessage::Gap(GapSubmessage::read_body(
176 body,
177 little_endian,
178 group_info_flag,
179 filtered_count_flag,
180 )?)
181 }
182 Ok(SubmessageId::DataFrag) => {
183 let inline_qos = (flags & DATA_FRAG_FLAG_INLINE_QOS) != 0;
184 let hash_key = (flags & DATA_FRAG_FLAG_HASH_KEY) != 0;
185 let key = (flags & DATA_FRAG_FLAG_KEY) != 0;
186 let non_standard = (flags & DATA_FRAG_FLAG_NON_STANDARD) != 0;
187 ParsedSubmessage::DataFrag(DataFragSubmessage::read_body(
188 body,
189 little_endian,
190 inline_qos,
191 hash_key,
192 key,
193 non_standard,
194 )?)
195 }
196 Ok(SubmessageId::HeartbeatFrag) => ParsedSubmessage::HeartbeatFrag(
197 HeartbeatFragSubmessage::read_body(body, little_endian)?,
198 ),
199 Ok(SubmessageId::NackFrag) => {
200 ParsedSubmessage::NackFrag(NackFragSubmessage::read_body(body, little_endian)?)
201 }
202 Ok(SubmessageId::InfoSrc) => {
203 ParsedSubmessage::InfoSource(InfoSourceSubmessage::read_body(body, little_endian)?)
204 }
205 Ok(SubmessageId::InfoTs) => {
206 let invalidate = (flags & INFO_TIMESTAMP_FLAG_INVALIDATE) != 0;
207 ParsedSubmessage::InfoTimestamp(InfoTimestampSubmessage::read_body(
208 body,
209 little_endian,
210 invalidate,
211 )?)
212 }
213 Ok(SubmessageId::InfoReply) => {
214 let multicast_flag = (flags & INFO_REPLY_FLAG_MULTICAST) != 0;
215 ParsedSubmessage::InfoReply(InfoReplySubmessage::read_body(
216 body,
217 little_endian,
218 multicast_flag,
219 )?)
220 }
221 Ok(_) | Err(WireError::UnknownSubmessageId { .. })
233 if id_byte == SUBMESSAGE_ID_HEADER_EXTENSION
234 && header.protocol_version
235 >= crate::wire_types::ProtocolVersion { major: 2, minor: 5 } =>
236 {
237 if !submessages.is_empty() {
238 return Err(WireError::ValueOutOfRange {
239 message: "HeaderExtension must appear directly after the RTPS header",
240 });
241 }
242 let he = HeaderExtension::decode_body(body, flags)?;
243 if let Some(pl) = &he.parameters {
244 pl.validate_must_understand_in_data_pipeline()?;
245 }
246 ParsedSubmessage::HeaderExtension(he)
247 }
248 Ok(_) | Err(WireError::UnknownSubmessageId { .. })
252 if (flags & SUBMESSAGE_FLAG_MUST_UNDERSTAND) != 0 =>
253 {
254 return Err(WireError::ValueOutOfRange {
255 message: "Unknown submessage id with must-understand flag",
256 });
257 }
258 Ok(_) | Err(WireError::UnknownSubmessageId { .. }) => {
261 ParsedSubmessage::Unknown { id: id_byte, flags }
262 }
263 Err(other) => return Err(other),
264 };
265 submessages.push(sub);
266 pos = body_end;
267 }
268
269 Ok(ParsedDatagram {
270 header,
271 submessages,
272 })
273}
274
275#[cfg(test)]
276mod tests {
277 #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
278 use super::*;
279 use crate::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
280 use alloc::vec;
281
282 fn header() -> RtpsHeader {
283 RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]))
284 }
285
286 fn data_msg(sn: i64, payload: &[u8]) -> DataSubmessage {
287 DataSubmessage {
288 extra_flags: 0,
289 reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
290 writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
291 writer_sn: SequenceNumber(sn),
292 inline_qos: None,
293 key_flag: false,
294 non_standard_flag: false,
295 serialized_payload: alloc::sync::Arc::from(payload),
296 }
297 }
298
299 #[test]
300 fn encode_decode_single_data_datagram() {
301 let h = header();
302 let d = data_msg(1, b"hello");
303 let bytes = encode_data_datagram(h, &[d.clone()]).unwrap();
304 let parsed = decode_datagram(&bytes).unwrap();
305 assert_eq!(parsed.header, h);
306 assert_eq!(parsed.submessages.len(), 1);
307 match &parsed.submessages[0] {
308 ParsedSubmessage::Data(decoded) => assert_eq!(decoded, &d),
309 other => panic!("expected Data, got {other:?}"),
310 }
311 }
312
313 #[test]
314 fn encode_decode_two_data_submessages() {
315 let h = header();
316 let d1 = data_msg(1, b"first");
317 let d2 = data_msg(2, b"second-payload");
318 let bytes = encode_data_datagram(h, &[d1.clone(), d2.clone()]).unwrap();
319 let parsed = decode_datagram(&bytes).unwrap();
320 assert_eq!(parsed.submessages.len(), 2);
321 match (&parsed.submessages[0], &parsed.submessages[1]) {
322 (ParsedSubmessage::Data(a), ParsedSubmessage::Data(b)) => {
323 assert_eq!(a, &d1);
324 assert_eq!(b, &d2);
325 }
326 other => panic!("unexpected: {other:?}"),
327 }
328 }
329
330 #[test]
331 fn encode_decode_empty_payload() {
332 let h = header();
333 let d = data_msg(42, b"");
334 let bytes = encode_data_datagram(h, &[d.clone()]).unwrap();
335 let parsed = decode_datagram(&bytes).unwrap();
336 assert_eq!(parsed.submessages.len(), 1);
337 match &parsed.submessages[0] {
338 ParsedSubmessage::Data(decoded) => {
339 assert!(decoded.serialized_payload.is_empty());
340 assert_eq!(decoded.writer_sn, SequenceNumber(42));
341 }
342 other => panic!("expected Data, got {other:?}"),
343 }
344 }
345
346 #[test]
347 fn decode_rejects_invalid_magic() {
348 let mut bytes = vec![0u8; 32];
349 bytes[..4].copy_from_slice(b"XXXX");
350 let res = decode_datagram(&bytes);
351 assert!(matches!(res, Err(WireError::InvalidMagic { .. })));
352 }
353
354 #[test]
355 fn decode_handles_last_submessage_zero_length() {
356 let h = header();
359 let mut bytes = h.to_bytes().to_vec();
360 let d = data_msg(7, b"X");
361 let (body, flags) = d.write_body(true);
362 let sh = SubmessageHeader {
364 submessage_id: SubmessageId::Data,
365 flags,
366 octets_to_next_header: 0,
367 };
368 bytes.extend_from_slice(&sh.to_bytes());
369 bytes.extend_from_slice(&body);
370 let parsed = decode_datagram(&bytes).unwrap();
371 match &parsed.submessages[0] {
372 ParsedSubmessage::Data(decoded) => {
373 assert_eq!(decoded, &d);
374 }
375 other => panic!("expected Data, got {other:?}"),
376 }
377 }
378
379 #[test]
380 fn decode_marks_unknown_submessage_id_without_failing() {
381 let h = header();
384 let mut bytes = h.to_bytes().to_vec();
385 let body = [0u8; 0]; let sh = SubmessageHeader {
387 submessage_id: SubmessageId::Pad,
388 flags: FLAG_E_LITTLE_ENDIAN,
389 octets_to_next_header: body.len() as u16,
390 };
391 bytes.extend_from_slice(&sh.to_bytes());
392 bytes.extend_from_slice(&body);
393 let parsed = decode_datagram(&bytes).unwrap();
394 assert_eq!(parsed.submessages.len(), 1);
395 match &parsed.submessages[0] {
396 ParsedSubmessage::Unknown { id, flags } => {
397 assert_eq!(*id, 0x01);
398 assert_eq!(*flags, FLAG_E_LITTLE_ENDIAN);
399 }
400 other => panic!("expected Unknown, got {other:?}"),
401 }
402 }
403
404 #[test]
405 fn decode_heartbeat_preserves_final_and_liveliness_flags() {
406 let h = header();
409 let hb = HeartbeatSubmessage {
410 reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
411 writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
412 first_sn: SequenceNumber(1),
413 last_sn: SequenceNumber(7),
414 count: 42,
415 final_flag: true,
416 liveliness_flag: true,
417 group_info: None,
418 };
419 let (body, flags) = hb.write_body(true);
420 let mut bytes = h.to_bytes().to_vec();
421 let sh = SubmessageHeader {
422 submessage_id: SubmessageId::Heartbeat,
423 flags,
424 octets_to_next_header: body.len() as u16,
425 };
426 bytes.extend_from_slice(&sh.to_bytes());
427 bytes.extend_from_slice(&body);
428 let parsed = decode_datagram(&bytes).unwrap();
429 match &parsed.submessages[0] {
430 ParsedSubmessage::Heartbeat(decoded) => {
431 assert_eq!(decoded, &hb);
432 assert!(decoded.final_flag);
433 assert!(decoded.liveliness_flag);
434 }
435 other => panic!("expected Heartbeat, got {other:?}"),
436 }
437 }
438
439 #[test]
440 fn decode_acknack_preserves_final_flag() {
441 let h = header();
442 let ack = AckNackSubmessage {
443 reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
444 writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
445 reader_sn_state: crate::submessages::SequenceNumberSet {
446 bitmap_base: SequenceNumber(1),
447 num_bits: 0,
448 bitmap: vec![],
449 },
450 count: 3,
451 final_flag: true,
452 };
453 let (body, flags) = ack.write_body(true);
454 let mut bytes = h.to_bytes().to_vec();
455 let sh = SubmessageHeader {
456 submessage_id: SubmessageId::AckNack,
457 flags,
458 octets_to_next_header: body.len() as u16,
459 };
460 bytes.extend_from_slice(&sh.to_bytes());
461 bytes.extend_from_slice(&body);
462 let parsed = decode_datagram(&bytes).unwrap();
463 match &parsed.submessages[0] {
464 ParsedSubmessage::AckNack(decoded) => {
465 assert_eq!(decoded, &ack);
466 assert!(decoded.final_flag);
467 }
468 other => panic!("expected AckNack, got {other:?}"),
469 }
470 }
471
472 #[test]
473 fn decode_data_frag_preserves_flags_and_payload() {
474 let h = header();
475 let df = DataFragSubmessage {
476 extra_flags: 0,
477 reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
478 writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
479 writer_sn: SequenceNumber(7),
480 fragment_starting_num: crate::wire_types::FragmentNumber(1),
481 fragments_in_submessage: 1,
482 fragment_size: 4,
483 sample_size: 12,
484 serialized_payload: alloc::sync::Arc::<[u8]>::from([0xAA, 0xBB, 0xCC, 0xDD].as_slice()),
485 inline_qos_flag: false,
486 hash_key_flag: true,
487 key_flag: false,
488 non_standard_flag: false,
489 };
490 let (body, flags) = df.write_body(true);
491 let mut bytes = h.to_bytes().to_vec();
492 let sh = SubmessageHeader {
493 submessage_id: SubmessageId::DataFrag,
494 flags,
495 octets_to_next_header: body.len() as u16,
496 };
497 bytes.extend_from_slice(&sh.to_bytes());
498 bytes.extend_from_slice(&body);
499 let parsed = decode_datagram(&bytes).unwrap();
500 match &parsed.submessages[0] {
501 ParsedSubmessage::DataFrag(decoded) => {
502 assert_eq!(decoded, &df);
503 assert!(decoded.hash_key_flag);
504 assert!(!decoded.inline_qos_flag);
505 }
506 other => panic!("expected DataFrag, got {other:?}"),
507 }
508 }
509
510 #[test]
511 fn decode_heartbeat_frag_roundtrip() {
512 let h = header();
513 let hf = HeartbeatFragSubmessage {
514 reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
515 writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
516 writer_sn: SequenceNumber(42),
517 last_fragment_num: crate::wire_types::FragmentNumber(8),
518 count: 3,
519 };
520 let (body, flags) = hf.write_body(true);
521 let mut bytes = h.to_bytes().to_vec();
522 let sh = SubmessageHeader {
523 submessage_id: SubmessageId::HeartbeatFrag,
524 flags,
525 octets_to_next_header: body.len() as u16,
526 };
527 bytes.extend_from_slice(&sh.to_bytes());
528 bytes.extend_from_slice(&body);
529 let parsed = decode_datagram(&bytes).unwrap();
530 match &parsed.submessages[0] {
531 ParsedSubmessage::HeartbeatFrag(decoded) => assert_eq!(decoded, &hf),
532 other => panic!("expected HeartbeatFrag, got {other:?}"),
533 }
534 }
535
536 #[test]
537 fn decode_nack_frag_roundtrip() {
538 let h = header();
539 let nf = NackFragSubmessage {
540 reader_id: crate::wire_types::EntityId::user_reader_with_key([1, 2, 3]),
541 writer_id: crate::wire_types::EntityId::user_writer_with_key([4, 5, 6]),
542 writer_sn: SequenceNumber(5),
543 fragment_number_state: crate::submessages::FragmentNumberSet {
544 bitmap_base: crate::wire_types::FragmentNumber(1),
545 num_bits: 4,
546 bitmap: vec![0b1010_0000_0000_0000_0000_0000_0000_0000],
547 },
548 count: 1,
549 };
550 let (body, flags) = nf.write_body(true);
551 let mut bytes = h.to_bytes().to_vec();
552 let sh = SubmessageHeader {
553 submessage_id: SubmessageId::NackFrag,
554 flags,
555 octets_to_next_header: body.len() as u16,
556 };
557 bytes.extend_from_slice(&sh.to_bytes());
558 bytes.extend_from_slice(&body);
559 let parsed = decode_datagram(&bytes).unwrap();
560 match &parsed.submessages[0] {
561 ParsedSubmessage::NackFrag(decoded) => assert_eq!(decoded, &nf),
562 other => panic!("expected NackFrag, got {other:?}"),
563 }
564 }
565
566 #[test]
569 fn decode_info_source_via_datagram() {
570 use crate::wire_types::{GuidPrefix, ProtocolVersion as PV, VendorId};
571 let h = header();
572 let info = InfoSourceSubmessage {
573 unused: 0,
574 protocol_version: PV::V2_5,
575 vendor_id: VendorId([0xAB, 0xCD]),
576 guid_prefix: GuidPrefix::from_bytes([3; 12]),
577 };
578 let (body, flags) = info.write_body(true);
579 let mut bytes = h.to_bytes().to_vec();
580 let sh = SubmessageHeader {
581 submessage_id: SubmessageId::InfoSrc,
582 flags,
583 octets_to_next_header: body.len() as u16,
584 };
585 bytes.extend_from_slice(&sh.to_bytes());
586 bytes.extend_from_slice(&body);
587 let parsed = decode_datagram(&bytes).unwrap();
588 match &parsed.submessages[0] {
589 ParsedSubmessage::InfoSource(decoded) => assert_eq!(decoded, &info),
590 other => panic!("expected InfoSource, got {other:?}"),
591 }
592 }
593
594 #[test]
595 fn decode_info_reply_with_multicast_via_datagram() {
596 use crate::wire_types::Locator;
597 let h = header();
598 let info = InfoReplySubmessage {
599 unicast_locators: alloc::vec![Locator::udp_v4([10, 1, 2, 3], 7411)],
600 multicast_locators: Some(alloc::vec![Locator::udp_v4([239, 255, 0, 1], 7400)]),
601 };
602 let (body, flags) = info.write_body(true);
603 let mut bytes = h.to_bytes().to_vec();
604 let sh = SubmessageHeader {
605 submessage_id: SubmessageId::InfoReply,
606 flags,
607 octets_to_next_header: body.len() as u16,
608 };
609 bytes.extend_from_slice(&sh.to_bytes());
610 bytes.extend_from_slice(&body);
611 let parsed = decode_datagram(&bytes).unwrap();
612 match &parsed.submessages[0] {
613 ParsedSubmessage::InfoReply(decoded) => assert_eq!(decoded, &info),
614 other => panic!("expected InfoReply, got {other:?}"),
615 }
616 }
617
618 #[test]
619 fn decode_rejects_truncated_after_header() {
620 let h = header();
621 let mut bytes = h.to_bytes().to_vec();
622 bytes.extend_from_slice(&[0u8, 0, 0]); let res = decode_datagram(&bytes);
624 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
625 }
626
627 #[test]
628 fn decode_header_extension_in_datagram() {
629 let h = header();
630 let he = crate::header_extension::HeaderExtension {
631 little_endian: true,
632 message_length: Some(123),
633 timestamp: Some(crate::header_extension::HeTimestamp {
634 seconds: 1,
635 fraction: 2,
636 }),
637 checksum: crate::header_extension::ChecksumValue::Crc32c(0xDEAD_BEEF),
638 ..crate::header_extension::HeaderExtension::default()
639 };
640 let mut bytes = h.to_bytes().to_vec();
641 bytes.extend_from_slice(&he.encode().unwrap());
642 let parsed = decode_datagram(&bytes).unwrap();
643 assert_eq!(parsed.submessages.len(), 1);
644 match &parsed.submessages[0] {
645 ParsedSubmessage::HeaderExtension(decoded) => assert_eq!(decoded, &he),
646 other => panic!("expected HE, got {other:?}"),
647 }
648 }
649
650 #[test]
651 fn decode_rejects_unknown_submessage_with_must_understand() {
652 let h = header();
655 let mut bytes = h.to_bytes().to_vec();
656 let body = [0u8; 4];
657 let sh = SubmessageHeader {
658 submessage_id: SubmessageId::Pad, flags: FLAG_E_LITTLE_ENDIAN | SUBMESSAGE_FLAG_MUST_UNDERSTAND,
660 octets_to_next_header: body.len() as u16,
661 };
662 let mut sh_bytes = sh.to_bytes();
663 sh_bytes[0] = 0x7E; bytes.extend_from_slice(&sh_bytes);
665 bytes.extend_from_slice(&body);
666 let res = decode_datagram(&bytes);
667 assert!(matches!(
668 res,
669 Err(WireError::ValueOutOfRange { message: msg }) if msg.contains("must-understand")
670 ));
671 }
672
673 #[test]
674 fn decode_skips_unknown_submessage_without_must_understand() {
675 let h = header();
677 let mut bytes = h.to_bytes().to_vec();
678 let body = [0u8; 4];
679 let sh = SubmessageHeader {
680 submessage_id: SubmessageId::Pad,
681 flags: FLAG_E_LITTLE_ENDIAN,
682 octets_to_next_header: body.len() as u16,
683 };
684 let mut sh_bytes = sh.to_bytes();
685 sh_bytes[0] = 0x7E;
686 bytes.extend_from_slice(&sh_bytes);
687 bytes.extend_from_slice(&body);
688 let parsed = decode_datagram(&bytes).unwrap();
689 assert_eq!(parsed.submessages.len(), 1);
690 match &parsed.submessages[0] {
691 ParsedSubmessage::Unknown { id, .. } => assert_eq!(*id, 0x7E),
692 other => panic!("expected Unknown, got {other:?}"),
693 }
694 }
695
696 #[test]
697 fn decode_data_after_header_extension() {
698 let h = header();
700 let he = crate::header_extension::HeaderExtension {
701 little_endian: true,
702 message_length: Some(0),
703 ..crate::header_extension::HeaderExtension::default()
704 };
705 let d = data_msg(7, b"after-he");
706 let mut bytes = h.to_bytes().to_vec();
707 bytes.extend_from_slice(&he.encode().unwrap());
708 let (body, flags) = d.write_body(true);
709 let sh = SubmessageHeader {
710 submessage_id: SubmessageId::Data,
711 flags,
712 octets_to_next_header: body.len() as u16,
713 };
714 bytes.extend_from_slice(&sh.to_bytes());
715 bytes.extend_from_slice(&body);
716 let parsed = decode_datagram(&bytes).unwrap();
717 assert_eq!(parsed.submessages.len(), 2);
718 assert!(matches!(
719 &parsed.submessages[0],
720 ParsedSubmessage::HeaderExtension(_)
721 ));
722 assert!(matches!(&parsed.submessages[1], ParsedSubmessage::Data(_)));
723 }
724
725 #[test]
726 fn decode_rejects_header_extension_after_data_submessage() {
727 let h = header();
731 let d = data_msg(7, b"first");
732 let he = crate::header_extension::HeaderExtension {
733 little_endian: true,
734 message_length: Some(0),
735 ..crate::header_extension::HeaderExtension::default()
736 };
737 let mut bytes = h.to_bytes().to_vec();
738 let (dbody, dflags) = d.write_body(true);
739 let dsh = SubmessageHeader {
740 submessage_id: SubmessageId::Data,
741 flags: dflags,
742 octets_to_next_header: dbody.len() as u16,
743 };
744 bytes.extend_from_slice(&dsh.to_bytes());
745 bytes.extend_from_slice(&dbody);
746 bytes.extend_from_slice(&he.encode().unwrap());
747 let res = decode_datagram(&bytes);
748 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
749 }
750
751 #[test]
754 fn decode_rejects_data_with_unknown_must_understand_pid_in_inline_qos() {
755 use crate::parameter_list::{MUST_UNDERSTAND_BIT, Parameter, ParameterList};
756 let h = header();
757 let mut pl = ParameterList::new();
759 pl.push(Parameter::new(
760 MUST_UNDERSTAND_BIT | 0x3500,
761 vec![1, 2, 3, 4],
762 ));
763 let d = DataSubmessage {
764 extra_flags: 0,
765 reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
766 writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
767 writer_sn: SequenceNumber(1),
768 inline_qos: Some(pl),
769 key_flag: false,
770 non_standard_flag: false,
771 serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
772 };
773 let mut bytes = h.to_bytes().to_vec();
774 let (body, flags) = d.write_body(true);
775 let sh = SubmessageHeader {
776 submessage_id: SubmessageId::Data,
777 flags,
778 octets_to_next_header: body.len() as u16,
779 };
780 bytes.extend_from_slice(&sh.to_bytes());
781 bytes.extend_from_slice(&body);
782 let res = decode_datagram(&bytes);
783 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
784 }
785
786 #[test]
787 fn decode_accepts_data_with_known_must_understand_pid_in_inline_qos() {
788 use crate::parameter_list::{MUST_UNDERSTAND_BIT, Parameter, ParameterList, pid};
789 let h = header();
790 let mut pl = ParameterList::new();
791 pl.push(Parameter::new(
793 MUST_UNDERSTAND_BIT | pid::KEY_HASH,
794 vec![0; 16],
795 ));
796 let d = DataSubmessage {
797 extra_flags: 0,
798 reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
799 writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
800 writer_sn: SequenceNumber(2),
801 inline_qos: Some(pl),
802 key_flag: false,
803 non_standard_flag: false,
804 serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
805 };
806 let mut bytes = h.to_bytes().to_vec();
807 let (body, flags) = d.write_body(true);
808 let sh = SubmessageHeader {
809 submessage_id: SubmessageId::Data,
810 flags,
811 octets_to_next_header: body.len() as u16,
812 };
813 bytes.extend_from_slice(&sh.to_bytes());
814 bytes.extend_from_slice(&body);
815 decode_datagram(&bytes).expect("known MU PID should pass");
816 }
817
818 #[test]
819 fn decode_accepts_vendor_specific_must_understand_pid() {
820 use crate::parameter_list::{
821 MUST_UNDERSTAND_BIT, Parameter, ParameterList, VENDOR_SPECIFIC_BIT,
822 };
823 let h = header();
824 let mut pl = ParameterList::new();
825 pl.push(Parameter::new(
827 MUST_UNDERSTAND_BIT | VENDOR_SPECIFIC_BIT | 0x0050,
828 vec![0xCA, 0xFE, 0xBA, 0xBE],
829 ));
830 let d = DataSubmessage {
831 extra_flags: 0,
832 reader_id: EntityId::user_reader_with_key([0xA, 0xB, 0xC]),
833 writer_id: EntityId::user_writer_with_key([0x1, 0x2, 0x3]),
834 writer_sn: SequenceNumber(3),
835 inline_qos: Some(pl),
836 key_flag: false,
837 non_standard_flag: false,
838 serialized_payload: alloc::sync::Arc::from([] as [u8; 0]),
839 };
840 let mut bytes = h.to_bytes().to_vec();
841 let (body, flags) = d.write_body(true);
842 let sh = SubmessageHeader {
843 submessage_id: SubmessageId::Data,
844 flags,
845 octets_to_next_header: body.len() as u16,
846 };
847 bytes.extend_from_slice(&sh.to_bytes());
848 bytes.extend_from_slice(&body);
849 decode_datagram(&bytes).expect("vendor-specific MU PID should pass");
850 }
851
852 #[test]
853 fn rtps_2_1_treats_0x80_as_vendor_specific_not_header_extension() {
854 use crate::wire_types::ProtocolVersion;
855 let mut h = header();
859 h.protocol_version = ProtocolVersion::V2_1;
860 let mut bytes = h.to_bytes().to_vec();
861 let d = data_msg(1, b"x");
864 let (body, flags) = d.write_body(true);
865 let sh = SubmessageHeader {
866 submessage_id: SubmessageId::Data,
867 flags,
868 octets_to_next_header: body.len() as u16,
869 };
870 bytes.extend_from_slice(&sh.to_bytes());
871 bytes.extend_from_slice(&body);
872 bytes.extend_from_slice(&[0x80, FLAG_E_LITTLE_ENDIAN, 4, 0]);
874 bytes.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
875 let parsed = decode_datagram(&bytes).expect("0x80 unter RTPS-2.1 muss skippen");
876 assert!(matches!(
877 parsed.submessages.last(),
878 Some(ParsedSubmessage::Unknown { id: 0x80, .. })
879 ));
880 }
881}