1use core::time::Duration;
34
35extern crate alloc;
36use alloc::vec::Vec;
37
38use alloc::rc::Rc;
39
40use crate::error::WireError;
41use crate::header::RtpsHeader;
42use crate::history_cache::{CacheChange, HistoryCache, HistoryKind};
43use crate::message_builder::{AddError, MessageBuilder, OutboundDatagram};
44use crate::reader_proxy::ReaderProxy;
45use crate::submessage_header::SubmessageId;
46use crate::submessages::{
47 DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage, NackFragSubmessage,
48 SequenceNumberSet,
49};
50use crate::wire_types::{EntityId, FragmentNumber, Guid, Locator, SequenceNumber, VendorId};
51
52pub const DEFAULT_HEARTBEAT_PERIOD: Duration = Duration::from_millis(100);
65
66pub const DEFAULT_FRAGMENT_SIZE: u32 = 1344;
69
70#[derive(Debug, Clone)]
72pub struct ReliableWriter {
73 guid: Guid,
74 vendor_id: VendorId,
75 reader_proxies: Vec<ReaderProxy>,
76 cache: HistoryCache,
77 heartbeat_period: Duration,
78 last_heartbeat: Option<Duration>,
79 heartbeat_count: i32,
80 nackfrag_count: i32,
81 unknown_src_count: u64,
85 next_sn: i64,
86 fragment_size: u32,
87 mtu: usize,
88}
89
90#[derive(Debug, Clone)]
92pub struct ReliableWriterConfig {
93 pub guid: Guid,
95 pub vendor_id: VendorId,
97 pub reader_proxies: Vec<ReaderProxy>,
99 pub max_samples: usize,
103 pub history_kind: HistoryKind,
109 pub heartbeat_period: Duration,
111 pub fragment_size: u32,
113 pub mtu: usize,
115}
116
117impl ReliableWriter {
118 #[must_use]
124 pub fn new(cfg: ReliableWriterConfig) -> Self {
125 assert!(cfg.fragment_size > 0, "fragment_size must be > 0");
126 assert!(cfg.mtu >= 20, "mtu must accommodate RTPS header");
127 Self {
128 guid: cfg.guid,
129 vendor_id: cfg.vendor_id,
130 reader_proxies: cfg.reader_proxies,
131 cache: HistoryCache::new_with_kind(cfg.history_kind, cfg.max_samples),
132 heartbeat_period: cfg.heartbeat_period,
133 last_heartbeat: None,
134 heartbeat_count: 0,
135 nackfrag_count: 0,
136 unknown_src_count: 0,
137 next_sn: 0,
138 fragment_size: cfg.fragment_size,
139 mtu: cfg.mtu,
140 }
141 }
142
143 #[must_use]
145 pub fn guid(&self) -> Guid {
146 self.guid
147 }
148
149 #[must_use]
151 pub fn reader_proxies(&self) -> &[ReaderProxy] {
152 &self.reader_proxies
153 }
154
155 #[must_use]
157 pub fn reader_proxy_count(&self) -> usize {
158 self.reader_proxies.len()
159 }
160
161 pub fn remove_samples_up_to(&mut self, up_to_exclusive: SequenceNumber) -> usize {
166 self.cache.remove_up_to(up_to_exclusive)
167 }
168
169 #[must_use]
171 pub fn cache(&self) -> &HistoryCache {
172 &self.cache
173 }
174
175 #[must_use]
177 pub fn heartbeat_count(&self) -> i32 {
178 self.heartbeat_count
179 }
180
181 #[must_use]
183 pub fn nackfrag_count(&self) -> i32 {
184 self.nackfrag_count
185 }
186
187 #[must_use]
191 pub fn unknown_src_count(&self) -> u64 {
192 self.unknown_src_count
193 }
194
195 #[must_use]
197 pub fn fragment_size(&self) -> u32 {
198 self.fragment_size
199 }
200
201 #[must_use]
204 fn needs_fragmentation(&self, payload: &[u8]) -> bool {
205 u32::try_from(payload.len()).unwrap_or(u32::MAX) > self.fragment_size && !payload.is_empty()
206 }
207
208 #[must_use]
216 pub fn all_samples_acknowledged(&self) -> bool {
217 let Some(max_sn) = self.cache.max_sn() else {
218 return true;
219 };
220 self.reader_proxies
221 .iter()
222 .all(|p| p.highest_acked_sn() >= max_sn)
223 }
224
225 pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
237 let guid = proxy.remote_reader_guid;
238 if let Some(idx) = self
239 .reader_proxies
240 .iter()
241 .position(|p| p.remote_reader_guid == guid)
242 {
243 self.reader_proxies[idx] = proxy;
244 } else {
245 self.reader_proxies.push(proxy);
246 }
247 self.last_heartbeat = None;
250 }
251
252 pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
254 let idx = self
255 .reader_proxies
256 .iter()
257 .position(|p| p.remote_reader_guid == guid)?;
258 Some(self.reader_proxies.remove(idx))
259 }
260
261 pub fn write(&mut self, payload: &[u8]) -> Result<Vec<OutboundDatagram>, WireError> {
272 let sn_value = self
273 .next_sn
274 .checked_add(1)
275 .ok_or(WireError::ValueOutOfRange {
276 message: "sequence number overflow",
277 })?;
278 self.next_sn = sn_value;
279 let sn = SequenceNumber(sn_value);
280 let payload: alloc::sync::Arc<[u8]> = alloc::sync::Arc::from(payload);
286 self.cache
287 .insert(CacheChange::alive_arc(
288 sn,
289 alloc::sync::Arc::clone(&payload),
290 ))
291 .map_err(|_| WireError::ValueOutOfRange {
292 message: "history cache full or duplicate",
293 })?;
294
295 let mut out = Vec::new();
296 for idx in 0..self.reader_proxies.len() {
297 let advanced = self.reader_proxies[idx].next_unsent_change(sn);
312 if advanced != Some(sn) {
313 continue;
314 }
315 let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
316 let targets = self.targets_for(idx);
317 let proxy_payload = self.adapt_payload_for_proxy(idx, &payload);
325 out.extend(self.build_sample_datagrams(sn, &proxy_payload, reader_id, &targets)?);
326 }
327 Ok(out)
328 }
329
330 fn adapt_payload_for_proxy(
335 &self,
336 idx: usize,
337 payload: &alloc::sync::Arc<[u8]>,
338 ) -> alloc::sync::Arc<[u8]> {
339 let negotiated = self.reader_proxies[idx].negotiated_data_representation();
340 if negotiated == crate::publication_data::data_representation::XCDR2 || payload.len() < 4 {
343 return alloc::sync::Arc::clone(payload);
344 }
345 let target_byte = match negotiated {
346 crate::publication_data::data_representation::XCDR => 0x01_u8,
347 _ => return alloc::sync::Arc::clone(payload),
348 };
349 if payload[1] == target_byte {
350 return alloc::sync::Arc::clone(payload);
351 }
352 let mut adapted: alloc::vec::Vec<u8> = payload.to_vec();
353 adapted[1] = target_byte;
354 alloc::sync::Arc::from(adapted.into_boxed_slice())
355 }
356
357 pub fn write_with_heartbeat(
373 &mut self,
374 payload: &[u8],
375 now: Duration,
376 ) -> Result<Vec<OutboundDatagram>, WireError> {
377 let mut out = self.write(payload)?;
378 if self.cache.is_empty() {
379 return Ok(out);
380 }
381 let cache_min = self.cache.min_sn().unwrap_or(SequenceNumber(1));
385 for idx in 0..self.reader_proxies.len() {
386 let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
387 let targets = self.targets_for(idx);
388 let mut builder =
389 MessageBuilder::open(self.rtps_header(), Rc::clone(&targets), self.mtu);
390 self.append_heartbeat(
391 &mut builder,
392 reader_id,
393 cache_min,
394 false,
395 &mut out,
396 &targets,
397 )?;
398 if let Some(dg) = builder.finish() {
399 out.push(dg);
400 }
401 }
402 self.last_heartbeat = Some(now);
403 Ok(out)
404 }
405
406 pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
413 let should_heartbeat = match self.last_heartbeat {
414 None => true,
415 Some(last) => now.saturating_sub(last) >= self.heartbeat_period,
416 };
417 let emit_hb = should_heartbeat && !self.cache.is_empty();
418
419 let mut out = Vec::new();
420 let mut hb_emitted_any = false;
421
422 for idx in 0..self.reader_proxies.len() {
423 let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
424 let targets = self.targets_for(idx);
425
426 while let Some((sn, frag)) = self.reader_proxies[idx].next_requested_fragment() {
428 match self.cache.get(sn) {
429 Some(change) => {
430 let payload = change.payload.clone();
431 #[cfg(feature = "metrics")]
432 crate::metrics::inc_retransmit();
433 #[cfg(feature = "metrics")]
434 crate::metrics::inc_fragmented_sample();
435 out.push(
436 self.build_data_frag_datagram(sn, frag, &payload, reader_id, &targets)?,
437 );
438 }
439 None => {
440 out.push(self.build_gap_datagram(sn, reader_id, &targets)?);
441 }
442 }
443 }
444
445 let mut builder =
447 MessageBuilder::open(self.rtps_header(), Rc::clone(&targets), self.mtu);
448
449 while let Some(sn) = self.reader_proxies[idx].next_requested_change() {
450 #[cfg(feature = "metrics")]
451 crate::metrics::inc_retransmit();
452 match self.cache.get(sn) {
453 Some(change) => {
454 let payload = change.payload.clone();
455 if self.needs_fragmentation(&payload) {
457 if let Some(dg) = builder.finish() {
458 out.push(dg);
459 }
460 builder = MessageBuilder::open(
461 self.rtps_header(),
462 Rc::clone(&targets),
463 self.mtu,
464 );
465 out.extend(
466 self.build_sample_datagrams(sn, &payload, reader_id, &targets)?,
467 );
468 } else {
469 self.append_data(
470 &mut builder,
471 sn,
472 &payload,
473 reader_id,
474 &mut out,
475 &targets,
476 )?;
477 }
478 }
479 None => {
480 self.append_gap(&mut builder, sn, reader_id, &mut out, &targets)?;
481 }
482 }
483 }
484
485 if emit_hb {
503 let cache_min = self.cache.min_sn().unwrap_or(SequenceNumber(1));
504 let per_proxy_first = SequenceNumber(
505 self.reader_proxies[idx]
506 .highest_acked_sn()
507 .0
508 .saturating_add(1),
509 );
510 let first_sn = cache_min.max(per_proxy_first);
511 self.append_heartbeat(
512 &mut builder,
513 reader_id,
514 first_sn,
515 false,
516 &mut out,
517 &targets,
518 )?;
519 hb_emitted_any = true;
520 }
521
522 if let Some(dg) = builder.finish() {
523 out.push(dg);
524 }
525 }
526
527 if hb_emitted_any {
528 self.last_heartbeat = Some(now);
529 }
530
531 Ok(out)
532 }
533
534 pub fn handle_acknack(
539 &mut self,
540 src_guid: Guid,
541 base: SequenceNumber,
542 requested: impl IntoIterator<Item = SequenceNumber>,
543 ) {
544 #[cfg(feature = "metrics")]
545 crate::metrics::inc_acknack_received();
546 let Some(idx) = self
547 .reader_proxies
548 .iter()
549 .position(|p| p.remote_reader_guid == src_guid)
550 else {
551 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
552 return;
553 };
554 self.reader_proxies[idx].acked_changes_set(base);
555 self.reader_proxies[idx].requested_changes_set(requested);
556 }
561
562 pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
564 if nf.writer_id != self.guid.entity_id {
565 return;
566 }
567 let Some(idx) = self
568 .reader_proxies
569 .iter()
570 .position(|p| p.remote_reader_guid == src_guid)
571 else {
572 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
573 return;
574 };
575 self.nackfrag_count = self.nackfrag_count.wrapping_add(1);
576 let missing: Vec<FragmentNumber> = nf.fragment_number_state.iter_set().collect();
577 self.reader_proxies[idx].requested_fragments_set(nf.writer_sn, missing);
578 }
579
580 fn rtps_header(&self) -> RtpsHeader {
583 RtpsHeader::new(self.vendor_id, self.guid.prefix)
584 }
585
586 fn targets_for(&self, idx: usize) -> Rc<Vec<Locator>> {
589 let p = &self.reader_proxies[idx];
590 if !p.multicast_locators.is_empty() {
591 Rc::new(p.multicast_locators.clone())
592 } else {
593 Rc::new(p.unicast_locators.clone())
594 }
595 }
596
597 fn append_data(
598 &self,
599 builder: &mut MessageBuilder,
600 sn: SequenceNumber,
601 payload: &alloc::sync::Arc<[u8]>,
602 reader_id: EntityId,
603 out: &mut Vec<OutboundDatagram>,
604 targets: &Rc<Vec<Locator>>,
605 ) -> Result<(), WireError> {
606 let data = DataSubmessage {
607 extra_flags: 0,
608 reader_id,
609 writer_id: self.guid.entity_id,
610 writer_sn: sn,
611 inline_qos: None,
613 key_flag: false,
614 non_standard_flag: false,
615 serialized_payload: alloc::sync::Arc::clone(payload),
616 };
617 let (body, flags) = data.write_body(true);
618 self.append_submessage(
619 builder,
620 SubmessageId::Data,
621 flags,
622 &body,
623 out,
624 targets,
625 "DATA",
626 )
627 }
628
629 fn append_gap(
630 &self,
631 builder: &mut MessageBuilder,
632 sn: SequenceNumber,
633 reader_id: EntityId,
634 out: &mut Vec<OutboundDatagram>,
635 targets: &Rc<Vec<Locator>>,
636 ) -> Result<(), WireError> {
637 let gap = GapSubmessage {
638 reader_id,
639 writer_id: self.guid.entity_id,
640 gap_start: sn,
641 gap_list: SequenceNumberSet {
642 bitmap_base: SequenceNumber(sn.0 + 1),
643 num_bits: 0,
644 bitmap: Vec::new(),
645 },
646 group_info: None,
647 filtered_count: None,
648 };
649 let (body, flags) = gap.write_body(true);
650 self.append_submessage(
651 builder,
652 SubmessageId::Gap,
653 flags,
654 &body,
655 out,
656 targets,
657 "GAP",
658 )
659 }
660
661 fn append_heartbeat(
662 &mut self,
663 builder: &mut MessageBuilder,
664 reader_id: EntityId,
665 first_sn: SequenceNumber,
666 final_flag: bool,
667 out: &mut Vec<OutboundDatagram>,
668 targets: &Rc<Vec<Locator>>,
669 ) -> Result<(), WireError> {
670 #[cfg(feature = "metrics")]
671 crate::metrics::inc_heartbeat_sent();
672 self.heartbeat_count = self.heartbeat_count.wrapping_add(1);
673 let last = self.cache.max_sn().unwrap_or(SequenceNumber(0));
674 let hb = HeartbeatSubmessage {
675 reader_id,
676 writer_id: self.guid.entity_id,
677 first_sn,
678 last_sn: last,
679 count: self.heartbeat_count,
680 final_flag,
681 liveliness_flag: false,
682 group_info: None,
683 };
684 let (body, flags) = hb.write_body(true);
685 self.append_submessage(
686 builder,
687 SubmessageId::Heartbeat,
688 flags,
689 &body,
690 out,
691 targets,
692 "HEARTBEAT",
693 )
694 }
695
696 #[allow(clippy::too_many_arguments)]
698 fn append_submessage(
699 &self,
700 builder: &mut MessageBuilder,
701 id: SubmessageId,
702 flags: u8,
703 body: &[u8],
704 out: &mut Vec<OutboundDatagram>,
705 targets: &Rc<Vec<Locator>>,
706 kind_hint: &'static str,
707 ) -> Result<(), WireError> {
708 match builder.try_add_submessage(id, flags, body) {
709 Ok(()) => Ok(()),
710 Err(AddError::BodyTooLarge) => Err(WireError::ValueOutOfRange {
711 message: match kind_hint {
712 "DATA" => "DATA body exceeds u16::MAX",
713 "GAP" => "GAP body exceeds u16::MAX",
714 "HEARTBEAT" => "HEARTBEAT body exceeds u16::MAX",
715 _ => "submessage body exceeds u16::MAX",
716 },
717 }),
718 Err(AddError::WouldExceedMtu { .. }) => {
719 let finished = core::mem::replace(
720 builder,
721 MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu),
722 );
723 if let Some(dg) = finished.finish() {
724 out.push(dg);
725 }
726 builder.try_add_submessage(id, flags, body).map_err(|_| {
727 WireError::ValueOutOfRange {
728 message: "submessage does not fit into fresh datagram",
729 }
730 })
731 }
732 }
733 }
734
735 fn build_sample_datagrams(
738 &self,
739 sn: SequenceNumber,
740 payload: &alloc::sync::Arc<[u8]>,
741 reader_id: EntityId,
742 targets: &Rc<Vec<Locator>>,
743 ) -> Result<Vec<OutboundDatagram>, WireError> {
744 if !self.needs_fragmentation(payload) {
745 return Ok(alloc::vec![
746 self.build_single_data_datagram(sn, payload, reader_id, targets,)?
747 ]);
748 }
749 let frag_size = self.fragment_size as usize;
750 let sample_size = u32::try_from(payload.len()).map_err(|_| WireError::ValueOutOfRange {
751 message: "sample size exceeds u32::MAX",
752 })?;
753 let frag_size_u16 = u16::try_from(frag_size).map_err(|_| WireError::ValueOutOfRange {
754 message: "fragment_size exceeds u16::MAX",
755 })?;
756 let mut out = Vec::new();
757 let mut frag_num: u32 = 1;
758 let mut pos = 0usize;
759 while pos < payload.len() {
760 let end = core::cmp::min(pos + frag_size, payload.len());
761 out.push(self.build_data_frag_submessage_datagram(
762 sn,
763 FragmentNumber(frag_num),
764 frag_size_u16,
765 sample_size,
766 &payload[pos..end],
767 reader_id,
768 targets,
769 )?);
770 pos = end;
771 frag_num = frag_num.checked_add(1).ok_or(WireError::ValueOutOfRange {
772 message: "fragment number overflow",
773 })?;
774 }
775 Ok(out)
776 }
777
778 fn build_single_data_datagram(
779 &self,
780 sn: SequenceNumber,
781 payload: &alloc::sync::Arc<[u8]>,
782 reader_id: EntityId,
783 targets: &Rc<Vec<Locator>>,
784 ) -> Result<OutboundDatagram, WireError> {
785 let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
786 let data = DataSubmessage {
787 extra_flags: 0,
788 reader_id,
789 writer_id: self.guid.entity_id,
790 writer_sn: sn,
791 inline_qos: None,
793 key_flag: false,
794 non_standard_flag: false,
795 serialized_payload: alloc::sync::Arc::clone(payload),
796 };
797 let (body, flags) = data.write_body(true);
798 builder
799 .try_add_submessage(SubmessageId::Data, flags, &body)
800 .map_err(|_| WireError::ValueOutOfRange {
801 message: "DATA submessage does not fit into MTU",
802 })?;
803 builder.finish().ok_or(WireError::ValueOutOfRange {
804 message: "MessageBuilder finish returned no datagram",
805 })
806 }
807
808 fn build_lifecycle_datagram(
814 &self,
815 sn: SequenceNumber,
816 key_hash: [u8; 16],
817 status_bits: u32,
818 reader_id: EntityId,
819 targets: &Rc<Vec<Locator>>,
820 ) -> Result<OutboundDatagram, WireError> {
821 let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
822 let inline_qos = crate::inline_qos::lifecycle_inline_qos(key_hash, status_bits);
823 let data = DataSubmessage {
824 extra_flags: 0,
825 reader_id,
826 writer_id: self.guid.entity_id,
827 writer_sn: sn,
828 inline_qos: Some(inline_qos),
829 key_flag: true,
830 non_standard_flag: false,
831 serialized_payload: alloc::sync::Arc::from(alloc::vec::Vec::new()),
832 };
833 let (body, flags) = data.write_body(true);
834 builder
835 .try_add_submessage(SubmessageId::Data, flags, &body)
836 .map_err(|_| WireError::ValueOutOfRange {
837 message: "lifecycle DATA submessage does not fit into MTU",
838 })?;
839 builder.finish().ok_or(WireError::ValueOutOfRange {
840 message: "MessageBuilder finish returned no datagram",
841 })
842 }
843
844 pub fn write_lifecycle(
858 &mut self,
859 key_hash: [u8; 16],
860 status_bits: u32,
861 ) -> Result<Vec<OutboundDatagram>, WireError> {
862 let sn_value = self
863 .next_sn
864 .checked_add(1)
865 .ok_or(WireError::ValueOutOfRange {
866 message: "sequence number overflow",
867 })?;
868 self.next_sn = sn_value;
869 let sn = SequenceNumber(sn_value);
870
871 let kind = match (
872 status_bits & crate::inline_qos::status_info::DISPOSED != 0,
873 status_bits & crate::inline_qos::status_info::UNREGISTERED != 0,
874 ) {
875 (true, true) => crate::history_cache::ChangeKind::NotAliveDisposedUnregistered,
876 (true, false) => crate::history_cache::ChangeKind::NotAliveDisposed,
877 (false, true) => crate::history_cache::ChangeKind::NotAliveUnregistered,
878 (false, false) => {
879 return Err(WireError::ValueOutOfRange {
880 message: "lifecycle send requires DISPOSED or UNREGISTERED bit",
881 });
882 }
883 };
884
885 self.cache
888 .insert(crate::history_cache::CacheChange::lifecycle(
889 sn,
890 key_hash.to_vec(),
891 kind,
892 ))
893 .map_err(|_| WireError::ValueOutOfRange {
894 message: "history cache full or duplicate (lifecycle)",
895 })?;
896
897 let mut out = Vec::new();
898 for idx in 0..self.reader_proxies.len() {
899 let advanced = self.reader_proxies[idx].next_unsent_change(sn);
900 if advanced != Some(sn) {
901 continue;
902 }
903 let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
904 let targets = self.targets_for(idx);
905 out.push(self.build_lifecycle_datagram(
906 sn,
907 key_hash,
908 status_bits,
909 reader_id,
910 &targets,
911 )?);
912 }
913 Ok(out)
914 }
915
916 fn build_data_frag_datagram(
917 &self,
918 sn: SequenceNumber,
919 frag: FragmentNumber,
920 full_payload: &alloc::sync::Arc<[u8]>,
921 reader_id: EntityId,
922 targets: &Rc<Vec<Locator>>,
923 ) -> Result<OutboundDatagram, WireError> {
924 let frag_size = self.fragment_size as usize;
925 if frag.0 == 0 {
926 return Err(WireError::ValueOutOfRange {
927 message: "fragment number must be >= 1",
928 });
929 }
930 let start = (frag.0 as usize - 1) * frag_size;
931 if start >= full_payload.len() {
932 return Err(WireError::ValueOutOfRange {
933 message: "fragment number beyond sample",
934 });
935 }
936 let end = core::cmp::min(start + frag_size, full_payload.len());
937 let sample_size =
938 u32::try_from(full_payload.len()).map_err(|_| WireError::ValueOutOfRange {
939 message: "sample size exceeds u32::MAX",
940 })?;
941 let frag_size_u16 = u16::try_from(frag_size).map_err(|_| WireError::ValueOutOfRange {
942 message: "fragment_size exceeds u16::MAX",
943 })?;
944 self.build_data_frag_submessage_datagram(
945 sn,
946 frag,
947 frag_size_u16,
948 sample_size,
949 &full_payload[start..end],
950 reader_id,
951 targets,
952 )
953 }
954
955 #[allow(clippy::too_many_arguments)]
956 fn build_data_frag_submessage_datagram(
957 &self,
958 sn: SequenceNumber,
959 frag: FragmentNumber,
960 fragment_size: u16,
961 sample_size: u32,
962 chunk: &[u8],
963 reader_id: EntityId,
964 targets: &Rc<Vec<Locator>>,
965 ) -> Result<OutboundDatagram, WireError> {
966 let df = DataFragSubmessage {
967 extra_flags: 0,
968 reader_id,
969 writer_id: self.guid.entity_id,
970 writer_sn: sn,
971 fragment_starting_num: frag,
972 fragments_in_submessage: 1,
973 fragment_size,
974 sample_size,
975 serialized_payload: alloc::sync::Arc::from(chunk),
987 inline_qos_flag: false,
988 hash_key_flag: false,
989 key_flag: false,
990 non_standard_flag: false,
991 };
992 let (body, flags) = df.write_body(true);
993 let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
994 builder
995 .try_add_submessage(SubmessageId::DataFrag, flags, &body)
996 .map_err(|_| WireError::ValueOutOfRange {
997 message: "DATA_FRAG submessage does not fit into MTU",
998 })?;
999 builder.finish().ok_or(WireError::ValueOutOfRange {
1000 message: "MessageBuilder finish returned no datagram",
1001 })
1002 }
1003
1004 fn build_gap_datagram(
1005 &self,
1006 sn: SequenceNumber,
1007 reader_id: EntityId,
1008 targets: &Rc<Vec<Locator>>,
1009 ) -> Result<OutboundDatagram, WireError> {
1010 let gap = GapSubmessage {
1011 reader_id,
1012 writer_id: self.guid.entity_id,
1013 gap_start: sn,
1014 gap_list: SequenceNumberSet {
1015 bitmap_base: SequenceNumber(sn.0 + 1),
1016 num_bits: 0,
1017 bitmap: Vec::new(),
1018 },
1019 group_info: None,
1020 filtered_count: None,
1021 };
1022 let (body, flags) = gap.write_body(true);
1023 let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
1024 builder
1025 .try_add_submessage(SubmessageId::Gap, flags, &body)
1026 .map_err(|_| WireError::ValueOutOfRange {
1027 message: "GAP submessage does not fit into MTU",
1028 })?;
1029 builder.finish().ok_or(WireError::ValueOutOfRange {
1030 message: "MessageBuilder finish returned no datagram",
1031 })
1032 }
1033}
1034
1035#[cfg(test)]
1036#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1037mod tests {
1038 use super::*;
1039 use crate::datagram::{ParsedSubmessage, decode_datagram};
1040 use crate::message_builder::DEFAULT_MTU;
1041 use crate::wire_types::{GuidPrefix, Locator};
1042
1043 fn sn(n: i64) -> SequenceNumber {
1044 SequenceNumber(n)
1045 }
1046
1047 fn reader_guid() -> Guid {
1048 Guid::new(
1049 GuidPrefix::from_bytes([2; 12]),
1050 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
1051 )
1052 }
1053
1054 fn make_writer(max_samples: usize, hb_period: Duration) -> ReliableWriter {
1055 make_writer_with_frag_size(max_samples, hb_period, DEFAULT_FRAGMENT_SIZE)
1056 }
1057
1058 fn make_writer_with_frag_size(
1059 max_samples: usize,
1060 hb_period: Duration,
1061 fragment_size: u32,
1062 ) -> ReliableWriter {
1063 let writer_guid = Guid::new(
1064 GuidPrefix::from_bytes([1; 12]),
1065 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1066 );
1067 let reader_proxy = ReaderProxy::new(
1068 reader_guid(),
1069 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1070 alloc::vec![],
1071 true,
1072 );
1073 ReliableWriter::new(ReliableWriterConfig {
1074 guid: writer_guid,
1075 vendor_id: VendorId::ZERODDS,
1076 reader_proxies: alloc::vec![reader_proxy],
1077 max_samples,
1078 history_kind: HistoryKind::KeepAll,
1079 heartbeat_period: hb_period,
1080 fragment_size,
1081 mtu: DEFAULT_MTU,
1082 })
1083 }
1084
1085 fn first_proxy(w: &ReliableWriter) -> &ReaderProxy {
1086 w.reader_proxies().first().unwrap()
1087 }
1088
1089 #[test]
1090 fn write_increments_sn_and_returns_data_datagram() {
1091 let mut w = make_writer(10, Duration::from_secs(1));
1092 let d1 = w.write(&alloc::vec![0xAA]).expect("write1");
1093 let d2 = w.write(&alloc::vec![0xBB]).expect("write2");
1094 assert_eq!(d1.len(), 1);
1095 assert_eq!(d2.len(), 1);
1096 let p1 = decode_datagram(&d1[0].bytes).unwrap();
1097 let p2 = decode_datagram(&d2[0].bytes).unwrap();
1098 match (&p1.submessages[0], &p2.submessages[0]) {
1099 (ParsedSubmessage::Data(a), ParsedSubmessage::Data(b)) => {
1100 assert_eq!(a.writer_sn, sn(1));
1101 assert_eq!(b.writer_sn, sn(2));
1102 }
1103 _ => panic!("expected DATA submessages"),
1104 }
1105 assert_eq!(w.cache().len(), 2);
1106 }
1107
1108 #[test]
1109 fn tick_emits_heartbeat_after_period() {
1110 let mut w = make_writer(10, Duration::from_millis(500));
1111 w.write(&alloc::vec![0xAA]).unwrap();
1112 let out = w.tick(Duration::from_millis(10)).unwrap();
1113 assert_eq!(out.len(), 1);
1114 let parsed = decode_datagram(&out[0].bytes).expect("decode hb");
1115 assert!(
1116 parsed
1117 .submessages
1118 .iter()
1119 .any(|s| matches!(s, ParsedSubmessage::Heartbeat(_)))
1120 );
1121 assert!(w.tick(Duration::from_millis(200)).unwrap().is_empty());
1122 let out2 = w.tick(Duration::from_millis(600)).unwrap();
1123 assert_eq!(out2.len(), 1);
1124 }
1125
1126 #[test]
1127 fn tick_skips_heartbeat_when_cache_empty() {
1128 let mut w = make_writer(10, Duration::from_millis(100));
1129 assert!(w.tick(Duration::from_secs(10)).unwrap().is_empty());
1130 }
1131
1132 #[test]
1133 fn handle_acknack_updates_proxy_state() {
1134 let mut w = make_writer(10, Duration::from_secs(10));
1135 let rguid = reader_guid();
1136 for i in 1..=3 {
1137 w.write(&alloc::vec![i as u8]).unwrap();
1138 }
1139 w.handle_acknack(rguid, sn(4), [sn(2)]);
1140 assert_eq!(w.cache().len(), 3, "cache intact under KeepAll");
1144 assert_eq!(first_proxy(&w).highest_acked_sn(), sn(3));
1145 assert_eq!(first_proxy(&w).pending_requested_count(), 0);
1148 }
1149
1150 #[test]
1151 fn handle_acknack_with_lower_base_leaves_requested() {
1152 let mut w = make_writer(10, Duration::from_secs(10));
1153 let rguid = reader_guid();
1154 for i in 1..=3 {
1155 w.write(&alloc::vec![i as u8]).unwrap();
1156 }
1157 w.handle_acknack(rguid, sn(2), [sn(2), sn(3)]);
1158 assert_eq!(w.cache().len(), 3);
1160 assert_eq!(first_proxy(&w).highest_acked_sn(), sn(1));
1161 assert_eq!(first_proxy(&w).pending_requested_count(), 2);
1162 }
1163
1164 #[test]
1165 fn keep_last_evicts_oldest_on_overflow() {
1166 let writer_guid = Guid::new(
1167 GuidPrefix::from_bytes([1; 12]),
1168 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1169 );
1170 let reader_proxy = ReaderProxy::new(
1171 reader_guid(),
1172 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1173 alloc::vec![],
1174 true,
1175 );
1176 let mut w = ReliableWriter::new(ReliableWriterConfig {
1177 guid: writer_guid,
1178 vendor_id: VendorId::ZERODDS,
1179 reader_proxies: alloc::vec![reader_proxy],
1180 max_samples: 3,
1181 history_kind: HistoryKind::KeepLast { depth: 3 },
1182 heartbeat_period: Duration::from_secs(10),
1183 fragment_size: DEFAULT_FRAGMENT_SIZE,
1184 mtu: DEFAULT_MTU,
1185 });
1186 for i in 1..=5 {
1187 w.write(&alloc::vec![i as u8])
1188 .expect("keep_last never fails");
1189 }
1190 assert_eq!(w.cache().len(), 3);
1192 assert_eq!(w.cache().min_sn(), Some(sn(3)));
1193 assert_eq!(w.cache().max_sn(), Some(sn(5)));
1194 assert_eq!(w.cache().evicted_count(), 2);
1195 }
1196
1197 #[test]
1198 fn keep_last_stalled_reader_does_not_block_fresh_writes() {
1199 let writer_guid = Guid::new(
1203 GuidPrefix::from_bytes([1; 12]),
1204 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1205 );
1206 let stalled = ReaderProxy::new(
1207 Guid::new(
1208 GuidPrefix::from_bytes([9; 12]),
1209 EntityId::user_reader_with_key([0xDE, 0xAD, 0x00]),
1210 ),
1211 alloc::vec![Locator::udp_v4([127, 0, 0, 99], 9999)],
1212 alloc::vec![],
1213 true,
1214 );
1215 let active = ReaderProxy::new(
1216 reader_guid(),
1217 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1218 alloc::vec![],
1219 true,
1220 );
1221 let mut w = ReliableWriter::new(ReliableWriterConfig {
1222 guid: writer_guid,
1223 vendor_id: VendorId::ZERODDS,
1224 reader_proxies: alloc::vec![stalled, active],
1225 max_samples: 3,
1226 history_kind: HistoryKind::KeepLast { depth: 3 },
1227 heartbeat_period: Duration::from_secs(10),
1228 fragment_size: DEFAULT_FRAGMENT_SIZE,
1229 mtu: DEFAULT_MTU,
1230 });
1231 for i in 1..=10 {
1233 w.write(&alloc::vec![i as u8]).expect("never blocks");
1234 }
1235 assert_eq!(w.cache().len(), 3);
1236 assert_eq!(w.cache().min_sn(), Some(sn(8)));
1237 w.handle_acknack(reader_guid(), sn(1), [sn(2)]);
1239 let out = w.tick(Duration::ZERO).unwrap();
1240 let has_gap = out.iter().any(|d| {
1241 decode_datagram(&d.bytes)
1242 .unwrap()
1243 .submessages
1244 .iter()
1245 .any(|s| matches!(s, ParsedSubmessage::Gap(_)))
1246 });
1247 assert!(has_gap, "evicted SN must elicit GAP");
1248 }
1249
1250 #[test]
1251 fn handle_acknack_unknown_source_counts_but_noops() {
1252 let mut w = make_writer(10, Duration::from_secs(10));
1253 w.write(&alloc::vec![1]).unwrap();
1254 let foreign = Guid::new(
1255 GuidPrefix::from_bytes([0xFF; 12]),
1256 EntityId::user_reader_with_key([0xFF, 0xFF, 0xFF]),
1257 );
1258 w.handle_acknack(foreign, sn(5), [sn(2)]);
1259 assert_eq!(w.cache().len(), 1, "cache untouched");
1260 assert_eq!(first_proxy(&w).pending_requested_count(), 0);
1261 assert_eq!(w.unknown_src_count(), 1, "unknown source counted");
1262 }
1263
1264 #[test]
1265 fn handle_nackfrag_unknown_source_counts() {
1266 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1267 let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1268 let foreign = Guid::new(
1269 GuidPrefix::from_bytes([0xFF; 12]),
1270 EntityId::user_reader_with_key([0xFF, 0xFF, 0xFF]),
1271 );
1272 let nf = NackFragSubmessage {
1273 reader_id: foreign.entity_id,
1274 writer_id: w.guid.entity_id,
1275 writer_sn: sn(1),
1276 fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1277 FragmentNumber(1),
1278 &[FragmentNumber(2)],
1279 ),
1280 count: 1,
1281 };
1282 w.handle_nackfrag(foreign, &nf);
1283 assert_eq!(w.nackfrag_count(), 0, "not counted as legit nackfrag");
1284 assert_eq!(w.unknown_src_count(), 1);
1285 }
1286
1287 #[test]
1288 fn tick_resends_requested_as_data_aggregated_with_hb() {
1289 let mut w = make_writer(10, Duration::from_secs(10));
1290 let rguid = reader_guid();
1291 for i in 1..=3 {
1292 w.write(&alloc::vec![i as u8]).unwrap();
1293 }
1294 w.handle_acknack(rguid, sn(1), [sn(2)]);
1295 let out = w.tick(Duration::ZERO).unwrap();
1296 let parsed = decode_datagram(&out[0].bytes).unwrap();
1298 let has_data_2 = parsed
1299 .submessages
1300 .iter()
1301 .any(|s| matches!(s, ParsedSubmessage::Data(d) if d.writer_sn == sn(2)));
1302 let has_hb = parsed
1303 .submessages
1304 .iter()
1305 .any(|s| matches!(s, ParsedSubmessage::Heartbeat(_)));
1306 assert!(has_data_2, "DATA-Resend fuer sn(2)");
1307 assert!(has_hb, "Piggyback-HEARTBEAT im gleichen Datagramm");
1308 }
1309
1310 #[test]
1311 fn tick_resends_evicted_request_as_gap() {
1312 let mut w = make_writer(10, Duration::from_secs(10));
1313 let rguid = reader_guid();
1314 w.write(&alloc::vec![1]).unwrap();
1315 w.handle_acknack(rguid, sn(1), [sn(5)]);
1316 let out = w.tick(Duration::ZERO).unwrap();
1317 let has_gap = out.iter().any(|d| {
1318 decode_datagram(&d.bytes)
1319 .unwrap()
1320 .submessages
1321 .iter()
1322 .any(|s| matches!(s, ParsedSubmessage::Gap(_)))
1323 });
1324 assert!(has_gap);
1325 }
1326
1327 #[test]
1328 fn write_at_cache_capacity_is_error() {
1329 let mut w = make_writer(2, Duration::from_secs(10));
1330 w.write(&alloc::vec![1]).unwrap();
1331 w.write(&alloc::vec![2]).unwrap();
1332 assert!(w.write(&alloc::vec![3]).is_err());
1333 }
1334
1335 #[test]
1336 fn heartbeat_count_increments() {
1337 let mut w = make_writer(10, Duration::from_millis(100));
1338 w.write(&alloc::vec![1]).unwrap();
1339 assert_eq!(w.heartbeat_count(), 0);
1340 w.tick(Duration::ZERO).unwrap();
1341 assert_eq!(w.heartbeat_count(), 1);
1342 w.tick(Duration::from_millis(150)).unwrap();
1343 assert_eq!(w.heartbeat_count(), 2);
1344 }
1345
1346 #[test]
1347 fn heartbeat_count_wraps_around_at_i32_max_per_spec_8_4_15_7() {
1348 let mut w = make_writer(10, Duration::from_millis(100));
1352 w.write(&alloc::vec![1]).unwrap();
1353 let counter: i32 = i32::MAX;
1358 let next = counter.wrapping_add(1);
1359 assert_eq!(next, i32::MIN, "i32::MAX + 1 wraps to i32::MIN");
1360 let after_wrap = next.wrapping_add(1);
1361 assert_eq!(after_wrap, i32::MIN + 1);
1362 }
1363
1364 #[test]
1367 fn write_under_fragment_size_produces_single_data() {
1368 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 10);
1369 let dgs = w.write(&alloc::vec![1, 2, 3, 4, 5]).unwrap();
1370 assert_eq!(dgs.len(), 1);
1371 let parsed = decode_datagram(&dgs[0].bytes).unwrap();
1372 assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
1373 }
1374
1375 #[test]
1376 fn write_above_fragment_size_produces_data_frag_split() {
1377 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1378 let payload: alloc::vec::Vec<u8> = (1..=10).collect();
1379 let dgs = w.write(&payload).unwrap();
1380 assert_eq!(dgs.len(), 3);
1381 for (i, dg) in dgs.iter().enumerate() {
1382 match &decode_datagram(&dg.bytes).unwrap().submessages[0] {
1383 ParsedSubmessage::DataFrag(df) => {
1384 assert_eq!(df.fragment_starting_num.0, (i as u32) + 1);
1385 assert_eq!(df.fragments_in_submessage, 1);
1386 assert_eq!(df.fragment_size, 4);
1387 assert_eq!(df.sample_size, 10);
1388 }
1389 other => panic!("expected DataFrag, got {other:?}"),
1390 }
1391 }
1392 }
1393
1394 #[test]
1395 fn handle_nackfrag_queues_fragment_resends() {
1396 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1397 let rguid = reader_guid();
1398 let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1399 let nf = NackFragSubmessage {
1400 reader_id: rguid.entity_id,
1401 writer_id: w.guid.entity_id,
1402 writer_sn: sn(1),
1403 fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1404 FragmentNumber(1),
1405 &[FragmentNumber(2), FragmentNumber(3)],
1406 ),
1407 count: 1,
1408 };
1409 w.handle_nackfrag(rguid, &nf);
1410 assert_eq!(w.nackfrag_count(), 1);
1411 assert_eq!(first_proxy(&w).pending_requested_fragment_count(), 2);
1412 }
1413
1414 #[test]
1415 fn tick_resends_requested_fragments() {
1416 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1417 let rguid = reader_guid();
1418 let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1419 let nf = NackFragSubmessage {
1420 reader_id: rguid.entity_id,
1421 writer_id: w.guid.entity_id,
1422 writer_sn: sn(1),
1423 fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1424 FragmentNumber(1),
1425 &[FragmentNumber(3)],
1426 ),
1427 count: 1,
1428 };
1429 w.handle_nackfrag(rguid, &nf);
1430 let out = w.tick(Duration::ZERO).unwrap();
1431 let frag_resends: alloc::vec::Vec<_> = out
1432 .iter()
1433 .filter(|d| {
1434 decode_datagram(&d.bytes)
1435 .unwrap()
1436 .submessages
1437 .iter()
1438 .any(|s| matches!(s, ParsedSubmessage::DataFrag(df) if df.fragment_starting_num == FragmentNumber(3)))
1439 })
1440 .collect();
1441 assert_eq!(frag_resends.len(), 1);
1442 assert_eq!(first_proxy(&w).pending_requested_fragment_count(), 0);
1443 }
1444
1445 #[test]
1446 fn acknack_resend_for_fragmented_sn_sends_all_fragments() {
1447 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1448 let rguid = reader_guid();
1449 let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1450 w.handle_acknack(rguid, sn(1), [sn(1)]);
1451 let out = w.tick(Duration::ZERO).unwrap();
1452 let frags: alloc::vec::Vec<_> = out
1453 .iter()
1454 .filter(|d| {
1455 decode_datagram(&d.bytes)
1456 .unwrap()
1457 .submessages
1458 .iter()
1459 .any(|s| matches!(s, ParsedSubmessage::DataFrag(_)))
1460 })
1461 .collect();
1462 assert_eq!(frags.len(), 3);
1463 }
1464
1465 #[test]
1466 fn heartbeat_carries_cache_range() {
1467 let mut w = make_writer(10, Duration::from_millis(100));
1468 w.write(&alloc::vec![1]).unwrap();
1469 w.write(&alloc::vec![2]).unwrap();
1470 w.write(&alloc::vec![3]).unwrap();
1471 let out = w.tick(Duration::ZERO).unwrap();
1472 let parsed = decode_datagram(&out[0].bytes).unwrap();
1473 let hb = parsed
1474 .submessages
1475 .iter()
1476 .find_map(|s| {
1477 if let ParsedSubmessage::Heartbeat(h) = s {
1478 Some(h)
1479 } else {
1480 None
1481 }
1482 })
1483 .expect("HB in output");
1484 assert_eq!(hb.first_sn, sn(1));
1485 assert_eq!(hb.last_sn, sn(3));
1486 }
1487
1488 #[test]
1491 fn write_fans_out_to_all_reader_proxies() {
1492 let mut w = make_writer(10, Duration::from_secs(10));
1493 let second = Guid::new(
1494 GuidPrefix::from_bytes([3; 12]),
1495 EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1496 );
1497 w.add_reader_proxy(ReaderProxy::new(
1498 second,
1499 alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1500 alloc::vec![],
1501 true,
1502 ));
1503 let dgs = w.write(&alloc::vec![0xAA]).unwrap();
1504 assert_eq!(dgs.len(), 2, "one datagram per reader-proxy");
1505 assert_ne!(dgs[0].targets, dgs[1].targets);
1507 }
1508
1509 #[test]
1510 fn add_reader_proxy_is_idempotent_on_same_guid() {
1511 let mut w = make_writer(10, Duration::from_secs(10));
1512 let rguid = reader_guid();
1513 let replacement = ReaderProxy::new(
1514 rguid,
1515 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 9999)],
1516 alloc::vec![],
1517 true,
1518 );
1519 w.add_reader_proxy(replacement);
1520 assert_eq!(w.reader_proxy_count(), 1);
1521 assert_eq!(
1522 w.reader_proxies()[0].unicast_locators,
1523 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 9999)]
1524 );
1525 }
1526
1527 #[test]
1528 fn remove_reader_proxy_by_guid() {
1529 let mut w = make_writer(10, Duration::from_secs(10));
1530 let rguid = reader_guid();
1531 let removed = w.remove_reader_proxy(rguid);
1532 assert!(removed.is_some());
1533 assert_eq!(w.reader_proxy_count(), 0);
1534 assert!(
1535 w.remove_reader_proxy(rguid).is_none(),
1536 "second remove is None"
1537 );
1538 }
1539
1540 #[test]
1541 fn acknack_dispatches_to_matching_proxy_only() {
1542 let mut w = make_writer(10, Duration::from_secs(10));
1543 let rguid1 = reader_guid();
1544 let rguid2 = Guid::new(
1545 GuidPrefix::from_bytes([3; 12]),
1546 EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1547 );
1548 w.add_reader_proxy(ReaderProxy::new(
1549 rguid2,
1550 alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1551 alloc::vec![],
1552 true,
1553 ));
1554 for i in 1..=3 {
1555 w.write(&alloc::vec![i as u8]).unwrap();
1556 }
1557 w.handle_acknack(rguid1, sn(4), []);
1558 assert_eq!(w.reader_proxies()[0].highest_acked_sn(), sn(3));
1561 assert_eq!(w.reader_proxies()[1].highest_acked_sn(), sn(0));
1562 assert_eq!(w.cache().len(), 3, "KeepAll cache intact");
1563 }
1564
1565 #[test]
1566 fn nackfrag_dispatches_only_to_matching_proxy() {
1567 let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1568 let rguid1 = reader_guid();
1569 let rguid2 = Guid::new(
1570 GuidPrefix::from_bytes([3; 12]),
1571 EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1572 );
1573 w.add_reader_proxy(ReaderProxy::new(
1574 rguid2,
1575 alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1576 alloc::vec![],
1577 true,
1578 ));
1579 let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1580 let nf = NackFragSubmessage {
1581 reader_id: rguid1.entity_id,
1582 writer_id: w.guid.entity_id,
1583 writer_sn: sn(1),
1584 fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1585 FragmentNumber(1),
1586 &[FragmentNumber(2)],
1587 ),
1588 count: 1,
1589 };
1590 w.handle_nackfrag(rguid1, &nf);
1591 assert_eq!(w.reader_proxies()[0].pending_requested_fragment_count(), 1);
1592 assert_eq!(w.reader_proxies()[1].pending_requested_fragment_count(), 0);
1593 }
1594
1595 #[test]
1601 fn periodic_heartbeat_has_final_flag_unset() {
1602 let mut w = make_writer(10, Duration::from_millis(50));
1603 w.write(&alloc::vec![1]).unwrap();
1604 let out = w.tick(Duration::ZERO).unwrap();
1605 let parsed = decode_datagram(&out[0].bytes).unwrap();
1606 let hb = parsed
1607 .submessages
1608 .iter()
1609 .find_map(|s| {
1610 if let ParsedSubmessage::Heartbeat(h) = s {
1611 Some(h)
1612 } else {
1613 None
1614 }
1615 })
1616 .expect("HB must be present");
1617 assert!(
1618 !hb.final_flag,
1619 "periodic HB must NOT set FinalFlag (Spec §8.4.9.2.7)"
1620 );
1621 }
1622
1623 #[test]
1627 fn heartbeat_after_add_reader_proxy_is_non_final() {
1628 let mut w = make_writer(10, Duration::from_secs(60));
1629 w.write(&alloc::vec![1]).unwrap();
1630 let _ = w.tick(Duration::ZERO).unwrap();
1632 let second = ReaderProxy::new(
1634 Guid::new(
1635 GuidPrefix::from_bytes([7; 12]),
1636 EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1637 ),
1638 alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1639 alloc::vec![],
1640 true,
1641 );
1642 w.add_reader_proxy(second);
1643 let out = w.tick(Duration::ZERO).unwrap();
1644 let mut hb_found = 0usize;
1645 for d in &out {
1646 for s in &decode_datagram(&d.bytes).unwrap().submessages {
1647 if let ParsedSubmessage::Heartbeat(h) = s {
1648 assert!(
1649 !h.final_flag,
1650 "post-add_reader_proxy HB must be non-final (Spec §8.4.9.2.7)"
1651 );
1652 hb_found += 1;
1653 }
1654 }
1655 }
1656 assert!(hb_found >= 1, "at least one HB expected");
1657 }
1658
1659 #[test]
1660 fn aggregation_packs_multiple_resends_into_one_datagram() {
1661 let mut w = make_writer(10, Duration::from_secs(10));
1662 let rguid = reader_guid();
1663 for i in 1..=3 {
1664 w.write(&alloc::vec![i as u8]).unwrap();
1665 }
1666 w.handle_acknack(rguid, sn(1), [sn(1), sn(2), sn(3)]);
1668 let out = w.tick(Duration::ZERO).unwrap();
1669 assert_eq!(out.len(), 1, "all resends aggregated into single datagram");
1671 let parsed = decode_datagram(&out[0].bytes).unwrap();
1672 let data_count = parsed
1673 .submessages
1674 .iter()
1675 .filter(|s| matches!(s, ParsedSubmessage::Data(_)))
1676 .count();
1677 assert_eq!(data_count, 3);
1678 let hb_count = parsed
1679 .submessages
1680 .iter()
1681 .filter(|s| matches!(s, ParsedSubmessage::Heartbeat(_)))
1682 .count();
1683 assert_eq!(hb_count, 1);
1684 }
1685}