1extern crate alloc;
29use alloc::sync::Arc;
30use alloc::vec::Vec;
31
32use crate::error::WireError;
33use crate::submessage_header::FLAG_E_LITTLE_ENDIAN;
34use crate::wire_types::{EntityId, FragmentNumber, SequenceNumber};
35
36pub const RTPS_BITMAP_MAX_BITS: u32 = 256;
42
43#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct SequenceNumberSet {
57 pub bitmap_base: SequenceNumber,
59 pub num_bits: u32,
61 pub bitmap: Vec<u32>,
63}
64
65impl SequenceNumberSet {
66 #[must_use]
68 pub fn wire_size(num_bits: u32) -> usize {
69 let words = (num_bits as usize).div_ceil(32);
70 8 + 4 + words * 4
71 }
72
73 #[must_use]
80 pub fn from_missing(base: SequenceNumber, missing: &[SequenceNumber]) -> Self {
81 let Some(last) = missing.last().copied() else {
82 return Self {
83 bitmap_base: base,
84 num_bits: 0,
85 bitmap: Vec::new(),
86 };
87 };
88 if last < base {
89 return Self {
90 bitmap_base: base,
91 num_bits: 0,
92 bitmap: Vec::new(),
93 };
94 }
95 let num_bits = u32::try_from(last.0 - base.0 + 1).unwrap_or(u32::MAX);
96 let num_words = (num_bits as usize).div_ceil(32);
97 let mut bitmap = alloc::vec![0u32; num_words];
98 for sn in missing {
99 if *sn < base {
100 continue;
101 }
102 let offset = (sn.0 - base.0) as usize;
103 let word_idx = offset / 32;
104 let bit = 31 - (offset % 32);
105 if word_idx < bitmap.len() {
106 bitmap[word_idx] |= 1u32 << bit;
107 }
108 }
109 Self {
110 bitmap_base: base,
111 num_bits,
112 bitmap,
113 }
114 }
115
116 pub fn iter_set(&self) -> impl Iterator<Item = SequenceNumber> + '_ {
118 (0..self.num_bits).filter_map(move |i| {
119 let word_idx = (i / 32) as usize;
120 let bit = 31 - (i as usize % 32);
121 if word_idx < self.bitmap.len() && (self.bitmap[word_idx] >> bit) & 1 == 1 {
122 Some(SequenceNumber(self.bitmap_base.0 + i64::from(i)))
123 } else {
124 None
125 }
126 })
127 }
128
129 #[must_use]
131 pub fn encoded_size(&self) -> usize {
132 Self::wire_size(self.num_bits)
133 }
134
135 pub fn write_to(&self, out: &mut Vec<u8>, little_endian: bool) {
137 if little_endian {
138 out.extend_from_slice(&self.bitmap_base.to_bytes_le());
139 out.extend_from_slice(&self.num_bits.to_le_bytes());
140 for w in &self.bitmap {
141 out.extend_from_slice(&w.to_le_bytes());
142 }
143 } else {
144 out.extend_from_slice(&self.bitmap_base.to_bytes_be());
145 out.extend_from_slice(&self.num_bits.to_be_bytes());
146 for w in &self.bitmap {
147 out.extend_from_slice(&w.to_be_bytes());
148 }
149 }
150 }
151
152 pub fn read_from(
157 bytes: &[u8],
158 offset: usize,
159 little_endian: bool,
160 ) -> Result<(Self, usize), WireError> {
161 let mut pos = offset;
162 if bytes.len() < pos + 8 {
163 return Err(WireError::UnexpectedEof {
164 needed: 8,
165 offset: pos,
166 });
167 }
168 let mut sn_bytes = [0u8; 8];
169 sn_bytes.copy_from_slice(&bytes[pos..pos + 8]);
170 let bitmap_base = if little_endian {
171 SequenceNumber::from_bytes_le(sn_bytes)
172 } else {
173 SequenceNumber::from_bytes_be(sn_bytes)
174 };
175 pos += 8;
176 if bytes.len() < pos + 4 {
177 return Err(WireError::UnexpectedEof {
178 needed: 4,
179 offset: pos,
180 });
181 }
182 let mut num_bytes = [0u8; 4];
183 num_bytes.copy_from_slice(&bytes[pos..pos + 4]);
184 let num_bits = if little_endian {
185 u32::from_le_bytes(num_bytes)
186 } else {
187 u32::from_be_bytes(num_bytes)
188 };
189 pos += 4;
190 if num_bits > RTPS_BITMAP_MAX_BITS {
191 return Err(WireError::ValueOutOfRange {
192 message: "SequenceNumberSet.numBits exceeds RTPS_BITMAP_MAX_BITS (256)",
193 });
194 }
195 let words = (num_bits as usize).div_ceil(32);
196 let bitmap_bytes = words * 4;
197 if bytes.len() < pos + bitmap_bytes {
198 return Err(WireError::UnexpectedEof {
199 needed: bitmap_bytes,
200 offset: pos,
201 });
202 }
203 let mut bitmap = Vec::with_capacity(words);
204 for _ in 0..words {
205 let mut w = [0u8; 4];
206 w.copy_from_slice(&bytes[pos..pos + 4]);
207 bitmap.push(if little_endian {
208 u32::from_le_bytes(w)
209 } else {
210 u32::from_be_bytes(w)
211 });
212 pos += 4;
213 }
214 Ok((
215 Self {
216 bitmap_base,
217 num_bits,
218 bitmap,
219 },
220 pos,
221 ))
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct FragmentNumberSet {
239 pub bitmap_base: FragmentNumber,
241 pub num_bits: u32,
243 pub bitmap: Vec<u32>,
245}
246
247impl FragmentNumberSet {
248 #[must_use]
250 pub fn wire_size(num_bits: u32) -> usize {
251 let words = (num_bits as usize).div_ceil(32);
252 4 + 4 + words * 4
253 }
254
255 #[must_use]
258 pub fn from_missing(base: FragmentNumber, missing: &[FragmentNumber]) -> Self {
259 let Some(last) = missing.last().copied() else {
260 return Self {
261 bitmap_base: base,
262 num_bits: 0,
263 bitmap: Vec::new(),
264 };
265 };
266 if last < base {
267 return Self {
268 bitmap_base: base,
269 num_bits: 0,
270 bitmap: Vec::new(),
271 };
272 }
273 let num_bits = last.0.saturating_sub(base.0).saturating_add(1);
274 let num_words = (num_bits as usize).div_ceil(32);
275 let mut bitmap = alloc::vec![0u32; num_words];
276 for fnum in missing {
277 if *fnum < base {
278 continue;
279 }
280 let offset = (fnum.0 - base.0) as usize;
281 let word_idx = offset / 32;
282 let bit = 31 - (offset % 32);
283 if word_idx < bitmap.len() {
284 bitmap[word_idx] |= 1u32 << bit;
285 }
286 }
287 Self {
288 bitmap_base: base,
289 num_bits,
290 bitmap,
291 }
292 }
293
294 pub fn iter_set(&self) -> impl Iterator<Item = FragmentNumber> + '_ {
296 (0..self.num_bits).filter_map(move |i| {
297 let word_idx = (i / 32) as usize;
298 let bit = 31 - (i as usize % 32);
299 if word_idx < self.bitmap.len() && (self.bitmap[word_idx] >> bit) & 1 == 1 {
300 Some(FragmentNumber(self.bitmap_base.0.wrapping_add(i)))
301 } else {
302 None
303 }
304 })
305 }
306
307 #[must_use]
309 pub fn encoded_size(&self) -> usize {
310 Self::wire_size(self.num_bits)
311 }
312
313 pub fn write_to(&self, out: &mut Vec<u8>, little_endian: bool) {
315 if little_endian {
316 out.extend_from_slice(&self.bitmap_base.to_bytes_le());
317 out.extend_from_slice(&self.num_bits.to_le_bytes());
318 for w in &self.bitmap {
319 out.extend_from_slice(&w.to_le_bytes());
320 }
321 } else {
322 out.extend_from_slice(&self.bitmap_base.to_bytes_be());
323 out.extend_from_slice(&self.num_bits.to_be_bytes());
324 for w in &self.bitmap {
325 out.extend_from_slice(&w.to_be_bytes());
326 }
327 }
328 }
329
330 pub fn read_from(
335 bytes: &[u8],
336 offset: usize,
337 little_endian: bool,
338 ) -> Result<(Self, usize), WireError> {
339 let mut pos = offset;
340 if bytes.len() < pos + 4 {
341 return Err(WireError::UnexpectedEof {
342 needed: 4,
343 offset: pos,
344 });
345 }
346 let mut bb = [0u8; 4];
347 bb.copy_from_slice(&bytes[pos..pos + 4]);
348 let bitmap_base = if little_endian {
349 FragmentNumber::from_bytes_le(bb)
350 } else {
351 FragmentNumber::from_bytes_be(bb)
352 };
353 pos += 4;
354 if bytes.len() < pos + 4 {
355 return Err(WireError::UnexpectedEof {
356 needed: 4,
357 offset: pos,
358 });
359 }
360 let mut nb = [0u8; 4];
361 nb.copy_from_slice(&bytes[pos..pos + 4]);
362 let num_bits = if little_endian {
363 u32::from_le_bytes(nb)
364 } else {
365 u32::from_be_bytes(nb)
366 };
367 pos += 4;
368 if num_bits > RTPS_BITMAP_MAX_BITS {
369 return Err(WireError::ValueOutOfRange {
370 message: "FragmentNumberSet.numBits exceeds RTPS_BITMAP_MAX_BITS (256)",
371 });
372 }
373 let words = (num_bits as usize).div_ceil(32);
374 let need = words * 4;
375 if bytes.len() < pos + need {
376 return Err(WireError::UnexpectedEof {
377 needed: need,
378 offset: pos,
379 });
380 }
381 let mut bitmap = Vec::with_capacity(words);
382 for _ in 0..words {
383 let mut w = [0u8; 4];
384 w.copy_from_slice(&bytes[pos..pos + 4]);
385 bitmap.push(if little_endian {
386 u32::from_le_bytes(w)
387 } else {
388 u32::from_be_bytes(w)
389 });
390 pos += 4;
391 }
392 Ok((
393 Self {
394 bitmap_base,
395 num_bits,
396 bitmap,
397 },
398 pos,
399 ))
400 }
401}
402
403pub const DATA_FLAG_INLINE_QOS: u8 = 0x02;
409pub const DATA_FLAG_DATA: u8 = 0x04;
411pub const DATA_FLAG_KEY: u8 = 0x08;
413pub const DATA_FLAG_NON_STANDARD: u8 = 0x10;
415
416#[derive(Debug, Clone, PartialEq, Eq)]
424pub struct DataSubmessage {
425 pub extra_flags: u16,
427 pub reader_id: EntityId,
429 pub writer_id: EntityId,
431 pub writer_sn: SequenceNumber,
433 pub inline_qos: Option<crate::parameter_list::ParameterList>,
437 pub key_flag: bool,
443 pub non_standard_flag: bool,
448 pub serialized_payload: Arc<[u8]>,
450}
451
452impl DataSubmessage {
453 #[must_use]
461 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
462 let mut out = Vec::new();
463 let extra = if little_endian {
465 self.extra_flags.to_le_bytes()
466 } else {
467 self.extra_flags.to_be_bytes()
468 };
469 out.extend_from_slice(&extra);
470 let octets_to_inline_qos: u16 = 16;
474 let oti = if little_endian {
475 octets_to_inline_qos.to_le_bytes()
476 } else {
477 octets_to_inline_qos.to_be_bytes()
478 };
479 out.extend_from_slice(&oti);
480 out.extend_from_slice(&self.reader_id.to_bytes());
482 out.extend_from_slice(&self.writer_id.to_bytes());
483 out.extend_from_slice(&if little_endian {
485 self.writer_sn.to_bytes_le()
486 } else {
487 self.writer_sn.to_bytes_be()
488 });
489 if let Some(pl) = &self.inline_qos {
491 out.extend_from_slice(&pl.to_bytes(little_endian));
492 }
493 out.extend_from_slice(&self.serialized_payload);
495
496 let mut flags = 0u8;
497 if little_endian {
498 flags |= FLAG_E_LITTLE_ENDIAN;
499 }
500 flags |= DATA_FLAG_DATA;
501 if self.key_flag {
502 flags |= DATA_FLAG_KEY;
503 }
504 if self.non_standard_flag {
505 flags |= DATA_FLAG_NON_STANDARD;
506 }
507 if self.inline_qos.is_some() {
508 flags |= DATA_FLAG_INLINE_QOS;
509 }
510 (out, flags)
511 }
512
513 pub fn read_body(body: &[u8], little_endian: bool) -> Result<Self, WireError> {
519 Self::read_body_with_flags(body, little_endian, 0)
520 }
521
522 pub fn read_body_with_flags(
530 body: &[u8],
531 little_endian: bool,
532 flags: u8,
533 ) -> Result<Self, WireError> {
534 if body.len() < 4 + 4 + 4 + 8 {
535 return Err(WireError::UnexpectedEof {
536 needed: 20,
537 offset: 0,
538 });
539 }
540 let mut pos = 0usize;
541 let mut ef = [0u8; 2];
542 ef.copy_from_slice(&body[pos..pos + 2]);
543 let extra_flags = if little_endian {
544 u16::from_le_bytes(ef)
545 } else {
546 u16::from_be_bytes(ef)
547 };
548 pos += 2;
549 pos += 2;
552 let mut rid = [0u8; 4];
553 rid.copy_from_slice(&body[pos..pos + 4]);
554 let reader_id = EntityId::from_bytes(rid);
555 pos += 4;
556 let mut wid = [0u8; 4];
557 wid.copy_from_slice(&body[pos..pos + 4]);
558 let writer_id = EntityId::from_bytes(wid);
559 pos += 4;
560 let mut sn = [0u8; 8];
561 sn.copy_from_slice(&body[pos..pos + 8]);
562 let writer_sn = if little_endian {
563 SequenceNumber::from_bytes_le(sn)
564 } else {
565 SequenceNumber::from_bytes_be(sn)
566 };
567 pos += 8;
568
569 let inline_qos = if flags & DATA_FLAG_INLINE_QOS != 0 {
571 let pl = crate::parameter_list::ParameterList::from_bytes(&body[pos..], little_endian)?;
577 let consumed = pl.to_bytes(little_endian).len();
580 pos += consumed;
581 Some(pl)
582 } else {
583 None
584 };
585
586 let serialized_payload: Arc<[u8]> = Arc::from(&body[pos..]);
588 let key_flag = (flags & DATA_FLAG_KEY) != 0;
589 let non_standard_flag = (flags & DATA_FLAG_NON_STANDARD) != 0;
590 Ok(Self {
591 extra_flags,
592 reader_id,
593 writer_id,
594 writer_sn,
595 inline_qos,
596 key_flag,
597 non_standard_flag,
598 serialized_payload,
599 })
600 }
601}
602
603pub const HEARTBEAT_FLAG_FINAL: u8 = 0x02;
609pub const HEARTBEAT_FLAG_LIVELINESS: u8 = 0x04;
611pub const HEARTBEAT_FLAG_GROUP_INFO: u8 = 0x08;
616
617#[derive(Debug, Clone, PartialEq, Eq)]
625pub struct HeartbeatGroupInfo {
626 pub current_gsn: SequenceNumber,
628 pub first_gsn: SequenceNumber,
630 pub last_gsn: SequenceNumber,
633 pub writer_set: Vec<crate::wire_types::GuidPrefix>,
635}
636
637#[derive(Debug, Clone, PartialEq, Eq)]
644pub struct HeartbeatSubmessage {
645 pub reader_id: EntityId,
647 pub writer_id: EntityId,
649 pub first_sn: SequenceNumber,
651 pub last_sn: SequenceNumber,
653 pub count: i32,
655 pub final_flag: bool,
657 pub liveliness_flag: bool,
659 pub group_info: Option<HeartbeatGroupInfo>,
661}
662
663impl HeartbeatSubmessage {
664 pub const WIRE_SIZE: usize = 28;
667
668 #[must_use]
671 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
672 let mut out = Vec::with_capacity(Self::WIRE_SIZE);
673 out.extend_from_slice(&self.reader_id.to_bytes());
674 out.extend_from_slice(&self.writer_id.to_bytes());
675 out.extend_from_slice(&if little_endian {
676 self.first_sn.to_bytes_le()
677 } else {
678 self.first_sn.to_bytes_be()
679 });
680 out.extend_from_slice(&if little_endian {
681 self.last_sn.to_bytes_le()
682 } else {
683 self.last_sn.to_bytes_be()
684 });
685 out.extend_from_slice(&if little_endian {
686 self.count.to_le_bytes()
687 } else {
688 self.count.to_be_bytes()
689 });
690 let mut flags = 0u8;
691 if little_endian {
692 flags |= FLAG_E_LITTLE_ENDIAN;
693 }
694 if self.final_flag {
695 flags |= HEARTBEAT_FLAG_FINAL;
696 }
697 if self.liveliness_flag {
698 flags |= HEARTBEAT_FLAG_LIVELINESS;
699 }
700 if let Some(gi) = &self.group_info {
701 flags |= HEARTBEAT_FLAG_GROUP_INFO;
702 for sn in [gi.current_gsn, gi.first_gsn, gi.last_gsn] {
703 out.extend_from_slice(&if little_endian {
704 sn.to_bytes_le()
705 } else {
706 sn.to_bytes_be()
707 });
708 }
709 let len = u32::try_from(gi.writer_set.len()).unwrap_or(u32::MAX);
710 out.extend_from_slice(&if little_endian {
711 len.to_le_bytes()
712 } else {
713 len.to_be_bytes()
714 });
715 for prefix in &gi.writer_set {
716 out.extend_from_slice(&prefix.to_bytes());
717 }
718 }
719 (out, flags)
720 }
721
722 pub fn read_body(
728 body: &[u8],
729 little_endian: bool,
730 final_flag: bool,
731 liveliness_flag: bool,
732 group_info_flag: bool,
733 ) -> Result<Self, WireError> {
734 if body.len() < Self::WIRE_SIZE {
735 return Err(WireError::UnexpectedEof {
736 needed: Self::WIRE_SIZE,
737 offset: 0,
738 });
739 }
740 let mut pos = 0usize;
741 let mut rid = [0u8; 4];
742 rid.copy_from_slice(&body[pos..pos + 4]);
743 let reader_id = EntityId::from_bytes(rid);
744 pos += 4;
745 let mut wid = [0u8; 4];
746 wid.copy_from_slice(&body[pos..pos + 4]);
747 let writer_id = EntityId::from_bytes(wid);
748 pos += 4;
749 let mut sn = [0u8; 8];
750 sn.copy_from_slice(&body[pos..pos + 8]);
751 let first_sn = if little_endian {
752 SequenceNumber::from_bytes_le(sn)
753 } else {
754 SequenceNumber::from_bytes_be(sn)
755 };
756 pos += 8;
757 sn.copy_from_slice(&body[pos..pos + 8]);
758 let last_sn = if little_endian {
759 SequenceNumber::from_bytes_le(sn)
760 } else {
761 SequenceNumber::from_bytes_be(sn)
762 };
763 pos += 8;
764 let mut cnt = [0u8; 4];
765 cnt.copy_from_slice(&body[pos..pos + 4]);
766 let count = if little_endian {
767 i32::from_le_bytes(cnt)
768 } else {
769 i32::from_be_bytes(cnt)
770 };
771 pos += 4;
772 let group_info = if group_info_flag {
773 if body.len() < pos + 28 {
775 return Err(WireError::UnexpectedEof {
776 needed: 28,
777 offset: pos,
778 });
779 }
780 let mut s = [0u8; 8];
781 s.copy_from_slice(&body[pos..pos + 8]);
782 let current_gsn = if little_endian {
783 SequenceNumber::from_bytes_le(s)
784 } else {
785 SequenceNumber::from_bytes_be(s)
786 };
787 pos += 8;
788 s.copy_from_slice(&body[pos..pos + 8]);
789 let first_gsn = if little_endian {
790 SequenceNumber::from_bytes_le(s)
791 } else {
792 SequenceNumber::from_bytes_be(s)
793 };
794 pos += 8;
795 s.copy_from_slice(&body[pos..pos + 8]);
796 let last_gsn = if little_endian {
797 SequenceNumber::from_bytes_le(s)
798 } else {
799 SequenceNumber::from_bytes_be(s)
800 };
801 pos += 8;
802 let mut len_bytes = [0u8; 4];
803 len_bytes.copy_from_slice(&body[pos..pos + 4]);
804 let len = if little_endian {
805 u32::from_le_bytes(len_bytes)
806 } else {
807 u32::from_be_bytes(len_bytes)
808 } as usize;
809 pos += 4;
810 let remaining = body.len().saturating_sub(pos);
813 if len.saturating_mul(12) > remaining {
814 return Err(WireError::ValueOutOfRange {
815 message: "HEARTBEAT.groupInfo.writerSet length exceeds body",
816 });
817 }
818 let mut writer_set = Vec::with_capacity(len);
819 for _ in 0..len {
820 let mut p = [0u8; 12];
821 p.copy_from_slice(&body[pos..pos + 12]);
822 writer_set.push(crate::wire_types::GuidPrefix::from_bytes(p));
823 pos += 12;
824 }
825 Some(HeartbeatGroupInfo {
826 current_gsn,
827 first_gsn,
828 last_gsn,
829 writer_set,
830 })
831 } else {
832 None
833 };
834 Ok(Self {
835 reader_id,
836 writer_id,
837 first_sn,
838 last_sn,
839 count,
840 final_flag,
841 liveliness_flag,
842 group_info,
843 })
844 }
845}
846
847pub const ACKNACK_FLAG_FINAL: u8 = 0x02;
853
854#[derive(Debug, Clone, PartialEq, Eq)]
860pub struct AckNackSubmessage {
861 pub reader_id: EntityId,
863 pub writer_id: EntityId,
865 pub reader_sn_state: SequenceNumberSet,
867 pub count: i32,
869 pub final_flag: bool,
871}
872
873impl AckNackSubmessage {
874 #[must_use]
876 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
877 let mut out = Vec::new();
878 out.extend_from_slice(&self.reader_id.to_bytes());
879 out.extend_from_slice(&self.writer_id.to_bytes());
880 self.reader_sn_state.write_to(&mut out, little_endian);
881 out.extend_from_slice(&if little_endian {
882 self.count.to_le_bytes()
883 } else {
884 self.count.to_be_bytes()
885 });
886 let mut flags = 0u8;
887 if little_endian {
888 flags |= FLAG_E_LITTLE_ENDIAN;
889 }
890 if self.final_flag {
891 flags |= ACKNACK_FLAG_FINAL;
892 }
893 (out, flags)
894 }
895
896 pub fn read_body(
902 body: &[u8],
903 little_endian: bool,
904 final_flag: bool,
905 ) -> Result<Self, WireError> {
906 if body.len() < 8 {
907 return Err(WireError::UnexpectedEof {
908 needed: 8,
909 offset: 0,
910 });
911 }
912 let mut pos = 0usize;
913 let mut rid = [0u8; 4];
914 rid.copy_from_slice(&body[pos..pos + 4]);
915 let reader_id = EntityId::from_bytes(rid);
916 pos += 4;
917 let mut wid = [0u8; 4];
918 wid.copy_from_slice(&body[pos..pos + 4]);
919 let writer_id = EntityId::from_bytes(wid);
920 pos += 4;
921 let (reader_sn_state, new_pos) = SequenceNumberSet::read_from(body, pos, little_endian)?;
922 pos = new_pos;
923 if body.len() < pos + 4 {
924 return Err(WireError::UnexpectedEof {
925 needed: 4,
926 offset: pos,
927 });
928 }
929 let mut cnt = [0u8; 4];
930 cnt.copy_from_slice(&body[pos..pos + 4]);
931 let count = if little_endian {
932 i32::from_le_bytes(cnt)
933 } else {
934 i32::from_be_bytes(cnt)
935 };
936 Ok(Self {
937 reader_id,
938 writer_id,
939 reader_sn_state,
940 count,
941 final_flag,
942 })
943 }
944}
945
946pub const GAP_FLAG_GROUP_INFO: u8 = 0x04;
953
954pub const GAP_FLAG_FILTERED_COUNT: u8 = 0x08;
960
961#[derive(Debug, Clone, Copy, PartialEq, Eq)]
963pub struct GapGroupInfo {
964 pub gap_start_gsn: SequenceNumber,
966 pub gap_end_gsn: SequenceNumber,
968}
969
970#[derive(Debug, Clone, PartialEq, Eq)]
975pub struct GapSubmessage {
976 pub reader_id: EntityId,
978 pub writer_id: EntityId,
980 pub gap_start: SequenceNumber,
982 pub gap_list: SequenceNumberSet,
984 pub group_info: Option<GapGroupInfo>,
986 pub filtered_count: Option<u32>,
991}
992
993impl GapSubmessage {
994 #[must_use]
996 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
997 let mut out = Vec::new();
998 out.extend_from_slice(&self.reader_id.to_bytes());
999 out.extend_from_slice(&self.writer_id.to_bytes());
1000 out.extend_from_slice(&if little_endian {
1001 self.gap_start.to_bytes_le()
1002 } else {
1003 self.gap_start.to_bytes_be()
1004 });
1005 self.gap_list.write_to(&mut out, little_endian);
1006 let mut flags = 0u8;
1007 if little_endian {
1008 flags |= FLAG_E_LITTLE_ENDIAN;
1009 }
1010 if let Some(gi) = self.group_info {
1011 flags |= GAP_FLAG_GROUP_INFO;
1012 out.extend_from_slice(&if little_endian {
1013 gi.gap_start_gsn.to_bytes_le()
1014 } else {
1015 gi.gap_start_gsn.to_bytes_be()
1016 });
1017 out.extend_from_slice(&if little_endian {
1018 gi.gap_end_gsn.to_bytes_le()
1019 } else {
1020 gi.gap_end_gsn.to_bytes_be()
1021 });
1022 }
1023 if let Some(fc) = self.filtered_count {
1024 flags |= GAP_FLAG_FILTERED_COUNT;
1025 out.extend_from_slice(&if little_endian {
1026 fc.to_le_bytes()
1027 } else {
1028 fc.to_be_bytes()
1029 });
1030 }
1031 (out, flags)
1032 }
1033
1034 pub fn read_body(
1040 body: &[u8],
1041 little_endian: bool,
1042 group_info_flag: bool,
1043 filtered_count_flag: bool,
1044 ) -> Result<Self, WireError> {
1045 if body.len() < 4 + 4 + 8 {
1046 return Err(WireError::UnexpectedEof {
1047 needed: 16,
1048 offset: 0,
1049 });
1050 }
1051 let mut pos = 0usize;
1052 let mut rid = [0u8; 4];
1053 rid.copy_from_slice(&body[pos..pos + 4]);
1054 let reader_id = EntityId::from_bytes(rid);
1055 pos += 4;
1056 let mut wid = [0u8; 4];
1057 wid.copy_from_slice(&body[pos..pos + 4]);
1058 let writer_id = EntityId::from_bytes(wid);
1059 pos += 4;
1060 let mut sn = [0u8; 8];
1061 sn.copy_from_slice(&body[pos..pos + 8]);
1062 let gap_start = if little_endian {
1063 SequenceNumber::from_bytes_le(sn)
1064 } else {
1065 SequenceNumber::from_bytes_be(sn)
1066 };
1067 pos += 8;
1068 let (gap_list, new_pos) = SequenceNumberSet::read_from(body, pos, little_endian)?;
1069 pos = new_pos;
1070 let group_info = if group_info_flag {
1071 if body.len() < pos + 16 {
1072 return Err(WireError::UnexpectedEof {
1073 needed: 16,
1074 offset: pos,
1075 });
1076 }
1077 let mut s = [0u8; 8];
1078 s.copy_from_slice(&body[pos..pos + 8]);
1079 let gap_start_gsn = if little_endian {
1080 SequenceNumber::from_bytes_le(s)
1081 } else {
1082 SequenceNumber::from_bytes_be(s)
1083 };
1084 pos += 8;
1085 s.copy_from_slice(&body[pos..pos + 8]);
1086 let gap_end_gsn = if little_endian {
1087 SequenceNumber::from_bytes_le(s)
1088 } else {
1089 SequenceNumber::from_bytes_be(s)
1090 };
1091 pos += 8;
1092 Some(GapGroupInfo {
1093 gap_start_gsn,
1094 gap_end_gsn,
1095 })
1096 } else {
1097 None
1098 };
1099 let filtered_count = if filtered_count_flag {
1100 if body.len() < pos + 4 {
1101 return Err(WireError::UnexpectedEof {
1102 needed: 4,
1103 offset: pos,
1104 });
1105 }
1106 let mut c = [0u8; 4];
1107 c.copy_from_slice(&body[pos..pos + 4]);
1108 let fc = if little_endian {
1109 u32::from_le_bytes(c)
1110 } else {
1111 u32::from_be_bytes(c)
1112 };
1113 Some(fc)
1114 } else {
1115 None
1116 };
1117 Ok(Self {
1118 reader_id,
1119 writer_id,
1120 gap_start,
1121 gap_list,
1122 group_info,
1123 filtered_count,
1124 })
1125 }
1126}
1127
1128pub const DATA_FRAG_FLAG_INLINE_QOS: u8 = 0x02;
1134pub const DATA_FRAG_FLAG_HASH_KEY: u8 = 0x04;
1136pub const DATA_FRAG_FLAG_KEY: u8 = 0x08;
1138pub const DATA_FRAG_FLAG_NON_STANDARD: u8 = 0x10;
1140
1141#[derive(Debug, Clone, PartialEq, Eq)]
1146pub struct DataFragSubmessage {
1147 pub extra_flags: u16,
1150 pub reader_id: EntityId,
1152 pub writer_id: EntityId,
1154 pub writer_sn: SequenceNumber,
1156 pub fragment_starting_num: FragmentNumber,
1158 pub fragments_in_submessage: u16,
1160 pub fragment_size: u16,
1162 pub sample_size: u32,
1164 pub serialized_payload: Arc<[u8]>,
1167 pub inline_qos_flag: bool,
1169 pub hash_key_flag: bool,
1171 pub key_flag: bool,
1173 pub non_standard_flag: bool,
1175}
1176
1177impl DataFragSubmessage {
1178 pub const HEADER_WIRE_SIZE: usize = 32;
1182
1183 pub const OCTETS_TO_INLINE_QOS: u16 = 28;
1187
1188 #[must_use]
1190 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
1191 let mut out = Vec::with_capacity(Self::HEADER_WIRE_SIZE + self.serialized_payload.len());
1192 if little_endian {
1193 out.extend_from_slice(&self.extra_flags.to_le_bytes());
1194 out.extend_from_slice(&Self::OCTETS_TO_INLINE_QOS.to_le_bytes());
1195 } else {
1196 out.extend_from_slice(&self.extra_flags.to_be_bytes());
1197 out.extend_from_slice(&Self::OCTETS_TO_INLINE_QOS.to_be_bytes());
1198 }
1199 out.extend_from_slice(&self.reader_id.to_bytes());
1200 out.extend_from_slice(&self.writer_id.to_bytes());
1201 out.extend_from_slice(&if little_endian {
1202 self.writer_sn.to_bytes_le()
1203 } else {
1204 self.writer_sn.to_bytes_be()
1205 });
1206 out.extend_from_slice(&if little_endian {
1207 self.fragment_starting_num.to_bytes_le()
1208 } else {
1209 self.fragment_starting_num.to_bytes_be()
1210 });
1211 if little_endian {
1212 out.extend_from_slice(&self.fragments_in_submessage.to_le_bytes());
1213 out.extend_from_slice(&self.fragment_size.to_le_bytes());
1214 out.extend_from_slice(&self.sample_size.to_le_bytes());
1215 } else {
1216 out.extend_from_slice(&self.fragments_in_submessage.to_be_bytes());
1217 out.extend_from_slice(&self.fragment_size.to_be_bytes());
1218 out.extend_from_slice(&self.sample_size.to_be_bytes());
1219 }
1220 out.extend_from_slice(&self.serialized_payload);
1221 let mut flags = 0u8;
1222 if little_endian {
1223 flags |= FLAG_E_LITTLE_ENDIAN;
1224 }
1225 if self.inline_qos_flag {
1226 flags |= DATA_FRAG_FLAG_INLINE_QOS;
1227 }
1228 if self.hash_key_flag {
1229 flags |= DATA_FRAG_FLAG_HASH_KEY;
1230 }
1231 if self.key_flag {
1232 flags |= DATA_FRAG_FLAG_KEY;
1233 }
1234 if self.non_standard_flag {
1235 flags |= DATA_FRAG_FLAG_NON_STANDARD;
1236 }
1237 (out, flags)
1238 }
1239
1240 pub fn read_body(
1246 body: &[u8],
1247 little_endian: bool,
1248 inline_qos_flag: bool,
1249 hash_key_flag: bool,
1250 key_flag: bool,
1251 non_standard_flag: bool,
1252 ) -> Result<Self, WireError> {
1253 if body.len() < Self::HEADER_WIRE_SIZE {
1254 return Err(WireError::UnexpectedEof {
1255 needed: Self::HEADER_WIRE_SIZE,
1256 offset: 0,
1257 });
1258 }
1259 let mut pos = 0usize;
1260 let mut ef = [0u8; 2];
1261 ef.copy_from_slice(&body[pos..pos + 2]);
1262 let extra_flags = if little_endian {
1263 u16::from_le_bytes(ef)
1264 } else {
1265 u16::from_be_bytes(ef)
1266 };
1267 pos += 2;
1268 let mut otq = [0u8; 2];
1277 otq.copy_from_slice(&body[pos..pos + 2]);
1278 let octets_to_inline_qos = if little_endian {
1279 u16::from_le_bytes(otq)
1280 } else {
1281 u16::from_be_bytes(otq)
1282 };
1283 pos += 2;
1284 if !inline_qos_flag && octets_to_inline_qos != Self::OCTETS_TO_INLINE_QOS {
1285 return Err(WireError::ValueOutOfRange {
1286 message: "DATA_FRAG.octetsToInlineQos must equal 28 when Q=false",
1287 });
1288 }
1289 let mut rid = [0u8; 4];
1290 rid.copy_from_slice(&body[pos..pos + 4]);
1291 let reader_id = EntityId::from_bytes(rid);
1292 pos += 4;
1293 let mut wid = [0u8; 4];
1294 wid.copy_from_slice(&body[pos..pos + 4]);
1295 let writer_id = EntityId::from_bytes(wid);
1296 pos += 4;
1297 let mut sn = [0u8; 8];
1298 sn.copy_from_slice(&body[pos..pos + 8]);
1299 let writer_sn = if little_endian {
1300 SequenceNumber::from_bytes_le(sn)
1301 } else {
1302 SequenceNumber::from_bytes_be(sn)
1303 };
1304 pos += 8;
1305 let mut fsn = [0u8; 4];
1306 fsn.copy_from_slice(&body[pos..pos + 4]);
1307 let fragment_starting_num = if little_endian {
1308 FragmentNumber::from_bytes_le(fsn)
1309 } else {
1310 FragmentNumber::from_bytes_be(fsn)
1311 };
1312 pos += 4;
1313 let mut fis = [0u8; 2];
1314 fis.copy_from_slice(&body[pos..pos + 2]);
1315 let fragments_in_submessage = if little_endian {
1316 u16::from_le_bytes(fis)
1317 } else {
1318 u16::from_be_bytes(fis)
1319 };
1320 pos += 2;
1321 let mut fs = [0u8; 2];
1322 fs.copy_from_slice(&body[pos..pos + 2]);
1323 let fragment_size = if little_endian {
1324 u16::from_le_bytes(fs)
1325 } else {
1326 u16::from_be_bytes(fs)
1327 };
1328 pos += 2;
1329 let mut ss = [0u8; 4];
1330 ss.copy_from_slice(&body[pos..pos + 4]);
1331 let sample_size = if little_endian {
1332 u32::from_le_bytes(ss)
1333 } else {
1334 u32::from_be_bytes(ss)
1335 };
1336 pos += 4;
1337 if inline_qos_flag {
1341 return Err(WireError::UnsupportedFeature {
1342 what: "DATA_FRAG with inline_qos",
1343 });
1344 }
1345 let serialized_payload: Arc<[u8]> = Arc::from(&body[pos..]);
1346 Ok(Self {
1347 extra_flags,
1348 reader_id,
1349 writer_id,
1350 writer_sn,
1351 fragment_starting_num,
1352 fragments_in_submessage,
1353 fragment_size,
1354 sample_size,
1355 serialized_payload,
1356 inline_qos_flag,
1357 hash_key_flag,
1358 key_flag,
1359 non_standard_flag,
1360 })
1361 }
1362}
1363
1364#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1380pub struct InfoSourceSubmessage {
1381 pub unused: u32,
1384 pub protocol_version: crate::wire_types::ProtocolVersion,
1386 pub vendor_id: crate::wire_types::VendorId,
1388 pub guid_prefix: crate::wire_types::GuidPrefix,
1390}
1391
1392impl InfoSourceSubmessage {
1393 pub const WIRE_SIZE: usize = 20;
1395
1396 #[must_use]
1398 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
1399 let mut out = Vec::with_capacity(Self::WIRE_SIZE);
1400 out.extend_from_slice(&if little_endian {
1401 self.unused.to_le_bytes()
1402 } else {
1403 self.unused.to_be_bytes()
1404 });
1405 out.extend_from_slice(&self.protocol_version.to_bytes());
1406 out.extend_from_slice(&self.vendor_id.to_bytes());
1407 out.extend_from_slice(&self.guid_prefix.to_bytes());
1408 let mut flags = 0u8;
1409 if little_endian {
1410 flags |= FLAG_E_LITTLE_ENDIAN;
1411 }
1412 (out, flags)
1413 }
1414
1415 pub fn read_body(body: &[u8], little_endian: bool) -> Result<Self, WireError> {
1420 if body.len() < Self::WIRE_SIZE {
1421 return Err(WireError::UnexpectedEof {
1422 needed: Self::WIRE_SIZE,
1423 offset: 0,
1424 });
1425 }
1426 let mut pos = 0usize;
1427 let mut u = [0u8; 4];
1428 u.copy_from_slice(&body[pos..pos + 4]);
1429 let unused = if little_endian {
1430 u32::from_le_bytes(u)
1431 } else {
1432 u32::from_be_bytes(u)
1433 };
1434 pos += 4;
1435 let mut pv = [0u8; 2];
1436 pv.copy_from_slice(&body[pos..pos + 2]);
1437 let protocol_version = crate::wire_types::ProtocolVersion::from_bytes(pv);
1438 pos += 2;
1439 let mut vid = [0u8; 2];
1440 vid.copy_from_slice(&body[pos..pos + 2]);
1441 let vendor_id = crate::wire_types::VendorId::from_bytes(vid);
1442 pos += 2;
1443 let mut gp = [0u8; 12];
1444 gp.copy_from_slice(&body[pos..pos + 12]);
1445 let guid_prefix = crate::wire_types::GuidPrefix::from_bytes(gp);
1446 Ok(Self {
1447 unused,
1448 protocol_version,
1449 vendor_id,
1450 guid_prefix,
1451 })
1452 }
1453}
1454
1455pub const INFO_TIMESTAMP_FLAG_INVALIDATE: u8 = 0x02;
1462
1463#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1467pub struct InfoTimestampSubmessage {
1468 pub timestamp: crate::header_extension::HeTimestamp,
1471 pub invalidate: bool,
1474}
1475
1476impl InfoTimestampSubmessage {
1477 #[must_use]
1480 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
1481 let mut flags = 0u8;
1482 if little_endian {
1483 flags |= FLAG_E_LITTLE_ENDIAN;
1484 }
1485 if self.invalidate {
1486 flags |= INFO_TIMESTAMP_FLAG_INVALIDATE;
1487 return (Vec::new(), flags);
1488 }
1489 let mut out = Vec::with_capacity(8);
1490 let s = if little_endian {
1491 self.timestamp.seconds.to_le_bytes()
1492 } else {
1493 self.timestamp.seconds.to_be_bytes()
1494 };
1495 let f = if little_endian {
1496 self.timestamp.fraction.to_le_bytes()
1497 } else {
1498 self.timestamp.fraction.to_be_bytes()
1499 };
1500 out.extend_from_slice(&s);
1501 out.extend_from_slice(&f);
1502 (out, flags)
1503 }
1504
1505 pub fn read_body(
1511 body: &[u8],
1512 little_endian: bool,
1513 invalidate_flag: bool,
1514 ) -> Result<Self, WireError> {
1515 if invalidate_flag {
1516 return Ok(Self {
1517 timestamp: crate::header_extension::HeTimestamp::default(),
1518 invalidate: true,
1519 });
1520 }
1521 if body.len() < 8 {
1522 return Err(WireError::UnexpectedEof {
1523 needed: 8,
1524 offset: 0,
1525 });
1526 }
1527 let mut s = [0u8; 4];
1528 s.copy_from_slice(&body[0..4]);
1529 let mut f = [0u8; 4];
1530 f.copy_from_slice(&body[4..8]);
1531 let seconds = if little_endian {
1532 i32::from_le_bytes(s)
1533 } else {
1534 i32::from_be_bytes(s)
1535 };
1536 let fraction = if little_endian {
1537 u32::from_le_bytes(f)
1538 } else {
1539 u32::from_be_bytes(f)
1540 };
1541 Ok(Self {
1542 timestamp: crate::header_extension::HeTimestamp { seconds, fraction },
1543 invalidate: false,
1544 })
1545 }
1546}
1547
1548pub const INFO_REPLY_FLAG_MULTICAST: u8 = 0x02;
1555
1556#[derive(Debug, Clone, PartialEq, Eq)]
1564pub struct InfoReplySubmessage {
1565 pub unicast_locators: Vec<crate::wire_types::Locator>,
1567 pub multicast_locators: Option<Vec<crate::wire_types::Locator>>,
1569}
1570
1571impl InfoReplySubmessage {
1572 #[must_use]
1574 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
1575 let mut out = Vec::new();
1576 Self::write_locator_list(&mut out, &self.unicast_locators, little_endian);
1577 let mut flags = 0u8;
1578 if little_endian {
1579 flags |= FLAG_E_LITTLE_ENDIAN;
1580 }
1581 if let Some(mcast) = &self.multicast_locators {
1582 flags |= INFO_REPLY_FLAG_MULTICAST;
1583 Self::write_locator_list(&mut out, mcast, little_endian);
1584 }
1585 (out, flags)
1586 }
1587
1588 fn write_locator_list(
1589 out: &mut Vec<u8>,
1590 list: &[crate::wire_types::Locator],
1591 little_endian: bool,
1592 ) {
1593 let len = u32::try_from(list.len()).unwrap_or(u32::MAX);
1594 out.extend_from_slice(&if little_endian {
1595 len.to_le_bytes()
1596 } else {
1597 len.to_be_bytes()
1598 });
1599 for loc in list {
1600 if little_endian {
1605 out.extend_from_slice(&loc.to_bytes_le());
1606 } else {
1607 out.extend_from_slice(&(loc.kind.as_i32()).to_be_bytes());
1609 out.extend_from_slice(&loc.port.to_be_bytes());
1610 out.extend_from_slice(&loc.address);
1611 }
1612 }
1613 }
1614
1615 pub fn read_body(
1621 body: &[u8],
1622 little_endian: bool,
1623 multicast_flag: bool,
1624 ) -> Result<Self, WireError> {
1625 let mut pos = 0usize;
1626 let unicast_locators = Self::read_locator_list(body, &mut pos, little_endian)?;
1627 let multicast_locators = if multicast_flag {
1628 Some(Self::read_locator_list(body, &mut pos, little_endian)?)
1629 } else {
1630 None
1631 };
1632 Ok(Self {
1633 unicast_locators,
1634 multicast_locators,
1635 })
1636 }
1637
1638 fn read_locator_list(
1639 body: &[u8],
1640 pos: &mut usize,
1641 little_endian: bool,
1642 ) -> Result<Vec<crate::wire_types::Locator>, WireError> {
1643 if body.len() < *pos + 4 {
1644 return Err(WireError::UnexpectedEof {
1645 needed: 4,
1646 offset: *pos,
1647 });
1648 }
1649 let mut len_bytes = [0u8; 4];
1650 len_bytes.copy_from_slice(&body[*pos..*pos + 4]);
1651 let len = if little_endian {
1652 u32::from_le_bytes(len_bytes)
1653 } else {
1654 u32::from_be_bytes(len_bytes)
1655 } as usize;
1656 *pos += 4;
1657 let remaining = body.len().saturating_sub(*pos);
1658 if len.saturating_mul(24) > remaining {
1659 return Err(WireError::ValueOutOfRange {
1660 message: "InfoReply.locatorList length exceeds body",
1661 });
1662 }
1663 let mut out = Vec::with_capacity(len);
1664 for _ in 0..len {
1665 let mut buf = [0u8; 24];
1666 buf.copy_from_slice(&body[*pos..*pos + 24]);
1667 let loc = if little_endian {
1670 crate::wire_types::Locator::from_bytes_le(buf)?
1671 } else {
1672 let mut k = [0u8; 4];
1673 k.copy_from_slice(&buf[0..4]);
1674 let kind_raw = i32::from_be_bytes(k);
1675 let kind = crate::wire_types::LocatorKind::from_i32(kind_raw)?;
1676 let mut p = [0u8; 4];
1677 p.copy_from_slice(&buf[4..8]);
1678 let port = u32::from_be_bytes(p);
1679 let mut address = [0u8; 16];
1680 address.copy_from_slice(&buf[8..24]);
1681 crate::wire_types::Locator {
1682 kind,
1683 port,
1684 address,
1685 }
1686 };
1687 out.push(loc);
1688 *pos += 24;
1689 }
1690 Ok(out)
1691 }
1692}
1693
1694#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1703pub struct HeartbeatFragSubmessage {
1704 pub reader_id: EntityId,
1706 pub writer_id: EntityId,
1708 pub writer_sn: SequenceNumber,
1710 pub last_fragment_num: FragmentNumber,
1712 pub count: i32,
1714}
1715
1716impl HeartbeatFragSubmessage {
1717 pub const WIRE_SIZE: usize = 24;
1719
1720 #[must_use]
1722 pub fn write_body(self, little_endian: bool) -> (Vec<u8>, u8) {
1723 let mut out = Vec::with_capacity(Self::WIRE_SIZE);
1724 out.extend_from_slice(&self.reader_id.to_bytes());
1725 out.extend_from_slice(&self.writer_id.to_bytes());
1726 out.extend_from_slice(&if little_endian {
1727 self.writer_sn.to_bytes_le()
1728 } else {
1729 self.writer_sn.to_bytes_be()
1730 });
1731 out.extend_from_slice(&if little_endian {
1732 self.last_fragment_num.to_bytes_le()
1733 } else {
1734 self.last_fragment_num.to_bytes_be()
1735 });
1736 out.extend_from_slice(&if little_endian {
1737 self.count.to_le_bytes()
1738 } else {
1739 self.count.to_be_bytes()
1740 });
1741 let mut flags = 0u8;
1742 if little_endian {
1743 flags |= FLAG_E_LITTLE_ENDIAN;
1744 }
1745 (out, flags)
1746 }
1747
1748 pub fn read_body(body: &[u8], little_endian: bool) -> Result<Self, WireError> {
1753 if body.len() < Self::WIRE_SIZE {
1754 return Err(WireError::UnexpectedEof {
1755 needed: Self::WIRE_SIZE,
1756 offset: 0,
1757 });
1758 }
1759 let mut pos = 0usize;
1760 let mut rid = [0u8; 4];
1761 rid.copy_from_slice(&body[pos..pos + 4]);
1762 let reader_id = EntityId::from_bytes(rid);
1763 pos += 4;
1764 let mut wid = [0u8; 4];
1765 wid.copy_from_slice(&body[pos..pos + 4]);
1766 let writer_id = EntityId::from_bytes(wid);
1767 pos += 4;
1768 let mut sn = [0u8; 8];
1769 sn.copy_from_slice(&body[pos..pos + 8]);
1770 let writer_sn = if little_endian {
1771 SequenceNumber::from_bytes_le(sn)
1772 } else {
1773 SequenceNumber::from_bytes_be(sn)
1774 };
1775 pos += 8;
1776 let mut lf = [0u8; 4];
1777 lf.copy_from_slice(&body[pos..pos + 4]);
1778 let last_fragment_num = if little_endian {
1779 FragmentNumber::from_bytes_le(lf)
1780 } else {
1781 FragmentNumber::from_bytes_be(lf)
1782 };
1783 pos += 4;
1784 let mut cnt = [0u8; 4];
1785 cnt.copy_from_slice(&body[pos..pos + 4]);
1786 let count = if little_endian {
1787 i32::from_le_bytes(cnt)
1788 } else {
1789 i32::from_be_bytes(cnt)
1790 };
1791 Ok(Self {
1792 reader_id,
1793 writer_id,
1794 writer_sn,
1795 last_fragment_num,
1796 count,
1797 })
1798 }
1799}
1800
1801#[derive(Debug, Clone, PartialEq, Eq)]
1808pub struct NackFragSubmessage {
1809 pub reader_id: EntityId,
1811 pub writer_id: EntityId,
1813 pub writer_sn: SequenceNumber,
1815 pub fragment_number_state: FragmentNumberSet,
1817 pub count: i32,
1819}
1820
1821impl NackFragSubmessage {
1822 #[must_use]
1824 pub fn write_body(&self, little_endian: bool) -> (Vec<u8>, u8) {
1825 let mut out = Vec::new();
1826 out.extend_from_slice(&self.reader_id.to_bytes());
1827 out.extend_from_slice(&self.writer_id.to_bytes());
1828 out.extend_from_slice(&if little_endian {
1829 self.writer_sn.to_bytes_le()
1830 } else {
1831 self.writer_sn.to_bytes_be()
1832 });
1833 self.fragment_number_state.write_to(&mut out, little_endian);
1834 out.extend_from_slice(&if little_endian {
1835 self.count.to_le_bytes()
1836 } else {
1837 self.count.to_be_bytes()
1838 });
1839 let mut flags = 0u8;
1840 if little_endian {
1841 flags |= FLAG_E_LITTLE_ENDIAN;
1842 }
1843 (out, flags)
1844 }
1845
1846 pub fn read_body(body: &[u8], little_endian: bool) -> Result<Self, WireError> {
1851 if body.len() < 4 + 4 + 8 + 4 + 4 + 4 {
1852 return Err(WireError::UnexpectedEof {
1853 needed: 4 + 4 + 8 + 4 + 4 + 4,
1854 offset: 0,
1855 });
1856 }
1857 let mut pos = 0usize;
1858 let mut rid = [0u8; 4];
1859 rid.copy_from_slice(&body[pos..pos + 4]);
1860 let reader_id = EntityId::from_bytes(rid);
1861 pos += 4;
1862 let mut wid = [0u8; 4];
1863 wid.copy_from_slice(&body[pos..pos + 4]);
1864 let writer_id = EntityId::from_bytes(wid);
1865 pos += 4;
1866 let mut sn = [0u8; 8];
1867 sn.copy_from_slice(&body[pos..pos + 8]);
1868 let writer_sn = if little_endian {
1869 SequenceNumber::from_bytes_le(sn)
1870 } else {
1871 SequenceNumber::from_bytes_be(sn)
1872 };
1873 pos += 8;
1874 let (fragment_number_state, new_pos) =
1875 FragmentNumberSet::read_from(body, pos, little_endian)?;
1876 pos = new_pos;
1877 if body.len() < pos + 4 {
1878 return Err(WireError::UnexpectedEof {
1879 needed: 4,
1880 offset: pos,
1881 });
1882 }
1883 let mut cnt = [0u8; 4];
1884 cnt.copy_from_slice(&body[pos..pos + 4]);
1885 let count = if little_endian {
1886 i32::from_le_bytes(cnt)
1887 } else {
1888 i32::from_be_bytes(cnt)
1889 };
1890 Ok(Self {
1891 reader_id,
1892 writer_id,
1893 writer_sn,
1894 fragment_number_state,
1895 count,
1896 })
1897 }
1898}
1899
1900#[cfg(test)]
1901mod tests {
1902 #![allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]
1903 use super::*;
1904 use alloc::vec;
1905
1906 fn writer_id() -> EntityId {
1907 EntityId::user_writer_with_key([0x10, 0x20, 0x30])
1908 }
1909 fn reader_id() -> EntityId {
1910 EntityId::user_reader_with_key([0x40, 0x50, 0x60])
1911 }
1912
1913 #[test]
1916 fn snset_wire_size_zero_bits_is_12_bytes() {
1917 assert_eq!(SequenceNumberSet::wire_size(0), 12);
1918 }
1919
1920 #[test]
1921 fn snset_wire_size_32_bits_is_16_bytes() {
1922 assert_eq!(SequenceNumberSet::wire_size(32), 16);
1923 }
1924
1925 #[test]
1926 fn snset_wire_size_33_bits_is_20_bytes() {
1927 assert_eq!(SequenceNumberSet::wire_size(33), 20);
1928 }
1929
1930 #[test]
1931 fn snset_roundtrip_le() {
1932 let s = SequenceNumberSet {
1933 bitmap_base: SequenceNumber(100),
1934 num_bits: 5,
1935 bitmap: vec![0b0000_1010_0000_0000_0000_0000_0000_0000],
1936 };
1937 let mut buf = Vec::new();
1938 s.write_to(&mut buf, true);
1939 let (decoded, end) = SequenceNumberSet::read_from(&buf, 0, true).unwrap();
1940 assert_eq!(decoded, s);
1941 assert_eq!(end, buf.len());
1942 }
1943
1944 #[test]
1945 fn snset_roundtrip_be() {
1946 let s = SequenceNumberSet {
1947 bitmap_base: SequenceNumber(0xDEAD_BEEF),
1948 num_bits: 64,
1949 bitmap: vec![0x1234_5678, 0x9ABC_DEF0],
1950 };
1951 let mut buf = Vec::new();
1952 s.write_to(&mut buf, false);
1953 let (decoded, _) = SequenceNumberSet::read_from(&buf, 0, false).unwrap();
1954 assert_eq!(decoded, s);
1955 }
1956
1957 #[test]
1958 fn snset_decode_rejects_truncated_bitmap() {
1959 let mut buf = Vec::new();
1961 buf.extend_from_slice(&SequenceNumber(0).to_bytes_le());
1962 buf.extend_from_slice(&64_u32.to_le_bytes());
1963 buf.extend_from_slice(&[0u8; 4]); let res = SequenceNumberSet::read_from(&buf, 0, true);
1965 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
1966 }
1967
1968 #[test]
1971 fn data_submessage_roundtrip_le() {
1972 let d = DataSubmessage {
1973 extra_flags: 0,
1974 reader_id: reader_id(),
1975 writer_id: writer_id(),
1976 writer_sn: SequenceNumber(42),
1977 inline_qos: None,
1978 key_flag: false,
1979 non_standard_flag: false,
1980 serialized_payload: Arc::<[u8]>::from([1u8, 2, 3, 4, 5].as_slice()),
1981 };
1982 let (bytes, flags) = d.write_body(true);
1983 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
1984 assert!(flags & DATA_FLAG_DATA != 0);
1985 let decoded = DataSubmessage::read_body(&bytes, true).unwrap();
1986 assert_eq!(decoded, d);
1987 }
1988
1989 #[test]
1990 fn data_submessage_roundtrip_be_with_empty_payload() {
1991 let d = DataSubmessage {
1992 extra_flags: 0,
1993 reader_id: reader_id(),
1994 writer_id: writer_id(),
1995 writer_sn: SequenceNumber(0xDEAD_BEEF),
1996 inline_qos: None,
1997 key_flag: false,
1998 non_standard_flag: false,
1999 serialized_payload: Arc::<[u8]>::from([].as_slice()),
2000 };
2001 let (bytes, flags) = d.write_body(false);
2002 assert_eq!(flags & FLAG_E_LITTLE_ENDIAN, 0);
2003 let decoded = DataSubmessage::read_body(&bytes, false).unwrap();
2004 assert_eq!(decoded, d);
2005 }
2006
2007 #[test]
2008 fn data_submessage_key_flag_roundtrip() {
2009 let d = DataSubmessage {
2011 extra_flags: 0,
2012 reader_id: reader_id(),
2013 writer_id: writer_id(),
2014 writer_sn: SequenceNumber(7),
2015 inline_qos: None,
2016 key_flag: true,
2017 non_standard_flag: false,
2018 serialized_payload: Arc::<[u8]>::from([0xAA, 0xBB].as_slice()),
2019 };
2020 let (bytes, flags) = d.write_body(true);
2021 assert!(flags & DATA_FLAG_KEY != 0, "K-Flag must be set");
2022 let decoded = DataSubmessage::read_body_with_flags(&bytes, true, flags).unwrap();
2023 assert!(decoded.key_flag);
2024 assert!(!decoded.non_standard_flag);
2025 assert_eq!(decoded, d);
2026 }
2027
2028 #[test]
2029 fn data_submessage_non_standard_flag_roundtrip() {
2030 let d = DataSubmessage {
2032 extra_flags: 0,
2033 reader_id: reader_id(),
2034 writer_id: writer_id(),
2035 writer_sn: SequenceNumber(8),
2036 inline_qos: None,
2037 key_flag: false,
2038 non_standard_flag: true,
2039 serialized_payload: Arc::<[u8]>::from([0xCC, 0xDD].as_slice()),
2040 };
2041 let (bytes, flags) = d.write_body(true);
2042 assert!(flags & DATA_FLAG_NON_STANDARD != 0, "N-Flag must be set");
2043 let decoded = DataSubmessage::read_body_with_flags(&bytes, true, flags).unwrap();
2044 assert!(!decoded.key_flag);
2045 assert!(decoded.non_standard_flag);
2046 assert_eq!(decoded, d);
2047 }
2048
2049 #[test]
2050 fn data_submessage_all_flags_combined_roundtrip() {
2051 let mut pl = crate::parameter_list::ParameterList::new();
2053 pl.push(crate::parameter_list::Parameter::new(0x0070, vec![1; 4]));
2054 let d = DataSubmessage {
2055 extra_flags: 0xABCD,
2056 reader_id: reader_id(),
2057 writer_id: writer_id(),
2058 writer_sn: SequenceNumber(9),
2059 inline_qos: Some(pl),
2060 key_flag: true,
2061 non_standard_flag: true,
2062 serialized_payload: Arc::<[u8]>::from([0xEE; 8].as_slice()),
2063 };
2064 let (bytes, flags) = d.write_body(true);
2065 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2066 assert!(flags & DATA_FLAG_INLINE_QOS != 0);
2067 assert!(flags & DATA_FLAG_DATA != 0);
2068 assert!(flags & DATA_FLAG_KEY != 0);
2069 assert!(flags & DATA_FLAG_NON_STANDARD != 0);
2070 let decoded = DataSubmessage::read_body_with_flags(&bytes, true, flags).unwrap();
2071 assert_eq!(decoded, d);
2072 }
2073
2074 #[test]
2075 fn data_submessage_octets_to_inline_qos_is_16() {
2076 let d = DataSubmessage {
2077 extra_flags: 0,
2078 reader_id: reader_id(),
2079 writer_id: writer_id(),
2080 writer_sn: SequenceNumber(1),
2081 inline_qos: None,
2082 key_flag: false,
2083 non_standard_flag: false,
2084 serialized_payload: Arc::<[u8]>::from([].as_slice()),
2085 };
2086 let (bytes, _) = d.write_body(true);
2087 assert_eq!(&bytes[2..4], &[16, 0]);
2089 }
2090
2091 #[test]
2092 fn data_submessage_decode_rejects_truncated() {
2093 let res = DataSubmessage::read_body(&[1, 2, 3], true);
2094 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2095 }
2096
2097 #[test]
2100 fn heartbeat_submessage_roundtrip_le() {
2101 let h = HeartbeatSubmessage {
2102 reader_id: reader_id(),
2103 writer_id: writer_id(),
2104 first_sn: SequenceNumber(1),
2105 last_sn: SequenceNumber(10),
2106 count: 7,
2107 final_flag: true,
2108 liveliness_flag: false,
2109 group_info: None,
2110 };
2111 let (bytes, flags) = h.write_body(true);
2112 assert!(flags & HEARTBEAT_FLAG_FINAL != 0);
2113 assert_eq!(flags & HEARTBEAT_FLAG_LIVELINESS, 0);
2114 assert_eq!(bytes.len(), HeartbeatSubmessage::WIRE_SIZE);
2115 let decoded = HeartbeatSubmessage::read_body(&bytes, true, true, false, false).unwrap();
2116 assert_eq!(decoded, h);
2117 }
2118
2119 #[test]
2120 fn heartbeat_submessage_no_final_flag_when_disabled() {
2121 let h = HeartbeatSubmessage {
2122 reader_id: reader_id(),
2123 writer_id: writer_id(),
2124 first_sn: SequenceNumber(1),
2125 last_sn: SequenceNumber(1),
2126 count: 0,
2127 final_flag: false,
2128 liveliness_flag: false,
2129 group_info: None,
2130 };
2131 let (_, flags) = h.write_body(true);
2132 assert_eq!(flags & HEARTBEAT_FLAG_FINAL, 0);
2133 }
2134
2135 #[test]
2136 fn heartbeat_submessage_liveliness_flag_roundtrip() {
2137 let h = HeartbeatSubmessage {
2138 reader_id: reader_id(),
2139 writer_id: writer_id(),
2140 first_sn: SequenceNumber(1),
2141 last_sn: SequenceNumber(1),
2142 count: 0,
2143 final_flag: false,
2144 liveliness_flag: true,
2145 group_info: None,
2146 };
2147 let (bytes, flags) = h.write_body(true);
2148 assert!(flags & HEARTBEAT_FLAG_LIVELINESS != 0);
2149 let decoded = HeartbeatSubmessage::read_body(&bytes, true, false, true, false).unwrap();
2150 assert_eq!(decoded, h);
2151 assert!(decoded.liveliness_flag);
2152 }
2153
2154 #[test]
2155 fn heartbeat_decode_rejects_truncated() {
2156 let res = HeartbeatSubmessage::read_body(&[0u8; 27], true, false, false, false);
2157 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2158 }
2159
2160 #[test]
2163 fn heartbeat_with_empty_group_info_roundtrip_le() {
2164 let h = HeartbeatSubmessage {
2165 reader_id: reader_id(),
2166 writer_id: writer_id(),
2167 first_sn: SequenceNumber(1),
2168 last_sn: SequenceNumber(5),
2169 count: 3,
2170 final_flag: false,
2171 liveliness_flag: false,
2172 group_info: Some(HeartbeatGroupInfo {
2173 current_gsn: SequenceNumber(100),
2174 first_gsn: SequenceNumber(50),
2175 last_gsn: SequenceNumber(99),
2176 writer_set: vec![],
2177 }),
2178 };
2179 let (bytes, flags) = h.write_body(true);
2180 assert!(flags & HEARTBEAT_FLAG_GROUP_INFO != 0);
2181 let decoded = HeartbeatSubmessage::read_body(&bytes, true, false, false, true).unwrap();
2182 assert_eq!(decoded, h);
2183 }
2184
2185 #[test]
2186 fn heartbeat_with_writer_set_roundtrip_be() {
2187 use crate::wire_types::GuidPrefix;
2188 let h = HeartbeatSubmessage {
2189 reader_id: reader_id(),
2190 writer_id: writer_id(),
2191 first_sn: SequenceNumber(1),
2192 last_sn: SequenceNumber(2),
2193 count: 1,
2194 final_flag: false,
2195 liveliness_flag: false,
2196 group_info: Some(HeartbeatGroupInfo {
2197 current_gsn: SequenceNumber(7),
2198 first_gsn: SequenceNumber(1),
2199 last_gsn: SequenceNumber(7),
2200 writer_set: vec![
2201 GuidPrefix::from_bytes([1; 12]),
2202 GuidPrefix::from_bytes([2; 12]),
2203 GuidPrefix::from_bytes([3; 12]),
2204 ],
2205 }),
2206 };
2207 let (bytes, flags) = h.write_body(false);
2208 assert!(flags & HEARTBEAT_FLAG_GROUP_INFO != 0);
2209 let decoded = HeartbeatSubmessage::read_body(&bytes, false, false, false, true).unwrap();
2210 assert_eq!(decoded, h);
2211 let gi = decoded.group_info.unwrap();
2212 assert_eq!(gi.writer_set.len(), 3);
2213 }
2214
2215 #[test]
2216 fn heartbeat_decode_rejects_oversized_writer_set_length() {
2217 let mut body = Vec::new();
2219 body.extend_from_slice(&reader_id().to_bytes());
2220 body.extend_from_slice(&writer_id().to_bytes());
2221 body.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2222 body.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2223 body.extend_from_slice(&1i32.to_le_bytes());
2224 body.extend_from_slice(&SequenceNumber(0).to_bytes_le());
2226 body.extend_from_slice(&SequenceNumber(0).to_bytes_le());
2227 body.extend_from_slice(&SequenceNumber(0).to_bytes_le());
2228 body.extend_from_slice(&u32::MAX.to_le_bytes());
2230 let res = HeartbeatSubmessage::read_body(&body, true, false, false, true);
2232 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2233 }
2234
2235 #[test]
2236 fn heartbeat_decode_rejects_truncated_group_info() {
2237 let mut body = Vec::new();
2239 body.extend_from_slice(&reader_id().to_bytes());
2240 body.extend_from_slice(&writer_id().to_bytes());
2241 body.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2242 body.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2243 body.extend_from_slice(&1i32.to_le_bytes());
2244 let res = HeartbeatSubmessage::read_body(&body, true, false, false, true);
2246 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2247 }
2248
2249 #[test]
2252 fn acknack_submessage_roundtrip_le() {
2253 let a = AckNackSubmessage {
2254 reader_id: reader_id(),
2255 writer_id: writer_id(),
2256 reader_sn_state: SequenceNumberSet {
2257 bitmap_base: SequenceNumber(5),
2258 num_bits: 3,
2259 bitmap: vec![0b1010_0000_0000_0000_0000_0000_0000_0000],
2260 },
2261 count: 1,
2262 final_flag: false,
2263 };
2264 let (bytes, flags) = a.write_body(true);
2265 assert_eq!(flags & ACKNACK_FLAG_FINAL, 0);
2266 let decoded = AckNackSubmessage::read_body(&bytes, true, false).unwrap();
2267 assert_eq!(decoded, a);
2268 }
2269
2270 #[test]
2271 fn acknack_submessage_with_final_flag() {
2272 let a = AckNackSubmessage {
2273 reader_id: reader_id(),
2274 writer_id: writer_id(),
2275 reader_sn_state: SequenceNumberSet {
2276 bitmap_base: SequenceNumber(1),
2277 num_bits: 0,
2278 bitmap: vec![],
2279 },
2280 count: 0,
2281 final_flag: true,
2282 };
2283 let (bytes, flags) = a.write_body(true);
2284 assert!(flags & ACKNACK_FLAG_FINAL != 0);
2285 let decoded = AckNackSubmessage::read_body(&bytes, true, true).unwrap();
2286 assert!(decoded.final_flag);
2287 }
2288
2289 #[test]
2292 fn gap_submessage_roundtrip_le() {
2293 let g = GapSubmessage {
2294 reader_id: reader_id(),
2295 writer_id: writer_id(),
2296 gap_start: SequenceNumber(1),
2297 gap_list: SequenceNumberSet {
2298 bitmap_base: SequenceNumber(5),
2299 num_bits: 8,
2300 bitmap: vec![0xFF000000],
2301 },
2302 group_info: None,
2303 filtered_count: None,
2304 };
2305 let (bytes, flags) = g.write_body(true);
2306 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2307 assert_eq!(flags & GAP_FLAG_GROUP_INFO, 0);
2308 assert_eq!(flags & GAP_FLAG_FILTERED_COUNT, 0);
2309 let decoded = GapSubmessage::read_body(&bytes, true, false, false).unwrap();
2310 assert_eq!(decoded, g);
2311 }
2312
2313 #[test]
2314 fn gap_decode_rejects_truncated_header() {
2315 let res = GapSubmessage::read_body(&[0u8; 10], true, false, false);
2316 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2317 }
2318
2319 #[test]
2322 fn gap_with_filtered_count_roundtrip_le() {
2323 let g = GapSubmessage {
2324 reader_id: reader_id(),
2325 writer_id: writer_id(),
2326 gap_start: SequenceNumber(1),
2327 gap_list: SequenceNumberSet {
2328 bitmap_base: SequenceNumber(2),
2329 num_bits: 0,
2330 bitmap: vec![],
2331 },
2332 group_info: None,
2333 filtered_count: Some(3),
2334 };
2335 let (bytes, flags) = g.write_body(true);
2336 assert!(flags & GAP_FLAG_FILTERED_COUNT != 0);
2337 let decoded = GapSubmessage::read_body(&bytes, true, false, true).unwrap();
2338 assert_eq!(decoded, g);
2339 assert_eq!(decoded.filtered_count, Some(3));
2340 }
2341
2342 #[test]
2343 fn gap_with_group_info_roundtrip_be() {
2344 let g = GapSubmessage {
2345 reader_id: reader_id(),
2346 writer_id: writer_id(),
2347 gap_start: SequenceNumber(10),
2348 gap_list: SequenceNumberSet {
2349 bitmap_base: SequenceNumber(11),
2350 num_bits: 0,
2351 bitmap: vec![],
2352 },
2353 group_info: Some(GapGroupInfo {
2354 gap_start_gsn: SequenceNumber(100),
2355 gap_end_gsn: SequenceNumber(110),
2356 }),
2357 filtered_count: None,
2358 };
2359 let (bytes, flags) = g.write_body(false);
2360 assert!(flags & GAP_FLAG_GROUP_INFO != 0);
2361 let decoded = GapSubmessage::read_body(&bytes, false, true, false).unwrap();
2362 assert_eq!(decoded, g);
2363 }
2364
2365 #[test]
2366 fn gap_with_group_info_and_filtered_count_combined() {
2367 let g = GapSubmessage {
2368 reader_id: reader_id(),
2369 writer_id: writer_id(),
2370 gap_start: SequenceNumber(5),
2371 gap_list: SequenceNumberSet {
2372 bitmap_base: SequenceNumber(6),
2373 num_bits: 0,
2374 bitmap: vec![],
2375 },
2376 group_info: Some(GapGroupInfo {
2377 gap_start_gsn: SequenceNumber(50),
2378 gap_end_gsn: SequenceNumber(55),
2379 }),
2380 filtered_count: Some(7),
2381 };
2382 let (bytes, flags) = g.write_body(true);
2383 assert!(flags & GAP_FLAG_GROUP_INFO != 0);
2384 assert!(flags & GAP_FLAG_FILTERED_COUNT != 0);
2385 let decoded = GapSubmessage::read_body(&bytes, true, true, true).unwrap();
2386 assert_eq!(decoded, g);
2387 }
2388
2389 #[test]
2390 fn gap_filtered_count_zero_is_distinct_from_none() {
2391 let zero = GapSubmessage {
2395 reader_id: reader_id(),
2396 writer_id: writer_id(),
2397 gap_start: SequenceNumber(1),
2398 gap_list: SequenceNumberSet {
2399 bitmap_base: SequenceNumber(2),
2400 num_bits: 0,
2401 bitmap: vec![],
2402 },
2403 group_info: None,
2404 filtered_count: Some(0),
2405 };
2406 let (bytes, flags) = zero.write_body(true);
2407 assert!(flags & GAP_FLAG_FILTERED_COUNT != 0);
2408 let decoded = GapSubmessage::read_body(&bytes, true, false, true).unwrap();
2409 assert_eq!(decoded.filtered_count, Some(0));
2410 }
2411
2412 #[test]
2413 fn gap_decode_rejects_truncated_filtered_count() {
2414 let g = GapSubmessage {
2416 reader_id: reader_id(),
2417 writer_id: writer_id(),
2418 gap_start: SequenceNumber(1),
2419 gap_list: SequenceNumberSet {
2420 bitmap_base: SequenceNumber(2),
2421 num_bits: 0,
2422 bitmap: vec![],
2423 },
2424 group_info: None,
2425 filtered_count: None,
2426 };
2427 let (bytes, _) = g.write_body(true);
2428 let res = GapSubmessage::read_body(&bytes, true, false, true);
2430 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2431 }
2432
2433 #[test]
2434 fn gap_decode_rejects_truncated_group_info() {
2435 let g = GapSubmessage {
2436 reader_id: reader_id(),
2437 writer_id: writer_id(),
2438 gap_start: SequenceNumber(1),
2439 gap_list: SequenceNumberSet {
2440 bitmap_base: SequenceNumber(2),
2441 num_bits: 0,
2442 bitmap: vec![],
2443 },
2444 group_info: None,
2445 filtered_count: None,
2446 };
2447 let (bytes, _) = g.write_body(true);
2448 let res = GapSubmessage::read_body(&bytes, true, true, false);
2449 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2450 }
2451
2452 #[test]
2455 fn fnset_wire_size_formula() {
2456 assert_eq!(FragmentNumberSet::wire_size(0), 8);
2457 assert_eq!(FragmentNumberSet::wire_size(1), 12);
2458 assert_eq!(FragmentNumberSet::wire_size(32), 12);
2459 assert_eq!(FragmentNumberSet::wire_size(33), 16);
2460 }
2461
2462 #[test]
2463 fn fnset_from_missing_single() {
2464 let s = FragmentNumberSet::from_missing(
2465 FragmentNumber(1),
2466 &[FragmentNumber(1), FragmentNumber(3)],
2467 );
2468 assert_eq!(s.bitmap_base, FragmentNumber(1));
2469 assert_eq!(s.num_bits, 3);
2470 let set: Vec<_> = s.iter_set().collect();
2471 assert_eq!(set, vec![FragmentNumber(1), FragmentNumber(3)]);
2472 }
2473
2474 #[test]
2475 fn fnset_from_missing_empty() {
2476 let s = FragmentNumberSet::from_missing(FragmentNumber(5), &[]);
2477 assert_eq!(s.num_bits, 0);
2478 assert!(s.iter_set().next().is_none());
2479 }
2480
2481 #[test]
2482 fn fnset_missing_below_base_is_ignored() {
2483 let s = FragmentNumberSet::from_missing(
2484 FragmentNumber(10),
2485 &[FragmentNumber(5), FragmentNumber(11)],
2486 );
2487 assert_eq!(s.bitmap_base, FragmentNumber(10));
2488 let set: Vec<_> = s.iter_set().collect();
2489 assert_eq!(set, vec![FragmentNumber(11)]);
2490 }
2491
2492 #[test]
2493 fn fnset_roundtrip_le() {
2494 let s = FragmentNumberSet {
2495 bitmap_base: FragmentNumber(100),
2496 num_bits: 35,
2497 bitmap: vec![0xDEAD_BEEF, 0xC000_0000],
2498 };
2499 let mut buf = Vec::new();
2500 s.write_to(&mut buf, true);
2501 assert_eq!(buf.len(), s.encoded_size());
2502 let (decoded, end) = FragmentNumberSet::read_from(&buf, 0, true).unwrap();
2503 assert_eq!(decoded, s);
2504 assert_eq!(end, buf.len());
2505 }
2506
2507 #[test]
2508 fn fnset_roundtrip_be() {
2509 let s = FragmentNumberSet {
2510 bitmap_base: FragmentNumber(1),
2511 num_bits: 8,
2512 bitmap: vec![0xFF00_0000],
2513 };
2514 let mut buf = Vec::new();
2515 s.write_to(&mut buf, false);
2516 let (decoded, _) = FragmentNumberSet::read_from(&buf, 0, false).unwrap();
2517 assert_eq!(decoded, s);
2518 }
2519
2520 #[test]
2521 fn fnset_decode_rejects_truncated() {
2522 let buf = [0u8; 4];
2523 let res = FragmentNumberSet::read_from(&buf, 0, true);
2524 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2525 }
2526
2527 fn dataflag_frag(
2530 writer_sn: i64,
2531 starting: u32,
2532 count: u16,
2533 frag_size: u16,
2534 sample_size: u32,
2535 payload: Vec<u8>,
2536 ) -> DataFragSubmessage {
2537 DataFragSubmessage {
2538 extra_flags: 0,
2539 reader_id: reader_id(),
2540 writer_id: writer_id(),
2541 writer_sn: SequenceNumber(writer_sn),
2542 fragment_starting_num: FragmentNumber(starting),
2543 fragments_in_submessage: count,
2544 fragment_size: frag_size,
2545 sample_size,
2546 serialized_payload: Arc::from(payload),
2547 inline_qos_flag: false,
2548 hash_key_flag: false,
2549 key_flag: false,
2550 non_standard_flag: false,
2551 }
2552 }
2553
2554 #[test]
2555 fn data_frag_roundtrip_le() {
2556 let d = dataflag_frag(1, 1, 1, 4, 12, vec![0xDE, 0xAD, 0xBE, 0xEF]);
2557 let (bytes, flags) = d.write_body(true);
2558 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2559 assert_eq!(bytes.len(), DataFragSubmessage::HEADER_WIRE_SIZE + 4);
2560 let decoded =
2561 DataFragSubmessage::read_body(&bytes, true, false, false, false, false).unwrap();
2562 assert_eq!(decoded, d);
2563 }
2564
2565 #[test]
2566 fn data_frag_roundtrip_be() {
2567 let d = dataflag_frag(7, 2, 1, 8, 16, vec![1, 2, 3, 4, 5, 6, 7, 8]);
2568 let (bytes, flags) = d.write_body(false);
2569 assert_eq!(flags & FLAG_E_LITTLE_ENDIAN, 0);
2570 let decoded =
2571 DataFragSubmessage::read_body(&bytes, false, false, false, false, false).unwrap();
2572 assert_eq!(decoded, d);
2573 }
2574
2575 #[test]
2576 fn data_frag_last_fragment_shorter_than_fragment_size() {
2577 let d = dataflag_frag(1, 3, 1, 4, 10, vec![0xAA, 0xBB]);
2579 let (bytes, _) = d.write_body(true);
2580 let decoded =
2581 DataFragSubmessage::read_body(&bytes, true, false, false, false, false).unwrap();
2582 assert_eq!(decoded.serialized_payload.as_ref(), &[0xAA, 0xBB][..]);
2583 assert_eq!(decoded.sample_size, 10);
2584 assert_eq!(decoded.fragment_size, 4);
2585 }
2586
2587 #[test]
2588 fn data_frag_decode_rejects_truncated() {
2589 let res = DataFragSubmessage::read_body(&[0u8; 20], true, false, false, false, false);
2590 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2591 }
2592
2593 #[test]
2594 fn data_frag_decode_accepts_nonzero_extra_flags_silently() {
2595 let d = dataflag_frag(1, 1, 1, 4, 4, vec![1, 2, 3, 4]);
2598 let (mut bytes, _) = d.write_body(true);
2599 bytes[0..2].copy_from_slice(&0x0042u16.to_le_bytes()); let decoded =
2601 DataFragSubmessage::read_body(&bytes, true, false, false, false, false).unwrap();
2602 assert_eq!(decoded.extra_flags, 0x0042);
2603 }
2604
2605 #[test]
2606 fn seqnumset_rejects_num_bits_above_256() {
2607 let mut buf = Vec::new();
2609 buf.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2610 buf.extend_from_slice(&257u32.to_le_bytes()); let res = SequenceNumberSet::read_from(&buf, 0, true);
2612 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2613 }
2614
2615 #[test]
2616 fn seqnumset_accepts_exactly_256_bits() {
2617 let mut buf = Vec::new();
2618 buf.extend_from_slice(&SequenceNumber(1).to_bytes_le());
2619 buf.extend_from_slice(&256u32.to_le_bytes());
2620 buf.extend_from_slice(&[0u8; 32]);
2622 let res = SequenceNumberSet::read_from(&buf, 0, true);
2623 assert!(res.is_ok());
2624 }
2625
2626 #[test]
2627 fn fnset_rejects_num_bits_above_256() {
2628 let mut buf = Vec::new();
2629 buf.extend_from_slice(&FragmentNumber(1).to_bytes_le());
2630 buf.extend_from_slice(&1000u32.to_le_bytes()); let res = FragmentNumberSet::read_from(&buf, 0, true);
2632 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2633 }
2634
2635 #[test]
2636 fn fnset_dos_giant_num_bits_rejected_before_alloc() {
2637 let mut buf = Vec::new();
2640 buf.extend_from_slice(&FragmentNumber(1).to_bytes_le());
2641 buf.extend_from_slice(&u32::MAX.to_le_bytes());
2642 let res = FragmentNumberSet::read_from(&buf, 0, true);
2643 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2644 }
2645
2646 #[test]
2647 fn data_frag_decode_rejects_wrong_octets_to_inline_qos_when_q_false() {
2648 let d = dataflag_frag(1, 1, 1, 4, 4, vec![1, 2, 3, 4]);
2651 let (mut bytes, _) = d.write_body(true);
2652 bytes[2..4].copy_from_slice(&99u16.to_le_bytes());
2654 let res = DataFragSubmessage::read_body(&bytes, true, false, false, false, false);
2655 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2656 }
2657
2658 #[test]
2659 fn data_frag_decode_rejects_inline_qos() {
2660 let d = dataflag_frag(1, 1, 1, 4, 4, vec![1, 2, 3, 4]);
2662 let (bytes, _) = d.write_body(true);
2663 let res = DataFragSubmessage::read_body(&bytes, true, true, false, false, false);
2664 assert!(matches!(res, Err(WireError::UnsupportedFeature { .. })));
2665 }
2666
2667 #[test]
2668 fn data_frag_flags_survive_roundtrip() {
2669 let mut d = dataflag_frag(1, 1, 1, 4, 4, vec![1, 2, 3, 4]);
2670 d.hash_key_flag = true;
2671 d.key_flag = true;
2672 d.non_standard_flag = true;
2673 let (bytes, flags) = d.write_body(true);
2674 assert!(flags & DATA_FRAG_FLAG_HASH_KEY != 0);
2675 assert!(flags & DATA_FRAG_FLAG_KEY != 0);
2676 assert!(flags & DATA_FRAG_FLAG_NON_STANDARD != 0);
2677 let decoded = DataFragSubmessage::read_body(&bytes, true, false, true, true, true).unwrap();
2678 assert!(decoded.hash_key_flag);
2679 assert!(decoded.key_flag);
2680 assert!(decoded.non_standard_flag);
2681 }
2682
2683 #[test]
2686 fn heartbeat_frag_roundtrip_le() {
2687 let h = HeartbeatFragSubmessage {
2688 reader_id: reader_id(),
2689 writer_id: writer_id(),
2690 writer_sn: SequenceNumber(42),
2691 last_fragment_num: FragmentNumber(8),
2692 count: 3,
2693 };
2694 let (bytes, flags) = h.write_body(true);
2695 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2696 assert_eq!(bytes.len(), HeartbeatFragSubmessage::WIRE_SIZE);
2697 let decoded = HeartbeatFragSubmessage::read_body(&bytes, true).unwrap();
2698 assert_eq!(decoded, h);
2699 }
2700
2701 #[test]
2702 fn heartbeat_frag_roundtrip_be() {
2703 let h = HeartbeatFragSubmessage {
2704 reader_id: reader_id(),
2705 writer_id: writer_id(),
2706 writer_sn: SequenceNumber(1),
2707 last_fragment_num: FragmentNumber(1),
2708 count: 1,
2709 };
2710 let (bytes, _) = h.write_body(false);
2711 let decoded = HeartbeatFragSubmessage::read_body(&bytes, false).unwrap();
2712 assert_eq!(decoded, h);
2713 }
2714
2715 #[test]
2716 fn heartbeat_frag_decode_rejects_truncated() {
2717 let res = HeartbeatFragSubmessage::read_body(&[0u8; 20], true);
2718 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2719 }
2720
2721 #[test]
2724 fn nack_frag_roundtrip_le() {
2725 let n = NackFragSubmessage {
2726 reader_id: reader_id(),
2727 writer_id: writer_id(),
2728 writer_sn: SequenceNumber(5),
2729 fragment_number_state: FragmentNumberSet {
2730 bitmap_base: FragmentNumber(1),
2731 num_bits: 4,
2732 bitmap: vec![0b1010_0000_0000_0000_0000_0000_0000_0000],
2733 },
2734 count: 2,
2735 };
2736 let (bytes, flags) = n.write_body(true);
2737 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2738 let decoded = NackFragSubmessage::read_body(&bytes, true).unwrap();
2739 assert_eq!(decoded, n);
2740 }
2741
2742 #[test]
2743 fn nack_frag_roundtrip_be() {
2744 let n = NackFragSubmessage {
2745 reader_id: reader_id(),
2746 writer_id: writer_id(),
2747 writer_sn: SequenceNumber(100),
2748 fragment_number_state: FragmentNumberSet {
2749 bitmap_base: FragmentNumber(10),
2750 num_bits: 0,
2751 bitmap: vec![],
2752 },
2753 count: 0,
2754 };
2755 let (bytes, _) = n.write_body(false);
2756 let decoded = NackFragSubmessage::read_body(&bytes, false).unwrap();
2757 assert_eq!(decoded, n);
2758 }
2759
2760 #[test]
2761 fn nack_frag_decode_rejects_truncated() {
2762 let res = NackFragSubmessage::read_body(&[0u8; 20], true);
2763 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2764 }
2765
2766 fn make_info_source() -> InfoSourceSubmessage {
2769 InfoSourceSubmessage {
2770 unused: 0,
2771 protocol_version: crate::wire_types::ProtocolVersion::V2_5,
2772 vendor_id: crate::wire_types::VendorId([0xAB, 0xCD]),
2773 guid_prefix: crate::wire_types::GuidPrefix::from_bytes([0xEE; 12]),
2774 }
2775 }
2776
2777 #[test]
2778 fn info_source_roundtrip_le() {
2779 let i = make_info_source();
2780 let (bytes, flags) = i.write_body(true);
2781 assert!(flags & FLAG_E_LITTLE_ENDIAN != 0);
2782 assert_eq!(bytes.len(), InfoSourceSubmessage::WIRE_SIZE);
2783 let decoded = InfoSourceSubmessage::read_body(&bytes, true).unwrap();
2784 assert_eq!(decoded, i);
2785 }
2786
2787 #[test]
2788 fn info_source_roundtrip_be() {
2789 let i = make_info_source();
2790 let (bytes, flags) = i.write_body(false);
2791 assert_eq!(flags & FLAG_E_LITTLE_ENDIAN, 0);
2792 let decoded = InfoSourceSubmessage::read_body(&bytes, false).unwrap();
2793 assert_eq!(decoded, i);
2794 }
2795
2796 #[test]
2797 fn info_source_wire_size_is_20() {
2798 let i = make_info_source();
2799 let (bytes, _) = i.write_body(true);
2800 assert_eq!(bytes.len(), 20);
2801 }
2802
2803 #[test]
2804 fn info_source_decode_rejects_truncated() {
2805 let res = InfoSourceSubmessage::read_body(&[0u8; 19], true);
2806 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2807 }
2808
2809 #[test]
2810 fn info_source_unused_field_roundtrips() {
2811 let mut i = make_info_source();
2814 i.unused = 0xDEAD_BEEF;
2815 let (bytes, _) = i.write_body(true);
2816 let decoded = InfoSourceSubmessage::read_body(&bytes, true).unwrap();
2817 assert_eq!(decoded.unused, 0xDEAD_BEEF);
2818 }
2819
2820 #[test]
2823 fn info_reply_unicast_only_roundtrip_le() {
2824 use crate::wire_types::Locator;
2825 let i = InfoReplySubmessage {
2826 unicast_locators: vec![
2827 Locator::udp_v4([10, 0, 0, 1], 7411),
2828 Locator::udp_v4([10, 0, 0, 2], 7411),
2829 ],
2830 multicast_locators: None,
2831 };
2832 let (bytes, flags) = i.write_body(true);
2833 assert_eq!(flags & INFO_REPLY_FLAG_MULTICAST, 0);
2834 let decoded = InfoReplySubmessage::read_body(&bytes, true, false).unwrap();
2835 assert_eq!(decoded, i);
2836 }
2837
2838 #[test]
2839 fn info_reply_with_multicast_roundtrip_le() {
2840 use crate::wire_types::Locator;
2841 let i = InfoReplySubmessage {
2842 unicast_locators: vec![Locator::udp_v4([10, 0, 0, 1], 7411)],
2843 multicast_locators: Some(vec![Locator::udp_v4([239, 255, 0, 1], 7400)]),
2844 };
2845 let (bytes, flags) = i.write_body(true);
2846 assert!(flags & INFO_REPLY_FLAG_MULTICAST != 0);
2847 let decoded = InfoReplySubmessage::read_body(&bytes, true, true).unwrap();
2848 assert_eq!(decoded, i);
2849 }
2850
2851 #[test]
2852 fn info_reply_with_multicast_roundtrip_be() {
2853 use crate::wire_types::Locator;
2854 let i = InfoReplySubmessage {
2855 unicast_locators: vec![Locator::udp_v4([10, 0, 0, 5], 7420)],
2856 multicast_locators: Some(vec![Locator::udp_v4([239, 255, 0, 9], 7400)]),
2857 };
2858 let (bytes, _) = i.write_body(false);
2859 let decoded = InfoReplySubmessage::read_body(&bytes, false, true).unwrap();
2860 assert_eq!(decoded, i);
2861 }
2862
2863 #[test]
2864 fn info_reply_empty_unicast_list_is_valid() {
2865 let i = InfoReplySubmessage {
2868 unicast_locators: vec![],
2869 multicast_locators: None,
2870 };
2871 let (bytes, _) = i.write_body(true);
2872 let decoded = InfoReplySubmessage::read_body(&bytes, true, false).unwrap();
2873 assert_eq!(decoded, i);
2874 assert!(decoded.unicast_locators.is_empty());
2875 }
2876
2877 #[test]
2878 fn info_reply_decode_rejects_oversized_locator_list_length() {
2879 let mut body = Vec::new();
2881 body.extend_from_slice(&u32::MAX.to_le_bytes());
2882 let res = InfoReplySubmessage::read_body(&body, true, false);
2884 assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
2885 }
2886
2887 #[test]
2888 fn info_reply_decode_rejects_truncated_length_field() {
2889 let res = InfoReplySubmessage::read_body(&[0u8; 3], true, false);
2890 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2891 }
2892
2893 #[test]
2896 fn info_timestamp_roundtrip_le() {
2897 let i = InfoTimestampSubmessage {
2898 timestamp: crate::header_extension::HeTimestamp {
2899 seconds: 0x1234_5678,
2900 fraction: 0x9ABC_DEF0,
2901 },
2902 invalidate: false,
2903 };
2904 let (bytes, flags) = i.write_body(true);
2905 assert_eq!(flags & INFO_TIMESTAMP_FLAG_INVALIDATE, 0);
2906 assert_eq!(bytes.len(), 8);
2907 let decoded = InfoTimestampSubmessage::read_body(&bytes, true, false).unwrap();
2908 assert_eq!(decoded, i);
2909 }
2910
2911 #[test]
2912 fn info_timestamp_roundtrip_be() {
2913 let i = InfoTimestampSubmessage {
2914 timestamp: crate::header_extension::HeTimestamp {
2915 seconds: 1_700_000_000,
2916 fraction: 12345,
2917 },
2918 invalidate: false,
2919 };
2920 let (bytes, flags) = i.write_body(false);
2921 assert_eq!(flags & FLAG_E_LITTLE_ENDIAN, 0);
2922 let decoded = InfoTimestampSubmessage::read_body(&bytes, false, false).unwrap();
2923 assert_eq!(decoded, i);
2924 }
2925
2926 #[test]
2927 fn info_timestamp_invalidate_flag_yields_empty_body() {
2928 let i = InfoTimestampSubmessage {
2929 timestamp: crate::header_extension::HeTimestamp::default(),
2930 invalidate: true,
2931 };
2932 let (bytes, flags) = i.write_body(true);
2933 assert!(flags & INFO_TIMESTAMP_FLAG_INVALIDATE != 0);
2934 assert!(bytes.is_empty(), "I-Flag → leerer Body");
2935 let decoded = InfoTimestampSubmessage::read_body(&bytes, true, true).unwrap();
2936 assert!(decoded.invalidate);
2937 }
2938
2939 #[test]
2940 fn info_timestamp_decode_rejects_truncated_when_no_invalidate() {
2941 let res = InfoTimestampSubmessage::read_body(&[0u8; 4], true, false);
2942 assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
2943 }
2944
2945 #[test]
2946 fn info_timestamp_decode_with_invalidate_ignores_body() {
2947 let res = InfoTimestampSubmessage::read_body(&[0u8; 8], true, true).unwrap();
2950 assert!(res.invalidate);
2951 assert_eq!(
2952 res.timestamp,
2953 crate::header_extension::HeTimestamp::default()
2954 );
2955 }
2956}