Skip to main content

zerodds_rtps/
reader_proxy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `ReaderProxy` — Writer-seitiger Zustand ueber **einen** Remote-Reader.
4//!
5//! DDSI-RTPS 2.5 §8.4.4.11 (Stateful Writer behavior). Der Writer fuehrt
6//! pro matched Reader einen `ReaderProxy`, in dem er mitverfolgt, welche
7//! Sequence-Numbers der Reader bereits acked hat und welche er explizit
8//! re-requested hat.
9//!
10//! ein Writer hat aktuell nur einen Reader (Single-Reader-
11//! Annahme). Trotzdem ist die Datenstruktur so geschnitten, dass spaeter
12//! `Vec<ReaderProxy>` moeglich ist.
13
14extern crate alloc;
15use alloc::collections::{BTreeMap, BTreeSet};
16use alloc::vec::Vec;
17
18use crate::wire_types::{FragmentNumber, Guid, Locator, SequenceNumber};
19
20/// Writer-seitiger State fuer einen Remote-Reader.
21#[derive(Debug, Clone)]
22pub struct ReaderProxy {
23    /// GUID des Remote-Reader-Endpoints.
24    pub remote_reader_guid: Guid,
25    /// Unicast-Empfangs-Locator(s) des Readers.
26    pub unicast_locators: Vec<Locator>,
27    /// Multicast-Empfangs-Locator(s).
28    pub multicast_locators: Vec<Locator>,
29    /// Reliable-Kind (immer true in WP 1.1).
30    pub is_reliable: bool,
31    /// Hoechste SN, die der Reader **bereits acked** hat
32    /// (aus AckNack.reader_sn_state.bitmap_base - 1).
33    highest_acked_sn: SequenceNumber,
34    /// Hoechste SN, die der Writer an diesen Reader **bereits gesendet** hat.
35    highest_sent_sn: SequenceNumber,
36    /// Set von Requested SNs aus AckNack.bitmap, fuer Re-Send vorgemerkt.
37    requested_changes: BTreeSet<SequenceNumber>,
38    /// Pro Sample-SN: Set fehlender FragmentNumbers, die der Reader via
39    /// NACK_FRAG angefragt hat. Fuer Fragment-granulare Re-Sends.
40    requested_fragments: BTreeMap<SequenceNumber, BTreeSet<FragmentNumber>>,
41    /// Spec §8.4.15.6 Inactive-Reader-Reclaim: letzte beobachtete
42    /// Reader-Aktivitaet (eingehender ACKNACK / NACK_FRAG). Der Writer
43    /// ruft `note_activity(now)` aus dem ACKNACK-Pfad. Wenn
44    /// `now - last_activity > inactive_threshold`, kann der Writer den
45    /// Proxy via `is_inactive` als reclaim-Kandidat erkennen, um
46    /// Strict-Reliability nicht den Cache OOM laufen zu lassen.
47    last_activity: core::time::Duration,
48    /// XTypes 1.3 §7.6.3.1 — pro-Reader ausgehandeltes Wire-Format.
49    /// Default `XCDR2` (=2). Bei Match wird das Field via
50    /// `data_representation::negotiate(writer_offered, reader_accepted, mode)`
51    /// gesetzt, sonst bleibt es Default. Encap-Header bei sample-write
52    /// nutzt `data_representation::encap_for_final_le(this)`.
53    negotiated_data_representation: i16,
54}
55
56impl ReaderProxy {
57    /// Erzeugt einen frischen Proxy.
58    #[must_use]
59    pub fn new(
60        remote_reader_guid: Guid,
61        unicast_locators: Vec<Locator>,
62        multicast_locators: Vec<Locator>,
63        is_reliable: bool,
64    ) -> Self {
65        Self {
66            remote_reader_guid,
67            unicast_locators,
68            multicast_locators,
69            is_reliable,
70            // Pre-existing state: nichts acked, nichts gesendet. SN beginnt
71            // bei 1; "0 acked" heisst "nichts acked" (§8.7.4).
72            highest_acked_sn: SequenceNumber(0),
73            highest_sent_sn: SequenceNumber(0),
74            requested_changes: BTreeSet::new(),
75            requested_fragments: BTreeMap::new(),
76            last_activity: core::time::Duration::ZERO,
77            // Default: XCDR2 (modern). SEDP-Match-Pfad ueberschreibt
78            // mit dem ausgehandelten Wert.
79            negotiated_data_representation: crate::publication_data::data_representation::XCDR2,
80        }
81    }
82
83    /// Setzt das ausgehandelte Wire-Format fuer diesen Reader.
84    /// Wird vom DCPS-SEDP-Match-Pfad nach `negotiate(...)` aufgerufen.
85    pub fn set_negotiated_data_representation(&mut self, id: i16) {
86        self.negotiated_data_representation = id;
87    }
88
89    /// Liefert das ausgehandelte Wire-Format.
90    #[must_use]
91    pub fn negotiated_data_representation(&self) -> i16 {
92        self.negotiated_data_representation
93    }
94
95    /// Spec §8.4.15.6 — markiert eingehende Reader-Aktivitaet (jeder
96    /// ACKNACK / NACK_FRAG ruft das aus dem Receiver-Pfad).
97    pub fn note_activity(&mut self, now: core::time::Duration) {
98        self.last_activity = now;
99    }
100
101    /// Spec §8.4.15.6 — `true` wenn der Reader laenger als `threshold`
102    /// keine Aktivitaet gezeigt hat. Caller (z.B. ReliableWriter)
103    /// nutzt das, um den Proxy aus der `matched_readers`-Liste zu
104    /// reclaimen, sodass Strict-Reliability nicht den Cache OOM
105    /// laufen laesst.
106    #[must_use]
107    pub fn is_inactive(&self, now: core::time::Duration, threshold: core::time::Duration) -> bool {
108        now.checked_sub(self.last_activity)
109            .is_some_and(|elapsed| elapsed > threshold)
110    }
111
112    /// Liefert den Last-Activity-Zeitpunkt (Diagnose).
113    #[must_use]
114    pub fn last_activity(&self) -> core::time::Duration {
115        self.last_activity
116    }
117
118    /// Markiert Samples bis zu und einschliesslich `sn` als "nicht
119    /// mehr relevant" fuer diesen Proxy — sowohl gesendet als auch
120    /// acked. Wird z.B. bei Volatile-Durability aufgerufen, wenn ein
121    /// neuer Reader-Proxy hinzukommt: der soll keine Historic-Samples
122    /// bekommen, also springen wir direkt auf den aktuellen Cache-
123    /// Stand.
124    ///
125    /// Spec-Bezug: OMG DDS 1.4 §2.2.3.4 DurabilityQosPolicy Volatile:
126    /// "The Service will not attempt to retain old data beyond what is
127    /// currently held by the DataWriter for live Readers".
128    pub fn skip_samples_up_to(&mut self, sn: SequenceNumber) {
129        if sn > self.highest_sent_sn {
130            self.highest_sent_sn = sn;
131        }
132        if sn > self.highest_acked_sn {
133            self.highest_acked_sn = sn;
134        }
135    }
136
137    /// Aktualisiert auf ACKNACK-Base — Reader hat alle SNs < `base` acked.
138    /// `base` entspricht `reader_sn_state.bitmap_base`.
139    pub fn acked_changes_set(&mut self, base: SequenceNumber) {
140        let new_acked = SequenceNumber(base.0 - 1);
141        if new_acked > self.highest_acked_sn {
142            self.highest_acked_sn = new_acked;
143        }
144        // Bereits acked SNs aus requested entfernen.
145        self.requested_changes
146            .retain(|sn| *sn > self.highest_acked_sn);
147        // Analog fuer fragment-granulare Requests.
148        self.requested_fragments
149            .retain(|sn, _| *sn > self.highest_acked_sn);
150    }
151
152    /// Merkt sich die im ACKNACK-Bitmap angefragten SNs fuer Re-Send.
153    pub fn requested_changes_set(&mut self, sns: impl IntoIterator<Item = SequenceNumber>) {
154        for sn in sns {
155            if sn > self.highest_acked_sn {
156                self.requested_changes.insert(sn);
157            }
158        }
159    }
160
161    /// Zieht die kleinste offene Requested-SN und entfernt sie.
162    pub fn next_requested_change(&mut self) -> Option<SequenceNumber> {
163        let sn = *self.requested_changes.iter().next()?;
164        self.requested_changes.remove(&sn);
165        Some(sn)
166    }
167
168    /// Liefert die naechste noch nicht gesendete SN, falls im Cache vorhanden.
169    ///
170    /// `cache_max` ist die groesste SN, die aktuell im Writer-Cache liegt.
171    pub fn next_unsent_change(&mut self, cache_max: SequenceNumber) -> Option<SequenceNumber> {
172        if self.highest_sent_sn < cache_max {
173            let next = SequenceNumber(self.highest_sent_sn.0 + 1);
174            self.highest_sent_sn = next;
175            Some(next)
176        } else {
177            None
178        }
179    }
180
181    /// True wenn zwischen `highest_acked` und `cache_max` noch unbestaetigte
182    /// Samples liegen.
183    #[must_use]
184    pub fn unacked_changes(&self, cache_max: SequenceNumber) -> bool {
185        cache_max > self.highest_acked_sn
186    }
187
188    /// Getter fuer `highest_acked_sn`.
189    #[must_use]
190    pub fn highest_acked_sn(&self) -> SequenceNumber {
191        self.highest_acked_sn
192    }
193
194    /// Getter fuer `highest_sent_sn`.
195    #[must_use]
196    pub fn highest_sent_sn(&self) -> SequenceNumber {
197        self.highest_sent_sn
198    }
199
200    /// Anzahl vorgemerkter Resend-Requests.
201    #[must_use]
202    pub fn pending_requested_count(&self) -> usize {
203        self.requested_changes.len()
204    }
205
206    /// Merkt sich Fragment-granulare Resend-Requests aus einem NACK_FRAG.
207    /// SN-Werte ≤ `highest_acked_sn` werden ignoriert.
208    pub fn requested_fragments_set(
209        &mut self,
210        sn: SequenceNumber,
211        fragments: impl IntoIterator<Item = FragmentNumber>,
212    ) {
213        if sn <= self.highest_acked_sn {
214            return;
215        }
216        let entry = self.requested_fragments.entry(sn).or_default();
217        for f in fragments {
218            if f != FragmentNumber::UNKNOWN {
219                entry.insert(f);
220            }
221        }
222        if entry.is_empty() {
223            self.requested_fragments.remove(&sn);
224        }
225    }
226
227    /// Zieht das kleinste offene (SN, FragmentNumber)-Paar und entfernt es.
228    pub fn next_requested_fragment(&mut self) -> Option<(SequenceNumber, FragmentNumber)> {
229        let sn = *self.requested_fragments.keys().next()?;
230        let frag = {
231            let set = self.requested_fragments.get_mut(&sn)?;
232            let f = *set.iter().next()?;
233            set.remove(&f);
234            f
235        };
236        if self
237            .requested_fragments
238            .get(&sn)
239            .is_some_and(alloc::collections::BTreeSet::is_empty)
240        {
241            self.requested_fragments.remove(&sn);
242        }
243        Some((sn, frag))
244    }
245
246    /// Anzahl vorgemerkter Fragment-Resends (Summe ueber alle SNs).
247    #[must_use]
248    pub fn pending_requested_fragment_count(&self) -> usize {
249        self.requested_fragments.values().map(BTreeSet::len).sum()
250    }
251}
252
253#[cfg(test)]
254#[allow(clippy::expect_used, clippy::unwrap_used)]
255mod tests {
256    use super::*;
257    use crate::wire_types::{EntityId, GuidPrefix};
258
259    fn sn(n: i64) -> SequenceNumber {
260        SequenceNumber(n)
261    }
262
263    fn proxy() -> ReaderProxy {
264        let guid = Guid::new(
265            GuidPrefix::from_bytes([1; 12]),
266            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
267        );
268        ReaderProxy::new(guid, alloc::vec![], alloc::vec![], true)
269    }
270
271    #[test]
272    fn fresh_proxy_has_zero_state() {
273        let p = proxy();
274        assert_eq!(p.highest_acked_sn(), sn(0));
275        assert_eq!(p.highest_sent_sn(), sn(0));
276        assert_eq!(p.pending_requested_count(), 0);
277    }
278
279    #[test]
280    fn acked_changes_set_monotonic() {
281        let mut p = proxy();
282        p.acked_changes_set(sn(5));
283        assert_eq!(p.highest_acked_sn(), sn(4));
284        // Rueckwaerts-Acks werden ignoriert
285        p.acked_changes_set(sn(3));
286        assert_eq!(p.highest_acked_sn(), sn(4));
287        p.acked_changes_set(sn(10));
288        assert_eq!(p.highest_acked_sn(), sn(9));
289    }
290
291    #[test]
292    fn requested_changes_set_above_ack_only() {
293        let mut p = proxy();
294        p.acked_changes_set(sn(5)); // → highest_acked = 4
295        p.requested_changes_set([sn(2), sn(4), sn(6), sn(8)]);
296        // Nur SN > 4 ueberleben
297        assert_eq!(p.pending_requested_count(), 2);
298    }
299
300    #[test]
301    fn next_requested_change_pulls_smallest_first() {
302        let mut p = proxy();
303        p.requested_changes_set([sn(8), sn(3), sn(5)]);
304        assert_eq!(p.next_requested_change(), Some(sn(3)));
305        assert_eq!(p.next_requested_change(), Some(sn(5)));
306        assert_eq!(p.next_requested_change(), Some(sn(8)));
307        assert_eq!(p.next_requested_change(), None);
308    }
309
310    #[test]
311    fn next_unsent_change_walks_sequentially() {
312        let mut p = proxy();
313        let cache_max = sn(3);
314        assert_eq!(p.next_unsent_change(cache_max), Some(sn(1)));
315        assert_eq!(p.next_unsent_change(cache_max), Some(sn(2)));
316        assert_eq!(p.next_unsent_change(cache_max), Some(sn(3)));
317        assert_eq!(p.next_unsent_change(cache_max), None);
318    }
319
320    #[test]
321    fn next_unsent_change_picks_up_after_cache_grows() {
322        let mut p = proxy();
323        assert_eq!(p.next_unsent_change(sn(2)), Some(sn(1)));
324        assert_eq!(p.next_unsent_change(sn(2)), Some(sn(2)));
325        assert_eq!(p.next_unsent_change(sn(2)), None);
326        assert_eq!(p.next_unsent_change(sn(5)), Some(sn(3)));
327    }
328
329    #[test]
330    fn unacked_changes_detects_gap() {
331        let mut p = proxy();
332        assert!(!p.unacked_changes(sn(0)));
333        assert!(p.unacked_changes(sn(5)));
334        p.acked_changes_set(sn(6)); // → highest_acked = 5
335        assert!(!p.unacked_changes(sn(5)));
336        assert!(p.unacked_changes(sn(7)));
337    }
338
339    #[test]
340    fn acking_also_prunes_requested_changes() {
341        let mut p = proxy();
342        p.requested_changes_set([sn(3), sn(5), sn(7)]);
343        assert_eq!(p.pending_requested_count(), 3);
344        p.acked_changes_set(sn(6)); // → highest_acked = 5
345        // sn(3) und sn(5) sind jetzt obsolet
346        assert_eq!(p.pending_requested_count(), 1);
347        assert_eq!(p.next_requested_change(), Some(sn(7)));
348    }
349
350    fn frag(n: u32) -> FragmentNumber {
351        FragmentNumber(n)
352    }
353
354    #[test]
355    fn requested_fragments_set_above_ack_only() {
356        let mut p = proxy();
357        p.acked_changes_set(sn(3)); // → highest_acked = 2
358        p.requested_fragments_set(sn(2), [frag(1), frag(2)]);
359        p.requested_fragments_set(sn(5), [frag(1), frag(3)]);
360        assert_eq!(p.pending_requested_fragment_count(), 2);
361    }
362
363    #[test]
364    fn next_requested_fragment_pulls_smallest_sn_first() {
365        let mut p = proxy();
366        p.requested_fragments_set(sn(5), [frag(3), frag(1)]);
367        p.requested_fragments_set(sn(2), [frag(2)]);
368        assert_eq!(p.next_requested_fragment(), Some((sn(2), frag(2))));
369        assert_eq!(p.next_requested_fragment(), Some((sn(5), frag(1))));
370        assert_eq!(p.next_requested_fragment(), Some((sn(5), frag(3))));
371        assert_eq!(p.next_requested_fragment(), None);
372    }
373
374    #[test]
375    fn acking_also_prunes_requested_fragments() {
376        let mut p = proxy();
377        p.requested_fragments_set(sn(3), [frag(1)]);
378        p.requested_fragments_set(sn(7), [frag(2)]);
379        assert_eq!(p.pending_requested_fragment_count(), 2);
380        p.acked_changes_set(sn(5)); // → highest_acked = 4
381        // sn(3) ist obsolet
382        assert_eq!(p.pending_requested_fragment_count(), 1);
383        assert_eq!(p.next_requested_fragment(), Some((sn(7), frag(2))));
384    }
385
386    #[test]
387    fn requested_fragments_ignore_unknown_sentinel() {
388        let mut p = proxy();
389        p.requested_fragments_set(sn(1), [FragmentNumber::UNKNOWN, frag(1)]);
390        assert_eq!(p.pending_requested_fragment_count(), 1);
391    }
392
393    // ---- Spec §8.4.15.6 Inactive-Reader-Reclaim ----
394
395    #[test]
396    fn proxy_is_inactive_initially_when_threshold_is_short() {
397        // Initial last_activity = ZERO. Wenn der Writer mit
398        // now=10s + threshold=1s prueft, ist der Proxy inactive.
399        let p = proxy();
400        assert!(p.is_inactive(
401            core::time::Duration::from_secs(10),
402            core::time::Duration::from_secs(1)
403        ));
404    }
405
406    #[test]
407    fn proxy_is_active_after_note_activity() {
408        let mut p = proxy();
409        p.note_activity(core::time::Duration::from_secs(5));
410        assert_eq!(p.last_activity(), core::time::Duration::from_secs(5));
411        // Innerhalb des Schwellwerts ist der Proxy aktiv.
412        assert!(!p.is_inactive(
413            core::time::Duration::from_secs(6),
414            core::time::Duration::from_secs(2)
415        ));
416    }
417
418    #[test]
419    fn proxy_becomes_inactive_after_threshold_elapses() {
420        let mut p = proxy();
421        p.note_activity(core::time::Duration::from_secs(5));
422        // 10 Sekunden spaeter, threshold 2s → inaktiv.
423        assert!(p.is_inactive(
424            core::time::Duration::from_secs(15),
425            core::time::Duration::from_secs(2)
426        ));
427    }
428
429    #[test]
430    fn proxy_inactivity_not_reported_when_now_before_last_activity() {
431        // Edge case: now < last_activity (Clock-Skew o.ae.) → kein
432        // Inactive-Report.
433        let mut p = proxy();
434        p.note_activity(core::time::Duration::from_secs(100));
435        assert!(!p.is_inactive(
436            core::time::Duration::from_secs(50),
437            core::time::Duration::from_secs(1)
438        ));
439    }
440}