extern crate alloc;
use alloc::collections::BTreeMap;
use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::time::Duration;
use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
use zerodds_rtps::error::WireError;
use zerodds_rtps::header::RtpsHeader;
use zerodds_rtps::participant_message_data::{
PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC, ParticipantMessageData,
};
use zerodds_rtps::submessages::DataSubmessage;
use zerodds_rtps::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
pub const MAX_QUEUED_PULSES: usize = 32;
pub const MAX_TRACKED_PEERS: usize = 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
struct PendingPulse {
kind: u32,
data: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerLivelinessState {
pub last_seen: Duration,
pub last_kind: u32,
}
#[derive(Debug)]
pub struct WlpEndpoint {
own_prefix: GuidPrefix,
vendor_id: VendorId,
next_sn: i64,
tick_period: Duration,
next_tick: Duration,
pending: VecDeque<PendingPulse>,
peers: BTreeMap<GuidPrefix, PeerLivelinessState>,
}
impl WlpEndpoint {
#[must_use]
pub fn new(own_prefix: GuidPrefix, vendor_id: VendorId, tick_period: Duration) -> Self {
Self {
own_prefix,
vendor_id,
next_sn: 1,
tick_period,
next_tick: Duration::ZERO,
pending: VecDeque::new(),
peers: BTreeMap::new(),
}
}
pub fn set_tick_period(&mut self, period: Duration) {
self.tick_period = period;
self.next_tick = Duration::ZERO;
}
pub fn assert_participant(&mut self) {
self.enqueue_pulse(PendingPulse {
kind: PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
data: Vec::new(),
});
}
pub fn assert_topic(&mut self, topic_token: Vec<u8>) {
self.enqueue_pulse(PendingPulse {
kind: PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC,
data: topic_token,
});
}
fn enqueue_pulse(&mut self, p: PendingPulse) {
if self.pending.len() >= MAX_QUEUED_PULSES {
let _ = self.pending.pop_front();
}
self.pending.push_back(p);
}
pub fn tick(&mut self, now: Duration) -> Result<Option<Vec<u8>>, WireError> {
if let Some(pulse) = self.pending.pop_front() {
return Ok(Some(self.encode_pulse(&pulse)?));
}
if self.tick_period.is_zero() {
return Ok(None);
}
if now < self.next_tick {
return Ok(None);
}
let pulse = PendingPulse {
kind: PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
data: Vec::new(),
};
let datagram = self.encode_pulse(&pulse)?;
self.next_tick = now + self.tick_period;
Ok(Some(datagram))
}
fn encode_pulse(&mut self, pulse: &PendingPulse) -> Result<Vec<u8>, WireError> {
let mut msg = ParticipantMessageData::automatic(self.own_prefix);
msg.kind = pulse.kind;
msg.data = pulse.data.clone();
let payload = msg.to_cdr(true)?; let sn = SequenceNumber(self.next_sn);
self.next_sn = self
.next_sn
.checked_add(1)
.ok_or(WireError::ValueOutOfRange {
message: "wlp sequence overflow",
})?;
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
writer_sn: sn,
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: payload.into(),
};
let header = RtpsHeader::new(self.vendor_id, self.own_prefix);
encode_data_datagram(header, &[data])
}
pub fn handle_datagram(&mut self, bytes: &[u8], now: Duration) -> Result<bool, WireError> {
let parsed = decode_datagram(bytes)?;
let mut updated = false;
for sub in parsed.submessages {
if let ParsedSubmessage::Data(d) = sub {
if d.writer_id == EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER {
if let Ok(msg) = ParticipantMessageData::from_cdr(&d.serialized_payload) {
let src = msg.prefix();
let prefix = if src == GuidPrefix::UNKNOWN {
parsed.header.guid_prefix
} else {
src
};
if self.peers.len() >= MAX_TRACKED_PEERS
&& !self.peers.contains_key(&prefix)
{
continue;
}
self.peers.insert(
prefix,
PeerLivelinessState {
last_seen: now,
last_kind: msg.kind,
},
);
updated = true;
}
}
}
}
Ok(updated)
}
#[must_use]
pub fn peer_state(&self, prefix: &GuidPrefix) -> Option<&PeerLivelinessState> {
self.peers.get(prefix)
}
#[must_use]
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn lost_peers(
&self,
now: Duration,
lease: Duration,
) -> impl Iterator<Item = (&GuidPrefix, &PeerLivelinessState)> + '_ {
self.peers.iter().filter(move |(_, s)| {
now.checked_sub(s.last_seen)
.is_some_and(|elapsed| elapsed > lease)
})
}
pub fn forget_peer(&mut self, prefix: &GuidPrefix) {
self.peers.remove(prefix);
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
use super::*;
use alloc::vec;
fn ep() -> WlpEndpoint {
WlpEndpoint::new(
GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
VendorId::ZERODDS,
Duration::from_millis(300),
)
}
#[test]
fn wlp_first_tick_emits_automatic_heartbeat() {
let mut e = ep();
let dg = e.tick(Duration::ZERO).unwrap();
assert!(dg.is_some(), "first tick must emit AUTOMATIC beat");
}
#[test]
fn wlp_tick_idle_returns_none_until_period() {
let mut e = ep();
let _ = e.tick(Duration::ZERO).unwrap();
let dg = e.tick(Duration::from_millis(100)).unwrap();
assert!(dg.is_none());
}
#[test]
fn wlp_tick_emits_again_after_period() {
let mut e = ep();
let _ = e.tick(Duration::ZERO).unwrap();
let dg = e.tick(Duration::from_millis(400)).unwrap();
assert!(dg.is_some());
}
#[test]
fn wlp_zero_period_disables_automatic_beats() {
let mut e = WlpEndpoint::new(
GuidPrefix::from_bytes([0xAA; 12]),
VendorId::ZERODDS,
Duration::ZERO,
);
let dg = e.tick(Duration::ZERO).unwrap();
assert!(dg.is_none());
}
#[test]
fn wlp_assert_participant_emits_manual_pulse() {
let mut e = WlpEndpoint::new(
GuidPrefix::from_bytes([1; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let _ = e.tick(Duration::ZERO).unwrap();
e.assert_participant();
let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
let parsed = decode_datagram(&dg).unwrap();
let data_sub = parsed.submessages.iter().find_map(|s| match s {
ParsedSubmessage::Data(d) => Some(d),
_ => None,
});
let payload = &data_sub.expect("DATA").serialized_payload;
let m = ParticipantMessageData::from_cdr(payload).unwrap();
assert_eq!(
m.kind,
PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE
);
}
#[test]
fn wlp_assert_topic_emits_vendor_kind_with_token() {
let mut e = WlpEndpoint::new(
GuidPrefix::from_bytes([2; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let _ = e.tick(Duration::ZERO).unwrap();
e.assert_topic(vec![0xAA, 0xBB]);
let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
let parsed = decode_datagram(&dg).unwrap();
let data_sub = parsed
.submessages
.iter()
.find_map(|s| match s {
ParsedSubmessage::Data(d) => Some(d),
_ => None,
})
.unwrap();
let m = ParticipantMessageData::from_cdr(&data_sub.serialized_payload).unwrap();
assert_eq!(
m.kind,
PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC
);
assert_eq!(m.data, vec![0xAA, 0xBB]);
}
#[test]
fn wlp_pending_queue_caps_at_max() {
let mut e = ep();
for _ in 0..(MAX_QUEUED_PULSES + 10) {
e.assert_participant();
}
assert_eq!(e.pending.len(), MAX_QUEUED_PULSES);
}
#[test]
fn wlp_handle_datagram_updates_peer_state() {
let mut sender = ep();
let mut receiver = WlpEndpoint::new(
GuidPrefix::from_bytes([99; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
let updated = receiver
.handle_datagram(&dg, Duration::from_millis(50))
.unwrap();
assert!(updated);
assert_eq!(receiver.peer_count(), 1);
let state = receiver
.peer_state(&GuidPrefix::from_bytes([
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
]))
.unwrap();
assert_eq!(
state.last_kind,
PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE
);
assert_eq!(state.last_seen, Duration::from_millis(50));
}
#[test]
fn wlp_handle_datagram_ignores_non_wlp_traffic() {
let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([5; 12]));
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
writer_sn: SequenceNumber(1),
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: vec![0u8; 8].into(),
};
let dg = encode_data_datagram(header, &[data]).unwrap();
let mut e = ep();
let updated = e.handle_datagram(&dg, Duration::from_millis(10)).unwrap();
assert!(!updated);
assert_eq!(e.peer_count(), 0);
}
#[test]
fn wlp_lost_peers_returns_only_expired() {
let mut sender = ep();
let mut receiver = WlpEndpoint::new(
GuidPrefix::from_bytes([99; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
receiver
.handle_datagram(&dg, Duration::from_millis(100))
.unwrap();
let lost: Vec<_> = receiver
.lost_peers(Duration::from_millis(350), Duration::from_millis(200))
.collect();
assert_eq!(lost.len(), 1);
let alive: Vec<_> = receiver
.lost_peers(Duration::from_millis(200), Duration::from_millis(200))
.collect();
assert_eq!(alive.len(), 0);
}
#[test]
fn wlp_forget_peer_removes_state() {
let mut sender = ep();
let mut receiver = WlpEndpoint::new(
GuidPrefix::from_bytes([99; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
receiver
.handle_datagram(&dg, Duration::from_millis(0))
.unwrap();
let prefix = GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
receiver.forget_peer(&prefix);
assert!(receiver.peer_state(&prefix).is_none());
}
#[test]
fn wlp_set_tick_period_takes_effect() {
let mut e = ep();
let _ = e.tick(Duration::ZERO).unwrap();
e.set_tick_period(Duration::from_millis(50));
let dg = e.tick(Duration::from_millis(100)).unwrap();
assert!(dg.is_some());
}
#[test]
fn wlp_handle_datagram_uses_header_prefix_when_payload_unknown() {
let mut msg = ParticipantMessageData::automatic(GuidPrefix::UNKNOWN);
msg.kind = PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE;
let payload = msg.to_cdr(true).unwrap();
let header_prefix = GuidPrefix::from_bytes([0x77; 12]);
let header = RtpsHeader::new(VendorId::ZERODDS, header_prefix);
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
writer_sn: SequenceNumber(1),
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: payload.into(),
};
let dg = encode_data_datagram(header, &[data]).unwrap();
let mut receiver = WlpEndpoint::new(
GuidPrefix::from_bytes([99; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let updated = receiver
.handle_datagram(&dg, Duration::from_millis(7))
.unwrap();
assert!(updated);
assert!(receiver.peer_state(&header_prefix).is_some());
}
#[test]
fn wlp_handle_datagram_skips_malformed_cdr() {
let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([0xAB; 12]));
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
writer_sn: SequenceNumber(1),
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: vec![0u8; 3].into(),
};
let dg = encode_data_datagram(header, &[data]).unwrap();
let mut e = ep();
let updated = e.handle_datagram(&dg, Duration::from_millis(5)).unwrap();
assert!(!updated);
assert_eq!(e.peer_count(), 0);
}
#[test]
fn wlp_pulses_drained_one_per_tick() {
let mut e = WlpEndpoint::new(
GuidPrefix::from_bytes([3; 12]),
VendorId::ZERODDS,
Duration::from_secs(3600),
);
let _ = e.tick(Duration::ZERO).unwrap();
e.assert_participant();
e.assert_participant();
let dg1 = e.tick(Duration::from_millis(1)).unwrap();
let dg2 = e.tick(Duration::from_millis(2)).unwrap();
let dg3 = e.tick(Duration::from_millis(3)).unwrap();
assert!(dg1.is_some());
assert!(dg2.is_some());
assert!(dg3.is_none(), "queue empty after 2 pulses");
}
}