1extern crate alloc;
41use alloc::vec::Vec;
42
43use crate::header::RtpsHeader;
44use crate::header_extension::{ChecksumValue, HeTimestamp, HeaderExtension};
45use crate::parameter_list::ParameterList;
46use crate::wire_types::{GuidPrefix, Locator, ProtocolVersion, VendorId};
47
48#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct ReceiverState {
52 pub source_version: ProtocolVersion,
54 pub source_vendor_id: VendorId,
56 pub source_guid_prefix: GuidPrefix,
58 pub dest_guid_prefix: GuidPrefix,
60 pub have_timestamp: bool,
62 pub timestamp: HeTimestamp,
64 pub message_length: Option<u32>,
66 pub message_checksum: ChecksumValue,
68 pub parameters: Option<ParameterList>,
70 pub unicast_reply_locator_list: Vec<Locator>,
73 pub multicast_reply_locator_list: Vec<Locator>,
76 pub clock_skew_detected: bool,
80}
81
82impl ReceiverState {
83 #[must_use]
87 pub fn new(dest_guid_prefix: GuidPrefix) -> Self {
88 Self {
89 source_version: ProtocolVersion::V2_5,
90 source_vendor_id: VendorId([0, 0]),
91 source_guid_prefix: GuidPrefix::from_bytes([0; 12]),
92 dest_guid_prefix,
93 have_timestamp: false,
94 timestamp: HeTimestamp::default(),
95 message_length: None,
96 message_checksum: ChecksumValue::None,
97 parameters: None,
98 unicast_reply_locator_list: Vec::new(),
99 multicast_reply_locator_list: Vec::new(),
100 clock_skew_detected: false,
101 }
102 }
103
104 pub fn init_from_header(&mut self, header: &RtpsHeader) {
106 self.source_version = header.protocol_version;
107 self.source_vendor_id = header.vendor_id;
108 self.source_guid_prefix = header.guid_prefix;
109 self.unicast_reply_locator_list.clear();
111 self.multicast_reply_locator_list.clear();
112 self.have_timestamp = false;
113 }
114
115 pub fn apply_info_source(
122 &mut self,
123 version: ProtocolVersion,
124 vendor_id: VendorId,
125 guid_prefix: GuidPrefix,
126 ) {
127 self.source_version = version;
128 self.source_vendor_id = vendor_id;
129 self.source_guid_prefix = guid_prefix;
130 self.have_timestamp = false;
131 self.unicast_reply_locator_list.clear();
132 self.multicast_reply_locator_list.clear();
133 }
134
135 pub fn apply_info_timestamp(&mut self, ts: HeTimestamp, invalidate: bool) {
138 if invalidate {
139 self.have_timestamp = false;
140 } else {
141 self.have_timestamp = true;
142 self.timestamp = ts;
143 }
144 }
145
146 pub fn apply_info_reply(&mut self, unicast: Vec<Locator>, multicast: Option<Vec<Locator>>) {
149 self.unicast_reply_locator_list = unicast;
150 if let Some(m) = multicast {
151 self.multicast_reply_locator_list = m;
152 }
153 }
154
155 pub fn apply_header_extension(&mut self, he: &HeaderExtension) {
159 if let Some(len) = he.message_length {
160 self.message_length = Some(len);
161 }
162 if let Some(ts) = he.timestamp {
163 self.have_timestamp = true;
164 self.timestamp = ts;
165 }
166 if !matches!(he.checksum, ChecksumValue::None) {
167 self.message_checksum = he.checksum.clone();
168 }
169 if let Some(pl) = &he.parameters {
170 self.parameters = Some(pl.clone());
171 }
172 }
173
174 pub fn note_clock_skew(&mut self, now_seconds: i32, threshold_seconds: u32) {
178 if !self.have_timestamp {
179 return;
180 }
181 let diff = (now_seconds as i64).saturating_sub(self.timestamp.seconds as i64);
182 if diff.unsigned_abs() > u64::from(threshold_seconds) {
183 self.clock_skew_detected = true;
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 #![allow(clippy::expect_used, clippy::unwrap_used)]
191 use super::*;
192 use crate::header_extension::ChecksumValue;
193 use alloc::vec;
194
195 fn dummy_prefix(byte: u8) -> GuidPrefix {
196 GuidPrefix::from_bytes([byte; 12])
197 }
198
199 #[test]
200 fn new_state_has_default_fields() {
201 let st = ReceiverState::new(dummy_prefix(7));
202 assert!(!st.have_timestamp);
203 assert_eq!(st.dest_guid_prefix, dummy_prefix(7));
204 assert!(matches!(st.message_checksum, ChecksumValue::None));
205 assert!(st.message_length.is_none());
206 assert!(!st.clock_skew_detected);
207 }
208
209 #[test]
210 fn init_from_header_overrides_source_fields() {
211 let mut st = ReceiverState::new(dummy_prefix(0));
212 let h = RtpsHeader::new(VendorId::ZERODDS, dummy_prefix(0xAB));
213 st.init_from_header(&h);
214 assert_eq!(st.source_vendor_id, VendorId::ZERODDS);
215 assert_eq!(st.source_guid_prefix, dummy_prefix(0xAB));
216 }
217
218 #[test]
219 fn apply_info_source_resets_reply_locators_and_timestamp() {
220 let mut st = ReceiverState::new(dummy_prefix(0));
221 st.have_timestamp = true;
222 st.unicast_reply_locator_list.push(Locator::INVALID);
223 st.apply_info_source(
224 ProtocolVersion { major: 2, minor: 5 },
225 VendorId([0x42, 0x42]),
226 dummy_prefix(0x99),
227 );
228 assert_eq!(st.source_version, ProtocolVersion { major: 2, minor: 5 });
229 assert_eq!(st.source_vendor_id, VendorId([0x42, 0x42]));
230 assert_eq!(st.source_guid_prefix, dummy_prefix(0x99));
231 assert!(!st.have_timestamp);
232 assert!(st.unicast_reply_locator_list.is_empty());
233 }
234
235 #[test]
236 fn apply_info_timestamp_sets_value() {
237 let mut st = ReceiverState::new(dummy_prefix(0));
238 st.apply_info_timestamp(
239 HeTimestamp {
240 seconds: 100,
241 fraction: 200,
242 },
243 false,
244 );
245 assert!(st.have_timestamp);
246 assert_eq!(st.timestamp.seconds, 100);
247 assert_eq!(st.timestamp.fraction, 200);
248 }
249
250 #[test]
251 fn apply_info_timestamp_with_invalidate_clears() {
252 let mut st = ReceiverState::new(dummy_prefix(0));
253 st.have_timestamp = true;
254 st.apply_info_timestamp(HeTimestamp::default(), true);
255 assert!(!st.have_timestamp);
256 }
257
258 #[test]
259 fn apply_info_reply_sets_locators() {
260 let mut st = ReceiverState::new(dummy_prefix(0));
261 let uni = vec![Locator::INVALID];
262 let multi = vec![Locator::INVALID, Locator::INVALID];
263 st.apply_info_reply(uni.clone(), Some(multi.clone()));
264 assert_eq!(st.unicast_reply_locator_list, uni);
265 assert_eq!(st.multicast_reply_locator_list, multi);
266 }
267
268 #[test]
269 fn apply_header_extension_updates_fields() {
270 let mut st = ReceiverState::new(dummy_prefix(0));
271 let he = HeaderExtension {
272 little_endian: true,
273 message_length: Some(99),
274 timestamp: Some(HeTimestamp {
275 seconds: 1,
276 fraction: 2,
277 }),
278 checksum: ChecksumValue::Crc32c(0xCAFE),
279 ..HeaderExtension::default()
280 };
281 st.apply_header_extension(&he);
282 assert_eq!(st.message_length, Some(99));
283 assert!(st.have_timestamp);
284 assert_eq!(st.timestamp.seconds, 1);
285 assert!(matches!(st.message_checksum, ChecksumValue::Crc32c(0xCAFE)));
286 }
287
288 #[test]
289 fn apply_header_extension_with_parameters_sets_pl() {
290 let mut st = ReceiverState::new(dummy_prefix(0));
291 let pl = ParameterList::new();
292 let he = HeaderExtension {
293 little_endian: true,
294 parameters: Some(pl.clone()),
295 ..HeaderExtension::default()
296 };
297 st.apply_header_extension(&he);
298 assert_eq!(st.parameters, Some(pl));
299 }
300
301 #[test]
302 fn note_clock_skew_skipped_without_timestamp() {
303 let mut st = ReceiverState::new(dummy_prefix(0));
304 st.note_clock_skew(1_000_000, 5);
305 assert!(!st.clock_skew_detected);
306 }
307
308 #[test]
309 fn note_clock_skew_within_threshold_does_not_flag() {
310 let mut st = ReceiverState::new(dummy_prefix(0));
311 st.have_timestamp = true;
312 st.timestamp = HeTimestamp {
313 seconds: 100,
314 fraction: 0,
315 };
316 st.note_clock_skew(102, 5); assert!(!st.clock_skew_detected);
318 }
319
320 #[test]
321 fn note_clock_skew_above_threshold_flags() {
322 let mut st = ReceiverState::new(dummy_prefix(0));
323 st.have_timestamp = true;
324 st.timestamp = HeTimestamp {
325 seconds: 100,
326 fraction: 0,
327 };
328 st.note_clock_skew(200, 5); assert!(st.clock_skew_detected);
330 }
331}