Skip to main content

zerodds_rtps/
reliable_stateless_writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `ReliableStatelessWriter` — DDSI-RTPS 2.5 §8.4.8.2 (Reliable
4//! StatelessWriter, T1-T12).
5//!
6//! Im Gegensatz zum StatefulWriter (`reliable_writer.rs`) traegt der
7//! StatelessWriter **keinen Per-Reader-Proxy-State**. Stattdessen
8//! existiert genau eine Liste von Reply-Locators (typisch Multicast)
9//! und ein einziger gemeinsamer Acked-State, der die
10//! `lowest_unacked_sn` aus dem Pool aller eingehenden ACKNACKs
11//! ableitet.
12//!
13//! Real-world Use-Cases sind selten — die DDSI-RTPS-Spec listet
14//! Reliable-Stateless als optional, und Cyclone DDS / FastDDS
15//! verwenden nur den Stateful-Pfad. ZeroDDS implementiert den
16//! Stateless-Reliable-Pfad fuer Spec-Vollstaendigkeit (K3b-D).
17//!
18//! # T1-T12 Transitions (Spec Tab. 8.46)
19//!
20//! Die Spec definiert eine 12-Zustands-Transition-Matrix. Wir mappen
21//! sie auf folgende API-Methoden:
22//!
23//! | T  | Trigger                            | API                         |
24//! |----|------------------------------------|-----------------------------|
25//! | T1 | new_change(kind, payload)          | [`Self::new_change`]        |
26//! | T2 | RESPONSIVE-Tick → DATA an alle     | [`Self::tick`] → DATA-Burst |
27//! | T3 | RESPONSIVE-Tick → HEARTBEAT        | [`Self::tick`] → HB         |
28//! | T4 | ACKNACK empfangen (FinalFlag=0)    | [`Self::handle_acknack`]    |
29//! | T5 | ACKNACK FinalFlag=1 (kein NACK)    | [`Self::handle_acknack`]    |
30//! | T6 | requested-Retransmit               | [`Self::tick`] (NACK-Drain) |
31//! | T7 | unsent_changes leer + ACKED-bound  | [`Self::is_acked_to`]       |
32//! | T8 | sample purge (Cache-LowWater)      | [`Self::purge_acked`]       |
33//! | T9 | new_change-Boundary (KeepLast)     | [`HistoryCache::insert`]    |
34//! | T10| Lease-Timeout / shutdown           | [`Self::shutdown`]          |
35//! | T11| Locator-List Update                | [`Self::set_locators`]      |
36//! | T12| Stats-Snapshot (Diagnose)          | [`Self::stats`]             |
37
38extern crate alloc;
39use alloc::collections::BTreeSet;
40use alloc::vec::Vec;
41use core::time::Duration;
42
43use crate::error::WireError;
44use crate::header::RtpsHeader;
45use crate::history_cache::{CacheChange, ChangeKind, HistoryCache, HistoryKind};
46use crate::message_builder::OutboundDatagram;
47use crate::submessages::{AckNackSubmessage, DataSubmessage, HeartbeatSubmessage};
48use crate::wire_types::{EntityId, Guid, GuidPrefix, Locator, SequenceNumber, VendorId};
49
50/// `ReliableStatelessWriter` — Spec §8.4.8.2.
51pub struct ReliableStatelessWriter {
52    /// Eigene GUID (Prefix + EntityId).
53    guid: Guid,
54    /// VendorId (RTPS-Header).
55    vendor_id: VendorId,
56    /// HistoryCache (KeepAll oder KeepLast — Spec laesst beides zu).
57    cache: HistoryCache,
58    /// Naechste Sequence-Number (writer-vergeben, monoton).
59    next_sn: i64,
60    /// Reply-Locator-Liste (typisch Multicast).
61    locators: Vec<Locator>,
62    /// Heartbeat-Counter (Spec §8.3.8.6 — wraps u32).
63    heartbeat_count: u32,
64    /// Pendings ACKNACK-Requested (gemeinsamer Pool aller Readers).
65    requested: BTreeSet<SequenceNumber>,
66    /// Niedrigste un-Acked SN (`min(base)` aller je gesehenen ACKNACKs).
67    /// `0` bedeutet: noch keine ACKNACK gesehen — keine Annahmen.
68    lowest_unacked: i64,
69    /// Heartbeat-Periode (Tick-getrieben).
70    heartbeat_period: Duration,
71    /// Letzter Heartbeat-Tick.
72    last_heartbeat: Duration,
73    /// Maximale Pakete-pro-Tick (DoS-Cap fuer Retransmits).
74    max_per_tick: usize,
75}
76
77/// Statistik-Snapshot (T12).
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
79pub struct ReliableStatelessStats {
80    /// Anzahl Cache-Eintraege.
81    pub cached_changes: usize,
82    /// Anzahl pendender Retransmits (`requested.len()`).
83    pub pending_retransmits: usize,
84    /// Aktueller `lowest_unacked` (0 = noch unbekannt).
85    pub lowest_unacked: i64,
86    /// Heartbeat-Counter (modulo-u32).
87    pub heartbeat_count: u32,
88}
89
90impl ReliableStatelessWriter {
91    /// Konstruktor.
92    #[must_use]
93    pub fn new(
94        prefix: GuidPrefix,
95        entity_id: EntityId,
96        vendor_id: VendorId,
97        history: HistoryKind,
98        capacity: usize,
99        heartbeat_period: Duration,
100    ) -> Self {
101        Self {
102            guid: Guid::new(prefix, entity_id),
103            vendor_id,
104            cache: HistoryCache::new_with_kind(history, capacity),
105            next_sn: 1,
106            locators: Vec::new(),
107            heartbeat_count: 0,
108            requested: BTreeSet::new(),
109            lowest_unacked: 0,
110            heartbeat_period,
111            last_heartbeat: Duration::ZERO,
112            max_per_tick: 16,
113        }
114    }
115
116    /// Eigene GUID.
117    #[must_use]
118    pub fn guid(&self) -> Guid {
119        self.guid
120    }
121
122    /// Setzt eine neue Locator-Liste (T11).
123    pub fn set_locators(&mut self, locators: Vec<Locator>) {
124        self.locators = locators;
125    }
126
127    /// Setzt das DoS-Cap fuer Retransmits-pro-Tick.
128    pub fn set_max_per_tick(&mut self, n: usize) {
129        self.max_per_tick = n;
130    }
131
132    /// T1 — `new_change`: legt einen Sample im Cache an. Liefert die
133    /// vergebene Sequence-Number.
134    ///
135    /// # Errors
136    /// `ValueOutOfRange` bei SN-Overflow oder Cache-Capacity-Limit.
137    pub fn new_change(
138        &mut self,
139        kind: ChangeKind,
140        payload: Vec<u8>,
141    ) -> Result<SequenceNumber, WireError> {
142        let sn = SequenceNumber(self.next_sn);
143        self.next_sn = self
144            .next_sn
145            .checked_add(1)
146            .ok_or(WireError::ValueOutOfRange {
147                message: "reliable stateless writer SN overflow",
148            })?;
149        let change = match kind {
150            ChangeKind::Alive => CacheChange::alive(sn, payload),
151            other => {
152                // Dispose/Unregister + Filtered nutzen denselben
153                // Konstruktor; Kind wird im Cache mit-bewahrt.
154                let mut c = CacheChange::alive(sn, payload);
155                c.kind = other;
156                c
157            }
158        };
159        self.cache
160            .insert(change)
161            .map_err(|_| WireError::ValueOutOfRange {
162                message: "reliable stateless writer cache full",
163            })?;
164        Ok(sn)
165    }
166
167    /// T4/T5 — verarbeitet eine eingehende ACKNACK. Aktualisiert
168    /// `lowest_unacked` (max der jemals gesehenen `base` — once-acked-
169    /// always-acked) und nimmt `requested`-SNs in den Retransmit-Pool.
170    pub fn handle_acknack(&mut self, ack: &AckNackSubmessage) {
171        let base = ack.reader_sn_state.bitmap_base.0;
172        if base > self.lowest_unacked {
173            self.lowest_unacked = base;
174            // Acked-SNs aus dem Retransmit-Pool entfernen (alles < base).
175            self.requested.retain(|sn| sn.0 >= base);
176        }
177        for sn in ack.reader_sn_state.iter_set() {
178            self.requested.insert(sn);
179        }
180    }
181
182    /// T7 — `is_acked_to(sn)`: alle SNs bis einschliesslich `sn` sind
183    /// von mindestens einem Reader bestaetigt.
184    #[must_use]
185    pub fn is_acked_to(&self, sn: SequenceNumber) -> bool {
186        sn.0 < self.lowest_unacked
187    }
188
189    /// T8 — purgt alle Cache-Eintraege, die `lowest_unacked - 1` oder
190    /// kleiner sind. Liefert die Anzahl entfernter Samples.
191    pub fn purge_acked(&mut self) -> usize {
192        if self.lowest_unacked <= 1 {
193            return 0;
194        }
195        let cutoff = SequenceNumber(self.lowest_unacked - 1);
196        self.cache.remove_up_to(cutoff)
197    }
198
199    /// T2/T3/T6 — Tick. Liefert eine Liste von Datagrams, die der
200    /// Caller via UDP an alle `locators` senden soll. Reihenfolge:
201    /// 1. Pending-Retransmits (max `max_per_tick`).
202    /// 2. Wenn `now >= last_heartbeat + heartbeat_period` UND Cache
203    ///    nicht leer: ein HEARTBEAT.
204    ///
205    /// # Errors
206    /// Wire-Encode-Fehler bei DATA/HEARTBEAT-Encoding.
207    pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
208        use alloc::rc::Rc;
209        let mut out = Vec::new();
210        let targets = Rc::new(self.locators.clone());
211        let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
212        let mut sent = 0usize;
213
214        // 1. Retransmits.
215        let retransmits: Vec<SequenceNumber> = self
216            .requested
217            .iter()
218            .take(self.max_per_tick)
219            .copied()
220            .collect();
221        for sn in &retransmits {
222            if let Some(change) = self.cache.get(*sn) {
223                let data = DataSubmessage {
224                    extra_flags: 0,
225                    reader_id: EntityId::UNKNOWN, // Stateless: an alle Reader.
226                    writer_id: self.guid.entity_id,
227                    writer_sn: *sn,
228                    inline_qos: None,
229                    key_flag: false,
230                    non_standard_flag: false,
231                    serialized_payload: alloc::sync::Arc::clone(&change.payload),
232                };
233                let bytes = crate::datagram::encode_data_datagram(header, &[data])?;
234                out.push(OutboundDatagram {
235                    bytes,
236                    targets: Rc::clone(&targets),
237                });
238                sent += 1;
239            }
240            self.requested.remove(sn);
241            if sent >= self.max_per_tick {
242                break;
243            }
244        }
245
246        // 2. Heartbeat-Tick.
247        if now >= self.last_heartbeat + self.heartbeat_period && !self.cache.is_empty() {
248            self.last_heartbeat = now;
249            self.heartbeat_count = self.heartbeat_count.wrapping_add(1);
250            let first = self
251                .cache
252                .min_sn()
253                .unwrap_or(SequenceNumber(self.lowest_unacked));
254            let last = self
255                .cache
256                .max_sn()
257                .unwrap_or(SequenceNumber(self.next_sn - 1));
258            let hb = HeartbeatSubmessage {
259                reader_id: EntityId::UNKNOWN,
260                writer_id: self.guid.entity_id,
261                first_sn: first,
262                last_sn: last,
263                count: self.heartbeat_count as i32,
264                final_flag: false,
265                liveliness_flag: false,
266                group_info: None,
267            };
268            let (body, flags) = hb.write_body(true);
269            let sh = crate::submessage_header::SubmessageHeader {
270                submessage_id: crate::submessage_header::SubmessageId::Heartbeat,
271                flags,
272                octets_to_next_header: body.len() as u16,
273            };
274            let mut bytes = header.to_bytes().to_vec();
275            bytes.extend_from_slice(&sh.to_bytes());
276            bytes.extend_from_slice(&body);
277            out.push(OutboundDatagram {
278                bytes,
279                targets: Rc::clone(&targets),
280            });
281        }
282
283        Ok(out)
284    }
285
286    /// T10 — Shutdown: leert den Cache + Retransmit-Pool.
287    pub fn shutdown(&mut self) {
288        if let Some(max) = self.cache.max_sn() {
289            let _ = self.cache.remove_up_to(max);
290        }
291        self.requested.clear();
292    }
293
294    /// T12 — Diagnose-Snapshot.
295    #[must_use]
296    pub fn stats(&self) -> ReliableStatelessStats {
297        ReliableStatelessStats {
298            cached_changes: self.cache.len(),
299            pending_retransmits: self.requested.len(),
300            lowest_unacked: self.lowest_unacked,
301            heartbeat_count: self.heartbeat_count,
302        }
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    #![allow(clippy::expect_used, clippy::unwrap_used)]
309    use super::*;
310    use crate::submessages::SequenceNumberSet;
311
312    fn make_writer() -> ReliableStatelessWriter {
313        ReliableStatelessWriter::new(
314            GuidPrefix::from_bytes([1; 12]),
315            EntityId::user_writer_with_key([1, 2, 3]),
316            VendorId::ZERODDS,
317            HistoryKind::KeepAll,
318            32,
319            Duration::from_millis(100),
320        )
321    }
322
323    #[test]
324    fn new_change_assigns_monotonic_sn_t1() {
325        let mut w = make_writer();
326        let sn1 = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
327        let sn2 = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
328        let sn3 = w.new_change(ChangeKind::Alive, alloc::vec![3]).unwrap();
329        assert_eq!(sn1.0, 1);
330        assert_eq!(sn2.0, 2);
331        assert_eq!(sn3.0, 3);
332    }
333
334    #[test]
335    fn handle_acknack_advances_lowest_unacked_t4() {
336        let mut w = make_writer();
337        let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
338        let _ = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
339        let ack = AckNackSubmessage {
340            reader_id: EntityId::UNKNOWN,
341            writer_id: w.guid.entity_id,
342            reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(2), &[]),
343            count: 1,
344            final_flag: true,
345        };
346        w.handle_acknack(&ack);
347        assert_eq!(w.stats().lowest_unacked, 2);
348    }
349
350    #[test]
351    fn handle_acknack_only_advances_t4_once_acked_always_acked() {
352        let mut w = make_writer();
353        let ack_high = AckNackSubmessage {
354            reader_id: EntityId::UNKNOWN,
355            writer_id: w.guid.entity_id,
356            reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(10), &[]),
357            count: 1,
358            final_flag: true,
359        };
360        w.handle_acknack(&ack_high);
361        let ack_low = AckNackSubmessage {
362            reader_id: EntityId::UNKNOWN,
363            writer_id: w.guid.entity_id,
364            reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(3), &[]),
365            count: 2,
366            final_flag: true,
367        };
368        // Niedriger ACKNACK darf nicht regredieren.
369        w.handle_acknack(&ack_low);
370        assert_eq!(w.stats().lowest_unacked, 10);
371    }
372
373    #[test]
374    fn handle_acknack_with_requested_bits_queues_retransmits_t6() {
375        let mut w = make_writer();
376        let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
377        let _ = w.new_change(ChangeKind::Alive, alloc::vec![2]).unwrap();
378        let _ = w.new_change(ChangeKind::Alive, alloc::vec![3]).unwrap();
379        let ack = AckNackSubmessage {
380            reader_id: EntityId::UNKNOWN,
381            writer_id: w.guid.entity_id,
382            reader_sn_state: SequenceNumberSet::from_missing(
383                SequenceNumber(1),
384                &[SequenceNumber(2), SequenceNumber(3)],
385            ),
386            count: 1,
387            final_flag: false,
388        };
389        w.handle_acknack(&ack);
390        assert_eq!(w.stats().pending_retransmits, 2);
391    }
392
393    #[test]
394    fn is_acked_to_t7() {
395        let mut w = make_writer();
396        let ack = AckNackSubmessage {
397            reader_id: EntityId::UNKNOWN,
398            writer_id: w.guid.entity_id,
399            reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(5), &[]),
400            count: 1,
401            final_flag: true,
402        };
403        w.handle_acknack(&ack);
404        assert!(w.is_acked_to(SequenceNumber(4)));
405        assert!(w.is_acked_to(SequenceNumber(1)));
406        assert!(!w.is_acked_to(SequenceNumber(5)));
407    }
408
409    #[test]
410    fn purge_acked_t8_removes_acked_samples() {
411        let mut w = make_writer();
412        for i in 1..=5 {
413            let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
414        }
415        let ack = AckNackSubmessage {
416            reader_id: EntityId::UNKNOWN,
417            writer_id: w.guid.entity_id,
418            reader_sn_state: SequenceNumberSet::from_missing(SequenceNumber(4), &[]),
419            count: 1,
420            final_flag: true,
421        };
422        w.handle_acknack(&ack);
423        let purged = w.purge_acked();
424        assert_eq!(purged, 3);
425        assert_eq!(w.stats().cached_changes, 2);
426    }
427
428    #[test]
429    fn tick_emits_heartbeat_t3() {
430        let mut w = make_writer();
431        let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
432        w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
433        let datagrams = w.tick(Duration::from_millis(150)).unwrap();
434        assert!(!datagrams.is_empty(), "tick should emit HB");
435        assert_eq!(w.stats().heartbeat_count, 1);
436    }
437
438    #[test]
439    fn tick_does_not_emit_heartbeat_when_cache_empty() {
440        let mut w = make_writer();
441        w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
442        let datagrams = w.tick(Duration::from_millis(150)).unwrap();
443        assert!(datagrams.is_empty(), "empty cache → no HB");
444    }
445
446    #[test]
447    fn tick_emits_retransmits_for_requested_sns_t6() {
448        let mut w = make_writer();
449        for i in 1..=3 {
450            let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
451        }
452        w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
453        let ack = AckNackSubmessage {
454            reader_id: EntityId::UNKNOWN,
455            writer_id: w.guid.entity_id,
456            reader_sn_state: SequenceNumberSet::from_missing(
457                SequenceNumber(1),
458                &[SequenceNumber(2), SequenceNumber(3)],
459            ),
460            count: 1,
461            final_flag: false,
462        };
463        w.handle_acknack(&ack);
464        let datagrams = w.tick(Duration::from_millis(0)).unwrap();
465        // 2 Retransmits — kein HB (Tick=0).
466        assert_eq!(datagrams.len(), 2);
467        assert_eq!(w.stats().pending_retransmits, 0);
468    }
469
470    #[test]
471    fn tick_caps_retransmits_at_max_per_tick() {
472        let mut w = make_writer();
473        for i in 1..=5 {
474            let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
475        }
476        w.set_locators(alloc::vec![Locator::udp_v4([10, 0, 0, 1], 7400)]);
477        w.set_max_per_tick(2);
478        let ack = AckNackSubmessage {
479            reader_id: EntityId::UNKNOWN,
480            writer_id: w.guid.entity_id,
481            reader_sn_state: SequenceNumberSet::from_missing(
482                SequenceNumber(1),
483                &[
484                    SequenceNumber(2),
485                    SequenceNumber(3),
486                    SequenceNumber(4),
487                    SequenceNumber(5),
488                ],
489            ),
490            count: 1,
491            final_flag: false,
492        };
493        w.handle_acknack(&ack);
494        let datagrams = w.tick(Duration::from_millis(0)).unwrap();
495        assert!(datagrams.len() <= 2, "max_per_tick cap respected");
496        assert!(w.stats().pending_retransmits >= 2, "rest stays queued");
497    }
498
499    #[test]
500    fn shutdown_clears_state_t10() {
501        let mut w = make_writer();
502        for i in 1..=3 {
503            let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
504        }
505        w.shutdown();
506        assert_eq!(w.stats().cached_changes, 0);
507        assert_eq!(w.stats().pending_retransmits, 0);
508    }
509
510    #[test]
511    fn set_locators_t11_replaces_list() {
512        let mut w = make_writer();
513        w.set_locators(alloc::vec![Locator::udp_v4([1, 1, 1, 1], 100)]);
514        w.set_locators(alloc::vec![Locator::udp_v4([2, 2, 2, 2], 200)]);
515        // Roundtrip: nur 1 Locator, der zweite.
516        let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
517        let datagrams = w.tick(Duration::from_millis(150)).unwrap();
518        assert!(!datagrams.is_empty());
519        assert_eq!(datagrams[0].targets.len(), 1);
520    }
521
522    #[test]
523    fn heartbeat_count_wraps_at_u32_max_t3_modular() {
524        // Spec §8.4.15.7: count modulo u32. Set heartbeat_count nahe
525        // u32::MAX und verifiziere wrap.
526        let mut w = make_writer();
527        w.heartbeat_count = u32::MAX - 1;
528        let _ = w.new_change(ChangeKind::Alive, alloc::vec![1]).unwrap();
529        w.set_locators(alloc::vec![Locator::udp_v4([1, 2, 3, 4], 7400)]);
530        let _ = w.tick(Duration::from_millis(150)).unwrap();
531        assert_eq!(w.stats().heartbeat_count, u32::MAX);
532        // Reset last_heartbeat damit der naechste Tick wieder feuert.
533        w.last_heartbeat = Duration::ZERO;
534        let _ = w.tick(Duration::from_millis(150)).unwrap();
535        // Wrap-around: u32::MAX + 1 → 0.
536        assert_eq!(w.stats().heartbeat_count, 0);
537    }
538
539    #[test]
540    fn stats_snapshot_t12() {
541        let mut w = make_writer();
542        for i in 1..=4 {
543            let _ = w.new_change(ChangeKind::Alive, alloc::vec![i]).unwrap();
544        }
545        let s = w.stats();
546        assert_eq!(s.cached_changes, 4);
547        assert_eq!(s.pending_retransmits, 0);
548        assert_eq!(s.lowest_unacked, 0);
549        assert_eq!(s.heartbeat_count, 0);
550    }
551}