Skip to main content

zerodds_dcps/
wlp.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Writer-Liveliness-Protocol (WLP) — DCPS-Runtime-Wiring.
4//!
5//! Implementiert DDSI-RTPS 2.5 §8.4.13 und §9.6.3.1 (Wire-Format der
6//! `ParticipantMessageData`). Ein [`WlpEndpoint`] kapselt einen
7//! WLP-Writer + WLP-Reader pro Participant. Der Writer sendet
8//! periodische AUTOMATIC-Heartbeats (`lease_duration / 3`), Reader
9//! aktualisiert per-Peer-Last-Seen-Timestamps.
10//!
11//! # Liveliness-Kinds (DDS DCPS 1.4 §2.2.3.11)
12//!
13//! - `AUTOMATIC` — Tick-getrieben, jeder periodische Beat. Implicit
14//!   "ich lebe noch", solange irgendein Tick durchkommt.
15//! - `MANUAL_BY_PARTICIPANT` — `assert_liveliness()` auf dem
16//!   `DomainParticipant` triggert genau einen Wire-Send mit `kind = 1`.
17//! - `MANUAL_BY_TOPIC` — `assert_liveliness()` auf dem `DataWriter`
18//!   triggert einen Vendor-Kind-Send (`kind = 0x80000001`,
19//!   ZeroDDS-Spezifisch) mit Topic-Token in `data`.
20//!
21//! # Architektur
22//!
23//! Der [`WlpEndpoint`] bleibt **stateless** bezgl. Reliable-State —
24//! WLP-Heartbeats sind Best-Effort (Spec §8.4.13: "best effort
25//! reliability is sufficient"). Der Writer ist deshalb keine
26//! `ReliableWriter`, sondern ein simpler `next_sn`-Counter mit
27//! `encode_data_datagram`. Das macht den Hot-Path billig und vermeidet
28//! Kopplung mit der HEARTBEAT/ACKNACK-Loop des SEDP-Stacks.
29
30extern crate alloc;
31use alloc::collections::BTreeMap;
32use alloc::collections::VecDeque;
33use alloc::vec::Vec;
34use core::time::Duration;
35
36use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
37use zerodds_rtps::error::WireError;
38use zerodds_rtps::header::RtpsHeader;
39use zerodds_rtps::participant_message_data::{
40    PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
41    PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
42    PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC, ParticipantMessageData,
43};
44use zerodds_rtps::submessages::DataSubmessage;
45use zerodds_rtps::wire_types::{EntityId, GuidPrefix, SequenceNumber, VendorId};
46
47/// DoS-Cap fuer die Manual-Pulse-Queue. Wenn ein Caller schneller
48/// `assert_liveliness()` ruft als der Tick-Loop das abarbeitet, droppen
49/// wir alte Pulse — ein Reader merkt sich nur den letzten Beat.
50pub const MAX_QUEUED_PULSES: usize = 32;
51
52/// DoS-Cap fuer die Anzahl bekannter remote Peer-Participants.
53/// Skaliert mit Domain-Groesse; mehr als 1024 ist im Single-Domain-
54/// Anti-Pattern und Hinweis auf Misuse.
55pub const MAX_TRACKED_PEERS: usize = 1024;
56
57/// State eines pendenden Manual-Pulses. Wird vom Tick verbraucht.
58#[derive(Debug, Clone, PartialEq, Eq)]
59struct PendingPulse {
60    kind: u32,
61    data: Vec<u8>,
62}
63
64/// Pro-Peer-Tracking-State im Reader.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct PeerLivelinessState {
67    /// Letztes empfangenes WLP-Heartbeat (relative Zeit zum
68    /// `WlpEndpoint`-Owner-Start).
69    pub last_seen: Duration,
70    /// Letzter empfangener `kind` — fuer Diagnose, ob Peer
71    /// AUTOMATIC oder MANUAL geschickt hat.
72    pub last_kind: u32,
73}
74
75/// WLP-Endpoint — Writer + Reader fuer DCPSParticipantMessage.
76///
77/// Owner ist die [`crate::runtime::DcpsRuntime`]. Der Endpoint wird unter dem Lock
78/// der Runtime mutiert; alle Methoden nehmen `&mut self`.
79#[derive(Debug)]
80pub struct WlpEndpoint {
81    /// Eigener Participant-Prefix (Source der Heartbeats).
82    own_prefix: GuidPrefix,
83    /// VendorId fuer den RTPS-Header.
84    vendor_id: VendorId,
85    /// Naechste Sequence-Number fuer den WLP-Writer.
86    next_sn: i64,
87    /// Tick-Periode fuer AUTOMATIC-Heartbeats (typisch
88    /// `lease_duration / 3`).
89    tick_period: Duration,
90    /// Zeitpunkt des naechsten faelligen AUTOMATIC-Beats.
91    next_tick: Duration,
92    /// Manual-Pulses, die noch nicht ausgesendet sind.
93    pending: VecDeque<PendingPulse>,
94    /// Per-Peer-Tracking (GuidPrefix → State).
95    peers: BTreeMap<GuidPrefix, PeerLivelinessState>,
96}
97
98impl WlpEndpoint {
99    /// Konstruktor.
100    ///
101    /// # Panics
102    /// Nicht — `tick_period == ZERO` ist erlaubt (deaktiviert
103    /// AUTOMATIC-Beats).
104    #[must_use]
105    pub fn new(own_prefix: GuidPrefix, vendor_id: VendorId, tick_period: Duration) -> Self {
106        Self {
107            own_prefix,
108            vendor_id,
109            next_sn: 1,
110            tick_period,
111            next_tick: Duration::ZERO,
112            pending: VecDeque::new(),
113            peers: BTreeMap::new(),
114        }
115    }
116
117    /// Setzt eine neue Tick-Periode (z.B. nach Lease-Aenderung).
118    /// Setzt den naechsten Beat-Slot zurueck auf "sofort", damit eine
119    /// Kuerzung der Periode unmittelbar wirkt (sonst wartet der
120    /// Endpoint noch das alte Intervall ab).
121    pub fn set_tick_period(&mut self, period: Duration) {
122        self.tick_period = period;
123        self.next_tick = Duration::ZERO;
124    }
125
126    /// `assert_liveliness()` auf dem `DomainParticipant` — enqueued
127    /// einen Manual-By-Participant-Pulse, der beim naechsten
128    /// `tick()` als WLP-DATA rausgeht. Idempotent ueber den
129    /// Tick-Slot — Mehrfachaufruf binnen einer Tick-Periode fuehrt
130    /// zu mehreren Pulses (Spec sagt nichts ueber Coalescing, aber
131    /// wir cappen mit [`MAX_QUEUED_PULSES`]).
132    pub fn assert_participant(&mut self) {
133        self.enqueue_pulse(PendingPulse {
134            kind: PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE,
135            data: Vec::new(),
136        });
137    }
138
139    /// `assert_liveliness()` auf einem `DataWriter` — enqueued einen
140    /// MANUAL_BY_TOPIC-Pulse mit dem gegebenen Topic-Token.
141    /// Spec-Compliance: ZeroDDS-Vendor-Kind, Cyclone ignoriert das
142    /// (MSB-set → "ignore unknown kind", §9.6.3.1).
143    pub fn assert_topic(&mut self, topic_token: Vec<u8>) {
144        self.enqueue_pulse(PendingPulse {
145            kind: PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC,
146            data: topic_token,
147        });
148    }
149
150    fn enqueue_pulse(&mut self, p: PendingPulse) {
151        if self.pending.len() >= MAX_QUEUED_PULSES {
152            // Drop oldest — neuer Pulse ist relevanter.
153            let _ = self.pending.pop_front();
154        }
155        self.pending.push_back(p);
156    }
157
158    /// Tick. Liefert ggf. ein Datagram, das gesendet werden soll.
159    /// Reihenfolge:
160    /// 1. Pending Manual-Pulses werden zuerst ausgesendet (einer pro
161    ///    Tick — bei voller Queue dauert die Drainage `n * tick_period`).
162    /// 2. Wenn `now >= next_tick` und `tick_period > 0`, sendet einen
163    ///    AUTOMATIC-Heartbeat und schedule den naechsten Tick.
164    ///
165    /// `None` heisst: nichts zu senden in diesem Tick.
166    ///
167    /// # Errors
168    /// `WireError` wenn das Encoding fehlschlaegt (z.B. SN-Overflow).
169    pub fn tick(&mut self, now: Duration) -> Result<Option<Vec<u8>>, WireError> {
170        if let Some(pulse) = self.pending.pop_front() {
171            return Ok(Some(self.encode_pulse(&pulse)?));
172        }
173        if self.tick_period.is_zero() {
174            return Ok(None);
175        }
176        if now < self.next_tick {
177            return Ok(None);
178        }
179        // AUTOMATIC-Beat
180        let pulse = PendingPulse {
181            kind: PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE,
182            data: Vec::new(),
183        };
184        let datagram = self.encode_pulse(&pulse)?;
185        self.next_tick = now + self.tick_period;
186        Ok(Some(datagram))
187    }
188
189    fn encode_pulse(&mut self, pulse: &PendingPulse) -> Result<Vec<u8>, WireError> {
190        let mut msg = ParticipantMessageData::automatic(self.own_prefix);
191        msg.kind = pulse.kind;
192        msg.data = pulse.data.clone();
193        let payload = msg.to_cdr(true)?; // LE per Default
194        let sn = SequenceNumber(self.next_sn);
195        self.next_sn = self
196            .next_sn
197            .checked_add(1)
198            .ok_or(WireError::ValueOutOfRange {
199                message: "wlp sequence overflow",
200            })?;
201        let data = DataSubmessage {
202            extra_flags: 0,
203            reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
204            writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
205            writer_sn: sn,
206            inline_qos: None,
207            key_flag: false,
208            non_standard_flag: false,
209            serialized_payload: payload.into(),
210        };
211        let header = RtpsHeader::new(self.vendor_id, self.own_prefix);
212        encode_data_datagram(header, &[data])
213    }
214
215    /// Verarbeitet ein eingehendes Datagram. Wenn es eine WLP-DATA-
216    /// Submessage enthaelt (writer_id == BUILTIN_PARTICIPANT_MESSAGE_
217    /// WRITER), wird der Per-Peer-Last-Seen-Timestamp aktualisiert.
218    /// Andere Datagramme werden ignoriert.
219    ///
220    /// # Errors
221    /// `WireError` wenn das aeussere Datagram malformed ist. Eine
222    /// einzelne malformed Submessage fuehrt nicht zum Fehler — wir
223    /// skippen sie still.
224    pub fn handle_datagram(&mut self, bytes: &[u8], now: Duration) -> Result<bool, WireError> {
225        let parsed = decode_datagram(bytes)?;
226        let mut updated = false;
227        for sub in parsed.submessages {
228            if let ParsedSubmessage::Data(d) = sub {
229                if d.writer_id == EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER {
230                    if let Ok(msg) = ParticipantMessageData::from_cdr(&d.serialized_payload) {
231                        // Quelle: bevorzugt der Prefix aus dem Payload
232                        // (wie Cyclone macht), fallback der RTPS-Header-
233                        // Prefix wenn der Payload den nicht traegt.
234                        let src = msg.prefix();
235                        let prefix = if src == GuidPrefix::UNKNOWN {
236                            parsed.header.guid_prefix
237                        } else {
238                            src
239                        };
240                        // DoS-Cap: Anzahl Peers begrenzen.
241                        if self.peers.len() >= MAX_TRACKED_PEERS
242                            && !self.peers.contains_key(&prefix)
243                        {
244                            // Drop: bekannten Peer-Liste ist voll.
245                            continue;
246                        }
247                        self.peers.insert(
248                            prefix,
249                            PeerLivelinessState {
250                                last_seen: now,
251                                last_kind: msg.kind,
252                            },
253                        );
254                        updated = true;
255                    }
256                }
257            }
258        }
259        Ok(updated)
260    }
261
262    /// Liefert den aktuellen Liveliness-State eines bekannten Peers.
263    /// `None` wenn unbekannt.
264    #[must_use]
265    pub fn peer_state(&self, prefix: &GuidPrefix) -> Option<&PeerLivelinessState> {
266        self.peers.get(prefix)
267    }
268
269    /// Anzahl getrackter Peers.
270    #[must_use]
271    pub fn peer_count(&self) -> usize {
272        self.peers.len()
273    }
274
275    /// Iter ueber alle Peers, deren Last-Seen aelter als `now - lease`
276    /// ist. Caller nutzt das, um Liveliness-Lost-Events zu treiben.
277    pub fn lost_peers(
278        &self,
279        now: Duration,
280        lease: Duration,
281    ) -> impl Iterator<Item = (&GuidPrefix, &PeerLivelinessState)> + '_ {
282        self.peers.iter().filter(move |(_, s)| {
283            now.checked_sub(s.last_seen)
284                .is_some_and(|elapsed| elapsed > lease)
285        })
286    }
287
288    /// Entfernt einen Peer aus dem Tracking (z.B. bei SPDP-Lease-
289    /// Expire).
290    pub fn forget_peer(&mut self, prefix: &GuidPrefix) {
291        self.peers.remove(prefix);
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
298    use super::*;
299    use alloc::vec;
300
301    fn ep() -> WlpEndpoint {
302        WlpEndpoint::new(
303            GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
304            VendorId::ZERODDS,
305            Duration::from_millis(300),
306        )
307    }
308
309    #[test]
310    fn wlp_first_tick_emits_automatic_heartbeat() {
311        let mut e = ep();
312        let dg = e.tick(Duration::ZERO).unwrap();
313        assert!(dg.is_some(), "first tick must emit AUTOMATIC beat");
314    }
315
316    #[test]
317    fn wlp_tick_idle_returns_none_until_period() {
318        let mut e = ep();
319        let _ = e.tick(Duration::ZERO).unwrap();
320        // Kein zweiter Beat innerhalb der Periode.
321        let dg = e.tick(Duration::from_millis(100)).unwrap();
322        assert!(dg.is_none());
323    }
324
325    #[test]
326    fn wlp_tick_emits_again_after_period() {
327        let mut e = ep();
328        let _ = e.tick(Duration::ZERO).unwrap();
329        let dg = e.tick(Duration::from_millis(400)).unwrap();
330        assert!(dg.is_some());
331    }
332
333    #[test]
334    fn wlp_zero_period_disables_automatic_beats() {
335        let mut e = WlpEndpoint::new(
336            GuidPrefix::from_bytes([0xAA; 12]),
337            VendorId::ZERODDS,
338            Duration::ZERO,
339        );
340        let dg = e.tick(Duration::ZERO).unwrap();
341        assert!(dg.is_none());
342    }
343
344    #[test]
345    fn wlp_assert_participant_emits_manual_pulse() {
346        // Tick-Period 1h damit der AUTOMATIC-Pfad nicht stoert.
347        let mut e = WlpEndpoint::new(
348            GuidPrefix::from_bytes([1; 12]),
349            VendorId::ZERODDS,
350            Duration::from_secs(3600),
351        );
352        // Erstmal initialer AUTOMATIC-Beat zum Initialisieren.
353        let _ = e.tick(Duration::ZERO).unwrap();
354        e.assert_participant();
355        let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
356        // Datagram dekodieren und kind pruefen.
357        let parsed = decode_datagram(&dg).unwrap();
358        let data_sub = parsed.submessages.iter().find_map(|s| match s {
359            ParsedSubmessage::Data(d) => Some(d),
360            _ => None,
361        });
362        let payload = &data_sub.expect("DATA").serialized_payload;
363        let m = ParticipantMessageData::from_cdr(payload).unwrap();
364        assert_eq!(
365            m.kind,
366            PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_BY_PARTICIPANT_LIVELINESS_UPDATE
367        );
368    }
369
370    #[test]
371    fn wlp_assert_topic_emits_vendor_kind_with_token() {
372        let mut e = WlpEndpoint::new(
373            GuidPrefix::from_bytes([2; 12]),
374            VendorId::ZERODDS,
375            Duration::from_secs(3600),
376        );
377        let _ = e.tick(Duration::ZERO).unwrap();
378        e.assert_topic(vec![0xAA, 0xBB]);
379        let dg = e.tick(Duration::from_millis(1)).unwrap().expect("manual");
380        let parsed = decode_datagram(&dg).unwrap();
381        let data_sub = parsed
382            .submessages
383            .iter()
384            .find_map(|s| match s {
385                ParsedSubmessage::Data(d) => Some(d),
386                _ => None,
387            })
388            .unwrap();
389        let m = ParticipantMessageData::from_cdr(&data_sub.serialized_payload).unwrap();
390        assert_eq!(
391            m.kind,
392            PARTICIPANT_MESSAGE_DATA_KIND_ZERODDS_MANUAL_BY_TOPIC
393        );
394        assert_eq!(m.data, vec![0xAA, 0xBB]);
395    }
396
397    #[test]
398    fn wlp_pending_queue_caps_at_max() {
399        let mut e = ep();
400        for _ in 0..(MAX_QUEUED_PULSES + 10) {
401            e.assert_participant();
402        }
403        assert_eq!(e.pending.len(), MAX_QUEUED_PULSES);
404    }
405
406    #[test]
407    fn wlp_handle_datagram_updates_peer_state() {
408        let mut sender = ep();
409        let mut receiver = WlpEndpoint::new(
410            GuidPrefix::from_bytes([99; 12]),
411            VendorId::ZERODDS,
412            Duration::from_secs(3600),
413        );
414        let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
415        let updated = receiver
416            .handle_datagram(&dg, Duration::from_millis(50))
417            .unwrap();
418        assert!(updated);
419        assert_eq!(receiver.peer_count(), 1);
420        let state = receiver
421            .peer_state(&GuidPrefix::from_bytes([
422                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
423            ]))
424            .unwrap();
425        assert_eq!(
426            state.last_kind,
427            PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE
428        );
429        assert_eq!(state.last_seen, Duration::from_millis(50));
430    }
431
432    #[test]
433    fn wlp_handle_datagram_ignores_non_wlp_traffic() {
434        // Eine SPDP-DATA-Submessage darf NICHT als WLP gewertet werden.
435        let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([5; 12]));
436        let data = DataSubmessage {
437            extra_flags: 0,
438            reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
439            writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
440            writer_sn: SequenceNumber(1),
441            inline_qos: None,
442            key_flag: false,
443            non_standard_flag: false,
444            serialized_payload: vec![0u8; 8].into(),
445        };
446        let dg = encode_data_datagram(header, &[data]).unwrap();
447        let mut e = ep();
448        let updated = e.handle_datagram(&dg, Duration::from_millis(10)).unwrap();
449        assert!(!updated);
450        assert_eq!(e.peer_count(), 0);
451    }
452
453    #[test]
454    fn wlp_lost_peers_returns_only_expired() {
455        let mut sender = ep();
456        let mut receiver = WlpEndpoint::new(
457            GuidPrefix::from_bytes([99; 12]),
458            VendorId::ZERODDS,
459            Duration::from_secs(3600),
460        );
461        let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
462        receiver
463            .handle_datagram(&dg, Duration::from_millis(100))
464            .unwrap();
465        // Lease 200 ms. now = 250 ms → 150 ms elapsed → lost.
466        let lost: Vec<_> = receiver
467            .lost_peers(Duration::from_millis(350), Duration::from_millis(200))
468            .collect();
469        assert_eq!(lost.len(), 1);
470        // now = 200 ms → 100 ms elapsed → noch alive.
471        let alive: Vec<_> = receiver
472            .lost_peers(Duration::from_millis(200), Duration::from_millis(200))
473            .collect();
474        assert_eq!(alive.len(), 0);
475    }
476
477    #[test]
478    fn wlp_forget_peer_removes_state() {
479        let mut sender = ep();
480        let mut receiver = WlpEndpoint::new(
481            GuidPrefix::from_bytes([99; 12]),
482            VendorId::ZERODDS,
483            Duration::from_secs(3600),
484        );
485        let dg = sender.tick(Duration::ZERO).unwrap().unwrap();
486        receiver
487            .handle_datagram(&dg, Duration::from_millis(0))
488            .unwrap();
489        let prefix = GuidPrefix::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
490        receiver.forget_peer(&prefix);
491        assert!(receiver.peer_state(&prefix).is_none());
492    }
493
494    #[test]
495    fn wlp_set_tick_period_takes_effect() {
496        let mut e = ep();
497        let _ = e.tick(Duration::ZERO).unwrap();
498        e.set_tick_period(Duration::from_millis(50));
499        // Mit alten 300 ms wuerde 100 ms keinen Beat triggern;
500        // mit 50 ms muss bei 100 ms ein Beat kommen.
501        let dg = e.tick(Duration::from_millis(100)).unwrap();
502        assert!(dg.is_some());
503    }
504
505    #[test]
506    fn wlp_handle_datagram_uses_header_prefix_when_payload_unknown() {
507        // Wenn der Payload-GuidPrefix == UNKNOWN ist (zero-bytes),
508        // muss der Endpoint den RTPS-Header-Prefix als Fallback
509        // benutzen.
510        // Wir bauen ein WLP-Datagram manuell mit Zero-Prefix-Payload.
511        let mut msg = ParticipantMessageData::automatic(GuidPrefix::UNKNOWN);
512        msg.kind = PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE;
513        let payload = msg.to_cdr(true).unwrap();
514        let header_prefix = GuidPrefix::from_bytes([0x77; 12]);
515        let header = RtpsHeader::new(VendorId::ZERODDS, header_prefix);
516        let data = DataSubmessage {
517            extra_flags: 0,
518            reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
519            writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
520            writer_sn: SequenceNumber(1),
521            inline_qos: None,
522            key_flag: false,
523            non_standard_flag: false,
524            serialized_payload: payload.into(),
525        };
526        let dg = encode_data_datagram(header, &[data]).unwrap();
527
528        let mut receiver = WlpEndpoint::new(
529            GuidPrefix::from_bytes([99; 12]),
530            VendorId::ZERODDS,
531            Duration::from_secs(3600),
532        );
533        let updated = receiver
534            .handle_datagram(&dg, Duration::from_millis(7))
535            .unwrap();
536        assert!(updated);
537        // Fallback-Prefix aus RTPS-Header muss als Peer-Key verwendet
538        // sein.
539        assert!(receiver.peer_state(&header_prefix).is_some());
540    }
541
542    #[test]
543    fn wlp_handle_datagram_skips_malformed_cdr() {
544        // WLP-Submessage mit 3-Byte-Payload (zu klein fuer
545        // Encapsulation-Header). handle_datagram darf nicht
546        // panicen, sondern still skippen.
547        let header = RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([0xAB; 12]));
548        let data = DataSubmessage {
549            extra_flags: 0,
550            reader_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_READER,
551            writer_id: EntityId::BUILTIN_PARTICIPANT_MESSAGE_WRITER,
552            writer_sn: SequenceNumber(1),
553            inline_qos: None,
554            key_flag: false,
555            non_standard_flag: false,
556            serialized_payload: vec![0u8; 3].into(),
557        };
558        let dg = encode_data_datagram(header, &[data]).unwrap();
559        let mut e = ep();
560        let updated = e.handle_datagram(&dg, Duration::from_millis(5)).unwrap();
561        assert!(!updated);
562        assert_eq!(e.peer_count(), 0);
563    }
564
565    #[test]
566    fn wlp_pulses_drained_one_per_tick() {
567        let mut e = WlpEndpoint::new(
568            GuidPrefix::from_bytes([3; 12]),
569            VendorId::ZERODDS,
570            Duration::from_secs(3600),
571        );
572        let _ = e.tick(Duration::ZERO).unwrap();
573        e.assert_participant();
574        e.assert_participant();
575        let dg1 = e.tick(Duration::from_millis(1)).unwrap();
576        let dg2 = e.tick(Duration::from_millis(2)).unwrap();
577        let dg3 = e.tick(Duration::from_millis(3)).unwrap();
578        assert!(dg1.is_some());
579        assert!(dg2.is_some());
580        assert!(dg3.is_none(), "queue empty after 2 pulses");
581    }
582}