Skip to main content

zerodds_rtps/
reliable_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Reliable RTPS-Reader (1:N Writer-Proxies) — DDSI-RTPS 2.5 §8.4.10.
4//!
5//! Entspricht der [`StatefulReader`]-Rolle mit 1..N matched Writers.
6//! Fragmentation (§8.4.14) ist unterstuetzt. Multi-Writer ab WP 1.4
7//! T4.5: pro Remote-Writer getrennter [`WriterProxyState`] mit eigenem
8//! `received_cache`, `delivered_up_to` und `FragmentAssembler`.
9//!
10//! # Warum pro-Proxy State?
11//!
12//! SequenceNumbers sind writer-lokal (Spec §8.3.5.4). Zwei Writer mit
13//! ueberlappenden SN-Spaces wuerden in einem globalen Cache kollidieren
14//! — daher separate Puffer pro Proxy.
15//!
16//! # API-Form
17//!
18//! ```text
19//!   let mut r = ReliableReader::new(...);
20//!   r.add_writer_proxy(proxy_for_remote_A);
21//!   loop {
22//!       match transport.recv_submessage() {
23//!           Data(d)      => for s in r.handle_data(&d) { deliver(s) },
24//!           DataFrag(df) => for s in r.handle_data_frag(&df, uptime()) { deliver(s) },
25//!           Heartbeat(h) => r.handle_heartbeat(&h, uptime()),
26//!           Gap(g)       => for s in r.handle_gap(&g) { deliver(s) },
27//!       }
28//!       for dg in r.tick(uptime())? { transport.send(dg) }
29//!   }
30//! ```
31//!
32//! [`StatefulReader`]: https://www.omg.org/spec/DDSI-RTPS/2.5/
33
34use core::time::Duration;
35
36extern crate alloc;
37use alloc::vec::Vec;
38
39use alloc::rc::Rc;
40
41use crate::error::WireError;
42use crate::fragment_assembler::{AssemblerCaps, FragmentAssembler};
43use crate::header::RtpsHeader;
44use crate::history_cache::{CacheChange, ChangeKind, HistoryCache};
45use crate::message_builder::OutboundDatagram;
46use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
47use crate::submessages::{
48    AckNackSubmessage, DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage,
49    NackFragSubmessage, SequenceNumberSet,
50};
51use crate::wire_types::{Guid, SequenceNumber, VendorId};
52use crate::writer_proxy::WriterProxy;
53
54/// Default-Heartbeat-Response-Delay.
55///
56/// RTPS 2.5 §8.4.15.7 erlaubt dem Reader einen konfigurierbaren Delay
57/// zwischen HEARTBEAT-Empfang und ACKNACK-Emit, um mehrere HBs zu
58/// batchen. Spec spezifiziert keinen festen Default — die zuvor
59/// verwendeten 200 ms sind ein Pre-1.0-Implementierungsdetail.
60///
61/// **0 ms** = synchrone ACK-Response. Cyclone DDS default ist ebenfalls
62/// 0 (`HeartbeatResponseDelay`-XML-Default). Macht ACKNACK event-driven
63/// statt deferred-batched. Kein Verlust an Korrektheit fuer reliable
64/// loopback / low-loss-Netze; bei lossy-Netzen kann der Wert via
65/// `ReliableReaderConfig::heartbeat_response_delay` hochgesetzt werden.
66///
67/// Pre-D.5e: 200 ms — das war ein impliziter Latency-Floor von 200 ms
68/// pro Roundtrip-ACK-Cycle.
69pub const DEFAULT_HEARTBEAT_RESPONSE_DELAY: Duration = Duration::from_millis(0);
70
71/// Pro-Writer State: der Proxy + getrennter Empfangs-State.
72///
73/// Jeder Remote-Writer hat seinen eigenen SN-Space (§8.3.5.4), also
74/// auch eigenen `received_cache`, `delivered_up_to` und
75/// `FragmentAssembler`. So koennen zwei Writer mit kollidierenden SN
76/// (z.B. beide starten bei 1) problemlos parallel empfangen werden.
77#[derive(Debug, Clone)]
78pub struct WriterProxyState {
79    /// Writer-Proxy-Protokoll-State.
80    pub proxy: WriterProxy,
81    /// Empfangs-Cache fuer diesen Writer.
82    pub received_cache: HistoryCache,
83    /// Hoechste SN, die an die App ausgeliefert wurde.
84    pub delivered_up_to: SequenceNumber,
85    /// Fragment-Reassembly fuer diesen Writer.
86    pub assembler: FragmentAssembler,
87    /// Zeitpunkt, seit wann ein ACKNACK/NACK_FRAG an diesen Writer
88    /// ausstehend ist. `None` = nichts ausstehend.
89    pub pending_acknack_since: Option<Duration>,
90}
91
92impl WriterProxyState {
93    fn new(proxy: WriterProxy, max_samples: usize, caps: AssemblerCaps) -> Self {
94        Self {
95            proxy,
96            received_cache: HistoryCache::new(max_samples),
97            delivered_up_to: SequenceNumber(0),
98            assembler: FragmentAssembler::new(caps),
99            pending_acknack_since: None,
100        }
101    }
102}
103
104/// Ein Reliable-Reader mit 0..N Writer-Proxies.
105#[derive(Debug, Clone)]
106pub struct ReliableReader {
107    guid: Guid,
108    vendor_id: VendorId,
109    writer_proxies: Vec<WriterProxyState>,
110    heartbeat_response_delay: Duration,
111    acknack_count: i32,
112    nackfrag_count: i32,
113    duplicate_frag_count: u64,
114    /// Template fuer neue Proxies.
115    max_samples_per_proxy: usize,
116    assembler_caps: AssemblerCaps,
117    /// Zaehler fuer Submessages, deren `writer_id` keinen Proxy hat.
118    unknown_src_count: u64,
119}
120
121/// Konfiguration beim Anlegen.
122#[derive(Debug, Clone)]
123pub struct ReliableReaderConfig {
124    /// GUID des Reader-Endpoints.
125    pub guid: Guid,
126    /// VendorId fuer den RTPS-Header der ACKNACKs.
127    pub vendor_id: VendorId,
128    /// Initiale Writer-Proxies. Weitere via `add_writer_proxy`.
129    pub writer_proxies: Vec<WriterProxy>,
130    /// Kapazitaet des Empfangs-Caches pro Proxy (nicht global).
131    pub max_samples_per_proxy: usize,
132    /// Heartbeat-Response-Delay (Default: 200 ms).
133    pub heartbeat_response_delay: Duration,
134    /// Caps fuer den Fragment-Assembler (pro Proxy).
135    pub assembler_caps: AssemblerCaps,
136}
137
138/// Ein an die Applikation ausgelieferter Sample.
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct DeliveredSample {
141    /// GUID des Writers, von dem der Sample kommt. Macht Multi-Writer-
142    /// Deduplication im Caller moeglich.
143    pub writer_guid: Guid,
144    /// Sequence-Number im Writer.
145    pub sequence_number: SequenceNumber,
146    /// Serialisierter Payload (Zero-Copy via `Arc::clone` aus dem Cache).
147    /// Nutzlast.
148    pub payload: alloc::sync::Arc<[u8]>,
149    /// Spec §8.2.1.2 ChangeKind — `Alive` fuer normale Samples,
150    /// `NotAliveDisposed` / `NotAliveUnregistered` /
151    /// `NotAliveDisposedUnregistered` fuer Lifecycle-Marker, die der
152    /// Writer per `dispose`/`unregister_instance` versendet hat.
153    /// Spec §9.6.3.9 PID_STATUS_INFO im Inline-QoS.
154    pub kind: ChangeKind,
155    /// `PID_KEY_HASH` aus dem Inline-QoS (Spec §9.6.4.8). Bei
156    /// Lifecycle-Markern ist das die Identitaet der disposed/
157    /// unregistered Instanz; bei keyed-Topic-ALIVE-Samples optional
158    /// (manche Vendors senden Inline-Hash, manche nicht). `None`
159    /// wenn der Writer keinen Hash inline mitliefert (typisch fuer
160    /// keyless Topics).
161    pub key_hash: Option<[u8; 16]>,
162}
163
164impl ReliableReader {
165    /// Erzeugt einen frischen Reader.
166    ///
167    /// # Panics
168    /// Wenn `cfg.assembler_caps.max_pending_sns == 0`.
169    #[must_use]
170    pub fn new(cfg: ReliableReaderConfig) -> Self {
171        assert!(
172            cfg.assembler_caps.max_pending_sns > 0,
173            "assembler_caps.max_pending_sns must be > 0; use a Best-Effort reader \
174             or increase the cap to actually accept fragmented samples"
175        );
176        let proxies = cfg
177            .writer_proxies
178            .into_iter()
179            .map(|p| WriterProxyState::new(p, cfg.max_samples_per_proxy, cfg.assembler_caps))
180            .collect();
181        Self {
182            guid: cfg.guid,
183            vendor_id: cfg.vendor_id,
184            writer_proxies: proxies,
185            heartbeat_response_delay: cfg.heartbeat_response_delay,
186            acknack_count: 0,
187            nackfrag_count: 0,
188            duplicate_frag_count: 0,
189            max_samples_per_proxy: cfg.max_samples_per_proxy,
190            assembler_caps: cfg.assembler_caps,
191            unknown_src_count: 0,
192        }
193    }
194
195    /// GUID.
196    #[must_use]
197    pub fn guid(&self) -> Guid {
198        self.guid
199    }
200
201    /// Read-only-Slice der Writer-Proxy-States.
202    #[must_use]
203    pub fn writer_proxies(&self) -> &[WriterProxyState] {
204        &self.writer_proxies
205    }
206
207    /// Anzahl registrierter Writer-Proxies.
208    #[must_use]
209    pub fn writer_proxy_count(&self) -> usize {
210        self.writer_proxies.len()
211    }
212
213    /// Zaehler der gesendeten ACKNACKs.
214    #[must_use]
215    pub fn acknack_count(&self) -> i32 {
216        self.acknack_count
217    }
218
219    /// Zaehler der gesendeten NACK_FRAGs.
220    #[must_use]
221    pub fn nackfrag_count(&self) -> i32 {
222        self.nackfrag_count
223    }
224
225    /// Summe der aktiven (unvollstaendigen) Fragment-Buffer ueber alle
226    /// Proxies.
227    #[must_use]
228    pub fn pending_fragment_count(&self) -> usize {
229        self.writer_proxies.iter().map(|s| s.assembler.len()).sum()
230    }
231
232    /// Summe der verworfenen Fragmente ueber alle Proxies
233    /// (DoS-/Inkonsistenz-Diagnose).
234    #[must_use]
235    pub fn dropped_fragment_count(&self) -> u64 {
236        self.writer_proxies
237            .iter()
238            .map(|s| s.assembler.drop_count())
239            .sum()
240    }
241
242    /// Anzahl DATA_FRAGs, die fuer bereits-bekannte SNs eintrafen
243    /// (Duplicate-Fragments, Re-Sends).
244    #[must_use]
245    pub fn duplicate_fragment_count(&self) -> u64 {
246        self.duplicate_frag_count
247    }
248
249    /// Anzahl Submessages, deren `writer_id` keinem registrierten
250    /// Proxy zuzuordnen war (Misrouting / Spoofing-Diagnose).
251    #[must_use]
252    pub fn unknown_src_count(&self) -> u64 {
253        self.unknown_src_count
254    }
255
256    /// Fuegt einen Writer-Proxy hinzu. Idempotent: gleiche GUID ersetzt.
257    ///
258    /// Setzt sofort ein preemptives ACKNACK als pending, damit der Writer
259    /// beim naechsten Tick ein "Hallo, ich bin hier"-ACKNACK bekommt.
260    /// Cyclone DDS reagiert darauf mit einem HEARTBEAT und beginnt mit
261    /// DATA-Resends — ohne diesen Impuls wartet der Writer passiv.
262    pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
263        let guid = proxy.remote_writer_guid;
264        let mut state =
265            WriterProxyState::new(proxy, self.max_samples_per_proxy, self.assembler_caps);
266        // Duration::ZERO triggert beim naechsten tick() sofort einen
267        // ACKNACK-Emit (now - ZERO >= heartbeat_response_delay).
268        state.pending_acknack_since = Some(Duration::ZERO);
269        if let Some(idx) = self
270            .writer_proxies
271            .iter()
272            .position(|s| s.proxy.remote_writer_guid == guid)
273        {
274            self.writer_proxies[idx] = state;
275        } else {
276            self.writer_proxies.push(state);
277        }
278    }
279
280    /// Entfernt einen Writer-Proxy.
281    pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
282        let idx = self
283            .writer_proxies
284            .iter()
285            .position(|s| s.proxy.remote_writer_guid == guid)?;
286        Some(self.writer_proxies.remove(idx).proxy)
287    }
288
289    /// Nulliert alle Diagnose-Zaehler. Beruehrt keine State-Maschine.
290    pub fn reset_diagnostics(&mut self) {
291        self.acknack_count = 0;
292        self.nackfrag_count = 0;
293        self.duplicate_frag_count = 0;
294        self.unknown_src_count = 0;
295        for s in &mut self.writer_proxies {
296            s.assembler.reset_diagnostics();
297        }
298    }
299
300    // ---------- Incoming Submessages ----------
301
302    /// DATA verarbeiten. Dispatch nach `writer_id` auf den passenden
303    /// Proxy. Liefert die reassemblierten Samples dieses Proxies.
304    ///
305    /// Spec §9.6.3.9 PID_STATUS_INFO: bei `key_flag=true` + Inline-QoS
306    /// mit gesetztem STATUS_INFO wird der CacheChange mit
307    /// NotAliveDisposed / NotAliveUnregistered / NotAliveDisposedUnregistered
308    /// markiert, statt Alive.
309    pub fn handle_data(&mut self, data: &DataSubmessage) -> Vec<DeliveredSample> {
310        let Some(idx) = self.proxy_index_by_writer_id(data.writer_id) else {
311            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
312            return Vec::new();
313        };
314        let state = &mut self.writer_proxies[idx];
315        let sn = data.writer_sn;
316        if state.proxy.is_known(sn) || sn <= state.delivered_up_to {
317            return Vec::new();
318        }
319        state.proxy.received_change_set(sn);
320        let kind = Self::classify_change_kind(data);
321        let key_hash = data
322            .inline_qos
323            .as_ref()
324            .and_then(crate::inline_qos::find_key_hash);
325        // Arc::clone statt Vec::clone auf dem Payload — der
326        // Refcount-Block wird zwischen DataSubmessage, Cache und
327        // DeliveredSample geteilt.
328        let _ = state.received_cache.insert(CacheChange {
329            sequence_number: sn,
330            payload: alloc::sync::Arc::clone(&data.serialized_payload),
331            kind,
332            key_hash,
333        });
334        Self::collect_in_order_for(state)
335    }
336
337    /// Klassifiziert eine eingehende DATA als Alive vs Lifecycle-Marker.
338    /// `key_flag=true` zeigt Key-Only-Payload an; STATUS_INFO im
339    /// Inline-QoS sagt, ob disposed/unregistered/beides.
340    fn classify_change_kind(data: &DataSubmessage) -> ChangeKind {
341        if !data.key_flag {
342            return ChangeKind::Alive;
343        }
344        let Some(pl) = data.inline_qos.as_ref() else {
345            return ChangeKind::Alive;
346        };
347        let Some(bits) = crate::inline_qos::find_status_info(pl) else {
348            return ChangeKind::Alive;
349        };
350        let disposed = bits & crate::inline_qos::status_info::DISPOSED != 0;
351        let unregistered = bits & crate::inline_qos::status_info::UNREGISTERED != 0;
352        match (disposed, unregistered) {
353            (true, true) => ChangeKind::NotAliveDisposedUnregistered,
354            (true, false) => ChangeKind::NotAliveDisposed,
355            (false, true) => ChangeKind::NotAliveUnregistered,
356            (false, false) => ChangeKind::Alive,
357        }
358    }
359
360    /// DATA_FRAG verarbeiten. `now` triggert NACK_FRAG-Scheduling
361    /// direkt, ohne auf HEARTBEAT zu warten.
362    pub fn handle_data_frag(
363        &mut self,
364        df: &DataFragSubmessage,
365        now: Duration,
366    ) -> Vec<DeliveredSample> {
367        let Some(idx) = self.proxy_index_by_writer_id(df.writer_id) else {
368            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
369            return Vec::new();
370        };
371        let state = &mut self.writer_proxies[idx];
372        let sn = df.writer_sn;
373        if state.proxy.is_known(sn) || sn <= state.delivered_up_to {
374            self.duplicate_frag_count = self.duplicate_frag_count.saturating_add(1);
375            return Vec::new();
376        }
377        let result = if let Some(completed) = state.assembler.insert(df) {
378            state.proxy.received_change_set(sn);
379            let _ = state
380                .received_cache
381                .insert(CacheChange::alive(sn, completed.payload));
382            Self::collect_in_order_for(state)
383        } else {
384            Vec::new()
385        };
386        if state.assembler.has_gaps() {
387            state.pending_acknack_since.get_or_insert(now);
388        }
389        result
390    }
391
392    /// HEARTBEAT verarbeiten. Dispatch nach `writer_id`.
393    pub fn handle_heartbeat(
394        &mut self,
395        hb: &HeartbeatSubmessage,
396        now: Duration,
397    ) -> Vec<DeliveredSample> {
398        let Some(idx) = self.proxy_index_by_writer_id(hb.writer_id) else {
399            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
400            return Vec::new();
401        };
402        let state = &mut self.writer_proxies[idx];
403        if hb.liveliness_flag {
404            return Vec::new();
405        }
406        state.proxy.update_from_heartbeat(hb.first_sn, hb.last_sn);
407        let has_missing = state.proxy.has_missing_changes();
408        let has_frag_gaps = state.assembler.has_gaps();
409        if !hb.final_flag || has_missing || has_frag_gaps {
410            state.pending_acknack_since.get_or_insert(now);
411        }
412        // Ein HB mit first_sn > delivered_up_to+1 bedeutet, dass Samples
413        // vor first_sn "lost" sind. `collect_in_order_for` rueckt dann
414        // `delivered_up_to` bis first_sn-1 vor und liefert Samples aus
415        // dem received_cache, die auf den Hole-Fill warteten (z.B. ein
416        // Volatile-direkt-send mit SN > delivered_up_to+1).
417        Self::collect_in_order_for(state)
418    }
419
420    /// GAP verarbeiten. Dispatch nach `writer_id`.
421    pub fn handle_gap(&mut self, gap: &GapSubmessage) -> Vec<DeliveredSample> {
422        let Some(idx) = self.proxy_index_by_writer_id(gap.writer_id) else {
423            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
424            return Vec::new();
425        };
426        let state = &mut self.writer_proxies[idx];
427        let mut sn = gap.gap_start;
428        while sn < gap.gap_list.bitmap_base {
429            state.proxy.irrelevant_change_set(sn);
430            state.assembler.discard(sn);
431            sn = SequenceNumber(sn.0 + 1);
432        }
433        for sn in gap.gap_list.iter_set() {
434            state.proxy.irrelevant_change_set(sn);
435            state.assembler.discard(sn);
436        }
437        Self::collect_in_order_for(state)
438    }
439
440    /// Tick: liefert faellige ACKNACK/NACK_FRAG-Datagramme **ueber alle
441    /// Proxies hinweg**. Pro Proxy ein eigenes ACKNACK/NACK_FRAG, weil
442    /// SN-Spaces pro Writer sind.
443    ///
444    /// # Errors
445    /// Wire-Encode-Fehler.
446    pub fn tick(&mut self, now: Duration) -> Result<Vec<Vec<u8>>, WireError> {
447        Ok(self
448            .tick_outbound(now)?
449            .into_iter()
450            .map(|d| d.bytes)
451            .collect())
452    }
453
454    /// Wie [`Self::tick`], aber mit Ziel-Locators fuer jedes Datagram.
455    /// Bevorzugt fuer Transport-Integration, weil jeder AckNack an den
456    /// konkreten Writer-Proxy-Unicast-Locator gehen muss.
457    ///
458    /// # Errors
459    /// `WireError::ValueOutOfRange` bei ueberlangem Submessage-Body.
460    pub fn tick_outbound(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
461        let mut out = Vec::new();
462        for idx in 0..self.writer_proxies.len() {
463            let Some(since) = self.writer_proxies[idx].pending_acknack_since else {
464                continue;
465            };
466            if now.saturating_sub(since) < self.heartbeat_response_delay {
467                continue;
468            }
469            self.writer_proxies[idx].pending_acknack_since = None;
470            let targets = Rc::new(self.writer_proxies[idx].proxy.unicast_locators.clone());
471
472            let incomplete_sns: Vec<SequenceNumber> = self.writer_proxies[idx]
473                .assembler
474                .incomplete_sns()
475                .collect();
476            for sn in incomplete_sns {
477                let bytes = self.build_nackfrag_datagram(idx, sn)?;
478                out.push(OutboundDatagram {
479                    bytes,
480                    targets: Rc::clone(&targets),
481                });
482            }
483            let bytes = self.build_acknack_datagram(idx)?;
484            out.push(OutboundDatagram { bytes, targets });
485        }
486        Ok(out)
487    }
488
489    // ---------- Intern ----------
490
491    fn proxy_index_by_writer_id(&self, writer_id: crate::wire_types::EntityId) -> Option<usize> {
492        self.writer_proxies
493            .iter()
494            .position(|s| s.proxy.remote_writer_guid.entity_id == writer_id)
495    }
496
497    fn collect_in_order_for(state: &mut WriterProxyState) -> Vec<DeliveredSample> {
498        let mut out = Vec::new();
499        loop {
500            let next = SequenceNumber(state.delivered_up_to.0 + 1);
501            if let Some(change) = state.received_cache.get(next) {
502                out.push(DeliveredSample {
503                    writer_guid: state.proxy.remote_writer_guid,
504                    sequence_number: change.sequence_number,
505                    payload: change.payload.clone(),
506                    kind: change.kind,
507                    key_hash: change.key_hash,
508                });
509                state.delivered_up_to = next;
510                state.received_cache.remove_up_to(next);
511            } else if state.proxy.is_known(next) && state.proxy.last_available_sn() >= next {
512                state.delivered_up_to = next;
513            } else if next < state.proxy.first_available_sn() {
514                // Writer hat via HEARTBEAT first_sn > next angekuendigt
515                // → Samples vor first_available sind "lost" (Volatile-
516                // Skip, Historic-Eviction). Delivery-Pointer weiterruecken,
517                // damit nachfolgende SNs im received_cache endlich geliefert
518                // werden koennen. Spec §8.4.12.4.
519                state.delivered_up_to = next;
520            } else {
521                break;
522            }
523        }
524        out
525    }
526
527    fn build_nackfrag_datagram(
528        &mut self,
529        proxy_idx: usize,
530        sn: SequenceNumber,
531    ) -> Result<Vec<u8>, WireError> {
532        let missing = self.writer_proxies[proxy_idx]
533            .assembler
534            .missing_fragments(sn);
535        self.nackfrag_count = self.nackfrag_count.wrapping_add(1);
536        let writer_guid = self.writer_proxies[proxy_idx].proxy.remote_writer_guid;
537        let nf = NackFragSubmessage {
538            reader_id: self.guid.entity_id,
539            writer_id: writer_guid.entity_id,
540            writer_sn: sn,
541            fragment_number_state: missing,
542            count: self.nackfrag_count,
543        };
544        let (body, mut flags) = nf.write_body(true);
545        flags |= FLAG_E_LITTLE_ENDIAN;
546        self.wrap_to_writer(writer_guid.prefix, SubmessageId::NackFrag, flags, &body)
547    }
548
549    fn build_acknack_datagram(&mut self, proxy_idx: usize) -> Result<Vec<u8>, WireError> {
550        let state = &self.writer_proxies[proxy_idx];
551        let base = state.proxy.acknack_base();
552        let missing = state.proxy.missing_changes(256);
553        let snset = SequenceNumberSet::from_missing(base, &missing);
554        self.acknack_count = self.acknack_count.wrapping_add(1);
555        // final_flag=true nur, wenn wir wirklich alles bis base-1 haben
556        // und keine weitere Writer-Aktion noetig ist. Beim preemptive
557        // AckNack (base=1, leere bitmap, Proxy noch nichts gesehen)
558        // muss final=false, sonst liest der Writer es als "Reader ist
559        // up-to-date" und schickt keine durability-Resends (Cyclone DDS
560        // zeigt dann nur HEARTBEATs, keine DATA).
561        let final_flag = missing.is_empty() && state.proxy.last_available_sn().0 >= 1;
562        let writer_guid = state.proxy.remote_writer_guid;
563        let ack = AckNackSubmessage {
564            reader_id: self.guid.entity_id,
565            writer_id: writer_guid.entity_id,
566            reader_sn_state: snset,
567            count: self.acknack_count,
568            final_flag,
569        };
570        let (body, mut flags) = ack.write_body(true);
571        flags |= FLAG_E_LITTLE_ENDIAN;
572        self.wrap_to_writer(writer_guid.prefix, SubmessageId::AckNack, flags, &body)
573    }
574
575    /// Packt `Header + INFO_DST(writer_prefix) + Submessage` in ein
576    /// Datagramm. INFO_DST ist zwingend: ohne ihn ist der effektive
577    /// Destination-Prefix = UNKNOWN, und Receiver (z.B. Cyclone DDS)
578    /// verwerfen die Submessage als "not a connection" (RTPS 2.5 §8.3.7.6).
579    fn wrap_to_writer(
580        &self,
581        writer_prefix: crate::wire_types::GuidPrefix,
582        id: SubmessageId,
583        flags: u8,
584        body: &[u8],
585    ) -> Result<Vec<u8>, WireError> {
586        let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
587        let mut out = Vec::new();
588        out.extend_from_slice(&header.to_bytes());
589
590        // INFO_DST: target writer's GuidPrefix (12 byte body).
591        let info_dst_header = SubmessageHeader {
592            submessage_id: SubmessageId::InfoDst,
593            flags: FLAG_E_LITTLE_ENDIAN,
594            octets_to_next_header: 12,
595        };
596        out.extend_from_slice(&info_dst_header.to_bytes());
597        out.extend_from_slice(&writer_prefix.to_bytes());
598
599        // Eigentliche Submessage (ACKNACK / NACK_FRAG).
600        let body_len = u16::try_from(body.len()).map_err(|_| WireError::ValueOutOfRange {
601            message: "submessage body exceeds u16::MAX",
602        })?;
603        let sh = SubmessageHeader {
604            submessage_id: id,
605            flags,
606            octets_to_next_header: body_len,
607        };
608        out.extend_from_slice(&sh.to_bytes());
609        out.extend_from_slice(body);
610        Ok(out)
611    }
612}
613
614#[cfg(test)]
615#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
616mod tests {
617    use super::*;
618    use crate::datagram::{ParsedSubmessage, decode_datagram};
619    use crate::wire_types::{EntityId, GuidPrefix, Locator};
620
621    fn single_writer_guid() -> Guid {
622        Guid::new(
623            GuidPrefix::from_bytes([1; 12]),
624            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
625        )
626    }
627
628    fn make_reader(max_samples: usize) -> ReliableReader {
629        let reader_guid = Guid::new(
630            GuidPrefix::from_bytes([2; 12]),
631            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
632        );
633        let writer_proxy = WriterProxy::new(
634            single_writer_guid(),
635            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
636            alloc::vec![],
637            true,
638        );
639        ReliableReader::new(ReliableReaderConfig {
640            guid: reader_guid,
641            vendor_id: VendorId::ZERODDS,
642            writer_proxies: alloc::vec![writer_proxy],
643            max_samples_per_proxy: max_samples,
644            heartbeat_response_delay: Duration::from_millis(200),
645            assembler_caps: AssemblerCaps::default(),
646        })
647    }
648
649    fn sn(n: i64) -> SequenceNumber {
650        SequenceNumber(n)
651    }
652
653    fn data(wid: EntityId, rid: EntityId, n: i64, byte: u8) -> DataSubmessage {
654        DataSubmessage {
655            extra_flags: 0,
656            reader_id: rid,
657            writer_id: wid,
658            writer_sn: sn(n),
659            inline_qos: None,
660            key_flag: false,
661            non_standard_flag: false,
662            serialized_payload: alloc::sync::Arc::from(alloc::vec![byte]),
663        }
664    }
665
666    fn heartbeat(
667        wid: EntityId,
668        rid: EntityId,
669        first: i64,
670        last: i64,
671        count: i32,
672        final_flag: bool,
673    ) -> HeartbeatSubmessage {
674        HeartbeatSubmessage {
675            reader_id: rid,
676            writer_id: wid,
677            first_sn: sn(first),
678            last_sn: sn(last),
679            count,
680            final_flag,
681            liveliness_flag: false,
682            group_info: None,
683        }
684    }
685
686    fn first_state(r: &ReliableReader) -> &WriterProxyState {
687        &r.writer_proxies()[0]
688    }
689
690    #[test]
691    fn in_order_data_delivered_immediately() {
692        let mut r = make_reader(10);
693        let w_eid = single_writer_guid().entity_id;
694        let r_eid = r.guid().entity_id;
695        let delivered = r.handle_data(&data(w_eid, r_eid, 1, 0xAA));
696        assert_eq!(delivered.len(), 1);
697        assert_eq!(delivered[0].payload.as_ref(), &[0xAA][..]);
698        assert_eq!(delivered[0].writer_guid, single_writer_guid());
699        assert_eq!(first_state(&r).delivered_up_to, sn(1));
700    }
701
702    #[test]
703    fn out_of_order_data_buffered_until_gap_filled() {
704        let mut r = make_reader(10);
705        let w = single_writer_guid().entity_id;
706        let rd = r.guid().entity_id;
707        assert!(r.handle_data(&data(w, rd, 2, 0x22)).is_empty());
708        assert!(r.handle_data(&data(w, rd, 3, 0x33)).is_empty());
709        let out = r.handle_data(&data(w, rd, 1, 0x11));
710        assert_eq!(
711            out.iter().map(|s| s.sequence_number).collect::<Vec<_>>(),
712            alloc::vec![sn(1), sn(2), sn(3)]
713        );
714        assert_eq!(first_state(&r).delivered_up_to, sn(3));
715    }
716
717    #[test]
718    fn duplicate_data_is_rejected() {
719        let mut r = make_reader(10);
720        let w = single_writer_guid().entity_id;
721        let rd = r.guid().entity_id;
722        r.handle_data(&data(w, rd, 1, 0xAA));
723        let second = r.handle_data(&data(w, rd, 1, 0xAA));
724        assert!(second.is_empty());
725    }
726
727    #[test]
728    fn mismatched_writer_id_is_counted() {
729        let mut r = make_reader(10);
730        let rd = r.guid().entity_id;
731        let foreign = EntityId::user_writer_with_key([0xFF, 0xFF, 0xFF]);
732        assert!(r.handle_data(&data(foreign, rd, 1, 0xAA)).is_empty());
733        assert_eq!(r.unknown_src_count(), 1);
734    }
735
736    // ---------- Wire-Side Lifecycle (T8) ----------
737
738    #[test]
739    fn alive_data_yields_alive_changekind() {
740        let mut r = make_reader(10);
741        let w = single_writer_guid().entity_id;
742        let rd = r.guid().entity_id;
743        let delivered = r.handle_data(&data(w, rd, 1, 0xAA));
744        assert_eq!(delivered.len(), 1);
745        assert_eq!(delivered[0].kind, ChangeKind::Alive);
746    }
747
748    fn lifecycle_data(
749        wid: EntityId,
750        rid: EntityId,
751        n: i64,
752        key_hash: [u8; 16],
753        status_bits: u32,
754    ) -> DataSubmessage {
755        DataSubmessage {
756            extra_flags: 0,
757            reader_id: rid,
758            writer_id: wid,
759            writer_sn: sn(n),
760            inline_qos: Some(crate::inline_qos::lifecycle_inline_qos(
761                key_hash,
762                status_bits,
763            )),
764            key_flag: true,
765            non_standard_flag: false,
766            serialized_payload: alloc::sync::Arc::from(alloc::vec![0u8; 0]),
767        }
768    }
769
770    #[test]
771    fn dispose_data_yields_not_alive_disposed() {
772        let mut r = make_reader(10);
773        let w = single_writer_guid().entity_id;
774        let rd = r.guid().entity_id;
775        let delivered = r.handle_data(&lifecycle_data(
776            w,
777            rd,
778            1,
779            [0xAB; 16],
780            crate::inline_qos::status_info::DISPOSED,
781        ));
782        assert_eq!(delivered.len(), 1);
783        assert_eq!(delivered[0].kind, ChangeKind::NotAliveDisposed);
784    }
785
786    #[test]
787    fn unregister_data_yields_not_alive_unregistered() {
788        let mut r = make_reader(10);
789        let w = single_writer_guid().entity_id;
790        let rd = r.guid().entity_id;
791        let delivered = r.handle_data(&lifecycle_data(
792            w,
793            rd,
794            1,
795            [0xCD; 16],
796            crate::inline_qos::status_info::UNREGISTERED,
797        ));
798        assert_eq!(delivered.len(), 1);
799        assert_eq!(delivered[0].kind, ChangeKind::NotAliveUnregistered);
800    }
801
802    #[test]
803    fn dispose_and_unregister_combined() {
804        let mut r = make_reader(10);
805        let w = single_writer_guid().entity_id;
806        let rd = r.guid().entity_id;
807        let bits =
808            crate::inline_qos::status_info::DISPOSED | crate::inline_qos::status_info::UNREGISTERED;
809        let delivered = r.handle_data(&lifecycle_data(w, rd, 1, [0xEF; 16], bits));
810        assert_eq!(delivered.len(), 1);
811        assert_eq!(delivered[0].kind, ChangeKind::NotAliveDisposedUnregistered);
812    }
813
814    #[test]
815    fn key_flag_without_status_info_falls_back_to_alive() {
816        // key_flag=true ohne PID_STATUS_INFO ist Spec-grenzwertig — wir
817        // fallen sicherheitshalber auf Alive zurueck statt zu raten.
818        let mut r = make_reader(10);
819        let w = single_writer_guid().entity_id;
820        let rd = r.guid().entity_id;
821        let mut d = data(w, rd, 1, 0xAA);
822        d.key_flag = true;
823        let delivered = r.handle_data(&d);
824        assert_eq!(delivered.len(), 1);
825        assert_eq!(delivered[0].kind, ChangeKind::Alive);
826    }
827
828    #[test]
829    fn heartbeat_with_missing_triggers_acknack_after_delay() {
830        let mut r = make_reader(10);
831        let w = single_writer_guid().entity_id;
832        let rd = r.guid().entity_id;
833        r.handle_heartbeat(&heartbeat(w, rd, 1, 3, 1, false), Duration::ZERO);
834        assert!(r.tick(Duration::from_millis(100)).unwrap().is_empty());
835        let out = r.tick(Duration::from_millis(250)).unwrap();
836        assert_eq!(out.len(), 1);
837    }
838
839    #[test]
840    fn heartbeat_without_missing_and_final_schedules_no_acknack() {
841        let mut r = make_reader(10);
842        let w = single_writer_guid().entity_id;
843        let rd = r.guid().entity_id;
844        r.handle_data(&data(w, rd, 1, 0xAA));
845        r.handle_heartbeat(&heartbeat(w, rd, 1, 1, 1, true), Duration::ZERO);
846        assert!(r.tick(Duration::from_secs(10)).unwrap().is_empty());
847    }
848
849    // ---------- Multi-Writer (T4.5) ----------
850
851    fn second_writer_guid() -> Guid {
852        Guid::new(
853            GuidPrefix::from_bytes([3; 12]),
854            EntityId::user_writer_with_key([0x40, 0x50, 0x60]),
855        )
856    }
857
858    fn add_second_writer(r: &mut ReliableReader) {
859        r.add_writer_proxy(WriterProxy::new(
860            second_writer_guid(),
861            alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7420)],
862            alloc::vec![],
863            true,
864        ));
865    }
866
867    #[test]
868    fn add_writer_proxy_increases_count() {
869        let mut r = make_reader(10);
870        add_second_writer(&mut r);
871        assert_eq!(r.writer_proxy_count(), 2);
872    }
873
874    #[test]
875    fn two_writers_with_overlapping_sn_spaces_both_delivered() {
876        // Kern-Regression: beide Writer benutzen SN 1. Ohne per-Proxy-
877        // State wuerde das zweite `handle_data` als Duplicate abgelehnt.
878        let mut r = make_reader(10);
879        add_second_writer(&mut r);
880        let w1 = single_writer_guid().entity_id;
881        let w2 = second_writer_guid().entity_id;
882        let rd = r.guid().entity_id;
883
884        let d1 = r.handle_data(&data(w1, rd, 1, 0xAA));
885        let d2 = r.handle_data(&data(w2, rd, 1, 0xBB));
886
887        assert_eq!(d1.len(), 1);
888        assert_eq!(d1[0].payload.as_ref(), &[0xAA][..]);
889        assert_eq!(d1[0].writer_guid, single_writer_guid());
890        assert_eq!(d2.len(), 1);
891        assert_eq!(d2[0].payload.as_ref(), &[0xBB][..]);
892        assert_eq!(d2[0].writer_guid, second_writer_guid());
893
894        assert_eq!(r.writer_proxies()[0].delivered_up_to, sn(1));
895        assert_eq!(r.writer_proxies()[1].delivered_up_to, sn(1));
896    }
897
898    #[test]
899    fn remove_writer_proxy_drops_its_state() {
900        let mut r = make_reader(10);
901        add_second_writer(&mut r);
902        let removed = r.remove_writer_proxy(single_writer_guid());
903        assert!(removed.is_some());
904        assert_eq!(r.writer_proxy_count(), 1);
905        assert_eq!(
906            r.writer_proxies()[0].proxy.remote_writer_guid,
907            second_writer_guid()
908        );
909    }
910
911    #[test]
912    fn tick_emits_one_acknack_per_writer_with_missing() {
913        let mut r = make_reader(10);
914        add_second_writer(&mut r);
915        let rd = r.guid().entity_id;
916        // Beide Writer schicken HB mit missing-SN
917        r.handle_heartbeat(
918            &heartbeat(single_writer_guid().entity_id, rd, 1, 3, 1, false),
919            Duration::ZERO,
920        );
921        r.handle_heartbeat(
922            &heartbeat(second_writer_guid().entity_id, rd, 1, 5, 1, false),
923            Duration::ZERO,
924        );
925        let out = r.tick(Duration::from_millis(250)).unwrap();
926        // 2 ACKNACKs (pro Writer einen)
927        assert_eq!(out.len(), 2);
928    }
929
930    // ---------- WP 1.E Stufe-B: Pre-Emptive ACKNACK ----------
931
932    /// §8.4.2.3.4: Beim Match eines neuen Writer-Proxies sendet der
933    /// Reader **proaktiv** einen ACKNACK mit `bitmap_base=1, num_bits=0,
934    /// final_flag=false` — das beschleunigt den ersten Datenfluss um
935    /// genau eine HB-Periode (typ. 1 s).
936    #[test]
937    fn pre_emptive_acknack_emitted_after_add_writer_proxy() {
938        let reader_guid = Guid::new(
939            GuidPrefix::from_bytes([2; 12]),
940            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
941        );
942        let mut r = ReliableReader::new(ReliableReaderConfig {
943            guid: reader_guid,
944            vendor_id: VendorId::ZERODDS,
945            writer_proxies: alloc::vec![],
946            max_samples_per_proxy: 10,
947            heartbeat_response_delay: Duration::from_millis(200),
948            assembler_caps: AssemblerCaps::default(),
949        });
950        r.add_writer_proxy(WriterProxy::new(
951            single_writer_guid(),
952            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
953            alloc::vec![],
954            true,
955        ));
956        // Werte aus add_writer_proxy: pending_acknack_since=Duration::ZERO
957        // → tick(>=delay) liefert Pre-Emptive AckNack.
958        let out = r.tick(Duration::from_millis(250)).unwrap();
959        assert_eq!(out.len(), 1, "exactly one Pre-Emptive ACKNACK expected");
960        let parsed = decode_datagram(&out[0]).unwrap();
961        let ack = parsed
962            .submessages
963            .iter()
964            .find_map(|s| {
965                if let ParsedSubmessage::AckNack(a) = s {
966                    Some(a)
967                } else {
968                    None
969                }
970            })
971            .expect("ACKNACK in datagram");
972        assert_eq!(ack.reader_sn_state.bitmap_base, sn(1));
973        assert_eq!(ack.reader_sn_state.num_bits, 0);
974        assert!(
975            !ack.final_flag,
976            "Pre-Emptive ACKNACK must be non-final (force HB-response)"
977        );
978    }
979
980    /// Pre-Emptive ACKNACK passiert NICHT, wenn `add_writer_proxy` nie
981    /// aufgerufen wurde (defensiver Sanity-Check fuer den Default-Reader).
982    #[test]
983    fn no_pre_emptive_acknack_without_proxy() {
984        let reader_guid = Guid::new(
985            GuidPrefix::from_bytes([2; 12]),
986            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
987        );
988        let mut r = ReliableReader::new(ReliableReaderConfig {
989            guid: reader_guid,
990            vendor_id: VendorId::ZERODDS,
991            writer_proxies: alloc::vec![],
992            max_samples_per_proxy: 10,
993            heartbeat_response_delay: Duration::from_millis(200),
994            assembler_caps: AssemblerCaps::default(),
995        });
996        // Keine Proxies → kein ACKNACK
997        assert!(r.tick(Duration::from_secs(10)).unwrap().is_empty());
998    }
999
1000    /// Initiale Proxies aus `ReliableReaderConfig.writer_proxies` bekommen
1001    /// **kein** automatisches Pre-Emptive — nur via `add_writer_proxy`.
1002    /// Das ist konsistent mit der DCPS-Integration: Discovery-Layer ruft
1003    /// `add_writer_proxy` auf, sobald SEDP-Match steht.
1004    #[test]
1005    fn initial_proxy_from_config_does_not_send_pre_emptive() {
1006        // make_reader() nutzt config.writer_proxies, kein add_writer_proxy
1007        let mut r = make_reader(10);
1008        // Vor add: auch nach langem tick kein Pre-Emptive
1009        assert!(
1010            r.tick(Duration::from_secs(10)).unwrap().is_empty(),
1011            "initial proxy from config must not emit Pre-Emptive"
1012        );
1013    }
1014
1015    #[test]
1016    fn pre_emptive_acknack_carries_info_dst() {
1017        // Pre-Emptive ACKNACK MUSS in INFO_DST(writer_prefix) gewrappt
1018        // sein, sonst verwerfen Cyclone/Fast-DDS die Submessage als
1019        // "not for me" (Spec §8.3.7.6 / §8.3.8.7).
1020        let reader_guid = Guid::new(
1021            GuidPrefix::from_bytes([2; 12]),
1022            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
1023        );
1024        let mut r = ReliableReader::new(ReliableReaderConfig {
1025            guid: reader_guid,
1026            vendor_id: VendorId::ZERODDS,
1027            writer_proxies: alloc::vec![],
1028            max_samples_per_proxy: 10,
1029            heartbeat_response_delay: Duration::from_millis(200),
1030            assembler_caps: AssemblerCaps::default(),
1031        });
1032        r.add_writer_proxy(WriterProxy::new(
1033            single_writer_guid(),
1034            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7420)],
1035            alloc::vec![],
1036            true,
1037        ));
1038        let out = r.tick(Duration::from_millis(250)).unwrap();
1039        assert_eq!(out.len(), 1);
1040        let parsed = decode_datagram(&out[0]).unwrap();
1041        // submessages[0] = INFO_DST (Unknown im Decoder, weil InfoDst
1042        // im Decoder nicht ausgepackt wird), [1] = ACKNACK
1043        assert!(parsed.submessages.len() >= 2, "INFO_DST + ACKNACK");
1044        match &parsed.submessages[0] {
1045            ParsedSubmessage::Unknown { id, .. } => assert_eq!(*id, 0x0E),
1046            other => panic!("expected INFO_DST first, got {other:?}"),
1047        }
1048    }
1049
1050    #[test]
1051    fn unknown_writer_id_in_heartbeat_counts_not_crashes() {
1052        let mut r = make_reader(10);
1053        let rd = r.guid().entity_id;
1054        let foreign = EntityId::user_writer_with_key([0xFF, 0xFF, 0xFF]);
1055        r.handle_heartbeat(&heartbeat(foreign, rd, 1, 3, 1, false), Duration::ZERO);
1056        assert_eq!(r.unknown_src_count(), 1);
1057        assert!(r.tick(Duration::from_secs(1)).unwrap().is_empty());
1058    }
1059}