Skip to main content

zerodds_rtps/
reliable_writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Reliable RTPS-Writer (1:N Reader-Proxies) — DDSI-RTPS 2.5 §8.4.9.
4//!
5//! Entspricht der [`StatefulWriter`]-Rolle mit 1..N matched Readers.
6//! keine Heartbeat-Liveliness. Fragmentation
7//! (§8.4.14) ist unterstuetzt. Multi-Reader + Submessage-Aggregation
8//! (Fast-DDS-Alignment) sind ab WP 1.4 T3 drin.
9//!
10//! # API-Form
11//!
12//! Die State-Machine ist tick-getrieben. `now` ist eine `Duration`
13//! seit Writer-Start (no_std-kompatibel, ohne std::Instant).
14//!
15//! ```text
16//!   let mut w = ReliableWriter::new(...);
17//!   w.add_reader_proxy(proxy_a);
18//!   loop {
19//!       if let Some(payload) = app.next_sample() {
20//!           for dg in w.write(payload)? { transport.send_to_all(&dg.targets, &dg.bytes); }
21//!       }
22//!       for dg in w.tick(uptime())? { transport.send_to_all(&dg.targets, &dg.bytes); }
23//!       match transport.recv_control() {
24//!           AckNack(src, ack) => w.handle_acknack(src, ack.base, ack.requested),
25//!           NackFrag(src, nf) => w.handle_nackfrag(src, &nf),
26//!           _ => {}
27//!       }
28//!   }
29//! ```
30//!
31//! [`StatefulWriter`]: https://www.omg.org/spec/DDSI-RTPS/2.5/
32
33use core::time::Duration;
34
35extern crate alloc;
36use alloc::vec::Vec;
37
38use alloc::rc::Rc;
39
40use crate::error::WireError;
41use crate::header::RtpsHeader;
42use crate::history_cache::{CacheChange, HistoryCache, HistoryKind};
43use crate::message_builder::{AddError, MessageBuilder, OutboundDatagram};
44use crate::reader_proxy::ReaderProxy;
45use crate::submessage_header::SubmessageId;
46use crate::submessages::{
47    DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage, NackFragSubmessage,
48    SequenceNumberSet,
49};
50use crate::wire_types::{EntityId, FragmentNumber, Guid, Locator, SequenceNumber, VendorId};
51
52/// Default-Heartbeat-Periode.
53///
54/// DDSI-RTPS §8.4.15 spezifiziert keine fixe Default — "implementation-defined,
55/// typically 1 s". Cyclone DDS und FastDDS verwenden 100 ms; das ist auch unser
56/// Wert weil:
57/// 1. Bei Reliable + KEEP_LAST(N) treibt die HB-Period den Worst-Case-Latency-
58///    Floor (Reader sendet ACK auf HB, Writer kann erst danach Cache schrumpfen).
59/// 2. 1 s default war Pre-D.5d Initial-Implementation; die jetzige
60///    Event-driven-ACKNACK + Per-Peer-Scheduler-Architektur (D.5d+) macht
61///    den HB-Floor zur reinen "idle keep-alive"-Pulse, nicht mehr zum
62///    Latency-Determinismus.
63/// 3. Spec ist erfuellt: Period < lease_duration, period > 0, period stabil.
64pub const DEFAULT_HEARTBEAT_PERIOD: Duration = Duration::from_millis(100);
65
66/// Default-Fragment-Size in Bytes. 1344 = 1400 MTU − 20 RTPS-Header −
67/// ~32 Byte Submessage-Overhead.
68pub const DEFAULT_FRAGMENT_SIZE: u32 = 1344;
69
70/// Ein Reliable-Writer mit 0..N Reader-Proxies.
71#[derive(Debug, Clone)]
72pub struct ReliableWriter {
73    guid: Guid,
74    vendor_id: VendorId,
75    reader_proxies: Vec<ReaderProxy>,
76    cache: HistoryCache,
77    heartbeat_period: Duration,
78    last_heartbeat: Option<Duration>,
79    heartbeat_count: i32,
80    nackfrag_count: i32,
81    /// ACKNACK/NACK_FRAG-Nachrichten von unbekannten `src_guid`-Remotes.
82    /// Diagnose: weist auf Misrouting, veraltete Proxies oder
83    /// boesartige Sender hin.
84    unknown_src_count: u64,
85    next_sn: i64,
86    fragment_size: u32,
87    mtu: usize,
88}
89
90/// Konfiguration beim Anlegen.
91#[derive(Debug, Clone)]
92pub struct ReliableWriterConfig {
93    /// GUID des Writer-Endpoints.
94    pub guid: Guid,
95    /// VendorId fuer den RTPS-Header.
96    pub vendor_id: VendorId,
97    /// Initiale Reader-Proxies. Weitere via `add_reader_proxy`.
98    pub reader_proxies: Vec<ReaderProxy>,
99    /// Absolute Obergrenze fuer Cache-Eintraege. Wirkt als Kapazitaet;
100    /// die Semantik bei Ueberlauf ist von [`history_kind`](Self::history_kind)
101    /// bestimmt.
102    pub max_samples: usize,
103    /// History-QoS:
104    /// - `KeepAll`: write() schlaegt bei Overflow fehl
105    ///   (No-Loss-Szenarien, z.B. Logging).
106    /// - `KeepLast { depth }`: aeltestes Sample faellt bei Overflow raus
107    ///   (Spec-Default; Stalled Reader blockt nicht die ganze Pipeline).
108    pub history_kind: HistoryKind,
109    /// Heartbeat-Periode (Default: 1 s).
110    pub heartbeat_period: Duration,
111    /// Fragment-Size in Bytes ([`DEFAULT_FRAGMENT_SIZE`]).
112    pub fragment_size: u32,
113    /// MTU fuer Submessage-Aggregation ([`DEFAULT_MTU`]).
114    pub mtu: usize,
115}
116
117impl ReliableWriter {
118    /// Erzeugt einen leeren Writer.
119    ///
120    /// # Panics
121    /// - `cfg.fragment_size == 0`
122    /// - `cfg.mtu < 20` (RTPS-Header passt nicht rein)
123    #[must_use]
124    pub fn new(cfg: ReliableWriterConfig) -> Self {
125        assert!(cfg.fragment_size > 0, "fragment_size must be > 0");
126        assert!(cfg.mtu >= 20, "mtu must accommodate RTPS header");
127        Self {
128            guid: cfg.guid,
129            vendor_id: cfg.vendor_id,
130            reader_proxies: cfg.reader_proxies,
131            cache: HistoryCache::new_with_kind(cfg.history_kind, cfg.max_samples),
132            heartbeat_period: cfg.heartbeat_period,
133            last_heartbeat: None,
134            heartbeat_count: 0,
135            nackfrag_count: 0,
136            unknown_src_count: 0,
137            next_sn: 0,
138            fragment_size: cfg.fragment_size,
139            mtu: cfg.mtu,
140        }
141    }
142
143    /// GUID des Writers.
144    #[must_use]
145    pub fn guid(&self) -> Guid {
146        self.guid
147    }
148
149    /// Read-only-Slice der registrierten Reader-Proxies.
150    #[must_use]
151    pub fn reader_proxies(&self) -> &[ReaderProxy] {
152        &self.reader_proxies
153    }
154
155    /// Anzahl registrierter Reader-Proxies.
156    #[must_use]
157    pub fn reader_proxy_count(&self) -> usize {
158        self.reader_proxies.len()
159    }
160
161    /// Entfernt Samples mit SN < `up_to_exclusive` aus dem Cache. Wird
162    /// von hoeheren Layern fuer Lifespan-Expire genutzt (Spec §2.2.3.16):
163    /// abgelaufene Samples verschwinden so, dass auch spaete
164    /// Reader-Proxies sie nicht mehr bekommen.
165    pub fn remove_samples_up_to(&mut self, up_to_exclusive: SequenceNumber) -> usize {
166        self.cache.remove_up_to(up_to_exclusive)
167    }
168
169    /// History-Cache (read-only).
170    #[must_use]
171    pub fn cache(&self) -> &HistoryCache {
172        &self.cache
173    }
174
175    /// Anzahl gesendeter HEARTBEATs.
176    #[must_use]
177    pub fn heartbeat_count(&self) -> i32 {
178        self.heartbeat_count
179    }
180
181    /// Anzahl empfangener NACK_FRAGs.
182    #[must_use]
183    pub fn nackfrag_count(&self) -> i32 {
184        self.nackfrag_count
185    }
186
187    /// Anzahl ACKNACK/NACK_FRAG-Nachrichten von **unbekannten** Quellen
188    /// seit Writer-Start. Typische Ursachen: Misrouting auf Multicast,
189    /// veraltete Proxies nach `remove_reader_proxy`, GUID-Spoofing.
190    #[must_use]
191    pub fn unknown_src_count(&self) -> u64 {
192        self.unknown_src_count
193    }
194
195    /// Aktuelle Fragment-Size-Konfiguration.
196    #[must_use]
197    pub fn fragment_size(&self) -> u32 {
198        self.fragment_size
199    }
200
201    /// True wenn ein Payload dieser Groesse fragmentiert wird
202    /// (Payload-Laenge > `fragment_size`).
203    #[must_use]
204    fn needs_fragmentation(&self, payload: &[u8]) -> bool {
205        u32::try_from(payload.len()).unwrap_or(u32::MAX) > self.fragment_size && !payload.is_empty()
206    }
207
208    /// Prueft, ob alle Reader-Proxies den aktuell hoechsten Sample-SN
209    /// im Cache bereits acknowledged haben. Liefert `true` auch wenn
210    /// der Cache leer ist oder keine Proxies existieren (nichts zu
211    /// bestaetigen).
212    ///
213    /// Spec-Basis fuer `DataWriter::wait_for_acknowledgments`
214    /// (OMG DDS 1.4 §2.2.2.4.2.22).
215    #[must_use]
216    pub fn all_samples_acknowledged(&self) -> bool {
217        let Some(max_sn) = self.cache.max_sn() else {
218            return true;
219        };
220        self.reader_proxies
221            .iter()
222            .all(|p| p.highest_acked_sn() >= max_sn)
223    }
224
225    /// Fuegt einen Reader-Proxy hinzu. Idempotent: wenn ein Proxy mit
226    /// gleichem `remote_reader_guid` existiert, wird er ersetzt.
227    ///
228    /// Setzt `last_heartbeat = None`, damit der naechste `tick()` sofort
229    /// einen Heartbeat an **alle** Proxies (inkl. den neuen) emittiert.
230    /// RTPS §8.4.15.4: frisch hinzugefuegter ReaderProxy muss Gelegenheit
231    /// zu AckNack bekommen, sonst wartet er bis zur naechsten periodischen
232    /// Heartbeat-Runde (Default 1 s) — und bei spaet-gewireten Proxies
233    /// (nach Cache-Inserts) ist das die einzige Moeglichkeit, die frueh
234    /// eingefuegten Samples nachzuliefern, da `write_sample_with_datagrams`
235    /// nur direkt sendet, wenn der Proxy synchron ist.
236    pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
237        let guid = proxy.remote_reader_guid;
238        if let Some(idx) = self
239            .reader_proxies
240            .iter()
241            .position(|p| p.remote_reader_guid == guid)
242        {
243            self.reader_proxies[idx] = proxy;
244        } else {
245            self.reader_proxies.push(proxy);
246        }
247        // Zwinge naechsten tick zu HB — neuer Proxy braucht ihn fuer
248        // AckNack-getriebenen Catch-up.
249        self.last_heartbeat = None;
250    }
251
252    /// Entfernt den Proxy mit gegebener GUID.
253    pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
254        let idx = self
255            .reader_proxies
256            .iter()
257            .position(|p| p.remote_reader_guid == guid)?;
258        Some(self.reader_proxies.remove(idx))
259    }
260
261    // ---------- Write ----------
262
263    /// Schreibt einen neuen Sample und fan-outet an alle Proxies.
264    ///
265    /// Pro Proxy entsteht (aggregiert):
266    /// - 1 DATA-Datagram wenn `payload.len() <= fragment_size`
267    /// - N DATA_FRAG-Datagramme (je Fragment 1 Datagram, kein Mix)
268    ///
269    /// # Errors
270    /// SN-Overflow, Cache voll, Body zu gross.
271    pub fn write(&mut self, payload: &[u8]) -> Result<Vec<OutboundDatagram>, WireError> {
272        let sn_value = self
273            .next_sn
274            .checked_add(1)
275            .ok_or(WireError::ValueOutOfRange {
276                message: "sequence number overflow",
277            })?;
278        self.next_sn = sn_value;
279        let sn = SequenceNumber(sn_value);
280        // Slice → Arc allokiert genau einmal pro Sample; danach nur
281        // noch Arc-clone (refcount-increment) im Cache + pro Datagram
282        // (Perf-Audit F7/F8). caller darf einen
283        // PoolBuffer<SMALL> reinreichen, damit das Pre-Framing
284        // heap-frei laeuft.
285        let payload: alloc::sync::Arc<[u8]> = alloc::sync::Arc::from(payload);
286        self.cache
287            .insert(CacheChange::alive_arc(
288                sn,
289                alloc::sync::Arc::clone(&payload),
290            ))
291            .map_err(|_| WireError::ValueOutOfRange {
292                message: "history cache full or duplicate",
293            })?;
294
295        let mut out = Vec::new();
296        for idx in 0..self.reader_proxies.len() {
297            // `next_unsent_change(cache_max)` rueckt den Proxy um genau
298            // eine SN vor. Zwei Faelle:
299            //
300            // * Proxy war synchron (`highest_sent_sn == sn - 1`): advance
301            //   liefert `Some(sn)`, wir koennen das Sample direkt an den
302            //   Peer schicken — spart eine Heartbeat-Runde.
303            //
304            // * Proxy ist lag (spaet via SEDP gewired, Cache hatte schon
305            //   aeltere Samples): advance liefert eine aeltere SN. Dann
306            //   waere es falsch, das *neue* Payload mit dem *neuen* SN
307            //   direkt zu schicken — der Proxy denkt, erst eine fruehe SN
308            //   sei raus, waehrend der Reader die neue sieht. Stattdessen
309            //   lassen wir `tick()` die Luecke via Heartbeat + AckNack-
310            //   gesteuertem Resend aufloesen (Standard-Reliable-Pfad).
311            let advanced = self.reader_proxies[idx].next_unsent_change(sn);
312            if advanced != Some(sn) {
313                continue;
314            }
315            let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
316            let targets = self.targets_for(idx);
317            // D.5g — Per-Peer Wire-Format. Payload-bytes 0..4 sind
318            // der Encap-Header (RTPS 2.5 §10.5). Wenn dieser
319            // Reader-Proxy XCDR1 ausgehandelt hat (legacy peer),
320            // klonen wir das Payload und ueberschreiben byte 1 von
321            // `0x07` (PLAIN_CDR2_LE) auf `0x01` (CDR_LE).
322            // Body-Bytes (offset 4..) bleiben fuer @final-Primitive-
323            // Structs bit-identisch zwischen XCDR1/XCDR2-LE.
324            let proxy_payload = self.adapt_payload_for_proxy(idx, &payload);
325            out.extend(self.build_sample_datagrams(sn, &proxy_payload, reader_id, &targets)?);
326        }
327        Ok(out)
328    }
329
330    /// D.5g — Bereitet die Payload-Bytes fuer einen bestimmten Reader-
331    /// Proxy auf. Wenn das ausgehandelte Wire-Format vom Cache-Default
332    /// abweicht, klont es das Payload und ueberschreibt byte 1 des
333    /// Encap-Headers entsprechend.
334    fn adapt_payload_for_proxy(
335        &self,
336        idx: usize,
337        payload: &alloc::sync::Arc<[u8]>,
338    ) -> alloc::sync::Arc<[u8]> {
339        let negotiated = self.reader_proxies[idx].negotiated_data_representation();
340        // Cache-default Encap (vom Caller gesetzt) ist XCDR2 = 0x07.
341        // Bei legacy-Reader (XCDR1) overriden wir byte 1.
342        if negotiated == crate::publication_data::data_representation::XCDR2 || payload.len() < 4 {
343            return alloc::sync::Arc::clone(payload);
344        }
345        let target_byte = match negotiated {
346            crate::publication_data::data_representation::XCDR => 0x01_u8,
347            _ => return alloc::sync::Arc::clone(payload),
348        };
349        if payload[1] == target_byte {
350            return alloc::sync::Arc::clone(payload);
351        }
352        let mut adapted: alloc::vec::Vec<u8> = payload.to_vec();
353        adapted[1] = target_byte;
354        alloc::sync::Arc::from(adapted.into_boxed_slice())
355    }
356
357    /// D.5e Phase-2: Write + piggyback HEARTBEAT in einer Operation.
358    ///
359    /// Cyclone DDS und FastDDS senden bei jedem `write()` zusaetzlich
360    /// einen HEARTBEAT, damit der Reader sofort einen ACKNACK triggern
361    /// kann. Ohne piggyback muss der Reader bis zum naechsten periodic-
362    /// HB (default 100 ms) warten — das wird bei 1-in-flight-Roundtrip
363    /// zum Latenz-Floor.
364    ///
365    /// Diese Methode ist ein Superset von [`Self::write`]: emittiert
366    /// alle DATA-Datagrams und haengt pro matched Reader-Proxy ein
367    /// HEARTBEAT-Datagram an. `last_heartbeat = now` wird gesetzt damit
368    /// `tick()` nicht doppelt feuert.
369    ///
370    /// # Errors
371    /// Wire-Encode-Fehler.
372    pub fn write_with_heartbeat(
373        &mut self,
374        payload: &[u8],
375        now: Duration,
376    ) -> Result<Vec<OutboundDatagram>, WireError> {
377        let mut out = self.write(payload)?;
378        if self.cache.is_empty() {
379            return Ok(out);
380        }
381        // Piggyback HEARTBEAT pro matched Reader-Proxy. final_flag=false
382        // damit der Reader den HB als "respond please"-Pulse interpretiert
383        // (RTPS 2.5 §8.4.15.5).
384        let cache_min = self.cache.min_sn().unwrap_or(SequenceNumber(1));
385        for idx in 0..self.reader_proxies.len() {
386            let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
387            let targets = self.targets_for(idx);
388            let mut builder =
389                MessageBuilder::open(self.rtps_header(), Rc::clone(&targets), self.mtu);
390            self.append_heartbeat(
391                &mut builder,
392                reader_id,
393                cache_min,
394                /*final_flag=*/ false,
395                &mut out,
396                &targets,
397            )?;
398            if let Some(dg) = builder.finish() {
399                out.push(dg);
400            }
401        }
402        self.last_heartbeat = Some(now);
403        Ok(out)
404    }
405
406    // ---------- Tick ----------
407
408    /// Tick-Event: HEARTBEATs + Resends + NACK_FRAG-Responses, aggregiert.
409    ///
410    /// # Errors
411    /// Wire-Encode-Fehler.
412    pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
413        let should_heartbeat = match self.last_heartbeat {
414            None => true,
415            Some(last) => now.saturating_sub(last) >= self.heartbeat_period,
416        };
417        let emit_hb = should_heartbeat && !self.cache.is_empty();
418
419        let mut out = Vec::new();
420        let mut hb_emitted_any = false;
421
422        for idx in 0..self.reader_proxies.len() {
423            let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
424            let targets = self.targets_for(idx);
425
426            // 1) Fragment-Resends (aus NACK_FRAG) — je 1 Datagramm pro Fragment
427            while let Some((sn, frag)) = self.reader_proxies[idx].next_requested_fragment() {
428                match self.cache.get(sn) {
429                    Some(change) => {
430                        let payload = change.payload.clone();
431                        #[cfg(feature = "metrics")]
432                        crate::metrics::inc_retransmit();
433                        #[cfg(feature = "metrics")]
434                        crate::metrics::inc_fragmented_sample();
435                        out.push(
436                            self.build_data_frag_datagram(sn, frag, &payload, reader_id, &targets)?,
437                        );
438                    }
439                    None => {
440                        out.push(self.build_gap_datagram(sn, reader_id, &targets)?);
441                    }
442                }
443            }
444
445            // 2) Aggregiertes Datagramm fuer ganze-SN-Resends + optional HB
446            let mut builder =
447                MessageBuilder::open(self.rtps_header(), Rc::clone(&targets), self.mtu);
448
449            while let Some(sn) = self.reader_proxies[idx].next_requested_change() {
450                #[cfg(feature = "metrics")]
451                crate::metrics::inc_retransmit();
452                match self.cache.get(sn) {
453                    Some(change) => {
454                        let payload = change.payload.clone();
455                        // Wenn Fragmentation noetig → eigene Datagramme, Builder flushen falls noetig.
456                        if self.needs_fragmentation(&payload) {
457                            if let Some(dg) = builder.finish() {
458                                out.push(dg);
459                            }
460                            builder = MessageBuilder::open(
461                                self.rtps_header(),
462                                Rc::clone(&targets),
463                                self.mtu,
464                            );
465                            out.extend(
466                                self.build_sample_datagrams(sn, &payload, reader_id, &targets)?,
467                            );
468                        } else {
469                            self.append_data(
470                                &mut builder,
471                                sn,
472                                &payload,
473                                reader_id,
474                                &mut out,
475                                &targets,
476                            )?;
477                        }
478                    }
479                    None => {
480                        self.append_gap(&mut builder, sn, reader_id, &mut out, &targets)?;
481                    }
482                }
483            }
484
485            // 3) Piggyback-HEARTBEAT am Ende (wenn faellig).
486            //
487            // `first_sn` ist per-Proxy: `max(cache.min_sn, proxy.highest_acked + 1)`.
488            // Das per-Proxy `highest_acked + 1` verhindert, dass Volatile-
489            // proxies (die via `skip_samples_up_to` ueber den cache-min
490            // hinaus vorgerueckt sind) den Reader auffordern, alte
491            // Samples nachzufordern. Spec §8.4.12.1: firstSN ist die
492            // "smallest sequence number considered relevant FOR THE READER".
493            //
494            // **FinalFlag (WP 1.E Stufe-A, §8.4.9.2.7):** Periodische
495            // HEARTBEATs MUESSEN `FinalFlag = NOT_SET` tragen, damit der
496            // Reader zur Antwort verpflichtet ist (Reliable-Liveness).
497            // Ein gesetztes Final-Bit wuerde dem Reader signalisieren
498            // "Du musst nichts tun" — was dazu fuehrt, dass nach Discovery
499            // bei voll-acknowledged Cache nie wieder ACKNACK kommt und
500            // der Writer in einen Zombie-State faellt. Daher hier hart
501            // `false`.
502            if emit_hb {
503                let cache_min = self.cache.min_sn().unwrap_or(SequenceNumber(1));
504                let per_proxy_first = SequenceNumber(
505                    self.reader_proxies[idx]
506                        .highest_acked_sn()
507                        .0
508                        .saturating_add(1),
509                );
510                let first_sn = cache_min.max(per_proxy_first);
511                self.append_heartbeat(
512                    &mut builder,
513                    reader_id,
514                    first_sn,
515                    /* final_flag */ false,
516                    &mut out,
517                    &targets,
518                )?;
519                hb_emitted_any = true;
520            }
521
522            if let Some(dg) = builder.finish() {
523                out.push(dg);
524            }
525        }
526
527        if hb_emitted_any {
528            self.last_heartbeat = Some(now);
529        }
530
531        Ok(out)
532    }
533
534    // ---------- Incoming Control ----------
535
536    /// Verarbeitet eine eingegangene ACKNACK von `src_guid`.
537    /// Unbekannter Sender → no-op.
538    pub fn handle_acknack(
539        &mut self,
540        src_guid: Guid,
541        base: SequenceNumber,
542        requested: impl IntoIterator<Item = SequenceNumber>,
543    ) {
544        #[cfg(feature = "metrics")]
545        crate::metrics::inc_acknack_received();
546        let Some(idx) = self
547            .reader_proxies
548            .iter()
549            .position(|p| p.remote_reader_guid == src_guid)
550        else {
551            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
552            return;
553        };
554        self.reader_proxies[idx].acked_changes_set(base);
555        self.reader_proxies[idx].requested_changes_set(requested);
556        // Cache-GC **entfernt** im Per-Destination-Queue-Modell (T3-
557        // Refactor): Der Cache wird nur noch durch HistoryKind::KeepLast
558        // getrimmt. Ein stalled Reader blockt damit nicht mehr die
559        // Pipeline — bei zu-alten Samples bekommt er GAP-Responses.
560    }
561
562    /// Verarbeitet ein eingegangenes NACK_FRAG von `src_guid`.
563    pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
564        if nf.writer_id != self.guid.entity_id {
565            return;
566        }
567        let Some(idx) = self
568            .reader_proxies
569            .iter()
570            .position(|p| p.remote_reader_guid == src_guid)
571        else {
572            self.unknown_src_count = self.unknown_src_count.saturating_add(1);
573            return;
574        };
575        self.nackfrag_count = self.nackfrag_count.wrapping_add(1);
576        let missing: Vec<FragmentNumber> = nf.fragment_number_state.iter_set().collect();
577        self.reader_proxies[idx].requested_fragments_set(nf.writer_sn, missing);
578    }
579
580    // ---------- Build-Helfer ----------
581
582    fn rtps_header(&self) -> RtpsHeader {
583        RtpsHeader::new(self.vendor_id, self.guid.prefix)
584    }
585
586    /// Ziel-Locator-Set fuer den Proxy `idx`. Multicast bevorzugt
587    /// (Netzwerk-Latenz und Bandbreite), Unicast als Fallback.
588    fn targets_for(&self, idx: usize) -> Rc<Vec<Locator>> {
589        let p = &self.reader_proxies[idx];
590        if !p.multicast_locators.is_empty() {
591            Rc::new(p.multicast_locators.clone())
592        } else {
593            Rc::new(p.unicast_locators.clone())
594        }
595    }
596
597    fn append_data(
598        &self,
599        builder: &mut MessageBuilder,
600        sn: SequenceNumber,
601        payload: &alloc::sync::Arc<[u8]>,
602        reader_id: EntityId,
603        out: &mut Vec<OutboundDatagram>,
604        targets: &Rc<Vec<Locator>>,
605    ) -> Result<(), WireError> {
606        let data = DataSubmessage {
607            extra_flags: 0,
608            reader_id,
609            writer_id: self.guid.entity_id,
610            writer_sn: sn,
611            // WP 2.0a: Arc::clone statt to_vec — Zero-Copy in den Wire-Pfad.
612            inline_qos: None,
613            key_flag: false,
614            non_standard_flag: false,
615            serialized_payload: alloc::sync::Arc::clone(payload),
616        };
617        let (body, flags) = data.write_body(true);
618        self.append_submessage(
619            builder,
620            SubmessageId::Data,
621            flags,
622            &body,
623            out,
624            targets,
625            "DATA",
626        )
627    }
628
629    fn append_gap(
630        &self,
631        builder: &mut MessageBuilder,
632        sn: SequenceNumber,
633        reader_id: EntityId,
634        out: &mut Vec<OutboundDatagram>,
635        targets: &Rc<Vec<Locator>>,
636    ) -> Result<(), WireError> {
637        let gap = GapSubmessage {
638            reader_id,
639            writer_id: self.guid.entity_id,
640            gap_start: sn,
641            gap_list: SequenceNumberSet {
642                bitmap_base: SequenceNumber(sn.0 + 1),
643                num_bits: 0,
644                bitmap: Vec::new(),
645            },
646            group_info: None,
647            filtered_count: None,
648        };
649        let (body, flags) = gap.write_body(true);
650        self.append_submessage(
651            builder,
652            SubmessageId::Gap,
653            flags,
654            &body,
655            out,
656            targets,
657            "GAP",
658        )
659    }
660
661    fn append_heartbeat(
662        &mut self,
663        builder: &mut MessageBuilder,
664        reader_id: EntityId,
665        first_sn: SequenceNumber,
666        final_flag: bool,
667        out: &mut Vec<OutboundDatagram>,
668        targets: &Rc<Vec<Locator>>,
669    ) -> Result<(), WireError> {
670        #[cfg(feature = "metrics")]
671        crate::metrics::inc_heartbeat_sent();
672        self.heartbeat_count = self.heartbeat_count.wrapping_add(1);
673        let last = self.cache.max_sn().unwrap_or(SequenceNumber(0));
674        let hb = HeartbeatSubmessage {
675            reader_id,
676            writer_id: self.guid.entity_id,
677            first_sn,
678            last_sn: last,
679            count: self.heartbeat_count,
680            final_flag,
681            liveliness_flag: false,
682            group_info: None,
683        };
684        let (body, flags) = hb.write_body(true);
685        self.append_submessage(
686            builder,
687            SubmessageId::Heartbeat,
688            flags,
689            &body,
690            out,
691            targets,
692            "HEARTBEAT",
693        )
694    }
695
696    /// Gemeinsamer Submessage-Append mit Overflow-Handling.
697    #[allow(clippy::too_many_arguments)]
698    fn append_submessage(
699        &self,
700        builder: &mut MessageBuilder,
701        id: SubmessageId,
702        flags: u8,
703        body: &[u8],
704        out: &mut Vec<OutboundDatagram>,
705        targets: &Rc<Vec<Locator>>,
706        kind_hint: &'static str,
707    ) -> Result<(), WireError> {
708        match builder.try_add_submessage(id, flags, body) {
709            Ok(()) => Ok(()),
710            Err(AddError::BodyTooLarge) => Err(WireError::ValueOutOfRange {
711                message: match kind_hint {
712                    "DATA" => "DATA body exceeds u16::MAX",
713                    "GAP" => "GAP body exceeds u16::MAX",
714                    "HEARTBEAT" => "HEARTBEAT body exceeds u16::MAX",
715                    _ => "submessage body exceeds u16::MAX",
716                },
717            }),
718            Err(AddError::WouldExceedMtu { .. }) => {
719                let finished = core::mem::replace(
720                    builder,
721                    MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu),
722                );
723                if let Some(dg) = finished.finish() {
724                    out.push(dg);
725                }
726                builder.try_add_submessage(id, flags, body).map_err(|_| {
727                    WireError::ValueOutOfRange {
728                        message: "submessage does not fit into fresh datagram",
729                    }
730                })
731            }
732        }
733    }
734
735    /// Erzeugt je 1 Datagramm pro Fragment (DATA) oder 1 DATA-Datagramm
736    /// (wenn unter `fragment_size`). Kein Aggregieren mit anderen DATAs.
737    fn build_sample_datagrams(
738        &self,
739        sn: SequenceNumber,
740        payload: &alloc::sync::Arc<[u8]>,
741        reader_id: EntityId,
742        targets: &Rc<Vec<Locator>>,
743    ) -> Result<Vec<OutboundDatagram>, WireError> {
744        if !self.needs_fragmentation(payload) {
745            return Ok(alloc::vec![
746                self.build_single_data_datagram(sn, payload, reader_id, targets,)?
747            ]);
748        }
749        let frag_size = self.fragment_size as usize;
750        let sample_size = u32::try_from(payload.len()).map_err(|_| WireError::ValueOutOfRange {
751            message: "sample size exceeds u32::MAX",
752        })?;
753        let frag_size_u16 = u16::try_from(frag_size).map_err(|_| WireError::ValueOutOfRange {
754            message: "fragment_size exceeds u16::MAX",
755        })?;
756        let mut out = Vec::new();
757        let mut frag_num: u32 = 1;
758        let mut pos = 0usize;
759        while pos < payload.len() {
760            let end = core::cmp::min(pos + frag_size, payload.len());
761            out.push(self.build_data_frag_submessage_datagram(
762                sn,
763                FragmentNumber(frag_num),
764                frag_size_u16,
765                sample_size,
766                &payload[pos..end],
767                reader_id,
768                targets,
769            )?);
770            pos = end;
771            frag_num = frag_num.checked_add(1).ok_or(WireError::ValueOutOfRange {
772                message: "fragment number overflow",
773            })?;
774        }
775        Ok(out)
776    }
777
778    fn build_single_data_datagram(
779        &self,
780        sn: SequenceNumber,
781        payload: &alloc::sync::Arc<[u8]>,
782        reader_id: EntityId,
783        targets: &Rc<Vec<Locator>>,
784    ) -> Result<OutboundDatagram, WireError> {
785        let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
786        let data = DataSubmessage {
787            extra_flags: 0,
788            reader_id,
789            writer_id: self.guid.entity_id,
790            writer_sn: sn,
791            // WP 2.0a: Arc::clone statt to_vec — Zero-Copy in den Wire-Pfad.
792            inline_qos: None,
793            key_flag: false,
794            non_standard_flag: false,
795            serialized_payload: alloc::sync::Arc::clone(payload),
796        };
797        let (body, flags) = data.write_body(true);
798        builder
799            .try_add_submessage(SubmessageId::Data, flags, &body)
800            .map_err(|_| WireError::ValueOutOfRange {
801                message: "DATA submessage does not fit into MTU",
802            })?;
803        builder.finish().ok_or(WireError::ValueOutOfRange {
804            message: "MessageBuilder finish returned no datagram",
805        })
806    }
807
808    /// Spec §9.6.3.9 PID_STATUS_INFO Lifecycle-Sample: DATA mit
809    /// `key_flag=true` + Inline-QoS [PID_KEY_HASH + PID_STATUS_INFO].
810    /// Payload bleibt leer (Spec erlaubt das, der Reader rekonstruiert
811    /// die Instanz aus dem Key-Hash). Wird vom DCPS-Layer beim
812    /// `dispose`/`unregister_instance` aufgerufen.
813    fn build_lifecycle_datagram(
814        &self,
815        sn: SequenceNumber,
816        key_hash: [u8; 16],
817        status_bits: u32,
818        reader_id: EntityId,
819        targets: &Rc<Vec<Locator>>,
820    ) -> Result<OutboundDatagram, WireError> {
821        let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
822        let inline_qos = crate::inline_qos::lifecycle_inline_qos(key_hash, status_bits);
823        let data = DataSubmessage {
824            extra_flags: 0,
825            reader_id,
826            writer_id: self.guid.entity_id,
827            writer_sn: sn,
828            inline_qos: Some(inline_qos),
829            key_flag: true,
830            non_standard_flag: false,
831            serialized_payload: alloc::sync::Arc::from(alloc::vec::Vec::new()),
832        };
833        let (body, flags) = data.write_body(true);
834        builder
835            .try_add_submessage(SubmessageId::Data, flags, &body)
836            .map_err(|_| WireError::ValueOutOfRange {
837                message: "lifecycle DATA submessage does not fit into MTU",
838            })?;
839        builder.finish().ok_or(WireError::ValueOutOfRange {
840            message: "MessageBuilder finish returned no datagram",
841        })
842    }
843
844    /// Sendet einen Lifecycle-Marker (dispose/unregister) an alle matched
845    /// Reader. Allokiert eine neue Sequence-Number, persistiert eine
846    /// `CacheChange` mit dem entsprechenden ChangeKind und baut pro
847    /// Reader-Proxy eine DATA mit Key-Hash + StatusInfo.
848    ///
849    /// `status_bits` ist die ODER-Verknuepfung der gewuenschten Bits aus
850    /// [`crate::inline_qos::status_info`]:
851    /// - DISPOSED: NotAliveDisposed
852    /// - UNREGISTERED: NotAliveUnregistered
853    /// - DISPOSED | UNREGISTERED: NotAliveDisposedUnregistered
854    ///
855    /// # Errors
856    /// Wire-Encode-Fehler oder Sequence-Number-Overflow.
857    pub fn write_lifecycle(
858        &mut self,
859        key_hash: [u8; 16],
860        status_bits: u32,
861    ) -> Result<Vec<OutboundDatagram>, WireError> {
862        let sn_value = self
863            .next_sn
864            .checked_add(1)
865            .ok_or(WireError::ValueOutOfRange {
866                message: "sequence number overflow",
867            })?;
868        self.next_sn = sn_value;
869        let sn = SequenceNumber(sn_value);
870
871        let kind = match (
872            status_bits & crate::inline_qos::status_info::DISPOSED != 0,
873            status_bits & crate::inline_qos::status_info::UNREGISTERED != 0,
874        ) {
875            (true, true) => crate::history_cache::ChangeKind::NotAliveDisposedUnregistered,
876            (true, false) => crate::history_cache::ChangeKind::NotAliveDisposed,
877            (false, true) => crate::history_cache::ChangeKind::NotAliveUnregistered,
878            (false, false) => {
879                return Err(WireError::ValueOutOfRange {
880                    message: "lifecycle send requires DISPOSED or UNREGISTERED bit",
881                });
882            }
883        };
884
885        // CacheChange persistieren — Late-Joiner-Replay (T9) liest darauf
886        // auf, und das History-Cache-Bookkeeping bleibt konsistent.
887        self.cache
888            .insert(crate::history_cache::CacheChange::lifecycle(
889                sn,
890                key_hash.to_vec(),
891                kind,
892            ))
893            .map_err(|_| WireError::ValueOutOfRange {
894                message: "history cache full or duplicate (lifecycle)",
895            })?;
896
897        let mut out = Vec::new();
898        for idx in 0..self.reader_proxies.len() {
899            let advanced = self.reader_proxies[idx].next_unsent_change(sn);
900            if advanced != Some(sn) {
901                continue;
902            }
903            let reader_id = self.reader_proxies[idx].remote_reader_guid.entity_id;
904            let targets = self.targets_for(idx);
905            out.push(self.build_lifecycle_datagram(
906                sn,
907                key_hash,
908                status_bits,
909                reader_id,
910                &targets,
911            )?);
912        }
913        Ok(out)
914    }
915
916    fn build_data_frag_datagram(
917        &self,
918        sn: SequenceNumber,
919        frag: FragmentNumber,
920        full_payload: &alloc::sync::Arc<[u8]>,
921        reader_id: EntityId,
922        targets: &Rc<Vec<Locator>>,
923    ) -> Result<OutboundDatagram, WireError> {
924        let frag_size = self.fragment_size as usize;
925        if frag.0 == 0 {
926            return Err(WireError::ValueOutOfRange {
927                message: "fragment number must be >= 1",
928            });
929        }
930        let start = (frag.0 as usize - 1) * frag_size;
931        if start >= full_payload.len() {
932            return Err(WireError::ValueOutOfRange {
933                message: "fragment number beyond sample",
934            });
935        }
936        let end = core::cmp::min(start + frag_size, full_payload.len());
937        let sample_size =
938            u32::try_from(full_payload.len()).map_err(|_| WireError::ValueOutOfRange {
939                message: "sample size exceeds u32::MAX",
940            })?;
941        let frag_size_u16 = u16::try_from(frag_size).map_err(|_| WireError::ValueOutOfRange {
942            message: "fragment_size exceeds u16::MAX",
943        })?;
944        self.build_data_frag_submessage_datagram(
945            sn,
946            frag,
947            frag_size_u16,
948            sample_size,
949            &full_payload[start..end],
950            reader_id,
951            targets,
952        )
953    }
954
955    #[allow(clippy::too_many_arguments)]
956    fn build_data_frag_submessage_datagram(
957        &self,
958        sn: SequenceNumber,
959        frag: FragmentNumber,
960        fragment_size: u16,
961        sample_size: u32,
962        chunk: &[u8],
963        reader_id: EntityId,
964        targets: &Rc<Vec<Locator>>,
965    ) -> Result<OutboundDatagram, WireError> {
966        let df = DataFragSubmessage {
967            extra_flags: 0,
968            reader_id,
969            writer_id: self.guid.entity_id,
970            writer_sn: sn,
971            fragment_starting_num: frag,
972            fragments_in_submessage: 1,
973            fragment_size,
974            sample_size,
975            // WP 2.0a: chunk ist ein Sub-Slice des vollen Arc-Payloads.
976            //
977            // **Zero-Copy-Scope-Claim:** der
978            // `Arc::from(chunk)` hier ALLOZIERT einen neuen Refcount-
979            // Block und kopiert die chunk-Bytes. Das ist **nicht**
980            // Zero-Copy. Der WP-2.0a-Claim „3-7 % Gewinn" bezieht
981            // sich ausschliesslich auf den unfragmentierten
982            // DATA-Pfad (`build_single_data_datagram`), wo
983            // `Arc::clone` den vollen Payload teilt. Fragmentation-
984            // Pfade bleiben copy-per-Chunk bis WP 2.0a-2 (iovec)
985            // die Submessage-Builder-Seite eliminiert.
986            serialized_payload: alloc::sync::Arc::from(chunk),
987            inline_qos_flag: false,
988            hash_key_flag: false,
989            key_flag: false,
990            non_standard_flag: false,
991        };
992        let (body, flags) = df.write_body(true);
993        let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
994        builder
995            .try_add_submessage(SubmessageId::DataFrag, flags, &body)
996            .map_err(|_| WireError::ValueOutOfRange {
997                message: "DATA_FRAG submessage does not fit into MTU",
998            })?;
999        builder.finish().ok_or(WireError::ValueOutOfRange {
1000            message: "MessageBuilder finish returned no datagram",
1001        })
1002    }
1003
1004    fn build_gap_datagram(
1005        &self,
1006        sn: SequenceNumber,
1007        reader_id: EntityId,
1008        targets: &Rc<Vec<Locator>>,
1009    ) -> Result<OutboundDatagram, WireError> {
1010        let gap = GapSubmessage {
1011            reader_id,
1012            writer_id: self.guid.entity_id,
1013            gap_start: sn,
1014            gap_list: SequenceNumberSet {
1015                bitmap_base: SequenceNumber(sn.0 + 1),
1016                num_bits: 0,
1017                bitmap: Vec::new(),
1018            },
1019            group_info: None,
1020            filtered_count: None,
1021        };
1022        let (body, flags) = gap.write_body(true);
1023        let mut builder = MessageBuilder::open(self.rtps_header(), Rc::clone(targets), self.mtu);
1024        builder
1025            .try_add_submessage(SubmessageId::Gap, flags, &body)
1026            .map_err(|_| WireError::ValueOutOfRange {
1027                message: "GAP submessage does not fit into MTU",
1028            })?;
1029        builder.finish().ok_or(WireError::ValueOutOfRange {
1030            message: "MessageBuilder finish returned no datagram",
1031        })
1032    }
1033}
1034
1035#[cfg(test)]
1036#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
1037mod tests {
1038    use super::*;
1039    use crate::datagram::{ParsedSubmessage, decode_datagram};
1040    use crate::message_builder::DEFAULT_MTU;
1041    use crate::wire_types::{GuidPrefix, Locator};
1042
1043    fn sn(n: i64) -> SequenceNumber {
1044        SequenceNumber(n)
1045    }
1046
1047    fn reader_guid() -> Guid {
1048        Guid::new(
1049            GuidPrefix::from_bytes([2; 12]),
1050            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
1051        )
1052    }
1053
1054    fn make_writer(max_samples: usize, hb_period: Duration) -> ReliableWriter {
1055        make_writer_with_frag_size(max_samples, hb_period, DEFAULT_FRAGMENT_SIZE)
1056    }
1057
1058    fn make_writer_with_frag_size(
1059        max_samples: usize,
1060        hb_period: Duration,
1061        fragment_size: u32,
1062    ) -> ReliableWriter {
1063        let writer_guid = Guid::new(
1064            GuidPrefix::from_bytes([1; 12]),
1065            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1066        );
1067        let reader_proxy = ReaderProxy::new(
1068            reader_guid(),
1069            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1070            alloc::vec![],
1071            true,
1072        );
1073        ReliableWriter::new(ReliableWriterConfig {
1074            guid: writer_guid,
1075            vendor_id: VendorId::ZERODDS,
1076            reader_proxies: alloc::vec![reader_proxy],
1077            max_samples,
1078            history_kind: HistoryKind::KeepAll,
1079            heartbeat_period: hb_period,
1080            fragment_size,
1081            mtu: DEFAULT_MTU,
1082        })
1083    }
1084
1085    fn first_proxy(w: &ReliableWriter) -> &ReaderProxy {
1086        w.reader_proxies().first().unwrap()
1087    }
1088
1089    #[test]
1090    fn write_increments_sn_and_returns_data_datagram() {
1091        let mut w = make_writer(10, Duration::from_secs(1));
1092        let d1 = w.write(&alloc::vec![0xAA]).expect("write1");
1093        let d2 = w.write(&alloc::vec![0xBB]).expect("write2");
1094        assert_eq!(d1.len(), 1);
1095        assert_eq!(d2.len(), 1);
1096        let p1 = decode_datagram(&d1[0].bytes).unwrap();
1097        let p2 = decode_datagram(&d2[0].bytes).unwrap();
1098        match (&p1.submessages[0], &p2.submessages[0]) {
1099            (ParsedSubmessage::Data(a), ParsedSubmessage::Data(b)) => {
1100                assert_eq!(a.writer_sn, sn(1));
1101                assert_eq!(b.writer_sn, sn(2));
1102            }
1103            _ => panic!("expected DATA submessages"),
1104        }
1105        assert_eq!(w.cache().len(), 2);
1106    }
1107
1108    #[test]
1109    fn tick_emits_heartbeat_after_period() {
1110        let mut w = make_writer(10, Duration::from_millis(500));
1111        w.write(&alloc::vec![0xAA]).unwrap();
1112        let out = w.tick(Duration::from_millis(10)).unwrap();
1113        assert_eq!(out.len(), 1);
1114        let parsed = decode_datagram(&out[0].bytes).expect("decode hb");
1115        assert!(
1116            parsed
1117                .submessages
1118                .iter()
1119                .any(|s| matches!(s, ParsedSubmessage::Heartbeat(_)))
1120        );
1121        assert!(w.tick(Duration::from_millis(200)).unwrap().is_empty());
1122        let out2 = w.tick(Duration::from_millis(600)).unwrap();
1123        assert_eq!(out2.len(), 1);
1124    }
1125
1126    #[test]
1127    fn tick_skips_heartbeat_when_cache_empty() {
1128        let mut w = make_writer(10, Duration::from_millis(100));
1129        assert!(w.tick(Duration::from_secs(10)).unwrap().is_empty());
1130    }
1131
1132    #[test]
1133    fn handle_acknack_updates_proxy_state() {
1134        let mut w = make_writer(10, Duration::from_secs(10));
1135        let rguid = reader_guid();
1136        for i in 1..=3 {
1137            w.write(&alloc::vec![i as u8]).unwrap();
1138        }
1139        w.handle_acknack(rguid, sn(4), [sn(2)]);
1140        // Per-destination-queue-Modell: Cache bleibt voll (KeepAll),
1141        // GC passiert nur via History-QoS. ACKNACK-State wird aber
1142        // am Proxy korrekt getrackt.
1143        assert_eq!(w.cache().len(), 3, "cache intact under KeepAll");
1144        assert_eq!(first_proxy(&w).highest_acked_sn(), sn(3));
1145        // sn(2) war acked durch base=4 → gar nicht erst als requested
1146        // gemerkt
1147        assert_eq!(first_proxy(&w).pending_requested_count(), 0);
1148    }
1149
1150    #[test]
1151    fn handle_acknack_with_lower_base_leaves_requested() {
1152        let mut w = make_writer(10, Duration::from_secs(10));
1153        let rguid = reader_guid();
1154        for i in 1..=3 {
1155            w.write(&alloc::vec![i as u8]).unwrap();
1156        }
1157        w.handle_acknack(rguid, sn(2), [sn(2), sn(3)]);
1158        // Cache voll unter KeepAll.
1159        assert_eq!(w.cache().len(), 3);
1160        assert_eq!(first_proxy(&w).highest_acked_sn(), sn(1));
1161        assert_eq!(first_proxy(&w).pending_requested_count(), 2);
1162    }
1163
1164    #[test]
1165    fn keep_last_evicts_oldest_on_overflow() {
1166        let writer_guid = Guid::new(
1167            GuidPrefix::from_bytes([1; 12]),
1168            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1169        );
1170        let reader_proxy = ReaderProxy::new(
1171            reader_guid(),
1172            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1173            alloc::vec![],
1174            true,
1175        );
1176        let mut w = ReliableWriter::new(ReliableWriterConfig {
1177            guid: writer_guid,
1178            vendor_id: VendorId::ZERODDS,
1179            reader_proxies: alloc::vec![reader_proxy],
1180            max_samples: 3,
1181            history_kind: HistoryKind::KeepLast { depth: 3 },
1182            heartbeat_period: Duration::from_secs(10),
1183            fragment_size: DEFAULT_FRAGMENT_SIZE,
1184            mtu: DEFAULT_MTU,
1185        });
1186        for i in 1..=5 {
1187            w.write(&alloc::vec![i as u8])
1188                .expect("keep_last never fails");
1189        }
1190        // Cache haelt nur die letzten 3 (SN 3, 4, 5)
1191        assert_eq!(w.cache().len(), 3);
1192        assert_eq!(w.cache().min_sn(), Some(sn(3)));
1193        assert_eq!(w.cache().max_sn(), Some(sn(5)));
1194        assert_eq!(w.cache().evicted_count(), 2);
1195    }
1196
1197    #[test]
1198    fn keep_last_stalled_reader_does_not_block_fresh_writes() {
1199        // Scenario: zwei Proxies, einer "stalled" (nie acked),
1200        // der andere aktiv. Unter KeepLast schreibt der Writer
1201        // weiter, der stalled Reader bekommt spaeter GAPs.
1202        let writer_guid = Guid::new(
1203            GuidPrefix::from_bytes([1; 12]),
1204            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
1205        );
1206        let stalled = ReaderProxy::new(
1207            Guid::new(
1208                GuidPrefix::from_bytes([9; 12]),
1209                EntityId::user_reader_with_key([0xDE, 0xAD, 0x00]),
1210            ),
1211            alloc::vec![Locator::udp_v4([127, 0, 0, 99], 9999)],
1212            alloc::vec![],
1213            true,
1214        );
1215        let active = ReaderProxy::new(
1216            reader_guid(),
1217            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7410)],
1218            alloc::vec![],
1219            true,
1220        );
1221        let mut w = ReliableWriter::new(ReliableWriterConfig {
1222            guid: writer_guid,
1223            vendor_id: VendorId::ZERODDS,
1224            reader_proxies: alloc::vec![stalled, active],
1225            max_samples: 3,
1226            history_kind: HistoryKind::KeepLast { depth: 3 },
1227            heartbeat_period: Duration::from_secs(10),
1228            fragment_size: DEFAULT_FRAGMENT_SIZE,
1229            mtu: DEFAULT_MTU,
1230        });
1231        // 10 samples — stalled nie acked, aber write schlaegt nicht fehl
1232        for i in 1..=10 {
1233            w.write(&alloc::vec![i as u8]).expect("never blocks");
1234        }
1235        assert_eq!(w.cache().len(), 3);
1236        assert_eq!(w.cache().min_sn(), Some(sn(8)));
1237        // Aktiver Reader fragt spaeter sn(2) an → ist evicted, bekommt GAP
1238        w.handle_acknack(reader_guid(), sn(1), [sn(2)]);
1239        let out = w.tick(Duration::ZERO).unwrap();
1240        let has_gap = out.iter().any(|d| {
1241            decode_datagram(&d.bytes)
1242                .unwrap()
1243                .submessages
1244                .iter()
1245                .any(|s| matches!(s, ParsedSubmessage::Gap(_)))
1246        });
1247        assert!(has_gap, "evicted SN must elicit GAP");
1248    }
1249
1250    #[test]
1251    fn handle_acknack_unknown_source_counts_but_noops() {
1252        let mut w = make_writer(10, Duration::from_secs(10));
1253        w.write(&alloc::vec![1]).unwrap();
1254        let foreign = Guid::new(
1255            GuidPrefix::from_bytes([0xFF; 12]),
1256            EntityId::user_reader_with_key([0xFF, 0xFF, 0xFF]),
1257        );
1258        w.handle_acknack(foreign, sn(5), [sn(2)]);
1259        assert_eq!(w.cache().len(), 1, "cache untouched");
1260        assert_eq!(first_proxy(&w).pending_requested_count(), 0);
1261        assert_eq!(w.unknown_src_count(), 1, "unknown source counted");
1262    }
1263
1264    #[test]
1265    fn handle_nackfrag_unknown_source_counts() {
1266        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1267        let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1268        let foreign = Guid::new(
1269            GuidPrefix::from_bytes([0xFF; 12]),
1270            EntityId::user_reader_with_key([0xFF, 0xFF, 0xFF]),
1271        );
1272        let nf = NackFragSubmessage {
1273            reader_id: foreign.entity_id,
1274            writer_id: w.guid.entity_id,
1275            writer_sn: sn(1),
1276            fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1277                FragmentNumber(1),
1278                &[FragmentNumber(2)],
1279            ),
1280            count: 1,
1281        };
1282        w.handle_nackfrag(foreign, &nf);
1283        assert_eq!(w.nackfrag_count(), 0, "not counted as legit nackfrag");
1284        assert_eq!(w.unknown_src_count(), 1);
1285    }
1286
1287    #[test]
1288    fn tick_resends_requested_as_data_aggregated_with_hb() {
1289        let mut w = make_writer(10, Duration::from_secs(10));
1290        let rguid = reader_guid();
1291        for i in 1..=3 {
1292            w.write(&alloc::vec![i as u8]).unwrap();
1293        }
1294        w.handle_acknack(rguid, sn(1), [sn(2)]);
1295        let out = w.tick(Duration::ZERO).unwrap();
1296        // Ein aggregiertes Datagramm: DATA-Resend + HEARTBEAT im gleichen
1297        let parsed = decode_datagram(&out[0].bytes).unwrap();
1298        let has_data_2 = parsed
1299            .submessages
1300            .iter()
1301            .any(|s| matches!(s, ParsedSubmessage::Data(d) if d.writer_sn == sn(2)));
1302        let has_hb = parsed
1303            .submessages
1304            .iter()
1305            .any(|s| matches!(s, ParsedSubmessage::Heartbeat(_)));
1306        assert!(has_data_2, "DATA-Resend fuer sn(2)");
1307        assert!(has_hb, "Piggyback-HEARTBEAT im gleichen Datagramm");
1308    }
1309
1310    #[test]
1311    fn tick_resends_evicted_request_as_gap() {
1312        let mut w = make_writer(10, Duration::from_secs(10));
1313        let rguid = reader_guid();
1314        w.write(&alloc::vec![1]).unwrap();
1315        w.handle_acknack(rguid, sn(1), [sn(5)]);
1316        let out = w.tick(Duration::ZERO).unwrap();
1317        let has_gap = out.iter().any(|d| {
1318            decode_datagram(&d.bytes)
1319                .unwrap()
1320                .submessages
1321                .iter()
1322                .any(|s| matches!(s, ParsedSubmessage::Gap(_)))
1323        });
1324        assert!(has_gap);
1325    }
1326
1327    #[test]
1328    fn write_at_cache_capacity_is_error() {
1329        let mut w = make_writer(2, Duration::from_secs(10));
1330        w.write(&alloc::vec![1]).unwrap();
1331        w.write(&alloc::vec![2]).unwrap();
1332        assert!(w.write(&alloc::vec![3]).is_err());
1333    }
1334
1335    #[test]
1336    fn heartbeat_count_increments() {
1337        let mut w = make_writer(10, Duration::from_millis(100));
1338        w.write(&alloc::vec![1]).unwrap();
1339        assert_eq!(w.heartbeat_count(), 0);
1340        w.tick(Duration::ZERO).unwrap();
1341        assert_eq!(w.heartbeat_count(), 1);
1342        w.tick(Duration::from_millis(150)).unwrap();
1343        assert_eq!(w.heartbeat_count(), 2);
1344    }
1345
1346    #[test]
1347    fn heartbeat_count_wraps_around_at_i32_max_per_spec_8_4_15_7() {
1348        // Spec §8.4.15.7: counts MUST be wrap-around-tolerant
1349        // (modular arithmetic). i32 wraps wenn der Counter
1350        // i32::MAX erreicht.
1351        let mut w = make_writer(10, Duration::from_millis(100));
1352        w.write(&alloc::vec![1]).unwrap();
1353        // Manuell auf MAX setzen (kein Public-Setter; aber wir tracken
1354        // den Counter ueber `heartbeat_count.wrapping_add(1)` →
1355        // wrap-Verhalten ist garantiert per Code).
1356        // Teste die wrapping-Semantik direkt:
1357        let counter: i32 = i32::MAX;
1358        let next = counter.wrapping_add(1);
1359        assert_eq!(next, i32::MIN, "i32::MAX + 1 wraps to i32::MIN");
1360        let after_wrap = next.wrapping_add(1);
1361        assert_eq!(after_wrap, i32::MIN + 1);
1362    }
1363
1364    // ---------- Fragmentation ----------
1365
1366    #[test]
1367    fn write_under_fragment_size_produces_single_data() {
1368        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 10);
1369        let dgs = w.write(&alloc::vec![1, 2, 3, 4, 5]).unwrap();
1370        assert_eq!(dgs.len(), 1);
1371        let parsed = decode_datagram(&dgs[0].bytes).unwrap();
1372        assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
1373    }
1374
1375    #[test]
1376    fn write_above_fragment_size_produces_data_frag_split() {
1377        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1378        let payload: alloc::vec::Vec<u8> = (1..=10).collect();
1379        let dgs = w.write(&payload).unwrap();
1380        assert_eq!(dgs.len(), 3);
1381        for (i, dg) in dgs.iter().enumerate() {
1382            match &decode_datagram(&dg.bytes).unwrap().submessages[0] {
1383                ParsedSubmessage::DataFrag(df) => {
1384                    assert_eq!(df.fragment_starting_num.0, (i as u32) + 1);
1385                    assert_eq!(df.fragments_in_submessage, 1);
1386                    assert_eq!(df.fragment_size, 4);
1387                    assert_eq!(df.sample_size, 10);
1388                }
1389                other => panic!("expected DataFrag, got {other:?}"),
1390            }
1391        }
1392    }
1393
1394    #[test]
1395    fn handle_nackfrag_queues_fragment_resends() {
1396        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1397        let rguid = reader_guid();
1398        let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1399        let nf = NackFragSubmessage {
1400            reader_id: rguid.entity_id,
1401            writer_id: w.guid.entity_id,
1402            writer_sn: sn(1),
1403            fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1404                FragmentNumber(1),
1405                &[FragmentNumber(2), FragmentNumber(3)],
1406            ),
1407            count: 1,
1408        };
1409        w.handle_nackfrag(rguid, &nf);
1410        assert_eq!(w.nackfrag_count(), 1);
1411        assert_eq!(first_proxy(&w).pending_requested_fragment_count(), 2);
1412    }
1413
1414    #[test]
1415    fn tick_resends_requested_fragments() {
1416        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1417        let rguid = reader_guid();
1418        let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1419        let nf = NackFragSubmessage {
1420            reader_id: rguid.entity_id,
1421            writer_id: w.guid.entity_id,
1422            writer_sn: sn(1),
1423            fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1424                FragmentNumber(1),
1425                &[FragmentNumber(3)],
1426            ),
1427            count: 1,
1428        };
1429        w.handle_nackfrag(rguid, &nf);
1430        let out = w.tick(Duration::ZERO).unwrap();
1431        let frag_resends: alloc::vec::Vec<_> = out
1432            .iter()
1433            .filter(|d| {
1434                decode_datagram(&d.bytes)
1435                    .unwrap()
1436                    .submessages
1437                    .iter()
1438                    .any(|s| matches!(s, ParsedSubmessage::DataFrag(df) if df.fragment_starting_num == FragmentNumber(3)))
1439            })
1440            .collect();
1441        assert_eq!(frag_resends.len(), 1);
1442        assert_eq!(first_proxy(&w).pending_requested_fragment_count(), 0);
1443    }
1444
1445    #[test]
1446    fn acknack_resend_for_fragmented_sn_sends_all_fragments() {
1447        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1448        let rguid = reader_guid();
1449        let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1450        w.handle_acknack(rguid, sn(1), [sn(1)]);
1451        let out = w.tick(Duration::ZERO).unwrap();
1452        let frags: alloc::vec::Vec<_> = out
1453            .iter()
1454            .filter(|d| {
1455                decode_datagram(&d.bytes)
1456                    .unwrap()
1457                    .submessages
1458                    .iter()
1459                    .any(|s| matches!(s, ParsedSubmessage::DataFrag(_)))
1460            })
1461            .collect();
1462        assert_eq!(frags.len(), 3);
1463    }
1464
1465    #[test]
1466    fn heartbeat_carries_cache_range() {
1467        let mut w = make_writer(10, Duration::from_millis(100));
1468        w.write(&alloc::vec![1]).unwrap();
1469        w.write(&alloc::vec![2]).unwrap();
1470        w.write(&alloc::vec![3]).unwrap();
1471        let out = w.tick(Duration::ZERO).unwrap();
1472        let parsed = decode_datagram(&out[0].bytes).unwrap();
1473        let hb = parsed
1474            .submessages
1475            .iter()
1476            .find_map(|s| {
1477                if let ParsedSubmessage::Heartbeat(h) = s {
1478                    Some(h)
1479                } else {
1480                    None
1481                }
1482            })
1483            .expect("HB in output");
1484        assert_eq!(hb.first_sn, sn(1));
1485        assert_eq!(hb.last_sn, sn(3));
1486    }
1487
1488    // ---------- Multi-Reader (WP 1.4 T3b) ----------
1489
1490    #[test]
1491    fn write_fans_out_to_all_reader_proxies() {
1492        let mut w = make_writer(10, Duration::from_secs(10));
1493        let second = Guid::new(
1494            GuidPrefix::from_bytes([3; 12]),
1495            EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1496        );
1497        w.add_reader_proxy(ReaderProxy::new(
1498            second,
1499            alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1500            alloc::vec![],
1501            true,
1502        ));
1503        let dgs = w.write(&alloc::vec![0xAA]).unwrap();
1504        assert_eq!(dgs.len(), 2, "one datagram per reader-proxy");
1505        // Verschiedene Targets
1506        assert_ne!(dgs[0].targets, dgs[1].targets);
1507    }
1508
1509    #[test]
1510    fn add_reader_proxy_is_idempotent_on_same_guid() {
1511        let mut w = make_writer(10, Duration::from_secs(10));
1512        let rguid = reader_guid();
1513        let replacement = ReaderProxy::new(
1514            rguid,
1515            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 9999)],
1516            alloc::vec![],
1517            true,
1518        );
1519        w.add_reader_proxy(replacement);
1520        assert_eq!(w.reader_proxy_count(), 1);
1521        assert_eq!(
1522            w.reader_proxies()[0].unicast_locators,
1523            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 9999)]
1524        );
1525    }
1526
1527    #[test]
1528    fn remove_reader_proxy_by_guid() {
1529        let mut w = make_writer(10, Duration::from_secs(10));
1530        let rguid = reader_guid();
1531        let removed = w.remove_reader_proxy(rguid);
1532        assert!(removed.is_some());
1533        assert_eq!(w.reader_proxy_count(), 0);
1534        assert!(
1535            w.remove_reader_proxy(rguid).is_none(),
1536            "second remove is None"
1537        );
1538    }
1539
1540    #[test]
1541    fn acknack_dispatches_to_matching_proxy_only() {
1542        let mut w = make_writer(10, Duration::from_secs(10));
1543        let rguid1 = reader_guid();
1544        let rguid2 = Guid::new(
1545            GuidPrefix::from_bytes([3; 12]),
1546            EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1547        );
1548        w.add_reader_proxy(ReaderProxy::new(
1549            rguid2,
1550            alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1551            alloc::vec![],
1552            true,
1553        ));
1554        for i in 1..=3 {
1555            w.write(&alloc::vec![i as u8]).unwrap();
1556        }
1557        w.handle_acknack(rguid1, sn(4), []);
1558        // Proxy 1 zeigt highest_acked=3, Proxy 2 unveraendert=0.
1559        // Cache-GC ist entkoppelt vom acknack (Per-destination-queue-Modell).
1560        assert_eq!(w.reader_proxies()[0].highest_acked_sn(), sn(3));
1561        assert_eq!(w.reader_proxies()[1].highest_acked_sn(), sn(0));
1562        assert_eq!(w.cache().len(), 3, "KeepAll cache intact");
1563    }
1564
1565    #[test]
1566    fn nackfrag_dispatches_only_to_matching_proxy() {
1567        let mut w = make_writer_with_frag_size(10, Duration::from_secs(10), 4);
1568        let rguid1 = reader_guid();
1569        let rguid2 = Guid::new(
1570            GuidPrefix::from_bytes([3; 12]),
1571            EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1572        );
1573        w.add_reader_proxy(ReaderProxy::new(
1574            rguid2,
1575            alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1576            alloc::vec![],
1577            true,
1578        ));
1579        let _ = w.write(&(1..=10).collect::<alloc::vec::Vec<u8>>()).unwrap();
1580        let nf = NackFragSubmessage {
1581            reader_id: rguid1.entity_id,
1582            writer_id: w.guid.entity_id,
1583            writer_sn: sn(1),
1584            fragment_number_state: crate::submessages::FragmentNumberSet::from_missing(
1585                FragmentNumber(1),
1586                &[FragmentNumber(2)],
1587            ),
1588            count: 1,
1589        };
1590        w.handle_nackfrag(rguid1, &nf);
1591        assert_eq!(w.reader_proxies()[0].pending_requested_fragment_count(), 1);
1592        assert_eq!(w.reader_proxies()[1].pending_requested_fragment_count(), 0);
1593    }
1594
1595    // ---------- WP 1.E Stufe-A: HEARTBEAT FinalFlag-Default ----------
1596
1597    /// §8.4.9.2.7: Periodische HEARTBEATs muessen `FinalFlag=NOT_SET`
1598    /// tragen, sonst antwortet der Reader nicht mit ACKNACK und der
1599    /// Reliable-Liveness-Loop bricht.
1600    #[test]
1601    fn periodic_heartbeat_has_final_flag_unset() {
1602        let mut w = make_writer(10, Duration::from_millis(50));
1603        w.write(&alloc::vec![1]).unwrap();
1604        let out = w.tick(Duration::ZERO).unwrap();
1605        let parsed = decode_datagram(&out[0].bytes).unwrap();
1606        let hb = parsed
1607            .submessages
1608            .iter()
1609            .find_map(|s| {
1610                if let ParsedSubmessage::Heartbeat(h) = s {
1611                    Some(h)
1612                } else {
1613                    None
1614                }
1615            })
1616            .expect("HB must be present");
1617        assert!(
1618            !hb.final_flag,
1619            "periodic HB must NOT set FinalFlag (Spec §8.4.9.2.7)"
1620        );
1621    }
1622
1623    /// Ad-hoc HB direkt nach `add_reader_proxy`: setzt `last_heartbeat=None`,
1624    /// d.h. naechster `tick()` emittiert sofort einen HB. Dieser HB ist
1625    /// ebenfalls non-final, damit der frische Reader sicher antwortet.
1626    #[test]
1627    fn heartbeat_after_add_reader_proxy_is_non_final() {
1628        let mut w = make_writer(10, Duration::from_secs(60));
1629        w.write(&alloc::vec![1]).unwrap();
1630        // first tick consumes initial HB
1631        let _ = w.tick(Duration::ZERO).unwrap();
1632        // add second proxy → last_heartbeat=None → next tick emits HB
1633        let second = ReaderProxy::new(
1634            Guid::new(
1635                GuidPrefix::from_bytes([7; 12]),
1636                EntityId::user_reader_with_key([0xA1, 0xB1, 0xC1]),
1637            ),
1638            alloc::vec![Locator::udp_v4([127, 0, 0, 2], 7411)],
1639            alloc::vec![],
1640            true,
1641        );
1642        w.add_reader_proxy(second);
1643        let out = w.tick(Duration::ZERO).unwrap();
1644        let mut hb_found = 0usize;
1645        for d in &out {
1646            for s in &decode_datagram(&d.bytes).unwrap().submessages {
1647                if let ParsedSubmessage::Heartbeat(h) = s {
1648                    assert!(
1649                        !h.final_flag,
1650                        "post-add_reader_proxy HB must be non-final (Spec §8.4.9.2.7)"
1651                    );
1652                    hb_found += 1;
1653                }
1654            }
1655        }
1656        assert!(hb_found >= 1, "at least one HB expected");
1657    }
1658
1659    #[test]
1660    fn aggregation_packs_multiple_resends_into_one_datagram() {
1661        let mut w = make_writer(10, Duration::from_secs(10));
1662        let rguid = reader_guid();
1663        for i in 1..=3 {
1664            w.write(&alloc::vec![i as u8]).unwrap();
1665        }
1666        // Alle 3 als requested
1667        w.handle_acknack(rguid, sn(1), [sn(1), sn(2), sn(3)]);
1668        let out = w.tick(Duration::ZERO).unwrap();
1669        // Ein Datagramm enthaelt mehrere DATAs + HEARTBEAT
1670        assert_eq!(out.len(), 1, "all resends aggregated into single datagram");
1671        let parsed = decode_datagram(&out[0].bytes).unwrap();
1672        let data_count = parsed
1673            .submessages
1674            .iter()
1675            .filter(|s| matches!(s, ParsedSubmessage::Data(_)))
1676            .count();
1677        assert_eq!(data_count, 3);
1678        let hb_count = parsed
1679            .submessages
1680            .iter()
1681            .filter(|s| matches!(s, ParsedSubmessage::Heartbeat(_)))
1682            .count();
1683        assert_eq!(hb_count, 1);
1684    }
1685}