1extern crate alloc;
39use alloc::collections::BTreeSet;
40use alloc::vec::Vec;
41use core::time::Duration;
42
43use crate::error::WireError;
44use crate::header::RtpsHeader;
45use crate::history_cache::{CacheChange, ChangeKind, HistoryCache, HistoryKind};
46use crate::message_builder::OutboundDatagram;
47use crate::submessages::{AckNackSubmessage, DataSubmessage, HeartbeatSubmessage};
48use crate::wire_types::{EntityId, Guid, GuidPrefix, Locator, SequenceNumber, VendorId};
49
50pub struct ReliableStatelessWriter {
52 guid: Guid,
54 vendor_id: VendorId,
56 cache: HistoryCache,
58 next_sn: i64,
60 locators: Vec<Locator>,
62 heartbeat_count: u32,
64 requested: BTreeSet<SequenceNumber>,
66 lowest_unacked: i64,
69 heartbeat_period: Duration,
71 last_heartbeat: Duration,
73 max_per_tick: usize,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
79pub struct ReliableStatelessStats {
80 pub cached_changes: usize,
82 pub pending_retransmits: usize,
84 pub lowest_unacked: i64,
86 pub heartbeat_count: u32,
88}
89
90impl ReliableStatelessWriter {
91 #[must_use]
93 pub fn new(
94 prefix: GuidPrefix,
95 entity_id: EntityId,
96 vendor_id: VendorId,
97 history: HistoryKind,
98 capacity: usize,
99 heartbeat_period: Duration,
100 ) -> Self {
101 Self {
102 guid: Guid::new(prefix, entity_id),
103 vendor_id,
104 cache: HistoryCache::new_with_kind(history, capacity),
105 next_sn: 1,
106 locators: Vec::new(),
107 heartbeat_count: 0,
108 requested: BTreeSet::new(),
109 lowest_unacked: 0,
110 heartbeat_period,
111 last_heartbeat: Duration::ZERO,
112 max_per_tick: 16,
113 }
114 }
115
116 #[must_use]
118 pub fn guid(&self) -> Guid {
119 self.guid
120 }
121
122 pub fn set_locators(&mut self, locators: Vec<Locator>) {
124 self.locators = locators;
125 }
126
127 pub fn set_max_per_tick(&mut self, n: usize) {
129 self.max_per_tick = n;
130 }
131
132 pub fn new_change(
138 &mut self,
139 kind: ChangeKind,
140 payload: Vec<u8>,
141 ) -> Result<SequenceNumber, WireError> {
142 let sn = SequenceNumber(self.next_sn);
143 self.next_sn = self
144 .next_sn
145 .checked_add(1)
146 .ok_or(WireError::ValueOutOfRange {
147 message: "reliable stateless writer SN overflow",
148 })?;
149 let change = match kind {
150 ChangeKind::Alive => CacheChange::alive(sn, payload),
151 other => {
152 let mut c = CacheChange::alive(sn, payload);
155 c.kind = other;
156 c
157 }
158 };
159 self.cache
160 .insert(change)
161 .map_err(|_| WireError::ValueOutOfRange {
162 message: "reliable stateless writer cache full",
163 })?;
164 Ok(sn)
165 }
166
167 pub fn handle_acknack(&mut self, ack: &AckNackSubmessage) {
171 let base = ack.reader_sn_state.bitmap_base.0;
172 if base > self.lowest_unacked {
173 self.lowest_unacked = base;
174 self.requested.retain(|sn| sn.0 >= base);
176 }
177 for sn in ack.reader_sn_state.iter_set() {
178 self.requested.insert(sn);
179 }
180 }
181
182 #[must_use]
185 pub fn is_acked_to(&self, sn: SequenceNumber) -> bool {
186 sn.0 < self.lowest_unacked
187 }
188
189 pub fn purge_acked(&mut self) -> usize {
192 if self.lowest_unacked <= 1 {
193 return 0;
194 }
195 let cutoff = SequenceNumber(self.lowest_unacked - 1);
196 self.cache.remove_up_to(cutoff)
197 }
198
199 pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
208 use alloc::rc::Rc;
209 let mut out = Vec::new();
210 let targets = Rc::new(self.locators.clone());
211 let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
212 let mut sent = 0usize;
213
214 let retransmits: Vec<SequenceNumber> = self
216 .requested
217 .iter()
218 .take(self.max_per_tick)
219 .copied()
220 .collect();
221 for sn in &retransmits {
222 if let Some(change) = self.cache.get(*sn) {
223 let data = DataSubmessage {
224 extra_flags: 0,
225 reader_id: EntityId::UNKNOWN, writer_id: self.guid.entity_id,
227 writer_sn: *sn,
228 inline_qos: None,
229 key_flag: false,
230 non_standard_flag: false,
231 serialized_payload: alloc::sync::Arc::clone(&change.payload),
232 };
233 let bytes = crate::datagram::encode_data_datagram(header, &[data])?;
234 out.push(OutboundDatagram {
235 bytes,
236 targets: Rc::clone(&targets),
237 });
238 sent += 1;
239 }
240 self.requested.remove(sn);
241 if sent >= self.max_per_tick {
242 break;
243 }
244 }
245
246 if now >= self.last_heartbeat + self.heartbeat_period && !self.cache.is_empty() {
248 self.last_heartbeat = now;
249 self.heartbeat_count = self.heartbeat_count.wrapping_add(1);
250 let first = self
251 .cache
252 .min_sn()
253 .unwrap_or(SequenceNumber(self.lowest_unacked));
254 let last = self
255 .cache
256 .max_sn()
257 .unwrap_or(SequenceNumber(self.next_sn - 1));
258 let hb = HeartbeatSubmessage {
259 reader_id: EntityId::UNKNOWN,
260 writer_id: self.guid.entity_id,
261 first_sn: first,
262 last_sn: last,
263 count: self.heartbeat_count as i32,
264 final_flag: false,
265 liveliness_flag: false,
266 group_info: None,
267 };
268 let (body, flags) = hb.write_body(true);
269 let sh = crate::submessage_header::SubmessageHeader {
270 submessage_id: crate::submessage_header::SubmessageId::Heartbeat,
271 flags,
272 octets_to_next_header: body.len() as u16,
273 };
274 let mut bytes = header.to_bytes().to_vec();
275 bytes.extend_from_slice(&sh.to_bytes());
276 bytes.extend_from_slice(&body);
277 out.push(OutboundDatagram {
278 bytes,
279 targets: Rc::clone(&targets),
280 });
281 }
282
283 Ok(out)
284 }
285
286 pub fn shutdown(&mut self) {
288 if let Some(max) = self.cache.max_sn() {
289 let _ = self.cache.remove_up_to(max);
290 }
291 self.requested.clear();
292 }
293
294 #[must_use]
296 pub fn stats(&self) -> ReliableStatelessStats {
297 ReliableStatelessStats {
298 cached_changes: self.cache.len(),
299 pending_retransmits: self.requested.len(),
300 lowest_unacked: self.lowest_unacked,
301 heartbeat_count: self.heartbeat_count,
302 }
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 #![allow(clippy::expect_used, clippy::unwrap_used)]
309 use super::*;
310 use crate::submessages::SequenceNumberSet;
311
312 fn make_writer() -> ReliableStatelessWriter {
313 ReliableStatelessWriter::new(
314 GuidPrefix::from_bytes([1; 12]),
315 EntityId::user_writer_with_key([1, 2, 3]),
316 VendorId::ZERODDS,
317 HistoryKind::KeepAll,
318 32,
319 Duration::from_millis(100),
320 )
321 }
322
323 #[test]
324 fn new_change_assigns_monotonic_sn_t1() {
325 let mut w = make_writer();
326 let sn1 = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
327 let sn2 = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
328 let sn3 = w.new_change(ChangeKind::Alive, alloc::vec![3]).unwrap();
329 assert_eq!(sn1.0, 1);
330 assert_eq!(sn2.0, 2);
331 assert_eq!(sn3.0, 3);
332 }
333
334 #[test]
335 fn handle_acknack_advances_lowest_unacked_t4() {
336 let mut w = make_writer();
337 let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
338 let _ = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
339 let ack = AckNackSubmessage {
340 reader_id: EntityId::UNKNOWN,
341 writer_id: w.guid.entity_id,
342 reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(2), &[]),
343 count: 1,
344 final_flag: true,
345 };
346 w.handle_acknack(&ack);
347 assert_eq!(w.stats().lowest_unacked, 2);
348 }
349
350 #[test]
351 fn handle_acknack_only_advances_t4_once_acked_always_acked() {
352 let mut w = make_writer();
353 let ack_high = AckNackSubmessage {
354 reader_id: EntityId::UNKNOWN,
355 writer_id: w.guid.entity_id,
356 reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(10), &[]),
357 count: 1,
358 final_flag: true,
359 };
360 w.handle_acknack(&ack_high);
361 let ack_low = AckNackSubmessage {
362 reader_id: EntityId::UNKNOWN,
363 writer_id: w.guid.entity_id,
364 reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(3), &[]),
365 count: 2,
366 final_flag: true,
367 };
368 w.handle_acknack(&ack_low);
370 assert_eq!(w.stats().lowest_unacked, 10);
371 }
372
373 #[test]
374 fn handle_acknack_with_requested_bits_queues_retransmits_t6() {
375 let mut w = make_writer();
376 let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
377 let _ = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
378 let _ = w.new_change(ChangeKind::Alive, alloc::vec![3]).unwrap();
379 let ack = AckNackSubmessage {
380 reader_id: EntityId::UNKNOWN,
381 writer_id: w.guid.entity_id,
382 reader_sn_state: SequenceNumberSet::from_missing(
383 SequenceNumber(1),
384 &[SequenceNumber(2), SequenceNumber(3)],
385 ),
386 count: 1,
387 final_flag: false,
388 };
389 w.handle_acknack(&ack);
390 assert_eq!(w.stats().pending_retransmits, 2);
391 }
392
393 #[test]
394 fn is_acked_to_t7() {
395 let mut w = make_writer();
396 let ack = AckNackSubmessage {
397 reader_id: EntityId::UNKNOWN,
398 writer_id: w.guid.entity_id,
399 reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(5), &[]),
400 count: 1,
401 final_flag: true,
402 };
403 w.handle_acknack(&ack);
404 assert!(w.is_acked_to(SequenceNumber(4)));
405 assert!(w.is_acked_to(SequenceNumber(1)));
406 assert!(!w.is_acked_to(SequenceNumber(5)));
407 }
408
409 #[test]
410 fn purge_acked_t8_removes_acked_samples() {
411 let mut w = make_writer();
412 for i in 1..=5 {
413 let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
414 }
415 let ack = AckNackSubmessage {
416 reader_id: EntityId::UNKNOWN,
417 writer_id: w.guid.entity_id,
418 reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(4), &[]),
419 count: 1,
420 final_flag: true,
421 };
422 w.handle_acknack(&ack);
423 let purged = w.purge_acked();
424 assert_eq!(purged, 3);
425 assert_eq!(w.stats().cached_changes, 2);
426 }
427
428 #[test]
429 fn tick_emits_heartbeat_t3() {
430 let mut w = make_writer();
431 let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
432 w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
433 let datagrams = w.tick(Duration::from_millis(150)).unwrap();
434 assert!(!datagrams.is_empty(), "tick should emit HB");
435 assert_eq!(w.stats().heartbeat_count, 1);
436 }
437
438 #[test]
439 fn tick_does_not_emit_heartbeat_when_cache_empty() {
440 let mut w = make_writer();
441 w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
442 let datagrams = w.tick(Duration::from_millis(150)).unwrap();
443 assert!(datagrams.is_empty(), "empty cache → no HB");
444 }
445
446 #[test]
447 fn tick_emits_retransmits_for_requested_sns_t6() {
448 let mut w = make_writer();
449 for i in 1..=3 {
450 let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
451 }
452 w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
453 let ack = AckNackSubmessage {
454 reader_id: EntityId::UNKNOWN,
455 writer_id: w.guid.entity_id,
456 reader_sn_state: SequenceNumberSet::from_missing(
457 SequenceNumber(1),
458 &[SequenceNumber(2), SequenceNumber(3)],
459 ),
460 count: 1,
461 final_flag: false,
462 };
463 w.handle_acknack(&ack);
464 let datagrams = w.tick(Duration::from_millis(0)).unwrap();
465 assert_eq!(datagrams.len(), 2);
467 assert_eq!(w.stats().pending_retransmits, 0);
468 }
469
470 #[test]
471 fn tick_caps_retransmits_at_max_per_tick() {
472 let mut w = make_writer();
473 for i in 1..=5 {
474 let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
475 }
476 w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
477 w.set_max_per_tick(2);
478 let ack = AckNackSubmessage {
479 reader_id: EntityId::UNKNOWN,
480 writer_id: w.guid.entity_id,
481 reader_sn_state: SequenceNumberSet::from_missing(
482 SequenceNumber(1),
483 &[
484 SequenceNumber(2),
485 SequenceNumber(3),
486 SequenceNumber(4),
487 SequenceNumber(5),
488 ],
489 ),
490 count: 1,
491 final_flag: false,
492 };
493 w.handle_acknack(&ack);
494 let datagrams = w.tick(Duration::from_millis(0)).unwrap();
495 assert!(datagrams.len() <= 2, "max_per_tick cap respected");
496 assert!(w.stats().pending_retransmits >= 2, "rest stays queued");
497 }
498
499 #[test]
500 fn shutdown_clears_state_t10() {
501 let mut w = make_writer();
502 for i in 1..=3 {
503 let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
504 }
505 w.shutdown();
506 assert_eq!(w.stats().cached_changes, 0);
507 assert_eq!(w.stats().pending_retransmits, 0);
508 }
509
510 #[test]
511 fn set_locators_t11_replaces_list() {
512 let mut w = make_writer();
513 w.set_locators(alloc::vec![Locator::udp_v4([1, 1, 1, 1], 100)]);
514 w.set_locators(alloc::vec![Locator::udp_v4([2, 2, 2, 2], 200)]);
515 let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
517 let datagrams = w.tick(Duration::from_millis(150)).unwrap();
518 assert!(!datagrams.is_empty());
519 assert_eq!(datagrams[0].targets.len(), 1);
520 }
521
522 #[test]
523 fn heartbeat_count_wraps_at_u32_max_t3_modular() {
524 let mut w = make_writer();
527 w.heartbeat_count = u32::MAX - 1;
528 let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
529 w.set_locators(alloc::vec![Locator::udp_v4([1, 2, 3, 4], 7400)]);
530 let _ = w.tick(Duration::from_millis(150)).unwrap();
531 assert_eq!(w.stats().heartbeat_count, u32::MAX);
532 w.last_heartbeat = Duration::ZERO;
534 let _ = w.tick(Duration::from_millis(150)).unwrap();
535 assert_eq!(w.stats().heartbeat_count, 0);
537 }
538
539 #[test]
540 fn stats_snapshot_t12() {
541 let mut w = make_writer();
542 for i in 1..=4 {
543 let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
544 }
545 let s = w.stats();
546 assert_eq!(s.cached_changes, 4);
547 assert_eq!(s.pending_retransmits, 0);
548 assert_eq!(s.lowest_unacked, 0);
549 assert_eq!(s.heartbeat_count, 0);
550 }
551}