Skip to main content

zerodds_rtps/
receiver_state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Receiver-State (DDSI-RTPS 2.5 §8.3.4 + §8.3.7.4).
4//!
5//! Beim Empfang einer RTPS-Message haelt der Receiver einen Zustand
6//! mit:
7//!
8//! ```text
9//!   sourceVersion        — ProtocolVersion aus RTPS-Header
10//!   sourceVendorId       — VendorId aus RTPS-Header
11//!   sourceGuidPrefix     — GuidPrefix des Senders
12//!   destGuidPrefix       — GuidPrefix des Receivers selbst
13//!   unicastReplyLocators
14//!   multicastReplyLocators
15//!   haveTimestamp        — true wenn InfoTimestamp/HE.W gesehen
16//!   timestamp            — letzte gesehene Sender-Wallclock
17//!   messageLength        — falls vom HE-L-Flag deklariert
18//!   messageChecksum      — falls vom HE-C-Feld deklariert
19//!   parameters           — falls vom HE-P-Feld deklariert
20//!   clockSkewDetected    — Heuristik: |timestamp - now| ueber Schwelle
21//! ```
22//!
23//! Update-Trigger:
24//!
25//! - **InfoSource** (§8.3.8.9.4): setzt
26//!   `sourceVersion`, `sourceVendorId`, `sourceGuidPrefix` auf die in
27//!   der InfoSource-Submessage gegebenen Werte; `haveTimestamp = false`
28//!   und Reply-Locator-Listen werden auf `LOCATOR_INVALID` resettet.
29//! - **InfoTimestamp** (§8.3.8.5.4): setzt `haveTimestamp = true`
30//!   bzw. = false bei `I-Flag = 1`, plus `timestamp = …`.
31//! - **HeaderExtension** (§8.3.7.4): kombiniert mehrere Wirkungen — L-
32//!   Flag aktualisiert `messageLength`; W-Flag setzt
33//!   `haveTimestamp = true` + `timestamp`; C-Flag aktualisiert
34//!   `messageChecksum`; P-Flag aktualisiert `parameters`.
35//!
36//! Der Receiver-State ist pro RTPS-Message kurzlebig: Vor jedem
37//! `decode_datagram` wird er auf den Defaultwert plus `destGuidPrefix`
38//! initialisiert.
39
40extern crate alloc;
41use alloc::vec::Vec;
42
43use crate::header::RtpsHeader;
44use crate::header_extension::{ChecksumValue, HeTimestamp, HeaderExtension};
45use crate::parameter_list::ParameterList;
46use crate::wire_types::{GuidPrefix, Locator, ProtocolVersion, VendorId};
47
48/// Receiver-State entsprechend Spec-Tabelle §8.3.4 und Update-Regeln
49/// in §8.3.7.4.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct ReceiverState {
52    /// ProtocolVersion aus RTPS-Header (oder von InfoSource ueberschrieben).
53    pub source_version: ProtocolVersion,
54    /// VendorId aus RTPS-Header (oder von InfoSource ueberschrieben).
55    pub source_vendor_id: VendorId,
56    /// GuidPrefix des Senders (RTPS-Header oder InfoSource).
57    pub source_guid_prefix: GuidPrefix,
58    /// GuidPrefix des Receivers (Konfigurations-Wert, fix).
59    pub dest_guid_prefix: GuidPrefix,
60    /// `true` wenn der Receiver einen Sender-Timestamp hat.
61    pub have_timestamp: bool,
62    /// Letzter Sender-Timestamp (gueltig wenn `have_timestamp`).
63    pub timestamp: HeTimestamp,
64    /// Gesetzt durch HE.L — Soll-Restlaenge der RTPS-Message.
65    pub message_length: Option<u32>,
66    /// Gesetzt durch HE.C — Soll-Checksum der RTPS-Message.
67    pub message_checksum: ChecksumValue,
68    /// Gesetzt durch HE.P — ParameterList aus dem HE.
69    pub parameters: Option<ParameterList>,
70    /// Reply-Locator-Listen (Default `LOCATOR_INVALID`-Listen, von
71    /// InfoReply ueberschreibbar).
72    pub unicast_reply_locator_list: Vec<Locator>,
73    /// Reply-Locator-Listen (Default `LOCATOR_INVALID`-Listen, von
74    /// InfoReply ueberschreibbar).
75    pub multicast_reply_locator_list: Vec<Locator>,
76    /// Heuristik-Flag: `|timestamp - now| > Schwelle`. Wird von
77    /// `note_clock_skew` gesetzt; das Decode-Modul liefert nur die
78    /// Eingangsdaten.
79    pub clock_skew_detected: bool,
80}
81
82impl ReceiverState {
83    /// Initial-Zustand vor Empfang einer Message: alle Felder auf
84    /// Spec-Defaults, `dest_guid_prefix` aus dem Receiver-Konfig
85    /// uebernommen.
86    #[must_use]
87    pub fn new(dest_guid_prefix: GuidPrefix) -> Self {
88        Self {
89            source_version: ProtocolVersion::V2_5,
90            source_vendor_id: VendorId([0, 0]),
91            source_guid_prefix: GuidPrefix::from_bytes([0; 12]),
92            dest_guid_prefix,
93            have_timestamp: false,
94            timestamp: HeTimestamp::default(),
95            message_length: None,
96            message_checksum: ChecksumValue::None,
97            parameters: None,
98            unicast_reply_locator_list: Vec::new(),
99            multicast_reply_locator_list: Vec::new(),
100            clock_skew_detected: false,
101        }
102    }
103
104    /// Initialisiert aus einem `RtpsHeader` (Spec §8.3.4.1).
105    pub fn init_from_header(&mut self, header: &RtpsHeader) {
106        self.source_version = header.protocol_version;
107        self.source_vendor_id = header.vendor_id;
108        self.source_guid_prefix = header.guid_prefix;
109        // Reply-Locator-Listen + haveTimestamp resetten:
110        self.unicast_reply_locator_list.clear();
111        self.multicast_reply_locator_list.clear();
112        self.have_timestamp = false;
113    }
114
115    /// Update aus einer InfoSource-Submessage (§8.3.8.9.4).
116    ///
117    /// > "An InfoSource Submessage MUST set the receiver's source
118    /// >  GuidPrefix, source ProtocolVersion, source VendorId, and MUST
119    /// >  reset haveTimestamp = false and the reply-locator-lists to
120    /// >  LOCATOR_INVALID."
121    pub fn apply_info_source(
122        &mut self,
123        version: ProtocolVersion,
124        vendor_id: VendorId,
125        guid_prefix: GuidPrefix,
126    ) {
127        self.source_version = version;
128        self.source_vendor_id = vendor_id;
129        self.source_guid_prefix = guid_prefix;
130        self.have_timestamp = false;
131        self.unicast_reply_locator_list.clear();
132        self.multicast_reply_locator_list.clear();
133    }
134
135    /// Update aus InfoTimestamp (§8.3.8.5.4). `invalidate = true` (also
136    /// das I-Flag in der Submessage) loescht den Timestamp.
137    pub fn apply_info_timestamp(&mut self, ts: HeTimestamp, invalidate: bool) {
138        if invalidate {
139            self.have_timestamp = false;
140        } else {
141            self.have_timestamp = true;
142            self.timestamp = ts;
143        }
144    }
145
146    /// Update aus InfoReply (§8.3.8.10.4): setzt die beiden Reply-
147    /// Locator-Listen.
148    pub fn apply_info_reply(&mut self, unicast: Vec<Locator>, multicast: Option<Vec<Locator>>) {
149        self.unicast_reply_locator_list = unicast;
150        if let Some(m) = multicast {
151            self.multicast_reply_locator_list = m;
152        }
153    }
154
155    /// Update aus HeaderExtension (§8.3.7.4). Aktualisiert je nach
156    /// gesetzten Flags `messageLength`, `timestamp`, `messageChecksum`
157    /// und `parameters`.
158    pub fn apply_header_extension(&mut self, he: &HeaderExtension) {
159        if let Some(len) = he.message_length {
160            self.message_length = Some(len);
161        }
162        if let Some(ts) = he.timestamp {
163            self.have_timestamp = true;
164            self.timestamp = ts;
165        }
166        if !matches!(he.checksum, ChecksumValue::None) {
167            self.message_checksum = he.checksum.clone();
168        }
169        if let Some(pl) = &he.parameters {
170            self.parameters = Some(pl.clone());
171        }
172    }
173
174    /// Setzt das `clock_skew_detected`-Flag, wenn der gegebene
175    /// `now`-Sekunden-Wert mehr als `threshold_seconds` vom Sender-
176    /// Timestamp abweicht. No-op wenn `!have_timestamp`.
177    pub fn note_clock_skew(&mut self, now_seconds: i32, threshold_seconds: u32) {
178        if !self.have_timestamp {
179            return;
180        }
181        let diff = (now_seconds as i64).saturating_sub(self.timestamp.seconds as i64);
182        if diff.unsigned_abs() > u64::from(threshold_seconds) {
183            self.clock_skew_detected = true;
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    #![allow(clippy::expect_used, clippy::unwrap_used)]
191    use super::*;
192    use crate::header_extension::ChecksumValue;
193    use alloc::vec;
194
195    fn dummy_prefix(byte: u8) -> GuidPrefix {
196        GuidPrefix::from_bytes([byte; 12])
197    }
198
199    #[test]
200    fn new_state_has_default_fields() {
201        let st = ReceiverState::new(dummy_prefix(7));
202        assert!(!st.have_timestamp);
203        assert_eq!(st.dest_guid_prefix, dummy_prefix(7));
204        assert!(matches!(st.message_checksum, ChecksumValue::None));
205        assert!(st.message_length.is_none());
206        assert!(!st.clock_skew_detected);
207    }
208
209    #[test]
210    fn init_from_header_overrides_source_fields() {
211        let mut st = ReceiverState::new(dummy_prefix(0));
212        let h = RtpsHeader::new(VendorId::ZERODDS, dummy_prefix(0xAB));
213        st.init_from_header(&h);
214        assert_eq!(st.source_vendor_id, VendorId::ZERODDS);
215        assert_eq!(st.source_guid_prefix, dummy_prefix(0xAB));
216    }
217
218    #[test]
219    fn apply_info_source_resets_reply_locators_and_timestamp() {
220        let mut st = ReceiverState::new(dummy_prefix(0));
221        st.have_timestamp = true;
222        st.unicast_reply_locator_list.push(Locator::INVALID);
223        st.apply_info_source(
224            ProtocolVersion { major: 2, minor: 5 },
225            VendorId([0x42, 0x42]),
226            dummy_prefix(0x99),
227        );
228        assert_eq!(st.source_version, ProtocolVersion { major: 2, minor: 5 });
229        assert_eq!(st.source_vendor_id, VendorId([0x42, 0x42]));
230        assert_eq!(st.source_guid_prefix, dummy_prefix(0x99));
231        assert!(!st.have_timestamp);
232        assert!(st.unicast_reply_locator_list.is_empty());
233    }
234
235    #[test]
236    fn apply_info_timestamp_sets_value() {
237        let mut st = ReceiverState::new(dummy_prefix(0));
238        st.apply_info_timestamp(
239            HeTimestamp {
240                seconds: 100,
241                fraction: 200,
242            },
243            false,
244        );
245        assert!(st.have_timestamp);
246        assert_eq!(st.timestamp.seconds, 100);
247        assert_eq!(st.timestamp.fraction, 200);
248    }
249
250    #[test]
251    fn apply_info_timestamp_with_invalidate_clears() {
252        let mut st = ReceiverState::new(dummy_prefix(0));
253        st.have_timestamp = true;
254        st.apply_info_timestamp(HeTimestamp::default(), true);
255        assert!(!st.have_timestamp);
256    }
257
258    #[test]
259    fn apply_info_reply_sets_locators() {
260        let mut st = ReceiverState::new(dummy_prefix(0));
261        let uni = vec![Locator::INVALID];
262        let multi = vec![Locator::INVALID, Locator::INVALID];
263        st.apply_info_reply(uni.clone(), Some(multi.clone()));
264        assert_eq!(st.unicast_reply_locator_list, uni);
265        assert_eq!(st.multicast_reply_locator_list, multi);
266    }
267
268    #[test]
269    fn apply_header_extension_updates_fields() {
270        let mut st = ReceiverState::new(dummy_prefix(0));
271        let he = HeaderExtension {
272            little_endian: true,
273            message_length: Some(99),
274            timestamp: Some(HeTimestamp {
275                seconds: 1,
276                fraction: 2,
277            }),
278            checksum: ChecksumValue::Crc32c(0xCAFE),
279            ..HeaderExtension::default()
280        };
281        st.apply_header_extension(&he);
282        assert_eq!(st.message_length, Some(99));
283        assert!(st.have_timestamp);
284        assert_eq!(st.timestamp.seconds, 1);
285        assert!(matches!(st.message_checksum, ChecksumValue::Crc32c(0xCAFE)));
286    }
287
288    #[test]
289    fn apply_header_extension_with_parameters_sets_pl() {
290        let mut st = ReceiverState::new(dummy_prefix(0));
291        let pl = ParameterList::new();
292        let he = HeaderExtension {
293            little_endian: true,
294            parameters: Some(pl.clone()),
295            ..HeaderExtension::default()
296        };
297        st.apply_header_extension(&he);
298        assert_eq!(st.parameters, Some(pl));
299    }
300
301    #[test]
302    fn note_clock_skew_skipped_without_timestamp() {
303        let mut st = ReceiverState::new(dummy_prefix(0));
304        st.note_clock_skew(1_000_000, 5);
305        assert!(!st.clock_skew_detected);
306    }
307
308    #[test]
309    fn note_clock_skew_within_threshold_does_not_flag() {
310        let mut st = ReceiverState::new(dummy_prefix(0));
311        st.have_timestamp = true;
312        st.timestamp = HeTimestamp {
313            seconds: 100,
314            fraction: 0,
315        };
316        st.note_clock_skew(102, 5); // diff 2s, threshold 5s
317        assert!(!st.clock_skew_detected);
318    }
319
320    #[test]
321    fn note_clock_skew_above_threshold_flags() {
322        let mut st = ReceiverState::new(dummy_prefix(0));
323        st.have_timestamp = true;
324        st.timestamp = HeTimestamp {
325            seconds: 100,
326            fraction: 0,
327        };
328        st.note_clock_skew(200, 5); // diff 100s, threshold 5s
329        assert!(st.clock_skew_detected);
330    }
331}