1extern crate alloc;
31use alloc::collections::BTreeMap;
32use alloc::collections::VecDeque;
33use alloc::vec::Vec;
34use core::time::Duration;
35
36use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
37use zerodds_rtps::error::WireError;
38use zerodds_rtps::header::RtpsHeader;
39use zerodds_rtps::participant_message_data::{
40 PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
41 PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
42 PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC, ParticipantMessageData,
43};
44use zerodds_rtps::submessages::DataSubmessage;
45use zerodds_rtps::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
46
47pub const MAX_QUEUED_PULSES: usize = 32;
51
52pub const MAX_TRACKED_PEERS: usize = 1024;
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59struct PendingPulse {
60 kind: u32,
61 data: Vec<u8>,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct PeerLivelinessState {
67 pub last_seen: Duration,
70 pub last_kind: u32,
73}
74
75#[derive(Debug)]
80pub struct WlpEndpoint {
81 own_prefix: GuidPrefix,
83 vendor_id: VendorId,
85 next_sn: i64,
87 tick_period: Duration,
90 next_tick: Duration,
92 pending: VecDeque<PendingPulse>,
94 peers: BTreeMap<GuidPrefix, PeerLivelinessState>,
96}
97
98impl WlpEndpoint {
99 #[must_use]
105 pub fn new(own_prefix: GuidPrefix, vendor_id: VendorId, tick_period: Duration) -> Self {
106 Self {
107 own_prefix,
108 vendor_id,
109 next_sn: 1,
110 tick_period,
111 next_tick: Duration::ZERO,
112 pending: VecDeque::new(),
113 peers: BTreeMap::new(),
114 }
115 }
116
117 pub fn set_tick_period(&mut self, period: Duration) {
122 self.tick_period = period;
123 self.next_tick = Duration::ZERO;
124 }
125
126 pub fn assert_participant(&mut self) {
133 self.enqueue_pulse(PendingPulse {
134 kind: PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
135 data: Vec::new(),
136 });
137 }
138
139 pub fn assert_topic(&mut self, topic_token: Vec<u8>) {
144 self.enqueue_pulse(PendingPulse {
145 kind: PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC,
146 data: topic_token,
147 });
148 }
149
150 fn enqueue_pulse(&mut self, p: PendingPulse) {
151 if self.pending.len() >= MAX_QUEUED_PULSES {
152 let _ = self.pending.pop_front();
154 }
155 self.pending.push_back(p);
156 }
157
158 pub fn tick(&mut self, now: Duration) -> Result<Option<Vec<u8>>, WireError> {
170 if let Some(pulse) = self.pending.pop_front() {
171 return Ok(Some(self.encode_pulse(&pulse)?));
172 }
173 if self.tick_period.is_zero() {
174 return Ok(None);
175 }
176 if now < self.next_tick {
177 return Ok(None);
178 }
179 let pulse = PendingPulse {
181 kind: PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
182 data: Vec::new(),
183 };
184 let datagram = self.encode_pulse(&pulse)?;
185 self.next_tick = now + self.tick_period;
186 Ok(Some(datagram))
187 }
188
189 fn encode_pulse(&mut self, pulse: &PendingPulse) -> Result<Vec<u8>, WireError> {
190 let mut msg = ParticipantMessageData::automatic(self.own_prefix);
191 msg.kind = pulse.kind;
192 msg.data = pulse.data.clone();
193 let payload = msg.to_cdr(true)?; let sn = SequenceNumber(self.next_sn);
195 self.next_sn = self
196 .next_sn
197 .checked_add(1)
198 .ok_or(WireError::ValueOutOfRange {
199 message: "wlp sequence overflow",
200 })?;
201 let data = DataSubmessage {
202 extra_flags: 0,
203 reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
204 writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
205 writer_sn: sn,
206 inline_qos: None,
207 key_flag: false,
208 non_standard_flag: false,
209 serialized_payload: payload.into(),
210 };
211 let header = RtpsHeader::new(self.vendor_id, self.own_prefix);
212 encode_data_datagram(header, &[data])
213 }
214
215 pub fn handle_datagram(&mut self, bytes: &[u8], now: Duration) -> Result<bool, WireError> {
225 let parsed = decode_datagram(bytes)?;
226 let mut updated = false;
227 for sub in parsed.submessages {
228 if let ParsedSubmessage::Data(d) = sub {
229 if d.writer_id == EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER {
230 if let Ok(msg) = ParticipantMessageData::from_cdr(&d.serialized_payload) {
231 let src = msg.prefix();
235 let prefix = if src == GuidPrefix::UNKNOWN {
236 parsed.header.guid_prefix
237 } else {
238 src
239 };
240 if self.peers.len() >= MAX_TRACKED_PEERS
242 && !self.peers.contains_key(&prefix)
243 {
244 continue;
246 }
247 self.peers.insert(
248 prefix,
249 PeerLivelinessState {
250 last_seen: now,
251 last_kind: msg.kind,
252 },
253 );
254 updated = true;
255 }
256 }
257 }
258 }
259 Ok(updated)
260 }
261
262 #[must_use]
265 pub fn peer_state(&self, prefix: &GuidPrefix) -> Option<&PeerLivelinessState> {
266 self.peers.get(prefix)
267 }
268
269 #[must_use]
271 pub fn peer_count(&self) -> usize {
272 self.peers.len()
273 }
274
275 pub fn lost_peers(
278 &self,
279 now: Duration,
280 lease: Duration,
281 ) -> impl Iterator<Item = (&GuidPrefix, &PeerLivelinessState)> + '_ {
282 self.peers.iter().filter(move |(_, s)| {
283 now.checked_sub(s.last_seen)
284 .is_some_and(|elapsed| elapsed > lease)
285 })
286 }
287
288 pub fn forget_peer(&mut self, prefix: &GuidPrefix) {
291 self.peers.remove(prefix);
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
298 use super::*;
299 use alloc::vec;
300
301 fn ep() -> WlpEndpoint {
302 WlpEndpoint::new(
303 GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
304 VendorId::ZERODDS,
305 Duration::from_millis(300),
306 )
307 }
308
309 #[test]
310 fn wlp_first_tick_emits_automatic_heartbeat() {
311 let mut e = ep();
312 let dg = e.tick(Duration::ZERO).unwrap();
313 assert!(dg.is_some(), "first tick must emit AUTOMATIC beat");
314 }
315
316 #[test]
317 fn wlp_tick_idle_returns_none_until_period() {
318 let mut e = ep();
319 let _ = e.tick(Duration::ZERO).unwrap();
320 let dg = e.tick(Duration::from_millis(100)).unwrap();
322 assert!(dg.is_none());
323 }
324
325 #[test]
326 fn wlp_tick_emits_again_after_period() {
327 let mut e = ep();
328 let _ = e.tick(Duration::ZERO).unwrap();
329 let dg = e.tick(Duration::from_millis(400)).unwrap();
330 assert!(dg.is_some());
331 }
332
333 #[test]
334 fn wlp_zero_period_disables_automatic_beats() {
335 let mut e = WlpEndpoint::new(
336 GuidPrefix::from_bytes([0xAA; 12]),
337 VendorId::ZERODDS,
338 Duration::ZERO,
339 );
340 let dg = e.tick(Duration::ZERO).unwrap();
341 assert!(dg.is_none());
342 }
343
344 #[test]
345 fn wlp_assert_participant_emits_manual_pulse() {
346 let mut e = WlpEndpoint::new(
348 GuidPrefix::from_bytes([1; 12]),
349 VendorId::ZERODDS,
350 Duration::from_secs(3600),
351 );
352 let _ = e.tick(Duration::ZERO).unwrap();
354 e.assert_participant();
355 let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
356 let parsed = decode_datagram(&dg).unwrap();
358 let data_sub = parsed.submessages.iter().find_map(|s| match s {
359 ParsedSubmessage::Data(d) => Some(d),
360 _ => None,
361 });
362 let payload = &data_sub.expect("DATA").serialized_payload;
363 let m = ParticipantMessageData::from_cdr(payload).unwrap();
364 assert_eq!(
365 m.kind,
366 PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE
367 );
368 }
369
370 #[test]
371 fn wlp_assert_topic_emits_vendor_kind_with_token() {
372 let mut e = WlpEndpoint::new(
373 GuidPrefix::from_bytes([2; 12]),
374 VendorId::ZERODDS,
375 Duration::from_secs(3600),
376 );
377 let _ = e.tick(Duration::ZERO).unwrap();
378 e.assert_topic(vec![0xAA, 0xBB]);
379 let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
380 let parsed = decode_datagram(&dg).unwrap();
381 let data_sub = parsed
382 .submessages
383 .iter()
384 .find_map(|s| match s {
385 ParsedSubmessage::Data(d) => Some(d),
386 _ => None,
387 })
388 .unwrap();
389 let m = ParticipantMessageData::from_cdr(&data_sub.serialized_payload).unwrap();
390 assert_eq!(
391 m.kind,
392 PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC
393 );
394 assert_eq!(m.data, vec![0xAA, 0xBB]);
395 }
396
397 #[test]
398 fn wlp_pending_queue_caps_at_max() {
399 let mut e = ep();
400 for _ in 0..(MAX_QUEUED_PULSES + 10) {
401 e.assert_participant();
402 }
403 assert_eq!(e.pending.len(), MAX_QUEUED_PULSES);
404 }
405
406 #[test]
407 fn wlp_handle_datagram_updates_peer_state() {
408 let mut sender = ep();
409 let mut receiver = WlpEndpoint::new(
410 GuidPrefix::from_bytes([99; 12]),
411 VendorId::ZERODDS,
412 Duration::from_secs(3600),
413 );
414 let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
415 let updated = receiver
416 .handle_datagram(&dg, Duration::from_millis(50))
417 .unwrap();
418 assert!(updated);
419 assert_eq!(receiver.peer_count(), 1);
420 let state = receiver
421 .peer_state(&GuidPrefix::from_bytes([
422 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
423 ]))
424 .unwrap();
425 assert_eq!(
426 state.last_kind,
427 PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE
428 );
429 assert_eq!(state.last_seen, Duration::from_millis(50));
430 }
431
432 #[test]
433 fn wlp_handle_datagram_ignores_non_wlp_traffic() {
434 let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([5; 12]));
436 let data = DataSubmessage {
437 extra_flags: 0,
438 reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
439 writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
440 writer_sn: SequenceNumber(1),
441 inline_qos: None,
442 key_flag: false,
443 non_standard_flag: false,
444 serialized_payload: vec![0u8; 8].into(),
445 };
446 let dg = encode_data_datagram(header, &[data]).unwrap();
447 let mut e = ep();
448 let updated = e.handle_datagram(&dg, Duration::from_millis(10)).unwrap();
449 assert!(!updated);
450 assert_eq!(e.peer_count(), 0);
451 }
452
453 #[test]
454 fn wlp_lost_peers_returns_only_expired() {
455 let mut sender = ep();
456 let mut receiver = WlpEndpoint::new(
457 GuidPrefix::from_bytes([99; 12]),
458 VendorId::ZERODDS,
459 Duration::from_secs(3600),
460 );
461 let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
462 receiver
463 .handle_datagram(&dg, Duration::from_millis(100))
464 .unwrap();
465 let lost: Vec<_> = receiver
467 .lost_peers(Duration::from_millis(350), Duration::from_millis(200))
468 .collect();
469 assert_eq!(lost.len(), 1);
470 let alive: Vec<_> = receiver
472 .lost_peers(Duration::from_millis(200), Duration::from_millis(200))
473 .collect();
474 assert_eq!(alive.len(), 0);
475 }
476
477 #[test]
478 fn wlp_forget_peer_removes_state() {
479 let mut sender = ep();
480 let mut receiver = WlpEndpoint::new(
481 GuidPrefix::from_bytes([99; 12]),
482 VendorId::ZERODDS,
483 Duration::from_secs(3600),
484 );
485 let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
486 receiver
487 .handle_datagram(&dg, Duration::from_millis(0))
488 .unwrap();
489 let prefix = GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
490 receiver.forget_peer(&prefix);
491 assert!(receiver.peer_state(&prefix).is_none());
492 }
493
494 #[test]
495 fn wlp_set_tick_period_takes_effect() {
496 let mut e = ep();
497 let _ = e.tick(Duration::ZERO).unwrap();
498 e.set_tick_period(Duration::from_millis(50));
499 let dg = e.tick(Duration::from_millis(100)).unwrap();
502 assert!(dg.is_some());
503 }
504
505 #[test]
506 fn wlp_handle_datagram_uses_header_prefix_when_payload_unknown() {
507 let mut msg = ParticipantMessageData::automatic(GuidPrefix::UNKNOWN);
512 msg.kind = PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE;
513 let payload = msg.to_cdr(true).unwrap();
514 let header_prefix = GuidPrefix::from_bytes([0x77; 12]);
515 let header = RtpsHeader::new(VendorId::ZERODDS, header_prefix);
516 let data = DataSubmessage {
517 extra_flags: 0,
518 reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
519 writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
520 writer_sn: SequenceNumber(1),
521 inline_qos: None,
522 key_flag: false,
523 non_standard_flag: false,
524 serialized_payload: payload.into(),
525 };
526 let dg = encode_data_datagram(header, &[data]).unwrap();
527
528 let mut receiver = WlpEndpoint::new(
529 GuidPrefix::from_bytes([99; 12]),
530 VendorId::ZERODDS,
531 Duration::from_secs(3600),
532 );
533 let updated = receiver
534 .handle_datagram(&dg, Duration::from_millis(7))
535 .unwrap();
536 assert!(updated);
537 assert!(receiver.peer_state(&header_prefix).is_some());
540 }
541
542 #[test]
543 fn wlp_handle_datagram_skips_malformed_cdr() {
544 let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([0xAB; 12]));
548 let data = DataSubmessage {
549 extra_flags: 0,
550 reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
551 writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
552 writer_sn: SequenceNumber(1),
553 inline_qos: None,
554 key_flag: false,
555 non_standard_flag: false,
556 serialized_payload: vec![0u8; 3].into(),
557 };
558 let dg = encode_data_datagram(header, &[data]).unwrap();
559 let mut e = ep();
560 let updated = e.handle_datagram(&dg, Duration::from_millis(5)).unwrap();
561 assert!(!updated);
562 assert_eq!(e.peer_count(), 0);
563 }
564
565 #[test]
566 fn wlp_pulses_drained_one_per_tick() {
567 let mut e = WlpEndpoint::new(
568 GuidPrefix::from_bytes([3; 12]),
569 VendorId::ZERODDS,
570 Duration::from_secs(3600),
571 );
572 let _ = e.tick(Duration::ZERO).unwrap();
573 e.assert_participant();
574 e.assert_participant();
575 let dg1 = e.tick(Duration::from_millis(1)).unwrap();
576 let dg2 = e.tick(Duration::from_millis(2)).unwrap();
577 let dg3 = e.tick(Duration::from_millis(3)).unwrap();
578 assert!(dg1.is_some());
579 assert!(dg2.is_some());
580 assert!(dg3.is_none(), "queue empty after 2 pulses");
581 }
582}