Skip to main content

zerodds_rtps/
writer_proxy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `WriterProxy` — Reader-seitiger Zustand ueber **einen** Remote-Writer.
4//!
5//! DDSI-RTPS 2.5 §8.4.6.5 (Stateful Reader behavior). Der Reader fuehrt
6//! pro matched Writer einen `WriterProxy`, in dem er die Range
7//! `[first_available_sn, last_available_sn]` aus HEARTBEATs mitverfolgt,
8//! bereits empfangene SNs markiert und fehlende als **missing** erkennt.
9//! Die missing-Menge speist den AckNack-Bitmap.
10//!
11//! ein Reader hat aktuell einen Writer (Single-Writer-
12//! Annahme).
13
14extern crate alloc;
15use alloc::collections::BTreeSet;
16use alloc::vec::Vec;
17
18use crate::wire_types::{Guid, Locator, SequenceNumber};
19
20/// Reader-seitiger State fuer einen Remote-Writer.
21#[derive(Debug, Clone)]
22pub struct WriterProxy {
23    /// GUID des Remote-Writer-Endpoints.
24    pub remote_writer_guid: Guid,
25    /// Unicast-Locators des Writers (z.B. fuer gerichtete Re-Sends).
26    pub unicast_locators: Vec<Locator>,
27    /// Multicast-Locators.
28    pub multicast_locators: Vec<Locator>,
29    /// Reliable-Kind.
30    pub is_reliable: bool,
31    /// Kleinste SN, die der Writer **noch** im Cache haelt (aus HEARTBEAT.first_sn).
32    first_available_sn: SequenceNumber,
33    /// Groesste SN, die der Writer annonciert hat (aus HEARTBEAT.last_sn).
34    last_available_sn: SequenceNumber,
35    /// Hoechste SN, die dieser Reader tatsaechlich **empfangen** hat.
36    highest_received_sn: SequenceNumber,
37    /// Bereits empfangene SNs (fuer Dup-Rejection + in-order Delivery).
38    received: BTreeSet<SequenceNumber>,
39    /// Von GAP-Submessages als irrelevant markierte SNs.
40    irrelevant: BTreeSet<SequenceNumber>,
41}
42
43impl WriterProxy {
44    /// Erzeugt einen frischen Proxy.
45    #[must_use]
46    pub fn new(
47        remote_writer_guid: Guid,
48        unicast_locators: Vec<Locator>,
49        multicast_locators: Vec<Locator>,
50        is_reliable: bool,
51    ) -> Self {
52        Self {
53            remote_writer_guid,
54            unicast_locators,
55            multicast_locators,
56            is_reliable,
57            first_available_sn: SequenceNumber(1),
58            last_available_sn: SequenceNumber(0),
59            highest_received_sn: SequenceNumber(0),
60            received: BTreeSet::new(),
61            irrelevant: BTreeSet::new(),
62        }
63    }
64
65    /// Verarbeitet einen HEARTBEAT.
66    ///
67    /// Gemaess §8.4.15: `first_sn` ist die kleinste SN, die der Writer
68    /// re-liefern kann; `last_sn` die groesste annoncierte.
69    pub fn update_from_heartbeat(&mut self, first_sn: SequenceNumber, last_sn: SequenceNumber) {
70        // Monoton wachsende Bounds.
71        if first_sn > self.first_available_sn {
72            self.first_available_sn = first_sn;
73            // SNs, die **vor** first_sn liegen, sind verloren und koennen
74            // nicht mehr angefragt werden — aus received/irrelevant werfen
75            // wir sie; sie werden auch nicht mehr missing sein.
76            let split = self.received.split_off(&first_sn);
77            self.received = split;
78            let split = self.irrelevant.split_off(&first_sn);
79            self.irrelevant = split;
80        }
81        if last_sn > self.last_available_sn {
82            self.last_available_sn = last_sn;
83        }
84    }
85
86    /// Markiert eine SN als empfangen.
87    pub fn received_change_set(&mut self, sn: SequenceNumber) {
88        if sn < self.first_available_sn {
89            // Liegt vor dem annoncierten Range — ignorieren.
90            return;
91        }
92        self.received.insert(sn);
93        if sn > self.highest_received_sn {
94            self.highest_received_sn = sn;
95        }
96    }
97
98    /// Markiert eine SN als irrelevant (per GAP).
99    pub fn irrelevant_change_set(&mut self, sn: SequenceNumber) {
100        if sn < self.first_available_sn {
101            return;
102        }
103        self.irrelevant.insert(sn);
104    }
105
106    /// True wenn SN bereits empfangen oder als irrelevant markiert.
107    #[must_use]
108    pub fn is_known(&self, sn: SequenceNumber) -> bool {
109        self.received.contains(&sn) || self.irrelevant.contains(&sn)
110    }
111
112    /// Liefert alle **fehlenden** SNs (weder empfangen noch irrelevant) im
113    /// Bereich `[first_available_sn, last_available_sn]`.
114    ///
115    /// Vektor ist nach SN aufsteigend sortiert. Begrenzt auf `max_count`
116    /// Eintraege — der erwartete RTPS-Bitmap-Window ist 256 SNs.
117    #[must_use]
118    pub fn missing_changes(&self, max_count: usize) -> Vec<SequenceNumber> {
119        let mut out = Vec::new();
120        if self.last_available_sn < self.first_available_sn {
121            return out;
122        }
123        let mut sn = self.first_available_sn;
124        while sn <= self.last_available_sn && out.len() < max_count {
125            if !self.is_known(sn) {
126                out.push(sn);
127            }
128            sn = SequenceNumber(sn.0 + 1);
129        }
130        out
131    }
132
133    /// True wenn fehlende SNs vorhanden sind.
134    #[must_use]
135    pub fn has_missing_changes(&self) -> bool {
136        !self.missing_changes(1).is_empty()
137    }
138
139    /// Getter: kleinste annoncierte SN.
140    #[must_use]
141    pub fn first_available_sn(&self) -> SequenceNumber {
142        self.first_available_sn
143    }
144
145    /// Getter: groesste annoncierte SN.
146    #[must_use]
147    pub fn last_available_sn(&self) -> SequenceNumber {
148        self.last_available_sn
149    }
150
151    /// Getter: hoechste empfangene SN.
152    #[must_use]
153    pub fn highest_received_sn(&self) -> SequenceNumber {
154        self.highest_received_sn
155    }
156
157    /// Passender AckNack-Base: kleinste noch nicht acked SN.
158    ///
159    /// Convention: alle SN < `acknack_base` sind acked. Wir liefern
160    /// die kleinste noch-nicht-empfangene-oder-irrelevante SN in `[first, last+1]`.
161    #[must_use]
162    pub fn acknack_base(&self) -> SequenceNumber {
163        let mut sn = self.first_available_sn;
164        while sn <= self.last_available_sn {
165            if !self.is_known(sn) {
166                return sn;
167            }
168            sn = SequenceNumber(sn.0 + 1);
169        }
170        SequenceNumber(self.last_available_sn.0 + 1)
171    }
172}
173
174#[cfg(test)]
175#[allow(clippy::expect_used, clippy::unwrap_used)]
176mod tests {
177    use super::*;
178    use crate::wire_types::{EntityId, GuidPrefix};
179
180    fn sn(n: i64) -> SequenceNumber {
181        SequenceNumber(n)
182    }
183
184    fn proxy() -> WriterProxy {
185        let guid = Guid::new(
186            GuidPrefix::from_bytes([2; 12]),
187            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
188        );
189        WriterProxy::new(guid, alloc::vec![], alloc::vec![], true)
190    }
191
192    #[test]
193    fn fresh_proxy_has_no_missing() {
194        let p = proxy();
195        assert!(!p.has_missing_changes());
196        assert_eq!(p.missing_changes(10), alloc::vec![]);
197        assert_eq!(p.acknack_base(), sn(1));
198    }
199
200    #[test]
201    fn heartbeat_sets_available_range() {
202        let mut p = proxy();
203        p.update_from_heartbeat(sn(1), sn(5));
204        assert_eq!(p.first_available_sn(), sn(1));
205        assert_eq!(p.last_available_sn(), sn(5));
206        // Noch nichts empfangen → alles missing
207        assert_eq!(
208            p.missing_changes(10),
209            alloc::vec![sn(1), sn(2), sn(3), sn(4), sn(5)]
210        );
211    }
212
213    #[test]
214    fn received_removes_from_missing() {
215        let mut p = proxy();
216        p.update_from_heartbeat(sn(1), sn(5));
217        p.received_change_set(sn(2));
218        p.received_change_set(sn(4));
219        assert_eq!(p.missing_changes(10), alloc::vec![sn(1), sn(3), sn(5)]);
220        assert_eq!(p.acknack_base(), sn(1));
221    }
222
223    #[test]
224    fn gap_marks_irrelevant() {
225        let mut p = proxy();
226        p.update_from_heartbeat(sn(1), sn(5));
227        p.irrelevant_change_set(sn(3));
228        assert_eq!(
229            p.missing_changes(10),
230            alloc::vec![sn(1), sn(2), sn(4), sn(5)]
231        );
232    }
233
234    #[test]
235    fn acknack_base_walks_up() {
236        let mut p = proxy();
237        p.update_from_heartbeat(sn(1), sn(3));
238        p.received_change_set(sn(1));
239        p.received_change_set(sn(2));
240        assert_eq!(p.acknack_base(), sn(3));
241        p.received_change_set(sn(3));
242        assert_eq!(p.acknack_base(), sn(4));
243    }
244
245    #[test]
246    fn heartbeat_advancing_first_prunes_old_state() {
247        let mut p = proxy();
248        p.update_from_heartbeat(sn(1), sn(10));
249        p.received_change_set(sn(3));
250        p.received_change_set(sn(7));
251        // Writer rotiert Cache → first jetzt bei 5
252        p.update_from_heartbeat(sn(5), sn(10));
253        assert_eq!(p.first_available_sn(), sn(5));
254        // sn(3) aus received entfernt, sn(7) bleibt
255        assert!(!p.is_known(sn(3)));
256        assert!(p.is_known(sn(7)));
257    }
258
259    #[test]
260    fn highest_received_tracks_max() {
261        let mut p = proxy();
262        p.update_from_heartbeat(sn(1), sn(10));
263        p.received_change_set(sn(3));
264        p.received_change_set(sn(7));
265        p.received_change_set(sn(5));
266        assert_eq!(p.highest_received_sn(), sn(7));
267    }
268
269    #[test]
270    fn received_before_first_is_ignored() {
271        let mut p = proxy();
272        p.update_from_heartbeat(sn(5), sn(10));
273        p.received_change_set(sn(2));
274        assert!(!p.is_known(sn(2)));
275        assert_eq!(p.highest_received_sn(), sn(0));
276    }
277
278    #[test]
279    fn missing_changes_respects_max_count() {
280        let mut p = proxy();
281        p.update_from_heartbeat(sn(1), sn(100));
282        let m = p.missing_changes(5);
283        assert_eq!(m, alloc::vec![sn(1), sn(2), sn(3), sn(4), sn(5)]);
284    }
285
286    #[test]
287    fn acknack_base_when_all_received_is_last_plus_one() {
288        let mut p = proxy();
289        p.update_from_heartbeat(sn(1), sn(3));
290        p.received_change_set(sn(1));
291        p.received_change_set(sn(2));
292        p.received_change_set(sn(3));
293        assert_eq!(p.acknack_base(), sn(4));
294        assert!(!p.has_missing_changes());
295    }
296}