extern crate alloc;
use alloc::vec::Vec;
use crate::header::RtpsHeader;
use crate::header_extension::{ChecksumValue, HeTimestamp, HeaderExtension};
use crate::parameter_list::ParameterList;
use crate::wire_types::{GuidPrefix, Locator, ProtocolVersion, VendorId};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceiverState {
pub source_version: ProtocolVersion,
pub source_vendor_id: VendorId,
pub source_guid_prefix: GuidPrefix,
pub dest_guid_prefix: GuidPrefix,
pub have_timestamp: bool,
pub timestamp: HeTimestamp,
pub message_length: Option<u32>,
pub message_checksum: ChecksumValue,
pub parameters: Option<ParameterList>,
pub unicast_reply_locator_list: Vec<Locator>,
pub multicast_reply_locator_list: Vec<Locator>,
pub clock_skew_detected: bool,
}
impl ReceiverState {
#[must_use]
pub fn new(dest_guid_prefix: GuidPrefix) -> Self {
Self {
source_version: ProtocolVersion::V2_5,
source_vendor_id: VendorId([0, 0]),
source_guid_prefix: GuidPrefix::from_bytes([0; 12]),
dest_guid_prefix,
have_timestamp: false,
timestamp: HeTimestamp::default(),
message_length: None,
message_checksum: ChecksumValue::None,
parameters: None,
unicast_reply_locator_list: Vec::new(),
multicast_reply_locator_list: Vec::new(),
clock_skew_detected: false,
}
}
pub fn init_from_header(&mut self, header: &RtpsHeader) {
self.source_version = header.protocol_version;
self.source_vendor_id = header.vendor_id;
self.source_guid_prefix = header.guid_prefix;
self.unicast_reply_locator_list.clear();
self.multicast_reply_locator_list.clear();
self.have_timestamp = false;
}
pub fn apply_info_source(
&mut self,
version: ProtocolVersion,
vendor_id: VendorId,
guid_prefix: GuidPrefix,
) {
self.source_version = version;
self.source_vendor_id = vendor_id;
self.source_guid_prefix = guid_prefix;
self.have_timestamp = false;
self.unicast_reply_locator_list.clear();
self.multicast_reply_locator_list.clear();
}
pub fn apply_info_timestamp(&mut self, ts: HeTimestamp, invalidate: bool) {
if invalidate {
self.have_timestamp = false;
} else {
self.have_timestamp = true;
self.timestamp = ts;
}
}
pub fn apply_info_reply(&mut self, unicast: Vec<Locator>, multicast: Option<Vec<Locator>>) {
self.unicast_reply_locator_list = unicast;
if let Some(m) = multicast {
self.multicast_reply_locator_list = m;
}
}
pub fn apply_header_extension(&mut self, he: &HeaderExtension) {
if let Some(len) = he.message_length {
self.message_length = Some(len);
}
if let Some(ts) = he.timestamp {
self.have_timestamp = true;
self.timestamp = ts;
}
if !matches!(he.checksum, ChecksumValue::None) {
self.message_checksum = he.checksum.clone();
}
if let Some(pl) = &he.parameters {
self.parameters = Some(pl.clone());
}
}
pub fn note_clock_skew(&mut self, now_seconds: i32, threshold_seconds: u32) {
if !self.have_timestamp {
return;
}
let diff = (now_seconds as i64).saturating_sub(self.timestamp.seconds as i64);
if diff.unsigned_abs() > u64::from(threshold_seconds) {
self.clock_skew_detected = true;
}
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
use crate::header_extension::ChecksumValue;
use alloc::vec;
fn dummy_prefix(byte: u8) -> GuidPrefix {
GuidPrefix::from_bytes([byte; 12])
}
#[test]
fn new_state_has_default_fields() {
let st = ReceiverState::new(dummy_prefix(7));
assert!(!st.have_timestamp);
assert_eq!(st.dest_guid_prefix, dummy_prefix(7));
assert!(matches!(st.message_checksum, ChecksumValue::None));
assert!(st.message_length.is_none());
assert!(!st.clock_skew_detected);
}
#[test]
fn init_from_header_overrides_source_fields() {
let mut st = ReceiverState::new(dummy_prefix(0));
let h = RtpsHeader::new(VendorId::ZERODDS, dummy_prefix(0xAB));
st.init_from_header(&h);
assert_eq!(st.source_vendor_id, VendorId::ZERODDS);
assert_eq!(st.source_guid_prefix, dummy_prefix(0xAB));
}
#[test]
fn apply_info_source_resets_reply_locators_and_timestamp() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.have_timestamp = true;
st.unicast_reply_locator_list.push(Locator::INVALID);
st.apply_info_source(
ProtocolVersion { major: 2, minor: 5 },
VendorId([0x42, 0x42]),
dummy_prefix(0x99),
);
assert_eq!(st.source_version, ProtocolVersion { major: 2, minor: 5 });
assert_eq!(st.source_vendor_id, VendorId([0x42, 0x42]));
assert_eq!(st.source_guid_prefix, dummy_prefix(0x99));
assert!(!st.have_timestamp);
assert!(st.unicast_reply_locator_list.is_empty());
}
#[test]
fn apply_info_timestamp_sets_value() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.apply_info_timestamp(
HeTimestamp {
seconds: 100,
fraction: 200,
},
false,
);
assert!(st.have_timestamp);
assert_eq!(st.timestamp.seconds, 100);
assert_eq!(st.timestamp.fraction, 200);
}
#[test]
fn apply_info_timestamp_with_invalidate_clears() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.have_timestamp = true;
st.apply_info_timestamp(HeTimestamp::default(), true);
assert!(!st.have_timestamp);
}
#[test]
fn apply_info_reply_sets_locators() {
let mut st = ReceiverState::new(dummy_prefix(0));
let uni = vec![Locator::INVALID];
let multi = vec![Locator::INVALID, Locator::INVALID];
st.apply_info_reply(uni.clone(), Some(multi.clone()));
assert_eq!(st.unicast_reply_locator_list, uni);
assert_eq!(st.multicast_reply_locator_list, multi);
}
#[test]
fn apply_header_extension_updates_fields() {
let mut st = ReceiverState::new(dummy_prefix(0));
let he = HeaderExtension {
little_endian: true,
message_length: Some(99),
timestamp: Some(HeTimestamp {
seconds: 1,
fraction: 2,
}),
checksum: ChecksumValue::Crc32c(0xCAFE),
..HeaderExtension::default()
};
st.apply_header_extension(&he);
assert_eq!(st.message_length, Some(99));
assert!(st.have_timestamp);
assert_eq!(st.timestamp.seconds, 1);
assert!(matches!(st.message_checksum, ChecksumValue::Crc32c(0xCAFE)));
}
#[test]
fn apply_header_extension_with_parameters_sets_pl() {
let mut st = ReceiverState::new(dummy_prefix(0));
let pl = ParameterList::new();
let he = HeaderExtension {
little_endian: true,
parameters: Some(pl.clone()),
..HeaderExtension::default()
};
st.apply_header_extension(&he);
assert_eq!(st.parameters, Some(pl));
}
#[test]
fn note_clock_skew_skipped_without_timestamp() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.note_clock_skew(1_000_000, 5);
assert!(!st.clock_skew_detected);
}
#[test]
fn note_clock_skew_within_threshold_does_not_flag() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.have_timestamp = true;
st.timestamp = HeTimestamp {
seconds: 100,
fraction: 0,
};
st.note_clock_skew(102, 5); assert!(!st.clock_skew_detected);
}
#[test]
fn note_clock_skew_above_threshold_flags() {
let mut st = ReceiverState::new(dummy_prefix(0));
st.have_timestamp = true;
st.timestamp = HeTimestamp {
seconds: 100,
fraction: 0,
};
st.note_clock_skew(200, 5); assert!(st.clock_skew_detected);
}
}