1use core::time::Duration;
35
36extern crate alloc;
37use alloc::vec::Vec;
38
39use alloc::rc::Rc;
40
41use crate::error::WireError;
42use crate::fragment_assembler::{AssemblerCaps, FragmentAssembler};
43use crate::header::RtpsHeader;
44use crate::history_cache::{CacheChange, ChangeKind, HistoryCache};
45use crate::message_builder::OutboundDatagram;
46use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
47use crate::submessages::{
48 AckNackSubmessage, DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage,
49 NackFragSubmessage, SequenceNumberSet,
50};
51use crate::wire_types::{Guid, SequenceNumber, VendorId};
52use crate::writer_proxy::WriterProxy;
53
54pub const DEFAULT_HEARTBEAT_RESPONSE_DELAY: Duration = Duration::from_millis(0);
70
71#[derive(Debug, Clone)]
78pub struct WriterProxyState {
79 pub proxy: WriterProxy,
81 pub received_cache: HistoryCache,
83 pub delivered_up_to: SequenceNumber,
85 pub assembler: FragmentAssembler,
87 pub pending_acknack_since: Option<Duration>,
90}
91
92impl WriterProxyState {
93 fn new(proxy: WriterProxy, max_samples: usize, caps: AssemblerCaps) -> Self {
94 Self {
95 proxy,
96 received_cache: HistoryCache::new(max_samples),
97 delivered_up_to: SequenceNumber(0),
98 assembler: FragmentAssembler::new(caps),
99 pending_acknack_since: None,
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct ReliableReader {
107 guid: Guid,
108 vendor_id: VendorId,
109 writer_proxies: Vec<WriterProxyState>,
110 heartbeat_response_delay: Duration,
111 acknack_count: i32,
112 nackfrag_count: i32,
113 duplicate_frag_count: u64,
114 max_samples_per_proxy: usize,
116 assembler_caps: AssemblerCaps,
117 unknown_src_count: u64,
119}
120
121#[derive(Debug, Clone)]
123pub struct ReliableReaderConfig {
124 pub guid: Guid,
126 pub vendor_id: VendorId,
128 pub writer_proxies: Vec<WriterProxy>,
130 pub max_samples_per_proxy: usize,
132 pub heartbeat_response_delay: Duration,
134 pub assembler_caps: AssemblerCaps,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct DeliveredSample {
141 pub writer_guid: Guid,
144 pub sequence_number: SequenceNumber,
146 pub payload: alloc::sync::Arc<[u8]>,
149 pub kind: ChangeKind,
155 pub key_hash: Option<[u8; 16]>,
162}
163
164impl ReliableReader {
165 #[must_use]
170 pub fn new(cfg: ReliableReaderConfig) -> Self {
171 assert!(
172 cfg.assembler_caps.max_pending_sns > 0,
173 "assembler_caps.max_pending_sns must be > 0; use a Best-Effort reader \
174 or increase the cap to actually accept fragmented samples"
175 );
176 let proxies = cfg
177 .writer_proxies
178 .into_iter()
179 .map(|p| WriterProxyState::new(p, cfg.max_samples_per_proxy, cfg.assembler_caps))
180 .collect();
181 Self {
182 guid: cfg.guid,
183 vendor_id: cfg.vendor_id,
184 writer_proxies: proxies,
185 heartbeat_response_delay: cfg.heartbeat_response_delay,
186 acknack_count: 0,
187 nackfrag_count: 0,
188 duplicate_frag_count: 0,
189 max_samples_per_proxy: cfg.max_samples_per_proxy,
190 assembler_caps: cfg.assembler_caps,
191 unknown_src_count: 0,
192 }
193 }
194
195 #[must_use]
197 pub fn guid(&self) -> Guid {
198 self.guid
199 }
200
201 #[must_use]
203 pub fn writer_proxies(&self) -> &[WriterProxyState] {
204 &self.writer_proxies
205 }
206
207 #[must_use]
209 pub fn writer_proxy_count(&self) -> usize {
210 self.writer_proxies.len()
211 }
212
213 #[must_use]
215 pub fn acknack_count(&self) -> i32 {
216 self.acknack_count
217 }
218
219 #[must_use]
221 pub fn nackfrag_count(&self) -> i32 {
222 self.nackfrag_count
223 }
224
225 #[must_use]
228 pub fn pending_fragment_count(&self) -> usize {
229 self.writer_proxies.iter().map(|s| s.assembler.len()).sum()
230 }
231
232 #[must_use]
235 pub fn dropped_fragment_count(&self) -> u64 {
236 self.writer_proxies
237 .iter()
238 .map(|s| s.assembler.drop_count())
239 .sum()
240 }
241
242 #[must_use]
245 pub fn duplicate_fragment_count(&self) -> u64 {
246 self.duplicate_frag_count
247 }
248
249 #[must_use]
252 pub fn unknown_src_count(&self) -> u64 {
253 self.unknown_src_count
254 }
255
256 pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
263 let guid = proxy.remote_writer_guid;
264 let mut state =
265 WriterProxyState::new(proxy, self.max_samples_per_proxy, self.assembler_caps);
266 state.pending_acknack_since = Some(Duration::ZERO);
269 if let Some(idx) = self
270 .writer_proxies
271 .iter()
272 .position(|s| s.proxy.remote_writer_guid == guid)
273 {
274 self.writer_proxies[idx] = state;
275 } else {
276 self.writer_proxies.push(state);
277 }
278 }
279
280 pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
282 let idx = self
283 .writer_proxies
284 .iter()
285 .position(|s| s.proxy.remote_writer_guid == guid)?;
286 Some(self.writer_proxies.remove(idx).proxy)
287 }
288
289 pub fn reset_diagnostics(&mut self) {
291 self.acknack_count = 0;
292 self.nackfrag_count = 0;
293 self.duplicate_frag_count = 0;
294 self.unknown_src_count = 0;
295 for s in &mut self.writer_proxies {
296 s.assembler.reset_diagnostics();
297 }
298 }
299
300 pub fn handle_data(&mut self, data: &DataSubmessage) -> Vec<DeliveredSample> {
310 let Some(idx) = self.proxy_index_by_writer_id(data.writer_id) else {
311 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
312 return Vec::new();
313 };
314 let state = &mut self.writer_proxies[idx];
315 let sn = data.writer_sn;
316 if state.proxy.is_known(sn) || sn <= state.delivered_up_to {
317 return Vec::new();
318 }
319 state.proxy.received_change_set(sn);
320 let kind = Self::classify_change_kind(data);
321 let key_hash = data
322 .inline_qos
323 .as_ref()
324 .and_then(crate::inline_qos::find_key_hash);
325 let _ = state.received_cache.insert(CacheChange {
329 sequence_number: sn,
330 payload: alloc::sync::Arc::clone(&data.serialized_payload),
331 kind,
332 key_hash,
333 });
334 Self::collect_in_order_for(state)
335 }
336
337 fn classify_change_kind(data: &DataSubmessage) -> ChangeKind {
341 if !data.key_flag {
342 return ChangeKind::Alive;
343 }
344 let Some(pl) = data.inline_qos.as_ref() else {
345 return ChangeKind::Alive;
346 };
347 let Some(bits) = crate::inline_qos::find_status_info(pl) else {
348 return ChangeKind::Alive;
349 };
350 let disposed = bits & crate::inline_qos::status_info::DISPOSED != 0;
351 let unregistered = bits & crate::inline_qos::status_info::UNREGISTERED != 0;
352 match (disposed, unregistered) {
353 (true, true) => ChangeKind::NotAliveDisposedUnregistered,
354 (true, false) => ChangeKind::NotAliveDisposed,
355 (false, true) => ChangeKind::NotAliveUnregistered,
356 (false, false) => ChangeKind::Alive,
357 }
358 }
359
360 pub fn handle_data_frag(
363 &mut self,
364 df: &DataFragSubmessage,
365 now: Duration,
366 ) -> Vec<DeliveredSample> {
367 let Some(idx) = self.proxy_index_by_writer_id(df.writer_id) else {
368 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
369 return Vec::new();
370 };
371 let state = &mut self.writer_proxies[idx];
372 let sn = df.writer_sn;
373 if state.proxy.is_known(sn) || sn <= state.delivered_up_to {
374 self.duplicate_frag_count = self.duplicate_frag_count.saturating_add(1);
375 return Vec::new();
376 }
377 let result = if let Some(completed) = state.assembler.insert(df) {
378 state.proxy.received_change_set(sn);
379 let _ = state
380 .received_cache
381 .insert(CacheChange::alive(sn, completed.payload));
382 Self::collect_in_order_for(state)
383 } else {
384 Vec::new()
385 };
386 if state.assembler.has_gaps() {
387 state.pending_acknack_since.get_or_insert(now);
388 }
389 result
390 }
391
392 pub fn handle_heartbeat(
394 &mut self,
395 hb: &HeartbeatSubmessage,
396 now: Duration,
397 ) -> Vec<DeliveredSample> {
398 let Some(idx) = self.proxy_index_by_writer_id(hb.writer_id) else {
399 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
400 return Vec::new();
401 };
402 let state = &mut self.writer_proxies[idx];
403 if hb.liveliness_flag {
404 return Vec::new();
405 }
406 state.proxy.update_from_heartbeat(hb.first_sn, hb.last_sn);
407 let has_missing = state.proxy.has_missing_changes();
408 let has_frag_gaps = state.assembler.has_gaps();
409 if !hb.final_flag || has_missing || has_frag_gaps {
410 state.pending_acknack_since.get_or_insert(now);
411 }
412 Self::collect_in_order_for(state)
418 }
419
420 pub fn handle_gap(&mut self, gap: &GapSubmessage) -> Vec<DeliveredSample> {
422 let Some(idx) = self.proxy_index_by_writer_id(gap.writer_id) else {
423 self.unknown_src_count = self.unknown_src_count.saturating_add(1);
424 return Vec::new();
425 };
426 let state = &mut self.writer_proxies[idx];
427 let mut sn = gap.gap_start;
428 while sn < gap.gap_list.bitmap_base {
429 state.proxy.irrelevant_change_set(sn);
430 state.assembler.discard(sn);
431 sn = SequenceNumber(sn.0 + 1);
432 }
433 for sn in gap.gap_list.iter_set() {
434 state.proxy.irrelevant_change_set(sn);
435 state.assembler.discard(sn);
436 }
437 Self::collect_in_order_for(state)
438 }
439
440 pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
447 Ok(self
448 .tick_outbound(now)?
449 .into_iter()
450 .map(|d| d.bytes)
451 .collect())
452 }
453
454 pub fn tick_outbound(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
461 let mut out = Vec::new();
462 for idx in 0..self.writer_proxies.len() {
463 let Some(since) = self.writer_proxies[idx].pending_acknack_since else {
464 continue;
465 };
466 if now.saturating_sub(since) < self.heartbeat_response_delay {
467 continue;
468 }
469 self.writer_proxies[idx].pending_acknack_since = None;
470 let targets = Rc::new(self.writer_proxies[idx].proxy.unicast_locators.clone());
471
472 let incomplete_sns: Vec<SequenceNumber> = self.writer_proxies[idx]
473 .assembler
474 .incomplete_sns()
475 .collect();
476 for sn in incomplete_sns {
477 let bytes = self.build_nackfrag_datagram(idx, sn)?;
478 out.push(OutboundDatagram {
479 bytes,
480 targets: Rc::clone(&targets),
481 });
482 }
483 let bytes = self.build_acknack_datagram(idx)?;
484 out.push(OutboundDatagram { bytes, targets });
485 }
486 Ok(out)
487 }
488
489 fn proxy_index_by_writer_id(&self, writer_id: crate::wire_types::EntityId) -> Option<usize> {
492 self.writer_proxies
493 .iter()
494 .position(|s| s.proxy.remote_writer_guid.entity_id == writer_id)
495 }
496
497 fn collect_in_order_for(state: &mut WriterProxyState) -> Vec<DeliveredSample> {
498 let mut out = Vec::new();
499 loop {
500 let next = SequenceNumber(state.delivered_up_to.0 + 1);
501 if let Some(change) = state.received_cache.get(next) {
502 out.push(DeliveredSample {
503 writer_guid: state.proxy.remote_writer_guid,
504 sequence_number: change.sequence_number,
505 payload: change.payload.clone(),
506 kind: change.kind,
507 key_hash: change.key_hash,
508 });
509 state.delivered_up_to = next;
510 state.received_cache.remove_up_to(next);
511 } else if state.proxy.is_known(next) && state.proxy.last_available_sn() >= next {
512 state.delivered_up_to = next;
513 } else if next < state.proxy.first_available_sn() {
514 state.delivered_up_to = next;
520 } else {
521 break;
522 }
523 }
524 out
525 }
526
527 fn build_nackfrag_datagram(
528 &mut self,
529 proxy_idx: usize,
530 sn: SequenceNumber,
531 ) -> Result<Vec<u8>, WireError> {
532 let missing = self.writer_proxies[proxy_idx]
533 .assembler
534 .missing_fragments(sn);
535 self.nackfrag_count = self.nackfrag_count.wrapping_add(1);
536 let writer_guid = self.writer_proxies[proxy_idx].proxy.remote_writer_guid;
537 let nf = NackFragSubmessage {
538 reader_id: self.guid.entity_id,
539 writer_id: writer_guid.entity_id,
540 writer_sn: sn,
541 fragment_number_state: missing,
542 count: self.nackfrag_count,
543 };
544 let (body, mut flags) = nf.write_body(true);
545 flags |= FLAG_E_LITTLE_ENDIAN;
546 self.wrap_to_writer(writer_guid.prefix, SubmessageId::NackFrag, flags, &body)
547 }
548
549 fn build_acknack_datagram(&mut self, proxy_idx: usize) -> Result<Vec<u8>, WireError> {
550 let state = &self.writer_proxies[proxy_idx];
551 let base = state.proxy.acknack_base();
552 let missing = state.proxy.missing_changes(256);
553 let snset = SequenceNumberSet::from_missing(base, &missing);
554 self.acknack_count = self.acknack_count.wrapping_add(1);
555 let final_flag = missing.is_empty() && state.proxy.last_available_sn().0 >= 1;
562 let writer_guid = state.proxy.remote_writer_guid;
563 let ack = AckNackSubmessage {
564 reader_id: self.guid.entity_id,
565 writer_id: writer_guid.entity_id,
566 reader_sn_state: snset,
567 count: self.acknack_count,
568 final_flag,
569 };
570 let (body, mut flags) = ack.write_body(true);
571 flags |= FLAG_E_LITTLE_ENDIAN;
572 self.wrap_to_writer(writer_guid.prefix, SubmessageId::AckNack, flags, &body)
573 }
574
575 fn wrap_to_writer(
580 &self,
581 writer_prefix: crate::wire_types::GuidPrefix,
582 id: SubmessageId,
583 flags: u8,
584 body: &[u8],
585 ) -> Result<Vec<u8>, WireError> {
586 let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
587 let mut out = Vec::new();
588 out.extend_from_slice(&header.to_bytes());
589
590 let info_dst_header = SubmessageHeader {
592 submessage_id: SubmessageId::InfoDst,
593 flags: FLAG_E_LITTLE_ENDIAN,
594 octets_to_next_header: 12,
595 };
596 out.extend_from_slice(&info_dst_header.to_bytes());
597 out.extend_from_slice(&writer_prefix.to_bytes());
598
599 let body_len = u16::try_from(body.len()).map_err(|_| WireError::ValueOutOfRange {
601 message: "submessage body exceeds u16::MAX",
602 })?;
603 let sh = SubmessageHeader {
604 submessage_id: id,
605 flags,
606 octets_to_next_header: body_len,
607 };
608 out.extend_from_slice(&sh.to_bytes());
609 out.extend_from_slice(body);
610 Ok(out)
611 }
612}
613
614#[cfg(test)]
615#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
616mod tests {
617 use super::*;
618 use crate::datagram::{ParsedSubmessage, decode_datagram};
619 use crate::wire_types::{EntityId, GuidPrefix, Locator};
620
621 fn single_writer_guid() -> Guid {
622 Guid::new(
623 GuidPrefix::from_bytes([1; 12]),
624 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
625 )
626 }
627
628 fn make_reader(max_samples: usize) -> ReliableReader {
629 let reader_guid = Guid::new(
630 GuidPrefix::from_bytes([2; 12]),
631 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
632 );
633 let writer_proxy = WriterProxy::new(
634 single_writer_guid(),
635 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
636 alloc::vec![],
637 true,
638 );
639 ReliableReader::new(ReliableReaderConfig {
640 guid: reader_guid,
641 vendor_id: VendorId::ZERODDS,
642 writer_proxies: alloc::vec![writer_proxy],
643 max_samples_per_proxy: max_samples,
644 heartbeat_response_delay: Duration::from_millis(200),
645 assembler_caps: AssemblerCaps::default(),
646 })
647 }
648
649 fn sn(n: i64) -> SequenceNumber {
650 SequenceNumber(n)
651 }
652
653 fn data(wid: EntityId, rid: EntityId, n: i64, byte: u8) -> DataSubmessage {
654 DataSubmessage {
655 extra_flags: 0,
656 reader_id: rid,
657 writer_id: wid,
658 writer_sn: sn(n),
659 inline_qos: None,
660 key_flag: false,
661 non_standard_flag: false,
662 serialized_payload: alloc::sync::Arc::from(alloc::vec![byte]),
663 }
664 }
665
666 fn heartbeat(
667 wid: EntityId,
668 rid: EntityId,
669 first: i64,
670 last: i64,
671 count: i32,
672 final_flag: bool,
673 ) -> HeartbeatSubmessage {
674 HeartbeatSubmessage {
675 reader_id: rid,
676 writer_id: wid,
677 first_sn: sn(first),
678 last_sn: sn(last),
679 count,
680 final_flag,
681 liveliness_flag: false,
682 group_info: None,
683 }
684 }
685
686 fn first_state(r: &ReliableReader) -> &WriterProxyState {
687 &r.writer_proxies()[0]
688 }
689
690 #[test]
691 fn in_order_data_delivered_immediately() {
692 let mut r = make_reader(10);
693 let w_eid = single_writer_guid().entity_id;
694 let r_eid = r.guid().entity_id;
695 let delivered = r.handle_data(&data(w_eid, r_eid, 1, 0xAA));
696 assert_eq!(delivered.len(), 1);
697 assert_eq!(delivered[0].payload.as_ref(), &[0xAA][..]);
698 assert_eq!(delivered[0].writer_guid, single_writer_guid());
699 assert_eq!(first_state(&r).delivered_up_to, sn(1));
700 }
701
702 #[test]
703 fn out_of_order_data_buffered_until_gap_filled() {
704 let mut r = make_reader(10);
705 let w = single_writer_guid().entity_id;
706 let rd = r.guid().entity_id;
707 assert!(r.handle_data(&data(w, rd, 2, 0x22)).is_empty());
708 assert!(r.handle_data(&data(w, rd, 3, 0x33)).is_empty());
709 let out = r.handle_data(&data(w, rd, 1, 0x11));
710 assert_eq!(
711 out.iter().map(|s| s.sequence_number).collect::<Vec<_>>(),
712 alloc::vec![sn(1), sn(2), sn(3)]
713 );
714 assert_eq!(first_state(&r).delivered_up_to, sn(3));
715 }
716
717 #[test]
718 fn duplicate_data_is_rejected() {
719 let mut r = make_reader(10);
720 let w = single_writer_guid().entity_id;
721 let rd = r.guid().entity_id;
722 r.handle_data(&data(w, rd, 1, 0xAA));
723 let second = r.handle_data(&data(w, rd, 1, 0xAA));
724 assert!(second.is_empty());
725 }
726
727 #[test]
728 fn mismatched_writer_id_is_counted() {
729 let mut r = make_reader(10);
730 let rd = r.guid().entity_id;
731 let foreign = EntityId::user_writer_with_key([0xFF, 0xFF, 0xFF]);
732 assert!(r.handle_data(&data(foreign, rd, 1, 0xAA)).is_empty());
733 assert_eq!(r.unknown_src_count(), 1);
734 }
735
736 #[test]
739 fn alive_data_yields_alive_changekind() {
740 let mut r = make_reader(10);
741 let w = single_writer_guid().entity_id;
742 let rd = r.guid().entity_id;
743 let delivered = r.handle_data(&data(w, rd, 1, 0xAA));
744 assert_eq!(delivered.len(), 1);
745 assert_eq!(delivered[0].kind, ChangeKind::Alive);
746 }
747
748 fn lifecycle_data(
749 wid: EntityId,
750 rid: EntityId,
751 n: i64,
752 key_hash: [u8; 16],
753 status_bits: u32,
754 ) -> DataSubmessage {
755 DataSubmessage {
756 extra_flags: 0,
757 reader_id: rid,
758 writer_id: wid,
759 writer_sn: sn(n),
760 inline_qos: Some(crate::inline_qos::lifecycle_inline_qos(
761 key_hash,
762 status_bits,
763 )),
764 key_flag: true,
765 non_standard_flag: false,
766 serialized_payload: alloc::sync::Arc::from(alloc::vec![0u8; 0]),
767 }
768 }
769
770 #[test]
771 fn dispose_data_yields_not_alive_disposed() {
772 let mut r = make_reader(10);
773 let w = single_writer_guid().entity_id;
774 let rd = r.guid().entity_id;
775 let delivered = r.handle_data(&lifecycle_data(
776 w,
777 rd,
778 1,
779 [0xAB; 16],
780 crate::inline_qos::status_info::DISPOSED,
781 ));
782 assert_eq!(delivered.len(), 1);
783 assert_eq!(delivered[0].kind, ChangeKind::NotAliveDisposed);
784 }
785
786 #[test]
787 fn unregister_data_yields_not_alive_unregistered() {
788 let mut r = make_reader(10);
789 let w = single_writer_guid().entity_id;
790 let rd = r.guid().entity_id;
791 let delivered = r.handle_data(&lifecycle_data(
792 w,
793 rd,
794 1,
795 [0xCD; 16],
796 crate::inline_qos::status_info::UNREGISTERED,
797 ));
798 assert_eq!(delivered.len(), 1);
799 assert_eq!(delivered[0].kind, ChangeKind::NotAliveUnregistered);
800 }
801
802 #[test]
803 fn dispose_and_unregister_combined() {
804 let mut r = make_reader(10);
805 let w = single_writer_guid().entity_id;
806 let rd = r.guid().entity_id;
807 let bits =
808 crate::inline_qos::status_info::DISPOSED | crate::inline_qos::status_info::UNREGISTERED;
809 let delivered = r.handle_data(&lifecycle_data(w, rd, 1, [0xEF; 16], bits));
810 assert_eq!(delivered.len(), 1);
811 assert_eq!(delivered[0].kind, ChangeKind::NotAliveDisposedUnregistered);
812 }
813
814 #[test]
815 fn key_flag_without_status_info_falls_back_to_alive() {
816 let mut r = make_reader(10);
819 let w = single_writer_guid().entity_id;
820 let rd = r.guid().entity_id;
821 let mut d = data(w, rd, 1, 0xAA);
822 d.key_flag = true;
823 let delivered = r.handle_data(&d);
824 assert_eq!(delivered.len(), 1);
825 assert_eq!(delivered[0].kind, ChangeKind::Alive);
826 }
827
828 #[test]
829 fn heartbeat_with_missing_triggers_acknack_after_delay() {
830 let mut r = make_reader(10);
831 let w = single_writer_guid().entity_id;
832 let rd = r.guid().entity_id;
833 r.handle_heartbeat(&heartbeat(w, rd, 1, 3, 1, false), Duration::ZERO);
834 assert!(r.tick(Duration::from_millis(100)).unwrap().is_empty());
835 let out = r.tick(Duration::from_millis(250)).unwrap();
836 assert_eq!(out.len(), 1);
837 }
838
839 #[test]
840 fn heartbeat_without_missing_and_final_schedules_no_acknack() {
841 let mut r = make_reader(10);
842 let w = single_writer_guid().entity_id;
843 let rd = r.guid().entity_id;
844 r.handle_data(&data(w, rd, 1, 0xAA));
845 r.handle_heartbeat(&heartbeat(w, rd, 1, 1, 1, true), Duration::ZERO);
846 assert!(r.tick(Duration::from_secs(10)).unwrap().is_empty());
847 }
848
849 fn second_writer_guid() -> Guid {
852 Guid::new(
853 GuidPrefix::from_bytes([3; 12]),
854 EntityId::user_writer_with_key([0x40, 0x50, 0x60]),
855 )
856 }
857
858 fn add_second_writer(r: &mut ReliableReader) {
859 r.add_writer_proxy(WriterProxy::new(
860 second_writer_guid(),
861 alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7420)],
862 alloc::vec![],
863 true,
864 ));
865 }
866
867 #[test]
868 fn add_writer_proxy_increases_count() {
869 let mut r = make_reader(10);
870 add_second_writer(&mut r);
871 assert_eq!(r.writer_proxy_count(), 2);
872 }
873
874 #[test]
875 fn two_writers_with_overlapping_sn_spaces_both_delivered() {
876 let mut r = make_reader(10);
879 add_second_writer(&mut r);
880 let w1 = single_writer_guid().entity_id;
881 let w2 = second_writer_guid().entity_id;
882 let rd = r.guid().entity_id;
883
884 let d1 = r.handle_data(&data(w1, rd, 1, 0xAA));
885 let d2 = r.handle_data(&data(w2, rd, 1, 0xBB));
886
887 assert_eq!(d1.len(), 1);
888 assert_eq!(d1[0].payload.as_ref(), &[0xAA][..]);
889 assert_eq!(d1[0].writer_guid, single_writer_guid());
890 assert_eq!(d2.len(), 1);
891 assert_eq!(d2[0].payload.as_ref(), &[0xBB][..]);
892 assert_eq!(d2[0].writer_guid, second_writer_guid());
893
894 assert_eq!(r.writer_proxies()[0].delivered_up_to, sn(1));
895 assert_eq!(r.writer_proxies()[1].delivered_up_to, sn(1));
896 }
897
898 #[test]
899 fn remove_writer_proxy_drops_its_state() {
900 let mut r = make_reader(10);
901 add_second_writer(&mut r);
902 let removed = r.remove_writer_proxy(single_writer_guid());
903 assert!(removed.is_some());
904 assert_eq!(r.writer_proxy_count(), 1);
905 assert_eq!(
906 r.writer_proxies()[0].proxy.remote_writer_guid,
907 second_writer_guid()
908 );
909 }
910
911 #[test]
912 fn tick_emits_one_acknack_per_writer_with_missing() {
913 let mut r = make_reader(10);
914 add_second_writer(&mut r);
915 let rd = r.guid().entity_id;
916 r.handle_heartbeat(
918 &heartbeat(single_writer_guid().entity_id, rd, 1, 3, 1, false),
919 Duration::ZERO,
920 );
921 r.handle_heartbeat(
922 &heartbeat(second_writer_guid().entity_id, rd, 1, 5, 1, false),
923 Duration::ZERO,
924 );
925 let out = r.tick(Duration::from_millis(250)).unwrap();
926 assert_eq!(out.len(), 2);
928 }
929
930 #[test]
937 fn pre_emptive_acknack_emitted_after_add_writer_proxy() {
938 let reader_guid = Guid::new(
939 GuidPrefix::from_bytes([2; 12]),
940 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
941 );
942 let mut r = ReliableReader::new(ReliableReaderConfig {
943 guid: reader_guid,
944 vendor_id: VendorId::ZERODDS,
945 writer_proxies: alloc::vec![],
946 max_samples_per_proxy: 10,
947 heartbeat_response_delay: Duration::from_millis(200),
948 assembler_caps: AssemblerCaps::default(),
949 });
950 r.add_writer_proxy(WriterProxy::new(
951 single_writer_guid(),
952 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
953 alloc::vec![],
954 true,
955 ));
956 let out = r.tick(Duration::from_millis(250)).unwrap();
959 assert_eq!(out.len(), 1, "exactly one Pre-Emptive ACKNACK expected");
960 let parsed = decode_datagram(&out[0]).unwrap();
961 let ack = parsed
962 .submessages
963 .iter()
964 .find_map(|s| {
965 if let ParsedSubmessage::AckNack(a) = s {
966 Some(a)
967 } else {
968 None
969 }
970 })
971 .expect("ACKNACK in datagram");
972 assert_eq!(ack.reader_sn_state.bitmap_base, sn(1));
973 assert_eq!(ack.reader_sn_state.num_bits, 0);
974 assert!(
975 !ack.final_flag,
976 "Pre-Emptive ACKNACK must be non-final (force HB-response)"
977 );
978 }
979
980 #[test]
983 fn no_pre_emptive_acknack_without_proxy() {
984 let reader_guid = Guid::new(
985 GuidPrefix::from_bytes([2; 12]),
986 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
987 );
988 let mut r = ReliableReader::new(ReliableReaderConfig {
989 guid: reader_guid,
990 vendor_id: VendorId::ZERODDS,
991 writer_proxies: alloc::vec![],
992 max_samples_per_proxy: 10,
993 heartbeat_response_delay: Duration::from_millis(200),
994 assembler_caps: AssemblerCaps::default(),
995 });
996 assert!(r.tick(Duration::from_secs(10)).unwrap().is_empty());
998 }
999
1000 #[test]
1005 fn initial_proxy_from_config_does_not_send_pre_emptive() {
1006 let mut r = make_reader(10);
1008 assert!(
1010 r.tick(Duration::from_secs(10)).unwrap().is_empty(),
1011 "initial proxy from config must not emit Pre-Emptive"
1012 );
1013 }
1014
1015 #[test]
1016 fn pre_emptive_acknack_carries_info_dst() {
1017 let reader_guid = Guid::new(
1021 GuidPrefix::from_bytes([2; 12]),
1022 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
1023 );
1024 let mut r = ReliableReader::new(ReliableReaderConfig {
1025 guid: reader_guid,
1026 vendor_id: VendorId::ZERODDS,
1027 writer_proxies: alloc::vec![],
1028 max_samples_per_proxy: 10,
1029 heartbeat_response_delay: Duration::from_millis(200),
1030 assembler_caps: AssemblerCaps::default(),
1031 });
1032 r.add_writer_proxy(WriterProxy::new(
1033 single_writer_guid(),
1034 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
1035 alloc::vec![],
1036 true,
1037 ));
1038 let out = r.tick(Duration::from_millis(250)).unwrap();
1039 assert_eq!(out.len(), 1);
1040 let parsed = decode_datagram(&out[0]).unwrap();
1041 assert!(parsed.submessages.len() >= 2, "INFO_DST + ACKNACK");
1044 match &parsed.submessages[0] {
1045 ParsedSubmessage::Unknown { id, .. } => assert_eq!(*id, 0x0E),
1046 other => panic!("expected INFO_DST first, got {other:?}"),
1047 }
1048 }
1049
1050 #[test]
1051 fn unknown_writer_id_in_heartbeat_counts_not_crashes() {
1052 let mut r = make_reader(10);
1053 let rd = r.guid().entity_id;
1054 let foreign = EntityId::user_writer_with_key([0xFF, 0xFF, 0xFF]);
1055 r.handle_heartbeat(&heartbeat(foreign, rd, 1, 3, 1, false), Duration::ZERO);
1056 assert_eq!(r.unknown_src_count(), 1);
1057 assert!(r.tick(Duration::from_secs(1)).unwrap().is_empty());
1058 }
1059}