Skip to main content

zerodds_dcps/
runtime.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! DcpsRuntime — Event-Loop + UDP-Sockets pro DomainParticipant.
4//!
5//! # Aufbau
6//!
7//! - Bindet 3 UDP-Sockets pro Participant:
8//!   * SPDP-Multicast-Receiver (domain-basierter Port).
9//!   * SPDP-Unicast-Fallback (ephemeral, fuer bidirektionale SPDP).
10//!   * User-Unicast (ephemeral, wohin matched peers senden).
11//! - Spawnt einen einzelnen Event-Loop-Thread, der periodisch:
12//!   * SPDP-Beacon sendet (alle 5 s Default),
13//!   * alle Sockets non-blocking pollt,
14//!   * SPDP-Datagrams in den DiscoveredParticipantsCache wandert,
15//!   * SEDP-Datagrams (Pub/Sub-Announces) dispatched,
16//!   * User-Daten an die richtigen DataReader-Slots ausliefert,
17//!   * WLP-/Liveliness-Tick fuehrt,
18//!   * TypeLookup-Service-Endpoints (XTypes 1.3 §7.6.3.3.4) bedient.
19//! - Thread-Lifecycle per `Arc<AtomicBool> stop_flag` + `JoinHandle`
20//!   im `Drop`.
21//!
22//! Mit aktivem `security`-Feature laufen alle Outbound-/Inbound-
23//! Bytes durch das `SharedSecurityGate` (DDS-Security 1.2). Das
24//! Multi-Interface-Binding (RuntimeConfig::interface_bindings)
25//! erlaubt Per-Subnet-Routing fuer Production-Topologien.
26
27extern crate alloc;
28use alloc::collections::BTreeMap;
29use alloc::string::String;
30use alloc::sync::Arc;
31use alloc::vec::Vec;
32use core::time::Duration;
33use std::net::{Ipv4Addr, SocketAddr};
34use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
35use std::sync::mpsc;
36use std::sync::{Condvar, Mutex, RwLock};
37use std::thread::{self, JoinHandle};
38use std::time::Instant;
39
40use zerodds_discovery::security::SecurityBuiltinStack;
41use zerodds_discovery::sedp::SedpStack;
42use zerodds_discovery::spdp::{
43    DiscoveredParticipant, DiscoveredParticipantsCache, SpdpBeacon, SpdpReader,
44};
45use zerodds_discovery::type_lookup::{
46    TypeLookupClient, TypeLookupEndpoints, TypeLookupReply, TypeLookupServer,
47};
48use zerodds_qos::Duration as QosDuration;
49use zerodds_rtps::EntityId;
50use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
51use zerodds_rtps::fragment_assembler::AssemblerCaps;
52use zerodds_rtps::history_cache::HistoryKind;
53use zerodds_rtps::message_builder::DEFAULT_MTU;
54use zerodds_rtps::participant_data::{ParticipantBuiltinTopicData, endpoint_flag};
55use zerodds_rtps::reliable_reader::{ReliableReader, ReliableReaderConfig};
56use zerodds_rtps::reliable_writer::{
57    DEFAULT_FRAGMENT_SIZE, DEFAULT_HEARTBEAT_PERIOD, ReliableWriter, ReliableWriterConfig,
58};
59use zerodds_rtps::wire_types::{
60    Guid, GuidPrefix, Locator, LocatorKind, ProtocolVersion, SPDP_DEFAULT_MULTICAST_ADDRESS,
61    VendorId, spdp_multicast_port,
62};
63use zerodds_transport::Transport;
64use zerodds_transport_udp::UdpTransport;
65
66#[cfg(feature = "security")]
67use zerodds_security_runtime::{EndpointProtection, IpRange, NetInterface, ProtectionLevel};
68
69use crate::error::{DdsError, Result};
70
71/// Default-Tick-Periode des Event-Loops.
72///
73/// Ist die Worst-Case-Quantisierung fuer Sub-Tick-getriebene Aufgaben
74/// (SEDP-Heartbeats, Reliable-Writer-Resends, ACKNACK-Emit). Kurz genug
75/// fuer sub-ms Roundtrip-Latenz (5 ms = 100 Hz Tick-Rate), lang genug
76/// um Idle-CPU-Cost klein zu halten.
77///
78/// Phase-3-Migration: dieser Tick-Loop wird durch einen
79/// Deadline-Heap + Cvar-Worker (`scheduler.rs`) ersetzt — dann ist
80/// dieser Wert nur noch Idle-Floor-Sleep (kein Quantisierungs-Tax fuer
81/// Events).
82pub const DEFAULT_TICK_PERIOD: Duration = Duration::from_millis(5);
83
84/// Default-SPDP-Announce-Periode (Spec §8.5.3.2 empfiehlt 5 s).
85pub const DEFAULT_SPDP_PERIOD: Duration = Duration::from_secs(5);
86
87/// Deadline/Lease Compat-Check: offered-Period muss <= requested sein.
88/// `0` ist Sentinel fuer INFINITE — da ist jede Kombination kompatibel
89/// (INFINITE offered impliziert "ich verspreche nichts schneller als
90/// unendlich", aber Reader mit INFINITE fordert auch nichts).
91fn deadline_compat(offered_nanos: u64, requested_nanos: u64) -> bool {
92    if offered_nanos == 0 || requested_nanos == 0 {
93        // INFINITE auf einer Seite → kompatibel.
94        return true;
95    }
96    offered_nanos <= requested_nanos
97}
98
99/// Partition-Matching: beide Seiten haben mindestens eine gemeinsame
100/// Partition ODER beide sind leer (default partition "").
101fn partitions_overlap(offered: &[String], requested: &[String]) -> bool {
102    if offered.is_empty() && requested.is_empty() {
103        return true;
104    }
105    // Eine leere Liste wird als ["" (default)] behandelt.
106    let off_default = offered.is_empty();
107    let req_default = requested.is_empty();
108    if off_default && requested.iter().any(|s| s.is_empty()) {
109        return true;
110    }
111    if req_default && offered.iter().any(|s| s.is_empty()) {
112        return true;
113    }
114    // Beide nicht-default: Intersect.
115    offered.iter().any(|o| requested.iter().any(|r| r == o))
116}
117
118/// Materialisiert die Locator-Adresse, die wir im SPDP-Beacon
119/// announcen, aus einem an UNSPECIFIED gebundenen UdpTransport.
120///
121/// Bind-an-`0.0.0.0` liefert `local_addr() == 0.0.0.0:port`, was
122/// fuer Peers nicht routbar ist. Per UDP-Connect-Probe zu einer
123/// non-routable Adresse loesen wir die outbound-Interface-Adresse
124/// auf (kein Datenverkehr — `connect()` auf einem UDP-Socket setzt
125/// nur die Routing-Information). Faellt zurueck auf
126/// `multicast_interface` (RuntimeConfig) wenn der Probe scheitert,
127/// oder auf den unveraenderten Locator als letzte Reserve.
128#[cfg(feature = "std")]
129fn announce_locator(uc: &UdpTransport, hint: Ipv4Addr) -> Locator {
130    let raw = uc.local_locator();
131    // Port aus dem gebundenen Socket beibehalten.
132    let port = raw.port;
133    // Adresse extrahieren — nur die letzten 4 Byte sind die IPv4.
134    let ip = Ipv4Addr::new(
135        raw.address[12],
136        raw.address[13],
137        raw.address[14],
138        raw.address[15],
139    );
140    if !ip.is_unspecified() {
141        return raw;
142    }
143    // Probe: temporaerer Socket, "connect" auf 192.0.2.1 (RFC 5737
144    // TEST-NET-1, garantiert non-routable). connect setzt nur die
145    // Routing-Tabelle — kein Paket geht raus.
146    if let Ok(probe) =
147        std::net::UdpSocket::bind(std::net::SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
148    {
149        if probe
150            .connect(std::net::SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 1), 7))
151            .is_ok()
152        {
153            if let Ok(std::net::SocketAddr::V4(local)) = probe.local_addr() {
154                let resolved = local.ip();
155                if !resolved.is_unspecified() {
156                    return Locator::udp_v4(resolved.octets(), port);
157                }
158            }
159        }
160    }
161    // Fallback 1: Hint aus RuntimeConfig.multicast_interface, wenn
162    // gesetzt und nicht UNSPECIFIED.
163    if !hint.is_unspecified() {
164        return Locator::udp_v4(hint.octets(), port);
165    }
166    // Fallback 2: Loopback. Nicht ideal, aber besser als 0.0.0.0
167    // als Locator (zumindest auf demselben Host routbar).
168    Locator::udp_v4([127, 0, 0, 1], port)
169}
170
171/// Konvertiert eine `core::time::Duration` (std) zu einer
172/// `zerodds_qos::Duration` (Spec-2^-32-Fraction-Encoding). Saturiert bei
173/// Ueberlauf — `i32::MAX` Sekunden reichen fuer ueber 60 Jahre Lease.
174fn qos_duration_from_std(d: Duration) -> QosDuration {
175    let secs = i32::try_from(d.as_secs()).unwrap_or(i32::MAX);
176    let nanos = d.subsec_nanos();
177    // Spec-Fraction ist 2^-32 s; aus nanos zurueck ueber (nanos << 32) / 1e9.
178    let fraction = ((u64::from(nanos)) << 32) / 1_000_000_000u64;
179    QosDuration {
180        seconds: secs,
181        fraction: fraction as u32,
182    }
183}
184
185/// Konvertiert eine `zerodds_qos::Duration` in Nanosekunden (0 = INFINITE,
186/// "kein Monitoring"). `seconds` ist i32 — wir clampen auf non-negative.
187fn qos_duration_to_nanos(d: zerodds_qos::Duration) -> u64 {
188    if d.is_infinite() {
189        return 0;
190    }
191    let secs = d.seconds.max(0) as u64;
192    // fraction ist 2^-32 s, d.h. nanos = fraction * 1e9 / 2^32.
193    let frac_nanos = ((d.fraction as u64) * 1_000_000_000u64) >> 32;
194    secs.saturating_mul(1_000_000_000u64)
195        .saturating_add(frac_nanos)
196}
197
198/// RTPS-Serialized-Payload-Header fuer User-Samples: XCDR2-LittleEndian
199/// + options=0. Spec OMG RTPS 2.5 §9.4.2.13.
200///
201/// Wird vor jeden User-Payload gelegt, bevor er in die DATA-Submessage
202/// geht — ohne diesen Header weigern sich Vendor-Reader (Cyclone /
203/// Fast-DDS), das Sample zu deliverieren.
204pub const USER_PAYLOAD_ENCAP: [u8; 4] = [0x00, 0x07, 0x00, 0x00];
205
206/// Stack-PoolBuffer-Cap fuer den Klein-Sample-Pfad in
207/// [`DcpsRuntime::write_user_sample`]. 1.5 KiB Payload + 4 B Encap-
208/// Header passen ohne Heap-Touch durch das Framing.
209const SMALL_FRAME_CAP: usize = 1536;
210
211/// Klein-Sample-Hot-Path-Helper: framet `USER_PAYLOAD_ENCAP` + Payload
212/// in einen Stack-`PoolBuffer<SMALL_FRAME_CAP>` und uebergibt den Slice
213/// an den Writer. Keine Vec-/Box-/Rc-/Arc-Allokation in dieser
214/// Funktion — verifiziert vom `dds_no_realloc_in_hot_path`-Lint.
215///
216/// zerodds-lint: hot-path-realloc-free
217fn write_user_sample_pooled(
218    writer: &mut ReliableWriter,
219    payload: &[u8],
220    now: Duration,
221) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>> {
222    let mut frame = zerodds_foundation::PoolBuffer::<SMALL_FRAME_CAP>::new();
223    frame
224        .extend_from_slice(&USER_PAYLOAD_ENCAP)
225        .map_err(|_| DdsError::WireError {
226            message: String::from("user encap framing"),
227        })?;
228    frame
229        .extend_from_slice(payload)
230        .map_err(|_| DdsError::WireError {
231            message: String::from("user payload framing"),
232        })?;
233    // D.5e Phase-2: HEARTBEAT-piggyback fuer instant ACK auf Reader-Seite.
234    writer
235        .write_with_heartbeat(frame.as_slice(), now)
236        .map_err(|_| DdsError::WireError {
237            message: String::from("user writer encode"),
238        })
239}
240
241/// Konfiguration fuer die Runtime. Exposed via DomainParticipant-
242/// Factory-Methoden.
243#[derive(Clone)]
244pub struct RuntimeConfig {
245    /// Tick-Periode des Event-Loops. Default 50 ms.
246    pub tick_period: Duration,
247    /// SPDP-Announce-Periode. Default 5 s.
248    pub spdp_period: Duration,
249    /// SPDP-Multicast-Gruppe (IPv4). Default 239.255.0.1 (Spec §9.6.1.4.1).
250    pub spdp_multicast_group: Ipv4Addr,
251    /// Interface-Address fuer Multicast-Join. Default 0.0.0.0
252    /// (Kernel waehlt das Default-Interface).
253    pub multicast_interface: Ipv4Addr,
254
255    /// Optionales Security-Gate. Nur mit Feature
256    /// `security` aktiv. Wenn gesetzt, werden UDP-Outbound-Messages
257    /// durch [`SharedSecurityGate::transform_outbound`] gezogen, und
258    /// Inbound-Messages durch [`SharedSecurityGate::transform_inbound_from`]
259    /// (Peer-Key aus RTPS-Header-Bytes 8..20).
260    #[cfg(feature = "security")]
261    pub security: Option<std::sync::Arc<zerodds_security_runtime::SharedSecurityGate>>,
262    /// Optionales LoggingPlugin fuer Security-Events.
263    /// Wird vom Inbound-Pfad gerufen wenn Pakete wegen Policy-Violation,
264    /// Tampering oder Legacy-Block gedroppt werden.
265    #[cfg(feature = "security")]
266    pub security_logger: Option<std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin>>,
267
268    /// Multi-Interface-Bindings. Leer → `user_unicast`
269    /// ist das einzige Outbound-Socket (Legacy-Verhalten). Non-empty →
270    /// `DcpsRuntime::start` baut pro Spec ein eigenes UDP-Socket und
271    /// der Writer-Tick-Loop routet pro Ziel-Locator auf den passenden
272    /// Socket.
273    #[cfg(feature = "security")]
274    pub interface_bindings: Vec<InterfaceBindingSpec>,
275
276    /// `true` → SPDP-Beacon annonciert zusaetzlich die 12 Secure-
277    /// Discovery-Bits (16..27, DDS-Security 1.2 §7.4.7.1). Default
278    /// `false` — nur Standard-Bits werden announced. Wird vom DCPS-
279    /// Factory gesetzt, sobald eine PolicyEngine konfiguriert ist
280    ///. Diese Flagge ist auch ohne `security`-Feature
281    /// verfuegbar, damit Tests die Bit-Praesenz pruefen koennen, ohne
282    /// das ganze Crypto-Crate zu aktivieren.
283    pub announce_secure_endpoints: bool,
284
285    /// WLP-Tick-Periode (Writer-Liveliness-Protocol, RTPS 2.5 §8.4.13).
286    /// `Duration::ZERO` → Default `participant_lease_duration / 3`
287    /// (Spec-Empfehlung: drei Misses bevor der Reader den Writer als
288    /// not-alive markiert). Direkter Override ermoeglicht aggressive
289    /// Tests.
290    pub wlp_period: Duration,
291
292    /// Lease-Duration die im SPDP-Beacon als
293    /// `PARTICIPANT_LEASE_DURATION` annonciert wird (Spec-Default 100 s).
294    /// Wird auch als Basis fuer den AUTOMATIC-WLP-Tick genutzt
295    /// (`wlp_period = participant_lease_duration / 3` wenn
296    /// `wlp_period == Duration::ZERO`).
297    pub participant_lease_duration: Duration,
298
299    /// USER_DATA-Bytes des Participants (DDS 1.4 §2.2.3.1
300    /// `UserDataQosPolicy`). Werden im SPDP-Beacon als PID_USER_DATA
301    /// (DDSI-RTPS §9.6.3.2) annonciert und auf Empfaengerseite in
302    /// `ParticipantBuiltinTopicData.user_data` exponiert. Default leer.
303    pub user_data: Vec<u8>,
304
305    /// Observability-Sink. Default ist `null_sink()` — jeder Event-Emit
306    /// ist dann ein direkter Return ohne Allokation auf der Konsumenten-
307    /// Seite. Konsumenten injizieren z.B.
308    /// [`zerodds_foundation::observability::StderrJsonSink`] (JSON-Lines
309    /// fuer Vector/fluentd/Datadog) oder eine eigene OTLP-Bridge.
310    pub observability: zerodds_foundation::observability::SharedSink,
311
312    /// Sprint D.5d Hebel C — RT-Pinning + Priority. Linux-only;
313    /// auf macOS/Windows sind die Hooks no-op.
314    ///
315    /// SCHED_FIFO-Prioritaet (1-99) fuer die drei Recv-Worker (SPDP-MC,
316    /// Metatraffic, User-Data). `None` = Default-Scheduler (CFS).
317    /// `Some(80)` ist Spec-Empfehlung fuer Echtzeit-Pfade. Erfordert
318    /// `CAP_SYS_NICE` oder `RLIMIT_RTPRIO`-erlaubten User.
319    pub recv_thread_priority: Option<i32>,
320
321    /// Wie [`Self::recv_thread_priority`], aber fuer den Tick-Worker.
322    pub tick_thread_priority: Option<i32>,
323
324    /// CPU-Affinity-Maske fuer die Recv-Worker. `None` = keine
325    /// Affinity (Kernel scheduled frei). Liste von CPU-Indizes, z.B.
326    /// `vec![2, 3]` fuer Cores 2+3. Wird via `sched_setaffinity`
327    /// gesetzt; alle drei Recv-Threads teilen sich dieselbe Maske.
328    pub recv_thread_cpus: Option<Vec<usize>>,
329
330    /// Wie [`Self::recv_thread_cpus`], aber fuer den Tick-Worker.
331    pub tick_thread_cpus: Option<Vec<usize>>,
332
333    /// D.5g — Default DataRepresentation-Liste die in SEDP-PublicationData
334    /// und SEDP-SubscriptionData annonciert wird, wenn nicht per-Writer/
335    /// Reader (UserWriterConfig/UserReaderConfig) ueberschrieben.
336    ///
337    /// **Wichtig**: Per Spec strict (XTypes 1.3 §7.6.3.1.2) ist das
338    /// erste Element der Writer's "offered" und muss in Reader's
339    /// "accepted"-Liste sein damit Match passiert. Default
340    /// `[XCDR1, XCDR2]` =Legacy-first → max Interop mit RTI Connext
341    /// Shapes Demo (XCDR1-only). Pure-XCDR2-Deployments koennen das
342    /// auf `[XCDR2]` oder `[XCDR2, XCDR1]` umstellen fuer Bandbreiten-
343    /// Effizienz und @appendable/@mutable-Support.
344    ///
345    /// Empty (`vec![]`) wird per Spec als `[XCDR1]` interpretiert.
346    pub data_representation_offer: Vec<i16>,
347
348    /// D.5g — Default Match-Mode fuer DataRepresentation-Negotiation.
349    ///
350    /// `Strict` (XTypes 1.3 §7.6.3.1.2 normativ): writer.first ∈
351    /// reader.list = match. `Tolerant` (Industry-Norm): any-overlap
352    /// = match, picks first-overlap als wire-format.
353    ///
354    /// Default `Tolerant` weil Cyclone DDS und FastDDS so matchen —
355    /// maximiert Interop. Strict-Setting nur fuer formale Spec-
356    /// Compliance-Tests sinnvoll.
357    pub data_rep_match_mode: zerodds_rtps::publication_data::data_representation::DataRepMatchMode,
358}
359
360/// Konfigurations-Eintrag fuer ein physisches oder logisches
361/// Netzwerk-Interface.
362///
363/// Ein Binding beschreibt ein Outbound-Socket: an welche IP/Port es
364/// bindet, welche `NetInterface`-Klasse das Interface repraesentiert,
365/// und welcher IP-Range als "zugehoerige Peers" zaehlt (Routing-
366/// Match).
367#[cfg(feature = "security")]
368#[derive(Clone, Debug)]
369pub struct InterfaceBindingSpec {
370    /// Name zur Diagnose + Log-Attribution (z.B. `"eth0"`, `"tun0"`,
371    /// `"lo"`).
372    pub name: String,
373    /// Bind-Adresse. `0.0.0.0` ueberlaesst dem Kernel das Interface.
374    pub bind_addr: Ipv4Addr,
375    /// Bind-Port. `0` = ephemeral.
376    pub bind_port: u16,
377    /// Interface-Klasse — fliesst in die PolicyEngine-Context ein.
378    pub kind: NetInterface,
379    /// Ziel-IP-Range, fuer die dieses Binding zustaendig ist. Beispiel:
380    /// `127.0.0.0/8` fuer Loopback. Ein Target dessen IP in diesem Range
381    /// liegt wird auf dieses Binding geroutet.
382    pub subnet: IpRange,
383    /// Wenn `true`: dieses Binding wird genutzt, wenn **kein** anderer
384    /// Subnet-Match greift. Genau ein Eintrag sollte `default = true`
385    /// sein (meist das WAN-Binding).
386    pub default: bool,
387}
388
389/// Fertig gebundenes Interface mit seinem UDP-Socket.
390#[cfg(feature = "security")]
391struct InterfaceBinding {
392    spec: InterfaceBindingSpec,
393    socket: Arc<UdpTransport>,
394}
395
396/// Pool aus Per-Interface-UDP-Sockets mit Target-basiertem Routing
397///.
398///
399/// Entscheidung:
400/// 1. Iteriert ueber alle Bindings; das erste, dessen Subnet das Ziel
401///    enthaelt, gewinnt.
402/// 2. Falls kein Match und ein Default-Binding existiert → Default-Pfad.
403/// 3. Kein Match + kein Default → `None`, Caller droppt.
404#[cfg(feature = "security")]
405struct OutboundSocketPool {
406    bindings: Vec<InterfaceBinding>,
407    default_idx: Option<usize>,
408}
409
410#[cfg(feature = "security")]
411impl OutboundSocketPool {
412    fn bind_all(specs: &[InterfaceBindingSpec]) -> Result<Self> {
413        let mut bindings = Vec::with_capacity(specs.len());
414        for spec in specs {
415            let socket = UdpTransport::bind_v4(spec.bind_addr, spec.bind_port).map_err(|_| {
416                DdsError::TransportError {
417                    label: "interface-binding bind_v4 failed",
418                }
419            })?;
420            // Kurzer Read-Timeout, damit der Per-Interface-Inbound-
421            // Poll im Event-Loop non-blocking wird. 5 ms ist klein
422            // genug um keine Latenz anderswo zu erzeugen (Tick-Period
423            // ist Default 50 ms), aber gross genug um Kontext-Switches
424            // zu amortisieren.
425            let socket = socket
426                .with_timeout(Some(Duration::from_millis(5)))
427                .map_err(|_| DdsError::TransportError {
428                    label: "interface-binding set_timeout failed",
429                })?;
430            bindings.push(InterfaceBinding {
431                spec: spec.clone(),
432                socket: Arc::new(socket),
433            });
434        }
435        let default_idx = bindings.iter().position(|b| b.spec.default);
436        Ok(Self {
437            bindings,
438            default_idx,
439        })
440    }
441
442    /// Liefert `(Socket, NetInterface-Klasse)` fuer ein Ziel-Locator.
443    /// `None` wenn weder ein Subnet-Match noch ein Default-Binding
444    /// existiert.
445    fn route(&self, target: &Locator) -> Option<(&Arc<UdpTransport>, NetInterface)> {
446        let ip = ipv4_from_locator(target)?;
447        let addr = core::net::IpAddr::V4(core::net::Ipv4Addr::from(ip));
448        for b in &self.bindings {
449            if b.spec.subnet.contains(&addr) {
450                return Some((&b.socket, b.spec.kind.clone()));
451            }
452        }
453        let idx = self.default_idx?;
454        let b = self.bindings.get(idx)?;
455        Some((&b.socket, b.spec.kind.clone()))
456    }
457}
458
459/// Extrahiert die IPv4-Adresse aus einem `Locator` (UDP-V4).
460/// `None` fuer SHM/UDS/IPv6.
461#[cfg(feature = "security")]
462fn ipv4_from_locator(loc: &Locator) -> Option<[u8; 4]> {
463    if loc.kind != LocatorKind::UdpV4 {
464        return None;
465    }
466    Some([
467        loc.address[12],
468        loc.address[13],
469        loc.address[14],
470        loc.address[15],
471    ])
472}
473
474impl core::fmt::Debug for RuntimeConfig {
475    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
476        let mut dbg = f.debug_struct("RuntimeConfig");
477        dbg.field("tick_period", &self.tick_period)
478            .field("spdp_period", &self.spdp_period)
479            .field("spdp_multicast_group", &self.spdp_multicast_group)
480            .field("multicast_interface", &self.multicast_interface);
481        #[cfg(feature = "security")]
482        {
483            dbg.field("security", &self.security.as_ref().map(|_| "<gate>"));
484            dbg.field(
485                "security_logger",
486                &self.security_logger.as_ref().map(|_| "<logger>"),
487            );
488        }
489        dbg.finish()
490    }
491}
492
493impl Default for RuntimeConfig {
494    fn default() -> Self {
495        Self {
496            tick_period: DEFAULT_TICK_PERIOD,
497            spdp_period: DEFAULT_SPDP_PERIOD,
498            spdp_multicast_group: Ipv4Addr::from(SPDP_DEFAULT_MULTICAST_ADDRESS),
499            multicast_interface: Ipv4Addr::UNSPECIFIED,
500            #[cfg(feature = "security")]
501            security: None,
502            #[cfg(feature = "security")]
503            security_logger: None,
504            #[cfg(feature = "security")]
505            interface_bindings: Vec::new(),
506            announce_secure_endpoints: false,
507            wlp_period: Duration::ZERO,
508            participant_lease_duration: Duration::from_secs(100),
509            user_data: Vec::new(),
510            observability: zerodds_foundation::observability::null_sink(),
511            recv_thread_priority: None,
512            tick_thread_priority: None,
513            recv_thread_cpus: None,
514            tick_thread_cpus: None,
515            // D.5g — Default `[XCDR1, XCDR2]` (legacy-first, max Interop).
516            data_representation_offer:
517                zerodds_rtps::publication_data::data_representation::DEFAULT_OFFER.to_vec(),
518            data_rep_match_mode:
519                zerodds_rtps::publication_data::data_representation::DataRepMatchMode::default(),
520        }
521    }
522}
523
524// ---------------------------------------------------------------------------
525// Security-Gate Helpers
526// ---------------------------------------------------------------------------
527
528/// Outbound-UDP-Bytes durch das Security-Gate ziehen (wenn konfiguriert).
529/// Ohne Feature `security` oder ohne Gate: pass-through (Klon als Vec).
530///
531/// Fehler im Gate loggen wir still und sendet das Paket **nicht** —
532/// lieber drop als plaintext-Leak.
533#[cfg(feature = "security")]
534fn secure_outbound_bytes(rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
535    match &rt.config.security {
536        Some(gate) => gate.transform_outbound(bytes).ok(),
537        None => Some(bytes.to_vec()),
538    }
539}
540
541#[cfg(not(feature = "security"))]
542fn secure_outbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
543    Some(bytes.to_vec())
544}
545
546/// Inbound-UDP-Bytes durch das Security-Gate ziehen.
547///
548/// Erwartet einen RTPS-Header mit GuidPrefix auf Bytes 8..20.
549/// `None` → Paket droppen.
550///
551/// Security: Drop-Gruende werden differenziert an den
552/// konfigurierten `LoggingPlugin` weitergereicht:
553/// * `Malformed`       → `Error`
554/// * `LegacyBlocked`   → `Error`
555/// * `PolicyViolation` → `Warning` (moegliches Tampering)
556/// * `CryptoError`     → `Warning` (Tag-Mismatch, Replay etc.)
557#[cfg(feature = "security")]
558fn secure_inbound_bytes(rt: &DcpsRuntime, bytes: &[u8], iface: &NetInterface) -> Option<Vec<u8>> {
559    use zerodds_security_runtime::{InboundVerdict, LogLevel};
560    let Some(gate) = &rt.config.security else {
561        return Some(bytes.to_vec());
562    };
563    let verdict = gate.classify_inbound(bytes, iface);
564    let category = verdict.category();
565    let (level, message): (LogLevel, String) = match &verdict {
566        InboundVerdict::Accept(out) => return Some(out.clone()),
567        InboundVerdict::Malformed => (
568            LogLevel::Error,
569            alloc::format!(
570                "inbound datagram too short ({} bytes, iface={:?})",
571                bytes.len(),
572                iface
573            ),
574        ),
575        InboundVerdict::LegacyBlocked => (
576            LogLevel::Error,
577            alloc::format!(
578                "legacy plaintext peer on protected domain \
579                 (iface={iface:?}, allow_unauthenticated_participants=false)"
580            ),
581        ),
582        InboundVerdict::PolicyViolation(msg) => {
583            (LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
584        }
585        InboundVerdict::CryptoError(msg) => {
586            (LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
587        }
588    };
589    if let Some(logger) = &rt.config.security_logger {
590        // Participant-Ident: GuidPrefix (bzw. 0-Padding bei Malformed).
591        let mut participant = [0u8; 16];
592        if bytes.len() >= 20 {
593            participant[..12].copy_from_slice(&bytes[8..20]);
594        }
595        logger.log(level, participant, category, &message);
596    }
597    None
598}
599
600#[cfg(not(feature = "security"))]
601fn secure_inbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
602    Some(bytes.to_vec())
603}
604
605/// Default-Interface-Klasse fuer Inbound-Dispatch wenn der Socket
606/// nicht zum `outbound_pool` gehoert. In v1.4-Setup (ohne
607/// `interface_bindings`) laufen alle Pakete durch `user_unicast`
608/// und werden als `Wan` klassifiziert — das ist die konservativste
609/// Annahme (Protection-Regeln greifen wie im Single-Interface-Fall).
610#[cfg(feature = "security")]
611const DEFAULT_INBOUND_IFACE: NetInterface = NetInterface::Wan;
612
613/// Per-Reader-Outbound-Transform.
614///
615/// Schlaegt im Writer-Slot nach, welches `ProtectionLevel` der
616/// gematchte Reader am gegebenen `target`-Locator erwartet, und zieht
617/// das Datagram dann individuell durch das Security-Gate. Dadurch
618/// bekommt jeder Reader eine Wire-Payload, die zu seinem Security-
619/// Profil passt (Legacy=plain, Fast=Sign, Secure=Encrypt).
620///
621/// Fallback-Pfade:
622/// * Kein Security-Gate konfiguriert → passthrough.
623/// * Kein `locator_to_peer`-Eintrag (Reader noch nicht via SEDP
624///   gematcht) → `transform_outbound` mit Domain-Rule — das ist
625///   der homogene v1.4-Pfad.
626/// * Gate liefert Fehler → `None` (Caller droppt — lieber kein
627///   plaintext-Leak).
628#[cfg(feature = "security")]
629fn secure_outbound_for_target(
630    rt: &DcpsRuntime,
631    writer_eid: EntityId,
632    bytes: &[u8],
633    target: &Locator,
634) -> Option<Vec<u8>> {
635    let Some(gate) = &rt.config.security else {
636        return Some(bytes.to_vec());
637    };
638    let resolved = rt.writer_slot(writer_eid).and_then(|arc| {
639        arc.lock().ok().and_then(|slot| {
640            let pk = slot.locator_to_peer.get(target).copied()?;
641            let lv = slot.reader_protection.get(&pk).copied()?;
642            Some((pk, lv))
643        })
644    });
645    match resolved {
646        Some((peer_key, level)) => gate.transform_outbound_for(&peer_key, bytes, level).ok(),
647        None => gate.transform_outbound(bytes).ok(),
648    }
649}
650
651#[cfg(not(feature = "security"))]
652fn secure_outbound_for_target(
653    _rt: &DcpsRuntime,
654    _writer_eid: EntityId,
655    bytes: &[u8],
656    _target: &Locator,
657) -> Option<Vec<u8>> {
658    Some(bytes.to_vec())
659}
660
661/// Sendet `bytes` an `target` auf dem passenden Interface-Socket
662///. Fallback auf `rt.user_unicast` wenn kein
663/// Pool konfiguriert ist oder kein Binding den Target-Range matcht
664/// und auch kein Default-Binding gesetzt ist.
665#[cfg(feature = "security")]
666fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
667    if let Some(pool) = &rt.outbound_pool {
668        if let Some((socket, _iface)) = pool.route(target) {
669            let _ = socket.send(target, bytes);
670            return;
671        }
672    }
673    let _ = rt.user_unicast.send(target, bytes);
674}
675
676#[cfg(not(feature = "security"))]
677fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
678    let _ = rt.user_unicast.send(target, bytes);
679}
680
681/// User-Writer-Slot im Runtime. Traegt ReliableWriter + Topic-Meta
682/// + Fragment-Size (aus QoS).
683struct UserWriterSlot {
684    writer: ReliableWriter,
685    topic_name: String,
686    type_name: String,
687    reliable: bool,
688    durability: zerodds_qos::DurabilityKind,
689    /// Deadline-Period in Nanosekunden (0 == INFINITE, kein Monitoring).
690    deadline_nanos: u64,
691    /// Letzter erfolgreicher `write` relativ zu `DcpsRuntime::start_instant`.
692    last_write: Option<Duration>,
693    /// Counter fuer überschrittene Deadlines (Spec §2.2.4.2.9).
694    offered_deadline_missed_count: u64,
695    /// Counter fuer LivelinessLost-Detections aus Sicht des Writers
696    /// (Spec §2.2.4.2.10). Inkrementiert in `check_writer_liveliness` bei
697    /// Manual-Lease-Ueberschreitung. 0 == nicht ueberwacht.
698    liveliness_lost_count: u64,
699    /// Letzter Assert-Zeitpunkt (Manual-Liveliness). `None` == nie.
700    last_liveliness_assert: Option<Duration>,
701    /// Per-policy-Zaehler fuer offered_incompatible_qos. Spec
702    /// §2.2.4.2.4.2 — Writer-Seite. Wird inkrementiert bei
703    /// `wire_writer_to_remote_reader` Reject.
704    offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus,
705    /// Lifespan-Duration in Nanosekunden (0 == INFINITE, kein Expire).
706    lifespan_nanos: u64,
707    /// Pro Sample-SN der Insert-Zeitpunkt (relativ zu start_instant).
708    /// Wird beim Expire aus front entfernt — SN sind monoton, Lifespan
709    /// ist konstant, also ist der Expire-Prefix immer front.
710    sample_insert_times:
711        alloc::collections::VecDeque<(zerodds_rtps::wire_types::SequenceNumber, Duration)>,
712    /// Liveliness-Kind (Automatic / ManualByParticipant / ManualByTopic).
713    liveliness_kind: zerodds_qos::LivelinessKind,
714    /// Lease-Duration in Nanosekunden (0 == INFINITE).
715    liveliness_lease_nanos: u64,
716    /// Ownership-Modus.
717    ownership: zerodds_qos::OwnershipKind,
718    /// Partition-Liste.
719    partition: Vec<String>,
720    /// Per-matched-Reader ProtectionLevel. Wird beim
721    /// SEDP-Match aus `sub.security_info` abgeleitet. `None`-Eintraege
722    /// fuer Legacy-Reader. Leer bei Writern ohne gematchte
723    /// Security-Peers — dann ist der Hot-Path unveraendert.
724    #[cfg(feature = "security")]
725    reader_protection: BTreeMap<[u8; 12], ProtectionLevel>,
726    /// Mapping Locator → GuidPrefix fuer den Writer-Tick-Loop, damit
727    /// `secure_outbound_for_target` die Protection per Ziel nachschlagen
728    /// kann, ohne die Writer-Tick-API zu brechen (`dg.targets` sind
729    /// heute Locator-Listen).
730    #[cfg(feature = "security")]
731    locator_to_peer: BTreeMap<Locator, [u8; 12]>,
732    /// F-TYPES-3 XTypes 1.3 §7.3.4.2 TypeIdentifier des Writer-Type
733    /// (von `T::TYPE_IDENTIFIER` aus `UserWriterConfig`).
734    type_identifier: zerodds_types::TypeIdentifier,
735    /// D.5g — Per-Writer-Override fuer DataRepresentation-Offer.
736    /// `None` = Runtime-Default. `Some(vec)` = pro-Writer hardcoded.
737    data_rep_offer_override: Option<Vec<i16>>,
738}
739
740/// Listener-Dispatch traegt parallel zur `UserSample` eine
741/// Zero-Copy-Sicht auf das Original-`Arc<[u8]>` mit Encap-Offset
742/// (Hebel-E Zero-Copy-Pfad).
743pub type UserSampleWithEncap = (UserSample, Option<(Arc<[u8]>, usize)>);
744
745/// Sample-Channel-Item: entweder Daten-Payload oder Lifecycle-Marker.
746/// Lifecycle wird vom Wire-Pfad als `key_hash + ChangeKind` aus dem
747/// PID_STATUS_INFO-Header rekonstruiert; der DataReader-DCPS-Layer
748/// uebersetzt das in `__push_lifecycle`.
749#[derive(Debug, Clone)]
750pub enum UserSample {
751    /// Normales Sample mit Payload (CDR-encoded Application-Type).
752    /// `writer_guid` ist die 16-byte-GUID des emittierenden Writers
753    /// — vom Subscriber fuer Exclusive-Ownership-Resolution
754    /// (DDS 1.4 §2.2.3.23 / §2.2.2.5.5) gebraucht.
755    Alive {
756        /// CDR-Payload (ohne Encapsulation-Header).
757        payload: Vec<u8>,
758        /// Writer-GUID — fuer Strongest-Writer-Selection.
759        writer_guid: [u8; 16],
760        /// Writer-`ownership_strength` zum Zeitpunkt des Empfangs.
761        /// `0` wenn der Writer noch nicht via Discovery bekannt ist
762        /// (Reader behandelt das als Default-Strength = Spec-konform
763        /// fuer Shared-Ownership-Topics; bei Exclusive filtert der
764        /// Reader die echte Strength gegen den aktuellen Owner).
765        writer_strength: i32,
766    },
767    /// Lifecycle-Marker (dispose / unregister) — Reader setzt
768    /// InstanceState entsprechend.
769    Lifecycle {
770        /// Key-Hash der betroffenen Instanz (16 byte).
771        key_hash: [u8; 16],
772        /// `NotAliveDisposed` / `NotAliveUnregistered` /
773        /// `NotAliveDisposedUnregistered`.
774        kind: zerodds_rtps::history_cache::ChangeKind,
775    },
776}
777
778/// User-Reader-Slot. ReliableReader + Topic-Meta + Channel zum
779/// DataReader (DCPS-API-Seite).
780/// Listener-Callback fuer Sample-Arrival.
781///
782/// Wird vom `recv_user_data_loop` synchron im Recv-Thread-Kontext
783/// gefeuert, sobald ein Alive-Sample im Reader-HistoryCache landet.
784/// Eliminiert die Polling-Latenz von `zerodds_reader_take()` —
785/// Listener-Pfad bringt typisch 50-100 µs raus pro Seite.
786///
787/// **Vertrag** (analog zu DDS-Spec §2.2.4.4 Listener-Semantik):
788/// * Callback laeuft im Recv-Thread, NICHT im User-Thread.
789/// * Kurz und nicht-blockierend. Kein I/O, keine Locks, keine
790///   ZeroDDS-API-Aufrufe rein.
791/// * `bytes` zeigt auf den CDR-Payload des Alive-Samples (ohne
792///   Encapsulation-Header). Lifetime nur fuer die Dauer des
793///   Callbacks; kopieren wenn ueber den Call hinaus benoetigt.
794/// * Disposed-/Unregistered-Lifecycle-Events feuern den Listener
795///   NICHT (nur `Alive` Samples) — fuer Lifecycle-Tracking
796///   weiter `zerodds_reader_take()` nutzen oder eine
797///   Lifecycle-Listener-API einbauen.
798pub type UserReaderListener = alloc::boxed::Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
799
800struct UserReaderSlot {
801    reader: ReliableReader,
802    topic_name: String,
803    type_name: String,
804    sample_tx: mpsc::Sender<UserSample>,
805    /// Spec §3 zerodds-async-1.0: Async-Waker-Slot. Wird vom
806    /// Async-Reader registriert; bei `sample_tx.send` rufen wir
807    /// `waker.wake()`. `None` wenn kein Async-Reader aktiv.
808    async_waker: alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>,
809    /// Listener-Callback fuer Alive-Samples.
810    /// Wird vom `recv_user_data_loop` synchron gefeuert. `None` =
811    /// kein Listener registriert (User pollt via
812    /// `zerodds_reader_take()`). Arc, damit der Recv-Thread den
813    /// Callback ohne weiteren Lock cloned ausfuehren kann (Lock-
814    /// Hold-Time minimieren).
815    listener: Option<alloc::sync::Arc<UserReaderListener>>,
816    durability: zerodds_qos::DurabilityKind,
817    /// Deadline-Period in Nanosekunden (0 == INFINITE).
818    deadline_nanos: u64,
819    /// Zeitpunkt des letzten empfangenen Samples relativ zu Runtime-Start.
820    last_sample_received: Option<Duration>,
821    /// Counter fuer verpasste Deadline-Erwartungen (Spec §2.2.4.2.11).
822    requested_deadline_missed_count: u64,
823    /// Per-policy-Zaehler fuer requested_incompatible_qos. Spec
824    /// §2.2.4.2.6.5 — Reader-Seite. Wird inkrementiert bei
825    /// `wire_reader_to_remote_writer` Reject.
826    requested_incompatible_qos: crate::status::RequestedIncompatibleQosStatus,
827    /// Sample-Lost-Counter (Spec §2.2.4.2.6.2). Inkrementiert
828    /// von `record_sample_lost`.
829    sample_lost_count: u64,
830    /// Sample-Rejected-Counter (Spec §2.2.4.2.6.3). Inkrementiert
831    /// von `record_sample_rejected`.
832    sample_rejected: crate::status::SampleRejectedStatus,
833    /// Reader-seitige angeforderte Liveliness-Lease (0 == INFINITE).
834    liveliness_lease_nanos: u64,
835    /// Reader-seitig angeforderter Liveliness-Kind.
836    liveliness_kind: zerodds_qos::LivelinessKind,
837    /// Counter: wie oft wurde der Writer als "alive" markiert
838    /// (Spec §2.2.4.2.14 alive_count).
839    liveliness_alive_count: u64,
840    /// Counter: wie oft wurde er als "not_alive" markiert (Lease abgelaufen).
841    liveliness_not_alive_count: u64,
842    /// Aktueller "alive/not-alive"-Zustand aus Reader-Sicht.
843    liveliness_alive: bool,
844    /// Ownership.
845    ownership: zerodds_qos::OwnershipKind,
846    /// Partition.
847    partition: Vec<String>,
848    /// Per-Writer-Strength-Cache fuer Exclusive-Ownership-Resolution
849    /// (DDS 1.4 §2.2.3.23). Wird von `wire_reader_to_remote_writer`
850    /// aus jedem `PublicationBuiltinTopicData.ownership_strength`
851    /// gefuellt; `delivered_to_user_sample` schlaegt hier nach um die
852    /// Strength in `UserSample::Alive` zu packen.
853    writer_strengths: alloc::collections::BTreeMap<[u8; 16], i32>,
854    /// F-TYPES-3 XTypes 1.3 §7.3.4.2 TypeIdentifier des Reader-Type
855    /// (von `T::TYPE_IDENTIFIER` aus `UserReaderConfig`). Default
856    /// `TypeIdentifier::None` signalisiert "kein TypeIdentifier" —
857    /// Match faellt zurueck auf reinen `type_name`-Vergleich
858    /// (DDS 1.4 §2.2.3 Default-Path).
859    type_identifier: zerodds_types::TypeIdentifier,
860    /// XTypes 1.3 §7.6.3.7 — TCE-Policy zur Steuerung der Strictness
861    /// des XTypes-Match-Pfads.
862    type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
863}
864
865/// Hilfsstruktur zum Announcen einer lokalen Publication/Subscription
866/// als SEDP-BuiltinTopicData. Caller erzeugt sie einmal pro
867/// Writer/Reader-Registration und reicht sie an SedpStack weiter.
868/// QoS-Config fuer die Registrierung eines User-Writers bei der Runtime.
869/// Bundelt alle Policies die auf Wire ueber SEDP gehen plus das lokale
870/// Monitoring. Vermeidet 10+-Argument-Funktionen.
871#[derive(Debug, Clone)]
872pub struct UserWriterConfig {
873    /// Topic-Name (DDS-Topic).
874    pub topic_name: String,
875    /// IDL-Type-Name.
876    pub type_name: String,
877    /// `true` = RELIABLE, `false` = BEST_EFFORT.
878    pub reliable: bool,
879    /// Durability.
880    pub durability: zerodds_qos::DurabilityKind,
881    /// Deadline-Period (offered).
882    pub deadline: zerodds_qos::DeadlineQosPolicy,
883    /// Lifespan-Duration (writer-only).
884    pub lifespan: zerodds_qos::LifespanQosPolicy,
885    /// Liveliness (offered).
886    pub liveliness: zerodds_qos::LivelinessQosPolicy,
887    /// Ownership-Modus (Shared / Exclusive).
888    pub ownership: zerodds_qos::OwnershipKind,
889    /// Strength bei Exclusive (ignoriert bei Shared).
890    pub ownership_strength: i32,
891    /// Partition-Liste. Leer == default partition (`""`).
892    pub partition: Vec<String>,
893    /// UserData QoS (Spec §2.2.3.1) — opaque `sequence<octet>`, ueber
894    /// Discovery propagiert.
895    pub user_data: Vec<u8>,
896    /// TopicData QoS (Spec §2.2.3.3).
897    pub topic_data: Vec<u8>,
898    /// GroupData QoS (Spec §2.2.3.2).
899    pub group_data: Vec<u8>,
900    /// XTypes 1.3 §7.3.4.2 TypeIdentifier (F-TYPES-3 Wire-up). Default
901    /// `TypeIdentifier::None` für `T::TYPE_IDENTIFIER`-Default.
902    pub type_identifier: zerodds_types::TypeIdentifier,
903
904    /// D.5g — Per-Writer Override der DataRepresentation-Offer-Liste.
905    /// `None` = nutze `RuntimeConfig::data_representation_offer`.
906    /// `Some(vec)` = pro-Writer ueberschrieben (z.B. `[XCDR2]` fuer
907    /// einen modernen-only-Pub).
908    pub data_representation_offer: Option<Vec<i16>>,
909}
910
911/// QoS-Config fuer die Registrierung eines User-Readers.
912#[derive(Debug, Clone)]
913pub struct UserReaderConfig {
914    /// Topic-Name.
915    pub topic_name: String,
916    /// IDL-Type-Name.
917    pub type_name: String,
918    /// `true` = RELIABLE, `false` = BEST_EFFORT.
919    pub reliable: bool,
920    /// Durability (requested).
921    pub durability: zerodds_qos::DurabilityKind,
922    /// Deadline (requested).
923    pub deadline: zerodds_qos::DeadlineQosPolicy,
924    /// Liveliness (requested).
925    pub liveliness: zerodds_qos::LivelinessQosPolicy,
926    /// Ownership.
927    pub ownership: zerodds_qos::OwnershipKind,
928    /// Partition.
929    pub partition: Vec<String>,
930    /// UserData QoS (Spec §2.2.3.1).
931    pub user_data: Vec<u8>,
932    /// TopicData QoS (Spec §2.2.3.3).
933    pub topic_data: Vec<u8>,
934    /// GroupData QoS (Spec §2.2.3.2).
935    pub group_data: Vec<u8>,
936    /// XTypes 1.3 §7.3.4.2 TypeIdentifier (F-TYPES-3 Wire-up).
937    pub type_identifier: zerodds_types::TypeIdentifier,
938    /// TypeConsistencyEnforcement (XTypes §7.6.3.7) — steuert wie strict
939    /// der Reader-Match XTypes-Compatibility prüft.
940    pub type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
941
942    /// D.5g — Per-Reader Override der DataRepresentation-Accept-Liste.
943    /// `None` = nutze `RuntimeConfig::data_representation_offer`.
944    /// `Some(vec)` = pro-Reader ueberschrieben (z.B. `[XCDR1]` fuer
945    /// einen Reader der nur legacy-XCDR1-Wire akzeptiert).
946    pub data_representation_offer: Option<Vec<i16>>,
947}
948
949fn build_publication_data(
950    owner_prefix: GuidPrefix,
951    writer_eid: EntityId,
952    cfg: &UserWriterConfig,
953    runtime_offer: &[i16],
954) -> zerodds_rtps::publication_data::PublicationBuiltinTopicData {
955    use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
956    zerodds_rtps::publication_data::PublicationBuiltinTopicData {
957        key: Guid::new(owner_prefix, writer_eid),
958        participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
959        topic_name: cfg.topic_name.clone(),
960        type_name: cfg.type_name.clone(),
961        durability: cfg.durability,
962        reliability: ReliabilityQosPolicy {
963            kind: if cfg.reliable {
964                ReliabilityKind::Reliable
965            } else {
966                ReliabilityKind::BestEffort
967            },
968            max_blocking_time: QosDuration::from_millis(100_i32),
969        },
970        ownership: cfg.ownership,
971        ownership_strength: cfg.ownership_strength,
972        liveliness: cfg.liveliness,
973        deadline: cfg.deadline,
974        lifespan: cfg.lifespan,
975        partition: cfg.partition.clone(),
976        user_data: cfg.user_data.clone(),
977        topic_data: cfg.topic_data.clone(),
978        group_data: cfg.group_data.clone(),
979        type_information: None,
980        // D.5g — PID_DATA_REPRESENTATION (XTypes 1.3 §7.6.3.1.1, RTPS 2.5
981        // PID 0x0073). Per-Writer-Override (cfg.data_representation_offer)
982        // ueberschreibt den RuntimeConfig-Default.
983        data_representation: cfg
984            .data_representation_offer
985            .clone()
986            .unwrap_or_else(|| runtime_offer.to_vec()),
987        // Security: PolicyEngine befuellt das spaeter. Default
988        // None = Legacy-Verhalten (kein EndpointSecurityInfo-PID).
989        security_info: None,
990        // .B — RPC-Discovery-PIDs. Default None: kein RPC-Endpoint;
991        // RpcEndpoint-Builder befuellt diese Felder.
992        service_instance_name: None,
993        related_entity_guid: None,
994        topic_aliases: None,
995        // F-TYPES-3 Wire-up: XTypes-1.3 §7.3.4.2 TypeIdentifier.
996        type_identifier: cfg.type_identifier.clone(),
997    }
998}
999
1000fn build_subscription_data(
1001    owner_prefix: GuidPrefix,
1002    reader_eid: EntityId,
1003    cfg: &UserReaderConfig,
1004    runtime_offer: &[i16],
1005) -> zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
1006    use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
1007    zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
1008        key: Guid::new(owner_prefix, reader_eid),
1009        participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
1010        topic_name: cfg.topic_name.clone(),
1011        type_name: cfg.type_name.clone(),
1012        durability: cfg.durability,
1013        reliability: ReliabilityQosPolicy {
1014            kind: if cfg.reliable {
1015                ReliabilityKind::Reliable
1016            } else {
1017                ReliabilityKind::BestEffort
1018            },
1019            max_blocking_time: QosDuration::from_millis(100_i32),
1020        },
1021        ownership: cfg.ownership,
1022        liveliness: cfg.liveliness,
1023        deadline: cfg.deadline,
1024        partition: cfg.partition.clone(),
1025        user_data: cfg.user_data.clone(),
1026        topic_data: cfg.topic_data.clone(),
1027        group_data: cfg.group_data.clone(),
1028        type_information: None,
1029        // D.5g — PID_DATA_REPRESENTATION (siehe build_publication_data).
1030        // Per-Reader-Override ueberschreibt RuntimeConfig-Default.
1031        data_representation: cfg
1032            .data_representation_offer
1033            .clone()
1034            .unwrap_or_else(|| runtime_offer.to_vec()),
1035        content_filter: None,
1036        security_info: None,
1037        service_instance_name: None,
1038        related_entity_guid: None,
1039        topic_aliases: None,
1040        // F-TYPES-3 Wire-up: XTypes-1.3 §7.3.4.2 TypeIdentifier.
1041        type_identifier: cfg.type_identifier.clone(),
1042    }
1043}
1044
1045/// Die Runtime eines `DomainParticipant`s. Hosts alle Background-
1046/// Threads und UDP-Sockets.
1047pub struct DcpsRuntime {
1048    /// Participant-GUID-Prefix (12-Byte Identifier, random pro Instanz).
1049    pub guid_prefix: GuidPrefix,
1050    /// Domain-Id.
1051    pub domain_id: i32,
1052    /// SPDP-Multicast-Receiver-Socket.
1053    pub spdp_multicast_rx: Arc<UdpTransport>,
1054    /// SPDP-Unicast-Socket (fuer bidirektionales SPDP, B2).
1055    pub spdp_unicast: Arc<UdpTransport>,
1056    /// User-Data-Unicast-Socket (Default-User-Unicast, wohin Peers
1057    /// matched-Samples senden).
1058    pub user_unicast: Arc<UdpTransport>,
1059    /// Sender-Socket fuer SPDP-Multicast-Announce (separater UdpSocket
1060    /// ohne SO_REUSE/SO_BIND_IP_MULTICAST, damit send_to sauber routet).
1061    spdp_mc_tx: Arc<UdpTransport>,
1062    /// SPDP-Beacon (sendet periodische Announces).
1063    spdp_beacon: Mutex<SpdpBeacon>,
1064    /// SPDP-Reader (parsed incoming Beacons).
1065    spdp_reader: SpdpReader,
1066    /// Discovered remote participants (prefix → data).
1067    discovered: Arc<Mutex<DiscoveredParticipantsCache>>,
1068    /// SEDP-Stack fuer Publication/Subscription-Announce + -Discovery.
1069    pub sedp: Arc<Mutex<SedpStack>>,
1070    /// TypeLookup-Service Builtin-Endpoint-GUIDs (XTypes 1.3 §7.6.3.3.4).
1071    pub type_lookup_endpoints: TypeLookupEndpoints,
1072    /// TypeLookup-Server (server-side Handler über die lokale
1073    /// TypeRegistry).
1074    pub type_lookup_server: Arc<Mutex<TypeLookupServer>>,
1075    /// TypeLookup-Client (client-side Correlation-Table für outstanding
1076    /// Requests).
1077    pub type_lookup_client: Arc<Mutex<TypeLookupClient>>,
1078    /// Security-Builtin-Endpoint-Stack
1079    /// (`DCPSParticipantStatelessMessage` + `DCPSParticipantVolatile-
1080    /// MessageSecure`). `None`, solange kein Security-Plugin aktiv ist
1081    /// — der Hot-Path ueberspringt dann jeglichen Security-Builtin-
1082    /// Demux. `Some` wird via [`DcpsRuntime::enable_security_builtins`]
1083    /// gesetzt, sobald die Factory ein Plugin registriert hat.
1084    pub security_builtin: Mutex<Option<Arc<Mutex<SecurityBuiltinStack>>>>,
1085    /// Monotonic "start time" — fuer SEDP-tick-Uhren.
1086    start_instant: Instant,
1087    /// Lokale User-Writer-Registry (EntityId → Writer-State).
1088    user_writers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserWriterSlot>>>>>,
1089    /// ADR-0006 Side-Map: pro User-Writer ein optionaler ShmLocator-Bytes-
1090    /// Wert (PID_SHM_LOCATOR im SEDP-Sample). `None` = kein
1091    /// Same-Host-Backend angeschlossen. Der Wire-Encoder konsultiert
1092    /// diese Map beim SEDP-Push.
1093    shm_locators: Arc<RwLock<BTreeMap<EntityId, Vec<u8>>>>,
1094    /// Lokale User-Reader-Registry (EntityId → Reader-State).
1095    user_readers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserReaderSlot>>>>>,
1096    /// Entity-Key-Counter (3 Byte, incrementing). User-Writer nutzen
1097    /// `0xC2` (with-key, user), User-Reader `0xC7`.
1098    entity_counter: AtomicU32,
1099    /// Konfiguration (cloned aus RuntimeConfig).
1100    pub config: RuntimeConfig,
1101    /// Per-Interface-Outbound-Socket-Pool. `None`
1102    /// wenn `config.interface_bindings` leer ist — dann bleibt
1103    /// `user_unicast` das einzige Outbound-Socket (v1.4-Pfad).
1104    #[cfg(feature = "security")]
1105    outbound_pool: Option<Arc<OutboundSocketPool>>,
1106    /// Writer-Liveliness-Protocol-Endpoint(RTPS 2.5 §8.4.13).
1107    /// Sendet periodische `ParticipantMessageData`-Heartbeats und
1108    /// trackt Last-Seen pro remote Participant.
1109    pub wlp: Arc<Mutex<crate::wlp::WlpEndpoint>>,
1110    /// Builtin-Topic-Reader-Sinks(DDS 1.4 §2.2.5). Werden vom
1111    /// `DomainParticipant`-Konstruktor via `attach_builtin_sinks`
1112    /// gesetzt; vorher ist hier `None` und der Discovery-Hot-Path
1113    /// laesst Samples kommentarlos fallen (z.B. wenn die Runtime
1114    /// direkt fuer interne Tests gestartet wird, ohne Participant).
1115    builtin_sinks: Mutex<Option<crate::builtin_subscriber::BuiltinSinks>>,
1116    /// Ignore-Filter(DDS 1.4 §2.2.2.2.1.14-17). Wird vom
1117    /// `DomainParticipant`-Konstruktor via `attach_ignore_filter`
1118    /// gesetzt. `None` heisst: kein Participant-Hook → keine
1119    /// Filterung.
1120    ignore_filter: Mutex<Option<crate::participant::IgnoreFilter>>,
1121    /// Stop-flag fuer alle Worker-Threads (recv-loops + tick-loop).
1122    stop: Arc<AtomicBool>,
1123    /// Worker-Thread JoinHandles. Per-Socket-Recv-Threads + tick-thread,
1124    /// alle ueber `stop` gemeinsam beendet (Sprint D.5b — vorher
1125    /// einziger Single-Thread `event_loop`).
1126    handles: Mutex<Vec<JoinHandle<()>>>,
1127    /// Match-Event-Notifier (D.5e Phase-1 Quick-Win). Wird vom
1128    /// SEDP-Match-Pfad nach `add_reader_proxy` / `add_writer_proxy`
1129    /// notified; `wait_for_matched_*` parkt darauf statt 20-ms-zu-pollen.
1130    /// Mutex-Inhalt ist nur ein Lock-Anker fuer Condvar-API; es gibt
1131    /// keinen state der davon geschuetzt wird (count wird unabhaengig
1132    /// per `user_*_matched_count` gelesen).
1133    match_event: Arc<(Mutex<()>, Condvar)>,
1134    /// Acknowledgments-Event-Notifier. Wird notified wenn ein Writer
1135    /// einen ACKNACK empfaengt der seinen acked-base vorrueckt.
1136    /// `wait_for_acknowledgments` parkt darauf statt 50-ms-zu-pollen.
1137    ack_event: Arc<(Mutex<()>, Condvar)>,
1138}
1139
1140impl core::fmt::Debug for DcpsRuntime {
1141    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1142        f.debug_struct("DcpsRuntime")
1143            .field("domain_id", &self.domain_id)
1144            .field("guid_prefix", &self.guid_prefix)
1145            .field("spdp_group", &self.config.spdp_multicast_group)
1146            .finish_non_exhaustive()
1147    }
1148}
1149
1150/// Type-Alias: Arc-geteilte Slot-Handles aus der Per-Slot-Mutex-
1151/// Architektur .
1152type WriterSlotArc = Arc<Mutex<UserWriterSlot>>;
1153type ReaderSlotArc = Arc<Mutex<UserReaderSlot>>;
1154
1155impl DcpsRuntime {
1156    // ========================================================================
1157    // --- Per-Slot-Mutex-Helpers
1158    //
1159    // Die `user_writers`/`user_readers`-Registry ist `RwLock<BTreeMap<EntityId,
1160    // Arc<Mutex<Slot>>>>`. Hot-Path-Zugriffe nehmen den read-Lock kurz, klonen
1161    // den Slot-Arc und geben den read-Lock frei, bevor sie den Per-Slot-Mutex
1162    // nehmen. Parallele Schreibzugriffe auf **verschiedene** Slots laufen damit
1163    // ohne globale Contention.
1164    //
1165    // Slot-Erzeugung/-Loeschung nimmt den write-Lock; das ist selten und
1166    // amortisiert sich.
1167    // ========================================================================
1168
1169    /// Liefert den Slot-Arc fuer einen User-Writer, falls vorhanden.
1170    /// Hot-Path-Form: ein einzelner read-Lock + Arc-Clone, kein
1171    /// Per-Slot-Mutex. Caller nimmt den Mutex selbst.
1172    fn writer_slot(&self, eid: EntityId) -> Option<WriterSlotArc> {
1173        self.user_writers
1174            .read()
1175            .ok()
1176            .and_then(|w| w.get(&eid).cloned())
1177    }
1178
1179    /// Liefert den Slot-Arc fuer einen User-Reader, falls vorhanden.
1180    fn reader_slot(&self, eid: EntityId) -> Option<ReaderSlotArc> {
1181        self.user_readers
1182            .read()
1183            .ok()
1184            .and_then(|r| r.get(&eid).cloned())
1185    }
1186
1187    /// Snapshot aller Writer-Slots als `Vec<(EntityId, Arc)>`. Erlaubt
1188    /// Iteration ohne den Registry-read-Lock zu halten — z.B. fuer
1189    /// Heartbeat-Tick oder Liveliness-Sweep, wo wir potentiell jeden
1190    /// Slot's Mutex nehmen.
1191    fn writer_slots_snapshot(&self) -> Vec<(EntityId, WriterSlotArc)> {
1192        match self.user_writers.read() {
1193            Ok(w) => w.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
1194            Err(_) => Vec::new(),
1195        }
1196    }
1197
1198    /// Snapshot aller Reader-Slots — symmetrisch zu writer_slots_snapshot.
1199    fn reader_slots_snapshot(&self) -> Vec<(EntityId, ReaderSlotArc)> {
1200        match self.user_readers.read() {
1201            Ok(r) => r.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
1202            Err(_) => Vec::new(),
1203        }
1204    }
1205
1206    /// Liefert die Liste der EntityIds aller registrierten Writer.
1207    /// Sehr leichtgewichtig — kein Slot-Arc-Clone, nur EntityIds.
1208    fn writer_eids(&self) -> Vec<EntityId> {
1209        match self.user_writers.read() {
1210            Ok(w) => w.keys().copied().collect(),
1211            Err(_) => Vec::new(),
1212        }
1213    }
1214
1215    /// Liefert die Liste der EntityIds aller registrierten Reader.
1216    fn reader_eids(&self) -> Vec<EntityId> {
1217        match self.user_readers.read() {
1218            Ok(r) => r.keys().copied().collect(),
1219            Err(_) => Vec::new(),
1220        }
1221    }
1222
1223    /// Startet eine neue Runtime fuer einen Participant.
1224    ///
1225    /// # Errors
1226    /// `TransportError` wenn eines der 3 UDP-Sockets nicht bindet
1227    /// (z.B. Port-Kollision auf dem SPDP-Multicast-Port in einer
1228    /// anderen SO_REUSE-less DDS-Instanz).
1229    pub fn start(
1230        domain_id: i32,
1231        guid_prefix: GuidPrefix,
1232        config: RuntimeConfig,
1233    ) -> Result<Arc<Self>> {
1234        // SPDP-Multicast-Receiver auf dem Spec-Port.
1235        // u32 → u16 enforcing, Spec-Port ist immer < 65536.
1236        let spdp_port = u16::try_from(spdp_multicast_port(domain_id as u32)).map_err(|_| {
1237            DdsError::BadParameter {
1238                what: "domain_id too large for SPDP port mapping",
1239            }
1240        })?;
1241        let spdp_mc = UdpTransport::bind_multicast_v4(
1242            config.spdp_multicast_group,
1243            spdp_port,
1244            config.multicast_interface,
1245        )
1246        .map_err(|_| DdsError::TransportError {
1247            label: "spdp multicast bind",
1248        })?
1249        // Sprint D.5b: Recv-Sockets haben einen eigenen Thread, der
1250        // blocking auf Daten wartet. Timeout 1 s = Stop-Flag-Polling-
1251        // Granularitaet beim Shutdown, NICHT der Tick-Rhythmus.
1252        .with_timeout(Some(Duration::from_secs(1)))
1253        .map_err(|_| DdsError::TransportError {
1254            label: "spdp multicast set_timeout",
1255        })?;
1256
1257        // SPDP-Unicast (ephemeral Port) — fuer bidirektionales SPDP
1258        // (wenn ein Peer per UC anfragt).
1259        let spdp_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
1260            .map_err(|_| DdsError::TransportError {
1261                label: "spdp unicast bind",
1262            })?
1263            .with_timeout(Some(Duration::from_secs(1)))
1264            .map_err(|_| DdsError::TransportError {
1265                label: "spdp unicast set_timeout",
1266            })?;
1267
1268        // User-Data-Unicast (ephemeral Port).
1269        let user_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
1270            .map_err(|_| DdsError::TransportError {
1271                label: "user unicast bind",
1272            })?
1273            .with_timeout(Some(Duration::from_secs(1)))
1274            .map_err(|_| DdsError::TransportError {
1275                label: "user unicast set_timeout",
1276            })?;
1277
1278        // Separater Sender-Socket fuer SPDP-Multicast-Announce. Dieser
1279        // nutzt einen ephemeral Unicast-Port; `send_to` zum Multicast-
1280        // Target geht ueber das vom Kernel gewaehlte outgoing-interface.
1281        let spdp_mc_tx = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0).map_err(|_| {
1282            DdsError::TransportError {
1283                label: "spdp mc-tx bind",
1284            }
1285        })?;
1286
1287        let stop = Arc::new(AtomicBool::new(false));
1288
1289        // Beacon-Locators fuer Cross-Host-Interop materialisieren:
1290        // bei `0.0.0.0`-Bind-Adresse (UNSPECIFIED) erfaehrt der Peer
1291        // sonst eine nicht-routbare Adresse. Wir loesen UNSPECIFIED
1292        // ueber einen UDP-Connect-Probe zu einer non-routable IP auf
1293        // (kein Datenverkehr, nur Routing-Tabelle) und annoncen die
1294        // resultierende lokale Interface-Adresse — Cross-Host-faehig
1295        // ohne externe Crate-Abhaengigkeit.
1296        let user_locator = announce_locator(&user_uc, config.multicast_interface);
1297        let spdp_uc_locator = announce_locator(&spdp_uc, config.multicast_interface);
1298        let participant_data = ParticipantBuiltinTopicData {
1299            guid: Guid::new(guid_prefix, EntityId::PARTICIPANT),
1300            protocol_version: ProtocolVersion::V2_5,
1301            vendor_id: VendorId::ZERODDS,
1302            default_unicast_locator: Some(user_locator),
1303            default_multicast_locator: None,
1304            metatraffic_unicast_locator: Some(spdp_uc_locator),
1305            metatraffic_multicast_locator: Some(Locator {
1306                kind: LocatorKind::UdpV4,
1307                port: u32::from(spdp_port),
1308                address: {
1309                    let mut a = [0u8; 16];
1310                    a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
1311                    a
1312                },
1313            }),
1314            domain_id: Some(domain_id as u32),
1315            // Wir announcen die Endpoints, die wir tatsaechlich
1316            // implementieren: SPDP (Participant-Ann/Det) + SEDP
1317            // (Publications/Subscriptions Ann+Det) + WLP (10/11) +
1318            // TypeLookup-Service (12/13). Cyclone/Fast-DDS filtern
1319            // ihre Proxy-Anlage anhand dieser Flags — ohne sie
1320            // bekommen wir keine SEDP-/WLP-Peers. SEDP-Topics-
1321            // Endpoints (Bits 28/29) sind per RTPS 2.5 §8.5.4.4
1322            // optional und in ZeroDDS via synthetische DCPSTopic-
1323            // Ableitung aus Pub/Sub abgedeckt — wir annoncen sie
1324            // nicht, sonst versprechen wir Peers eine nicht existente
1325            // Endpoint-Paarung. Wenn der Caller
1326            // `announce_secure_endpoints = true` setzt (Security-
1327            // Factory-Pfad), mixen wir zusaetzlich die 12 Secure-
1328            // Discovery-Bits (16..27, DDS-Security 1.2 §7.4.7.1) ein.
1329            builtin_endpoint_set: {
1330                let mut mask = endpoint_flag::ALL_STANDARD;
1331                if config.announce_secure_endpoints {
1332                    mask |= endpoint_flag::ALL_SECURE;
1333                }
1334                mask
1335            },
1336            // Spec default-lease = 100 s; konfigurierbar via
1337            // `RuntimeConfig::participant_lease_duration`.
1338            lease_duration: qos_duration_from_std(config.participant_lease_duration),
1339            // UserData am Participant — gefuellt aus
1340            // `DomainParticipantQos::user_data` ueber RuntimeConfig.
1341            user_data: config.user_data.clone(),
1342            // PROPERTY_LIST: Security fuellt das mit Security-Caps,
1343            // sobald eine PolicyEngine konfiguriert ist. Default-leer
1344            // bleibt abwaerts-kompatibel mit Legacy-Peers.
1345            properties: Default::default(),
1346            // IdentityToken/PermissionsToken werden vom Security-
1347            // Layer befuellt, sobald Authentication + AccessControl
1348            // initialisiert sind. Default `None` = Legacy-Annonce.
1349            identity_token: None,
1350            permissions_token: None,
1351            identity_status_token: None,
1352            sig_algo_info: None,
1353            kx_algo_info: None,
1354            sym_cipher_algo_info: None,
1355        };
1356        let beacon = SpdpBeacon::new(participant_data);
1357        let sedp = SedpStack::new(guid_prefix, VendorId::ZERODDS);
1358
1359        #[cfg(feature = "security")]
1360        let outbound_pool = if config.interface_bindings.is_empty() {
1361            None
1362        } else {
1363            Some(Arc::new(OutboundSocketPool::bind_all(
1364                &config.interface_bindings,
1365            )?))
1366        };
1367
1368        // WLP-Endpoint (RTPS 2.5 §8.4.13). Tick-Periode ist explicit
1369        // `wlp_period`, oder `lease/3` wenn `wlp_period == ZERO`
1370        // (Spec-Empfehlung: drei Misses bevor Reader den Writer als
1371        // not-alive markiert).
1372        let wlp_tick_period = if config.wlp_period.is_zero() {
1373            config.participant_lease_duration / 3
1374        } else {
1375            config.wlp_period
1376        };
1377        let wlp = crate::wlp::WlpEndpoint::new(guid_prefix, VendorId::ZERODDS, wlp_tick_period);
1378
1379        let rt = Arc::new(Self {
1380            guid_prefix,
1381            domain_id,
1382            spdp_multicast_rx: Arc::new(spdp_mc),
1383            spdp_unicast: Arc::new(spdp_uc),
1384            user_unicast: Arc::new(user_uc),
1385            spdp_mc_tx: Arc::new(spdp_mc_tx),
1386            spdp_beacon: Mutex::new(beacon),
1387            spdp_reader: SpdpReader::new(),
1388            discovered: Arc::new(Mutex::new(DiscoveredParticipantsCache::new())),
1389            sedp: Arc::new(Mutex::new(sedp)),
1390            type_lookup_endpoints: TypeLookupEndpoints::new(guid_prefix),
1391            type_lookup_server: Arc::new(Mutex::new(TypeLookupServer::new())),
1392            type_lookup_client: Arc::new(Mutex::new(TypeLookupClient::new())),
1393            security_builtin: Mutex::new(None),
1394            start_instant: Instant::now(),
1395            user_writers: Arc::new(RwLock::new(BTreeMap::new())),
1396            shm_locators: Arc::new(RwLock::new(BTreeMap::new())),
1397            user_readers: Arc::new(RwLock::new(BTreeMap::new())),
1398            entity_counter: AtomicU32::new(1),
1399            config,
1400            stop: stop.clone(),
1401            handles: Mutex::new(Vec::new()),
1402            match_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
1403            ack_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
1404            #[cfg(feature = "security")]
1405            outbound_pool,
1406            wlp: Arc::new(Mutex::new(wlp)),
1407            builtin_sinks: Mutex::new(None),
1408            ignore_filter: Mutex::new(None),
1409        });
1410
1411        // Per-Socket-Recv-Threads + ein Tick-Thread (Sprint D.5b).
1412        //
1413        // Vorher lief der gesamte Stack in einem einzigen Event-Loop,
1414        // der Pro Iteration drei blocking-`recv()`s mit `tick_period`-
1415        // Timeout (50 ms) sequenziell durchgegangen ist. Bei einem
1416        // Roundtrip wartete jede Stufe bis zu 50 ms auf Timeouts der
1417        // vorderen Sockets, bevor das eigene Datagram dran war —
1418        // ergab 5-14 ms p50.
1419        //
1420        // Refit: jeder relevante Recv-Pfad hat einen eigenen Thread,
1421        // der direkt blocking auf seinem Socket sitzt und sofort
1422        // dispatcht wenn Daten ankommen. Der Tick-Thread macht die
1423        // periodischen Outbound-Sachen (HEARTBEAT/Resend/ACKNACK/
1424        // SPDP-Announce/Deadline/Lifespan/Liveliness) und schlaeft
1425        // `tick_period` zwischen Iterationen.
1426        //
1427        // Lock-Order (Deadlock-Vermeidung): Tick-Thread und
1428        // Recv-Threads konkurrieren um `rt.sedp.lock()` / `rt.wlp.lock()`.
1429        // Konvention: lock-hold-Zeiten kurz halten (handle_datagram /
1430        // tick sind beide schnell), kein Sub-Lock unter dem `sedp`-
1431        // oder `wlp`-Lock holen.
1432        let mut handles_init: Vec<JoinHandle<()>> = Vec::with_capacity(4);
1433
1434        let rt_recv_spdp_mc = Arc::clone(&rt);
1435        let stop_recv_spdp_mc = stop.clone();
1436        handles_init.push(
1437            thread::Builder::new()
1438                .name(String::from("zdds-recv-spdp-mc"))
1439                .spawn(move || recv_spdp_multicast_loop(rt_recv_spdp_mc, stop_recv_spdp_mc))
1440                .map_err(|_| DdsError::PreconditionNotMet {
1441                    reason: "spawn zdds-recv-spdp-mc thread",
1442                })?,
1443        );
1444
1445        let rt_recv_meta = Arc::clone(&rt);
1446        let stop_recv_meta = stop.clone();
1447        handles_init.push(
1448            thread::Builder::new()
1449                .name(String::from("zdds-recv-meta"))
1450                .spawn(move || recv_metatraffic_loop(rt_recv_meta, stop_recv_meta))
1451                .map_err(|_| DdsError::PreconditionNotMet {
1452                    reason: "spawn zdds-recv-meta thread",
1453                })?,
1454        );
1455
1456        let rt_recv_user = Arc::clone(&rt);
1457        let stop_recv_user = stop.clone();
1458        handles_init.push(
1459            thread::Builder::new()
1460                .name(String::from("zdds-recv-user"))
1461                .spawn(move || recv_user_data_loop(rt_recv_user, stop_recv_user))
1462                .map_err(|_| DdsError::PreconditionNotMet {
1463                    reason: "spawn zdds-recv-user thread",
1464                })?,
1465        );
1466
1467        let rt_tick = Arc::clone(&rt);
1468        let stop_tick = stop;
1469        handles_init.push(
1470            thread::Builder::new()
1471                .name(String::from("zdds-tick"))
1472                .spawn(move || tick_loop(rt_tick, stop_tick))
1473                .map_err(|_| DdsError::PreconditionNotMet {
1474                    reason: "spawn zdds-tick thread",
1475                })?,
1476        );
1477
1478        let mut guard = rt
1479            .handles
1480            .lock()
1481            .map_err(|_| DdsError::PreconditionNotMet {
1482                reason: "runtime handles mutex poisoned",
1483            })?;
1484        *guard = handles_init;
1485        drop(guard);
1486
1487        Ok(rt)
1488    }
1489
1490    /// Lokaler Unicast-Locator fuer User-Data (wird in SPDP announced).
1491    #[must_use]
1492    pub fn user_locator(&self) -> zerodds_rtps::wire_types::Locator {
1493        self.user_unicast.local_locator()
1494    }
1495
1496    /// Lokaler Unicast-Locator fuer SPDP-Metatraffic.
1497    #[must_use]
1498    pub fn spdp_unicast_locator(&self) -> zerodds_rtps::wire_types::Locator {
1499        self.spdp_unicast.local_locator()
1500    }
1501
1502    /// Liefert die `BuiltinEndpointSet`-Bitmaske, die der Runtime
1503    /// aktuell im SPDP-Beacon annonciert. Wird fuer Tests + Diagnose
1504    /// genutzt; produktive Konsumenten sollten den SPDP-Beacon selbst
1505    /// dekodieren.
1506    #[must_use]
1507    pub fn announced_builtin_endpoint_set(&self) -> u32 {
1508        self.spdp_beacon
1509            .lock()
1510            .map(|b| b.data.builtin_endpoint_set)
1511            .unwrap_or(0)
1512    }
1513
1514    /// Registriert einen `TypeObject` in der lokalen TypeLookup-Server-
1515    /// Registry. Andere Participants können diesen Type danach via
1516    /// `getTypes`-Request abfragen (XTypes 1.3 §7.6.3.3.4).
1517    ///
1518    /// Liefert den `EquivalenceHash` des registrierten Types zurück
1519    /// (Caller kann ihn z.B. in `PublicationBuiltinTopicData` als
1520    /// PID_TYPE_INFORMATION-Hint einbetten).
1521    ///
1522    /// # Errors
1523    /// `DdsError::PreconditionNotMet` bei Lock-Poisoning oder Hash-
1524    /// Berechnungs-Fehler.
1525    pub fn register_type_object(
1526        &self,
1527        obj: zerodds_types::type_object::TypeObject,
1528    ) -> Result<zerodds_types::EquivalenceHash> {
1529        let hash = zerodds_types::compute_hash(&obj).map_err(|_| DdsError::PreconditionNotMet {
1530            reason: "type hash computation failed",
1531        })?;
1532        let mut server =
1533            self.type_lookup_server
1534                .lock()
1535                .map_err(|_| DdsError::PreconditionNotMet {
1536                    reason: "type_lookup_server mutex poisoned",
1537                })?;
1538        match obj {
1539            zerodds_types::type_object::TypeObject::Minimal(m) => {
1540                server.registry.insert_minimal(hash, m);
1541            }
1542            zerodds_types::type_object::TypeObject::Complete(c) => {
1543                server.registry.insert_complete(hash, c);
1544            }
1545            _ => {
1546                return Err(DdsError::PreconditionNotMet {
1547                    reason: "unknown TypeObject variant",
1548                });
1549            }
1550        }
1551        Ok(hash)
1552    }
1553
1554    /// Sendet einen `getTypes`-Request an einen discovered Peer und
1555    /// liefert eine `RequestId` zurück, mit der der Caller den
1556    /// asynchronen Reply später korrelieren kann (XTypes 1.3
1557    /// §7.6.3.3.4 + `TypeLookupClient::handle_reply`).
1558    ///
1559    /// `peer` muss in `discovered_participants()` sein — sonst wird
1560    /// `None` zurückgegeben (kein bekannter Peer-Locator). Bei
1561    /// erfolgreichem Send wird die Request-Sample-Identity-Sequence
1562    /// als `RequestId` zurückgegeben; eingehender Reply wird auf
1563    /// dieser Sequence-ID korreliert.
1564    ///
1565    /// # Errors
1566    /// `DdsError::PreconditionNotMet` bei Encode-Fehlern oder Lock-
1567    /// Poisoning.
1568    pub fn send_type_lookup_request(
1569        &self,
1570        peer: zerodds_rtps::wire_types::GuidPrefix,
1571        type_hashes: &[zerodds_types::EquivalenceHash],
1572    ) -> Result<Option<zerodds_discovery::type_lookup::RequestId>> {
1573        use alloc::sync::Arc as AllocArc;
1574        use zerodds_discovery::type_lookup::request_types_payload;
1575        use zerodds_rtps::datagram::encode_data_datagram;
1576        use zerodds_rtps::header::RtpsHeader;
1577        use zerodds_rtps::submessages::DataSubmessage;
1578        use zerodds_rtps::wire_types::{ProtocolVersion, SequenceNumber};
1579
1580        // Find peer's user-unicast-Locator (default-unicast first;
1581        // fallback metatraffic-unicast). TypeLookup-Datagrams gehen über
1582        // den User-Unicast-Pfad — das Peer-DCPS-Runtime hat dort einen
1583        // gemeinsamen Receive-Loop für SEDP/User-Daten/TypeLookup.
1584        let target = {
1585            let discovered = self
1586                .discovered
1587                .lock()
1588                .map_err(|_| DdsError::PreconditionNotMet {
1589                    reason: "discovered mutex poisoned",
1590                })?;
1591            let Some(dp) = discovered.get(&peer) else {
1592                return Ok(None);
1593            };
1594            dp.data
1595                .default_unicast_locator
1596                .or(dp.data.metatraffic_unicast_locator)
1597        };
1598        let Some(target) = target else {
1599            return Ok(None);
1600        };
1601
1602        // Allocate RequestId (client-side incrementing sequence). Reply-
1603        // Korrelation laeuft ueber den `handle_reply`-Callback. Wir
1604        // registrieren einen Callback, der die zurueckgelieferten
1605        // TypeObjects in den lokalen `TypeLookupServer.registry`
1606        // einspeist (XTypes 1.3 §7.6.3.3.4): Hash-by-Hash, getrennt
1607        // fuer Minimal- und Complete-Variants. So wird ein Hash, der
1608        // einmal aufgeloest wurde, fuer kuenftige `has_type_for_hash`-
1609        // Checks (= keine Re-Requests) erkannt.
1610        let mut client =
1611            self.type_lookup_client
1612                .lock()
1613                .map_err(|_| DdsError::PreconditionNotMet {
1614                    reason: "type_lookup_client mutex poisoned",
1615                })?;
1616        let type_ids: alloc::vec::Vec<zerodds_types::TypeIdentifier> = type_hashes
1617            .iter()
1618            .map(|h| zerodds_types::TypeIdentifier::EquivalenceHashMinimal(*h))
1619            .collect();
1620        let server_for_cb = Arc::clone(&self.type_lookup_server);
1621        let cb = Box::new(
1622            move |reply: zerodds_discovery::type_lookup::TypeLookupReply| {
1623                let zerodds_discovery::type_lookup::TypeLookupReply::Types(types_reply) = reply
1624                else {
1625                    return;
1626                };
1627                let Ok(mut server) = server_for_cb.lock() else {
1628                    return;
1629                };
1630                for t in &types_reply.types {
1631                    match t {
1632                        zerodds_types::type_lookup::ReplyTypeObject::Minimal(m) => {
1633                            let to = zerodds_types::type_object::TypeObject::Minimal(m.clone());
1634                            if let Ok(h) = zerodds_types::compute_hash(&to) {
1635                                server.registry.insert_minimal(h, m.clone());
1636                            }
1637                        }
1638                        zerodds_types::type_lookup::ReplyTypeObject::Complete(c) => {
1639                            let to = zerodds_types::type_object::TypeObject::Complete(c.clone());
1640                            if let Ok(h) = zerodds_types::compute_hash(&to) {
1641                                server.registry.insert_complete(h, c.clone());
1642                            }
1643                        }
1644                    }
1645                }
1646            },
1647        );
1648        let request_id = client.request_types(type_ids.clone(), cb);
1649        drop(client);
1650
1651        // Encode the wire request payload (PL_CDR_LE-Encapsulation).
1652        let body = request_types_payload(&type_ids).map_err(|_| DdsError::PreconditionNotMet {
1653            reason: "type_lookup request payload encode failed",
1654        })?;
1655        let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
1656        payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
1657        payload.extend_from_slice(&body);
1658
1659        // Use the RequestId as the writer_sn so the peer-side reply can
1660        // echo it for correlation (XTypes §7.6.3.3.3 Sample-Identity).
1661        let id_u64 = request_id.0;
1662        let sn =
1663            SequenceNumber::from_high_low((id_u64 >> 32) as i32, (id_u64 & 0xFFFF_FFFF) as u32);
1664        let header = RtpsHeader {
1665            protocol_version: ProtocolVersion::CURRENT,
1666            vendor_id: VendorId::ZERODDS,
1667            guid_prefix: self.guid_prefix,
1668        };
1669        let data = DataSubmessage {
1670            extra_flags: 0,
1671            reader_id: EntityId::TL_SVC_REQ_READER,
1672            writer_id: EntityId::TL_SVC_REQ_WRITER,
1673            writer_sn: sn,
1674            inline_qos: None,
1675            key_flag: false,
1676            non_standard_flag: false,
1677            serialized_payload: AllocArc::from(payload.into_boxed_slice()),
1678        };
1679        let datagram =
1680            encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
1681                reason: "type_lookup request datagram encode failed",
1682            })?;
1683
1684        if target.kind == LocatorKind::UdpV4 {
1685            let _ = self.user_unicast.send(&target, &datagram);
1686        }
1687        Ok(Some(request_id))
1688    }
1689
1690    /// aktiviert den Security-Builtin-Endpoint-Stack
1691    /// (`DCPSParticipantStatelessMessage` + `DCPSParticipantVolatile-
1692    /// MessageSecure`). Wird typischerweise von der Factory aufgerufen,
1693    /// sobald ein Security-Plugin auf dem Participant registriert ist.
1694    /// Idempotent: zweiter Aufruf hat keine Wirkung. Liefert den (ggf.
1695    /// frisch erzeugten) Stack-Handle zurueck.
1696    pub fn enable_security_builtins(
1697        &self,
1698        vendor_id: VendorId,
1699    ) -> Arc<Mutex<SecurityBuiltinStack>> {
1700        // Lock-Poisoning ist hier ein Bug-Indikator (frueherer Panic im
1701        // Hot-Path). Wir liefern in dem Fall einen frischen, isolierten
1702        // Stack zurueck — der Caller bekommt zumindest einen
1703        // funktionalen Slot, der Hot-Path schreibt seine Mutationen aber
1704        // ins ungelockte Original. Im Praxis-Code passiert das nicht;
1705        // im Test (wo Poisoning vorkommen kann) ist das eine
1706        // Best-Effort-Recovery.
1707        let mut slot = match self.security_builtin.lock() {
1708            Ok(g) => g,
1709            Err(_) => {
1710                return Arc::new(Mutex::new(SecurityBuiltinStack::new(
1711                    self.guid_prefix,
1712                    vendor_id,
1713                )));
1714            }
1715        };
1716        if let Some(existing) = slot.as_ref() {
1717            return Arc::clone(existing);
1718        }
1719        let stack = Arc::new(Mutex::new(SecurityBuiltinStack::new(
1720            self.guid_prefix,
1721            vendor_id,
1722        )));
1723        // Bereits entdeckte Peers nachholen (Discovery hat ggf. schon
1724        // SPDP-Beacons gesehen, bevor das Plugin aktiviert wurde).
1725        if let Ok(cache) = self.discovered.lock() {
1726            if let Ok(mut s) = stack.lock() {
1727                for peer in cache.iter() {
1728                    s.handle_remote_endpoints(peer);
1729                }
1730            }
1731        }
1732        *slot = Some(Arc::clone(&stack));
1733        stack
1734    }
1735
1736    /// Snapshot-Handle auf den Security-Builtin-Stack. `None`, wenn
1737    /// [`enable_security_builtins`](Self::enable_security_builtins)
1738    /// noch nicht aufgerufen wurde.
1739    #[must_use]
1740    pub fn security_builtin_snapshot(&self) -> Option<Arc<Mutex<SecurityBuiltinStack>>> {
1741        self.security_builtin.lock().ok()?.as_ref().map(Arc::clone)
1742    }
1743
1744    /// `assert_liveliness()` auf dem `DomainParticipant` (DCPS 1.4
1745    /// §2.2.3.11 MANUAL_BY_PARTICIPANT). Sendet beim naechsten Tick
1746    /// genau einen WLP-Heartbeat mit `kind = MANUAL_BY_PARTICIPANT`,
1747    /// alle Reader die diesen Participant matchen frischen ihren
1748    /// Last-Seen-Timestamp auf. Idempotent — Mehrfachaufruf binnen
1749    /// einer Tick-Periode resultiert in mehreren Wire-Sends bis zur
1750    /// Cap (`MAX_QUEUED_PULSES = 32`).
1751    pub fn assert_liveliness(&self) {
1752        if let Ok(mut wlp) = self.wlp.lock() {
1753            wlp.assert_participant();
1754        }
1755    }
1756
1757    /// `assert_liveliness()` auf einem `DataWriter` (DCPS 1.4 §2.2.3.11
1758    /// MANUAL_BY_TOPIC). `topic_token` ist ein opaque Token, das
1759    /// matchende Reader nutzen koennen, um den Pulse einem konkreten
1760    /// Topic zuzuordnen. Wir verwenden ZeroDDS-Vendor-Kind (Cyclone /
1761    /// Fast-DDS ignorieren das Vendor-Kind, was Spec-konform ist —
1762    /// MSB-set in `kind` fordert "ignore unknown" Verhalten).
1763    pub fn assert_writer_liveliness(&self, topic_token: Vec<u8>) {
1764        if let Ok(mut wlp) = self.wlp.lock() {
1765            wlp.assert_topic(topic_token);
1766        }
1767    }
1768
1769    /// Aktueller WLP-Last-Seen-Timestamp eines remote Peers (relativ
1770    /// zum Runtime-Start). `None` wenn der Peer noch keinen WLP-
1771    /// Heartbeat geschickt hat.
1772    #[must_use]
1773    pub fn peer_liveliness_last_seen(&self, prefix: &GuidPrefix) -> Option<Duration> {
1774        self.wlp
1775            .lock()
1776            .ok()
1777            .and_then(|w| w.peer_state(prefix).map(|s| s.last_seen))
1778    }
1779
1780    /// Liefert die [`zerodds_discovery::PeerCapabilities`] eines remote
1781    /// Peers, basierend auf seinem zuletzt empfangenen SPDP-Beacon.
1782    /// `None` wenn der Peer noch nicht via SPDP entdeckt wurde.
1783    #[must_use]
1784    pub fn peer_capabilities(
1785        &self,
1786        prefix: &GuidPrefix,
1787    ) -> Option<zerodds_discovery::PeerCapabilities> {
1788        self.discovered
1789            .lock()
1790            .ok()
1791            .and_then(|d| d.get(prefix).map(|p| p.data.builtin_endpoint_set))
1792            .map(zerodds_discovery::PeerCapabilities::from_bits)
1793    }
1794
1795    /// Snapshot der aktuell entdeckten remote Participants.
1796    /// Key = Guid-Prefix, Value = zuletzt gesehener Beacon-Inhalt.
1797    #[must_use]
1798    pub fn discovered_participants(&self) -> Vec<DiscoveredParticipant> {
1799        self.discovered
1800            .lock()
1801            .map(|cache| cache.iter().cloned().collect())
1802            .unwrap_or_default()
1803    }
1804
1805    /// Verdrahtet die `BuiltinSinks` des `DomainParticipant`s in den
1806    /// Discovery-Hot-Path. Ab diesem
1807    /// Aufruf landen alle SPDP-/SEDP-Receive-Events als Samples in
1808    /// den 4 Builtin-Topic-Readern.
1809    ///
1810    /// Wird vom `DomainParticipant`-Konstruktor genau einmal beim
1811    /// Setup aufgerufen.
1812    pub fn attach_builtin_sinks(&self, sinks: crate::builtin_subscriber::BuiltinSinks) {
1813        if let Ok(mut guard) = self.builtin_sinks.lock() {
1814            *guard = Some(sinks);
1815        }
1816    }
1817
1818    /// Snapshot der aktuell verdrahteten BuiltinSinks (intern fuer den
1819    /// Hot-Path).
1820    pub(crate) fn builtin_sinks_snapshot(&self) -> Option<crate::builtin_subscriber::BuiltinSinks> {
1821        self.builtin_sinks.lock().ok().and_then(|g| g.clone())
1822    }
1823
1824    /// Verdrahtet den `IgnoreFilter` des `DomainParticipant`s in den
1825    /// Discovery-Hot-Path. Ab
1826    /// diesem Aufruf werden SPDP-/SEDP-Receive-Events gegen den
1827    /// Filter geprueft, bevor sie als Builtin-Sample gepusht oder als
1828    /// SEDP-Match-Quelle herangezogen werden.
1829    ///
1830    /// Wird vom `DomainParticipant`-Konstruktor genau einmal beim
1831    /// Setup aufgerufen.
1832    pub fn attach_ignore_filter(&self, filter: crate::participant::IgnoreFilter) {
1833        if let Ok(mut guard) = self.ignore_filter.lock() {
1834            *guard = Some(filter);
1835        }
1836    }
1837
1838    /// Snapshot des aktuell verdrahteten IgnoreFilter (intern fuer
1839    /// den Hot-Path).
1840    pub(crate) fn ignore_filter_snapshot(&self) -> Option<crate::participant::IgnoreFilter> {
1841        self.ignore_filter.lock().ok().and_then(|g| g.clone())
1842    }
1843
1844    /// Kuendigt eine lokale Publication ueber SEDP an. Der Runtime
1845    /// sendet die erzeugten Datagramme sofort an alle bereits
1846    /// entdeckten Remote-Participants.
1847    ///
1848    /// # Errors
1849    /// `WireError` wenn Encoding fehlschlaegt.
1850    pub fn announce_publication(
1851        &self,
1852        data: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
1853    ) -> Result<()> {
1854        // ADR-0006: Side-Map-Lookup. Wenn der lokale User-Writer ein
1855        // Same-Host-Backend angeschlossen hat (set_shm_locator wurde
1856        // aufgerufen), injizieren wir PID_SHM_LOCATOR in das SEDP-
1857        // Sample. Sonst pure 1:1 Spec-Wire.
1858        let shm = self.shm_locator(data.key.entity_id);
1859        let datagrams = {
1860            let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
1861                reason: "sedp poisoned",
1862            })?;
1863            let res = if let Some(ref bytes) = shm {
1864                sedp.announce_publication_with_shm_locator(data, bytes)
1865            } else {
1866                sedp.announce_publication(data)
1867            };
1868            res.map_err(|_| DdsError::WireError {
1869                message: alloc::string::String::from("sedp announce_publication"),
1870            })?
1871        };
1872        // Senden ausserhalb des locks (Rc<Vec<Locator>> ist !Send,
1873        // aber wir sind im gleichen Thread wie `self` — kein
1874        // Problem).
1875        for dg in datagrams {
1876            if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
1877                for t in dg.targets.iter() {
1878                    if t.kind == LocatorKind::UdpV4 {
1879                        let _ = self.spdp_mc_tx.send(t, &secured);
1880                    }
1881                }
1882            }
1883        }
1884        Ok(())
1885    }
1886
1887    /// Kuendigt eine lokale Subscription ueber SEDP an. Analog zu
1888    /// `announce_publication`.
1889    ///
1890    /// # Errors
1891    /// `WireError` wenn Encoding fehlschlaegt.
1892    pub fn announce_subscription(
1893        &self,
1894        data: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
1895    ) -> Result<()> {
1896        let datagrams = {
1897            let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
1898                reason: "sedp poisoned",
1899            })?;
1900            sedp.announce_subscription(data)
1901                .map_err(|_| DdsError::WireError {
1902                    message: alloc::string::String::from("sedp announce_subscription"),
1903                })?
1904        };
1905        for dg in datagrams {
1906            if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
1907                for t in dg.targets.iter() {
1908                    if t.kind == LocatorKind::UdpV4 {
1909                        let _ = self.spdp_mc_tx.send(t, &secured);
1910                    }
1911                }
1912            }
1913        }
1914        Ok(())
1915    }
1916
1917    /// Registriert einen lokalen User-Writer. Der Caller bekommt die
1918    /// Writer-`EntityId`; fuer Sends via `write_user_sample(eid, ...)`.
1919    ///
1920    /// In Runtime gibt es **noch kein** automatisches SEDP-Announce +
1921    /// Matching — das kommt in B4b. Aktuell ist `register_user_writer`
1922    /// nur die Verdrahtung.
1923    ///
1924    /// # Errors
1925    /// `PreconditionNotMet` wenn der Registry-Mutex vergiftet ist.
1926    pub fn register_user_writer(&self, cfg: UserWriterConfig) -> Result<EntityId> {
1927        let now = self.start_instant.elapsed();
1928        let key = self.next_entity_key();
1929        let eid = EntityId::user_writer_with_key(key);
1930        let writer = ReliableWriter::new(ReliableWriterConfig {
1931            guid: Guid::new(self.guid_prefix, eid),
1932            vendor_id: VendorId::ZERODDS,
1933            reader_proxies: Vec::new(),
1934            max_samples: 1024,
1935            history_kind: HistoryKind::KeepLast { depth: 32 },
1936            heartbeat_period: DEFAULT_HEARTBEAT_PERIOD,
1937            fragment_size: DEFAULT_FRAGMENT_SIZE,
1938            mtu: DEFAULT_MTU,
1939        });
1940        let pub_data = build_publication_data(
1941            self.guid_prefix,
1942            eid,
1943            &cfg,
1944            &self.config.data_representation_offer,
1945        );
1946        self.user_writers
1947            .write()
1948            .map_err(|_| DdsError::PreconditionNotMet {
1949                reason: "user_writers poisoned",
1950            })?
1951            .insert(
1952                eid,
1953                Arc::new(Mutex::new(UserWriterSlot {
1954                    writer,
1955                    topic_name: cfg.topic_name.clone(),
1956                    type_name: cfg.type_name.clone(),
1957                    reliable: cfg.reliable,
1958                    durability: cfg.durability,
1959                    deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
1960                    // Initial `None`: Deadline-Fenster startet erst beim
1961                    // ersten echten Write. Verhindert falsche Misses durch
1962                    // langsamen Entity-Setup (z.B. Linux-CI-Container)
1963                    // bevor die App ihren ersten write() macht. Beim
1964                    // ersten write() wird `last_write = Some(now)` gesetzt,
1965                    // ab dann tickt der Deadline-Counter.
1966                    last_write: None,
1967                    offered_deadline_missed_count: 0,
1968                    liveliness_lost_count: 0,
1969                    last_liveliness_assert: Some(now),
1970                    offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus::default(
1971                    ),
1972                    lifespan_nanos: qos_duration_to_nanos(cfg.lifespan.duration),
1973                    sample_insert_times: alloc::collections::VecDeque::new(),
1974                    liveliness_kind: cfg.liveliness.kind,
1975                    liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
1976                    ownership: cfg.ownership,
1977                    partition: cfg.partition.clone(),
1978                    #[cfg(feature = "security")]
1979                    reader_protection: BTreeMap::new(),
1980                    #[cfg(feature = "security")]
1981                    locator_to_peer: BTreeMap::new(),
1982                    type_identifier: cfg.type_identifier.clone(),
1983                    data_rep_offer_override: cfg.data_representation_offer.clone(),
1984                })),
1985            );
1986        // SEDP-Announce an alle bereits entdeckten Peers.
1987        let _ = self.announce_publication(&pub_data);
1988        // Match gegen bereits gecachte Remote-Subscriptions.
1989        self.match_local_writer_against_cache(eid);
1990        // Observability-Event.
1991        self.config.observability.record(
1992            &zerodds_foundation::observability::Event::new(
1993                zerodds_foundation::observability::Level::Info,
1994                zerodds_foundation::observability::Component::Dcps,
1995                "user_writer.created",
1996            )
1997            .with_attr("topic", cfg.topic_name.as_str())
1998            .with_attr("type", cfg.type_name.as_str())
1999            .with_attr("reliable", if cfg.reliable { "true" } else { "false" }),
2000        );
2001        Ok(eid)
2002    }
2003
2004    /// Registriert einen lokalen User-Reader. Gibt die Reader-EntityId
2005    /// und einen `mpsc::Receiver` zurueck, ueber den DataReader-Handles
2006    /// ankommende Samples konsumieren.
2007    ///
2008    /// # Errors
2009    /// `PreconditionNotMet` wenn der Registry-Mutex vergiftet ist.
2010    /// Registriert einen User-Reader. Liefert die EntityId und einen
2011    /// `mpsc::Receiver<UserSample>` — Alive-Samples liefern Payload,
2012    /// Lifecycle-Marker tragen Key-Hash + ChangeKind.
2013    pub fn register_user_reader(
2014        &self,
2015        cfg: UserReaderConfig,
2016    ) -> Result<(EntityId, mpsc::Receiver<UserSample>)> {
2017        let now = self.start_instant.elapsed();
2018        let key = self.next_entity_key();
2019        let eid = EntityId::user_reader_with_key(key);
2020        let reader = ReliableReader::new(ReliableReaderConfig {
2021            guid: Guid::new(self.guid_prefix, eid),
2022            vendor_id: VendorId::ZERODDS,
2023            writer_proxies: Vec::new(),
2024            max_samples_per_proxy: 256,
2025            // D.5e: 0ms = synchrone ACK-Response (Cyclone-Parität).
2026            // Vorher 200ms = Pre-1.0 Default ohne Spec-Begruendung.
2027            heartbeat_response_delay:
2028                zerodds_rtps::reliable_reader::DEFAULT_HEARTBEAT_RESPONSE_DELAY,
2029            assembler_caps: AssemblerCaps::default(),
2030        });
2031        let (tx, rx) = mpsc::channel();
2032        let sub_data = build_subscription_data(
2033            self.guid_prefix,
2034            eid,
2035            &cfg,
2036            &self.config.data_representation_offer,
2037        );
2038        self.user_readers
2039            .write()
2040            .map_err(|_| DdsError::PreconditionNotMet {
2041                reason: "user_readers poisoned",
2042            })?
2043            .insert(
2044                eid,
2045                Arc::new(Mutex::new(UserReaderSlot {
2046                    reader,
2047                    topic_name: cfg.topic_name.clone(),
2048                    type_name: cfg.type_name.clone(),
2049                    sample_tx: tx,
2050                    async_waker: Arc::new(std::sync::Mutex::new(None)),
2051                    listener: None,
2052                    durability: cfg.durability,
2053                    deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
2054                    // Start-Zeitpunkt als Referenz (siehe register_user_writer).
2055                    last_sample_received: Some(now),
2056                    requested_deadline_missed_count: 0,
2057                    requested_incompatible_qos:
2058                        crate::status::RequestedIncompatibleQosStatus::default(),
2059                    sample_lost_count: 0,
2060                    sample_rejected: crate::status::SampleRejectedStatus::default(),
2061                    liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
2062                    liveliness_kind: cfg.liveliness.kind,
2063                    liveliness_alive_count: 0,
2064                    liveliness_not_alive_count: 0,
2065                    // Optimistische Init: wir sehen den Writer via SEDP,
2066                    // bis Lease ablaeuft gilt er als alive.
2067                    liveliness_alive: true,
2068                    ownership: cfg.ownership,
2069                    partition: cfg.partition.clone(),
2070                    writer_strengths: alloc::collections::BTreeMap::new(),
2071                    type_identifier: cfg.type_identifier.clone(),
2072                    type_consistency: cfg.type_consistency,
2073                })),
2074            );
2075        // SEDP-Announce unsere Subscription.
2076        let _ = self.announce_subscription(&sub_data);
2077        // Match gegen bereits gecachte Remote-Publications.
2078        self.match_local_reader_against_cache(eid);
2079        // Observability-Event.
2080        self.config.observability.record(
2081            &zerodds_foundation::observability::Event::new(
2082                zerodds_foundation::observability::Level::Info,
2083                zerodds_foundation::observability::Component::Dcps,
2084                "user_reader.created",
2085            )
2086            .with_attr("topic", cfg.topic_name.as_str())
2087            .with_attr("type", cfg.type_name.as_str()),
2088        );
2089        Ok((eid, rx))
2090    }
2091
2092    /// Bei Registrierung / SEDP-event: Fuer einen lokalen Writer `eid`
2093    /// alle im Cache bekannten Subscriptions durchgehen; bei Topic+Type-
2094    /// Match einen `ReaderProxy` zum lokalen ReliableWriter hinzufuegen.
2095    fn match_local_writer_against_cache(&self, eid: EntityId) {
2096        let (topic, type_name) = {
2097            let Some(arc) = self.writer_slot(eid) else {
2098                return;
2099            };
2100            let Ok(s) = arc.lock() else {
2101                return;
2102            };
2103            (s.topic_name.clone(), s.type_name.clone())
2104        };
2105        let matches: Vec<_> = {
2106            let sedp = match self.sedp.lock() {
2107                Ok(s) => s,
2108                Err(_) => return,
2109            };
2110            sedp.cache()
2111                .match_subscriptions(&topic, &type_name)
2112                .map(|s| s.data.clone())
2113                .collect()
2114        };
2115        for sub in matches {
2116            self.wire_writer_to_remote_reader(eid, &sub);
2117        }
2118    }
2119
2120    fn match_local_reader_against_cache(&self, eid: EntityId) {
2121        let (topic, type_name) = {
2122            let Some(arc) = self.reader_slot(eid) else {
2123                return;
2124            };
2125            let Ok(s) = arc.lock() else {
2126                return;
2127            };
2128            (s.topic_name.clone(), s.type_name.clone())
2129        };
2130        let matches: Vec<_> = {
2131            let sedp = match self.sedp.lock() {
2132                Ok(s) => s,
2133                Err(_) => return,
2134            };
2135            sedp.cache()
2136                .match_publications(&topic, &type_name)
2137                .map(|p| p.data.clone())
2138                .collect()
2139        };
2140        for pubd in matches {
2141            self.wire_reader_to_remote_writer(eid, &pubd);
2142        }
2143    }
2144
2145    fn wire_writer_to_remote_reader(
2146        &self,
2147        writer_eid: EntityId,
2148        sub: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
2149    ) {
2150        let locators = remote_user_locators(sub.key.prefix, &self.discovered);
2151        if locators.is_empty() {
2152            return;
2153        }
2154        if let Some(slot_arc) = self.writer_slot(writer_eid) {
2155            if let Ok(mut slot) = slot_arc.lock() {
2156                let slot = &mut *slot;
2157                // --- QoS-Compatibility ---
2158                // Spec OMG DDS 1.4 §2.2.3.6: Writer offered >= Reader requested.
2159                //
2160                // Pro Reject die zustaendige Policy-ID in
2161                // `offered_incompatible_qos.policies` einbumpen, damit der
2162                // DataWriter-Listener via `dispatch_offered_incompatible_qos`
2163                // ausgeloest wird. Wir tracken die *erste* fehlerhafte
2164                // Policy als `last_policy_id` (Spec §2.2.4.1: most-recent).
2165                use crate::psm_constants::qos_policy_id as qid;
2166                use crate::status::bump_policy_count;
2167                let bump = |slot: &mut UserWriterSlot, pid: u32| {
2168                    slot.offered_incompatible_qos.total_count =
2169                        slot.offered_incompatible_qos.total_count.saturating_add(1);
2170                    slot.offered_incompatible_qos.last_policy_id = pid;
2171                    bump_policy_count(&mut slot.offered_incompatible_qos.policies, pid);
2172                };
2173
2174                // Durability-Rang: Volatile < TransientLocal < Transient <
2175                // Persistent. Writer darf mehr anbieten als Reader anfordert.
2176                if (slot.durability as u8) < (sub.durability as u8) {
2177                    bump(slot, qid::DURABILITY);
2178                    return;
2179                }
2180                // Deadline: Writer-Period <= Reader-Period (Writer verspricht
2181                // schneller zu schreiben als Reader erwartet).
2182                if !deadline_compat(
2183                    slot.deadline_nanos,
2184                    qos_duration_to_nanos(sub.deadline.period),
2185                ) {
2186                    bump(slot, qid::DEADLINE);
2187                    return;
2188                }
2189                // Liveliness-Kind: Automatic < ManualByParticipant < ManualByTopic.
2190                // Writer-Kind >= Reader-Kind. Lease: writer.lease <= reader.lease.
2191                if (slot.liveliness_kind as u8) < (sub.liveliness.kind as u8) {
2192                    bump(slot, qid::LIVELINESS);
2193                    return;
2194                }
2195                if !deadline_compat(
2196                    slot.liveliness_lease_nanos,
2197                    qos_duration_to_nanos(sub.liveliness.lease_duration),
2198                ) {
2199                    bump(slot, qid::LIVELINESS);
2200                    return;
2201                }
2202                // Ownership: beide muessen gleich sein (Spec §2.2.3.6 Table:
2203                // kein "kompatibel"-Fall ausser exakt gleich).
2204                if slot.ownership != sub.ownership {
2205                    bump(slot, qid::OWNERSHIP);
2206                    return;
2207                }
2208                // Partition: mindestens eine gemeinsame Partition — oder
2209                // beide leer (default partition "").
2210                if !partitions_overlap(&slot.partition, &sub.partition) {
2211                    bump(slot, qid::PARTITION);
2212                    return;
2213                }
2214                // F-TYPES-3 XTypes-1.3 §7.6.3.7 symmetric Writer-Side-Check.
2215                // Wenn beide Seiten einen TypeIdentifier (≠ None) tragen,
2216                // pruefen wir Compatibility. Reader's TCE-Policy ist hier
2217                // nicht direkt verfuegbar; wir nehmen Default-TCE
2218                // (AllowTypeCoercion ohne PreventWidening) — der Reader-
2219                // Side-Check in `wire_reader_to_remote_writer` validiert
2220                // mit der echten Reader-TCE.
2221                if slot.type_identifier != zerodds_types::TypeIdentifier::None
2222                    && sub.type_identifier != zerodds_types::TypeIdentifier::None
2223                {
2224                    let registry = zerodds_types::resolve::TypeRegistry::new();
2225                    let tce = zerodds_types::qos::TypeConsistencyEnforcement::default();
2226                    let matcher = zerodds_types::type_matcher::TypeMatcher::new(&tce);
2227                    if !matcher
2228                        .match_types(&slot.type_identifier, &sub.type_identifier, &registry)
2229                        .is_match()
2230                    {
2231                        bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
2232                        return;
2233                    }
2234                }
2235
2236                let mut proxy = zerodds_rtps::reader_proxy::ReaderProxy::new(
2237                    sub.key,
2238                    locators.clone(),
2239                    Vec::new(),
2240                    slot.reliable,
2241                );
2242                // D.5g — Per-Peer DataRepresentation negotiation
2243                // (XTypes 1.3 §7.6.3.1.2). Writer-offered = Per-Writer-
2244                // Override (slot.data_rep_offer_override) ODER Runtime-
2245                // Default. Reader-accepted = sub.data_representation
2246                // (Spec-Default `[XCDR1]` wenn leer). Match-Mode aus
2247                // RuntimeConfig.
2248                {
2249                    use zerodds_rtps::publication_data::data_representation as dr;
2250                    let writer_offered: Vec<i16> = slot
2251                        .data_rep_offer_override
2252                        .clone()
2253                        .unwrap_or_else(|| self.config.data_representation_offer.clone());
2254                    let mode = self.config.data_rep_match_mode;
2255                    if let Some(negotiated) =
2256                        dr::negotiate(&writer_offered, &sub.data_representation, mode)
2257                    {
2258                        proxy.set_negotiated_data_representation(negotiated);
2259                    } else {
2260                        // Kein Overlap → SEDP-Match-Spec-Verletzung.
2261                        // Wir adden den Proxy trotzdem fuer Best-Effort-
2262                        // Compat; Wire-Format default bleibt XCDR2.
2263                        // Spec-strict-Caller sollte Match ablehnen.
2264                    }
2265                }
2266                if slot.durability == zerodds_qos::DurabilityKind::Volatile {
2267                    if let Some(max) = slot.writer.cache().max_sn() {
2268                        proxy.skip_samples_up_to(max);
2269                    }
2270                }
2271                slot.writer.add_reader_proxy(proxy);
2272                // D.5e Phase-1: wake `wait_for_matched_subscription`-waiters.
2273                self.match_event.1.notify_all();
2274
2275                // Security: Per-Reader-Protection-Level aus
2276                // security_info ableiten und Locator-Lookup-Map
2277                // aufbauen, damit der Writer-Tick pro Ziel
2278                // individuell serialisieren kann.
2279                #[cfg(feature = "security")]
2280                {
2281                    let peer_key = sub.key.prefix.0;
2282                    let level = EndpointProtection::from_info(sub.security_info.as_ref()).level;
2283                    slot.reader_protection.insert(peer_key, level);
2284                    for loc in &locators {
2285                        slot.locator_to_peer.insert(*loc, peer_key);
2286                    }
2287                }
2288            }
2289        }
2290        // Match-Event ausserhalb des Slot-Mutex emittieren.
2291        self.config.observability.record(
2292            &zerodds_foundation::observability::Event::new(
2293                zerodds_foundation::observability::Level::Info,
2294                zerodds_foundation::observability::Component::Discovery,
2295                "writer.matched_remote_reader",
2296            )
2297            .with_attr("writer_eid", alloc::format!("{writer_eid:?}")),
2298        );
2299    }
2300
2301    fn wire_reader_to_remote_writer(
2302        &self,
2303        reader_eid: EntityId,
2304        pubd: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
2305    ) {
2306        let locators = remote_user_locators(pubd.key.prefix, &self.discovered);
2307        if locators.is_empty() {
2308            return;
2309        }
2310        if let Some(slot_arc) = self.reader_slot(reader_eid) {
2311            if let Ok(mut slot) = slot_arc.lock() {
2312                let slot = &mut *slot;
2313                // per-Policy-Bump fuer requested_incompatible_qos.
2314                use crate::psm_constants::qos_policy_id as qid;
2315                use crate::status::bump_policy_count;
2316                let bump = |slot: &mut UserReaderSlot, pid: u32| {
2317                    slot.requested_incompatible_qos.total_count = slot
2318                        .requested_incompatible_qos
2319                        .total_count
2320                        .saturating_add(1);
2321                    slot.requested_incompatible_qos.last_policy_id = pid;
2322                    bump_policy_count(&mut slot.requested_incompatible_qos.policies, pid);
2323                };
2324
2325                // Siehe wire_writer... — symmetrisch, Writer ist jetzt remote.
2326                if (pubd.durability as u8) < (slot.durability as u8) {
2327                    bump(slot, qid::DURABILITY);
2328                    return;
2329                }
2330                if !deadline_compat(
2331                    qos_duration_to_nanos(pubd.deadline.period),
2332                    slot.deadline_nanos,
2333                ) {
2334                    bump(slot, qid::DEADLINE);
2335                    return;
2336                }
2337                if (pubd.liveliness.kind as u8) < (slot.liveliness_kind as u8) {
2338                    bump(slot, qid::LIVELINESS);
2339                    return;
2340                }
2341                if !deadline_compat(
2342                    qos_duration_to_nanos(pubd.liveliness.lease_duration),
2343                    slot.liveliness_lease_nanos,
2344                ) {
2345                    bump(slot, qid::LIVELINESS);
2346                    return;
2347                }
2348                if pubd.ownership != slot.ownership {
2349                    bump(slot, qid::OWNERSHIP);
2350                    return;
2351                }
2352                if !partitions_overlap(&pubd.partition, &slot.partition) {
2353                    bump(slot, qid::PARTITION);
2354                    return;
2355                }
2356
2357                // F-TYPES-3 XTypes-1.3 §7.6.3.7 TypeConsistencyEnforcement.
2358                // Wenn beide Seiten einen TypeIdentifier (≠ None) tragen,
2359                // pruefen wir Compatibility via TypeMatcher. Sonst faellt
2360                // der Match auf reinen type_name-Vergleich (Default-Path).
2361                if slot.type_identifier != zerodds_types::TypeIdentifier::None
2362                    && pubd.type_identifier != zerodds_types::TypeIdentifier::None
2363                {
2364                    let registry = zerodds_types::resolve::TypeRegistry::new();
2365                    let matcher =
2366                        zerodds_types::type_matcher::TypeMatcher::new(&slot.type_consistency);
2367                    if !matcher
2368                        .match_types(&pubd.type_identifier, &slot.type_identifier, &registry)
2369                        .is_match()
2370                    {
2371                        bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
2372                        return;
2373                    }
2374                }
2375
2376                slot.reader
2377                    .add_writer_proxy(zerodds_rtps::writer_proxy::WriterProxy::new(
2378                        pubd.key,
2379                        locators,
2380                        Vec::new(),
2381                        true,
2382                    ));
2383                // D.5e Phase-1: wake `wait_for_matched_publication`-waiters.
2384                self.match_event.1.notify_all();
2385
2386                // §2.2.3.23 Exclusive-Ownership-Resolver-Cache:
2387                // Writer-`ownership_strength` aus Discovery merken, damit
2388                // `delivered_to_user_sample` den Wert in jeden Sample
2389                // packen kann.
2390                slot.writer_strengths
2391                    .insert(pubd.key.to_bytes(), pubd.ownership_strength);
2392            }
2393        }
2394    }
2395
2396    /// Schreibt einen Sample an einen registrierten User-Writer und
2397    /// versendet die erzeugten Datagramme.
2398    ///
2399    /// Der Payload wird mit dem RTPS-Serialized-Payload-Header
2400    /// (Encapsulation-Scheme) versehen, bevor er in die DATA-
2401    /// Submessage geht. OMG RTPS 2.5 §9.4.2.13 verlangt genau diese
2402    /// 4 Bytes am Anfang jedes serialisierten User-Payloads:
2403    ///   [0x00, 0x07, 0x00, 0x00] = XCDR2_LE + options=0.
2404    /// Ohne diesen Header weigern sich Cyclone/Fast-DDS-Reader, das
2405    /// Sample zu deliverieren (sie parsen die ersten 4 Bytes als
2406    /// encapsulation kind + options und droppen unknown-scheme).
2407    ///
2408    /// # Errors
2409    /// - `BadParameter` wenn die EntityId keinen registrierten Writer hat.
2410    /// - `WireError` bei Encoding-Fehler.
2411    pub fn write_user_sample(&self, eid: EntityId, payload: Vec<u8>) -> Result<()> {
2412        // Hot-Path: fuer Klein-Samples (<= 1.5 kB Payload)
2413        // wird das Encap-Framing in einen Stack-PoolBuffer kopiert —
2414        // null Heap-Touch im Framing-Schritt. Grosse Samples fallen
2415        // zurueck auf Vec.
2416        let now = self.start_instant.elapsed();
2417        let total = USER_PAYLOAD_ENCAP.len() + payload.len();
2418        let out_datagrams = {
2419            let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
2420                what: "unknown writer entity id",
2421            })?;
2422            let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
2423                reason: "user_writer slot poisoned",
2424            })?;
2425            // Deadline-Timer: letzter-Write merken fuer offered_deadline_missed.
2426            slot.last_write = Some(now);
2427            let dgs = if total <= SMALL_FRAME_CAP {
2428                write_user_sample_pooled(&mut slot.writer, &payload, now)?
2429            } else {
2430                let mut framed = Vec::with_capacity(total);
2431                framed.extend_from_slice(&USER_PAYLOAD_ENCAP);
2432                framed.extend_from_slice(&payload);
2433                // D.5e Phase-2: HEARTBEAT-piggyback fuer instant ACK.
2434                slot.writer
2435                    .write_with_heartbeat(&framed, now)
2436                    .map_err(|_| DdsError::WireError {
2437                        message: String::from("user writer encode"),
2438                    })?
2439            };
2440            // Lifespan: Insert-Zeit der gerade geschriebenen SN merken.
2441            if slot.lifespan_nanos != 0 {
2442                if let Some(sn) = slot.writer.cache().max_sn() {
2443                    slot.sample_insert_times.push_back((sn, now));
2444                }
2445            }
2446            dgs
2447        };
2448        for dg in out_datagrams {
2449            if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
2450                for t in dg.targets.iter() {
2451                    if t.kind == LocatorKind::UdpV4 {
2452                        let _ = self.user_unicast.send(t, &secured);
2453                    }
2454                }
2455            }
2456        }
2457        // Embargo-Inspect-Tap am DCPS-Layer (Pfad-getrennt vom
2458        // Production-Pfad). Nur kompiliert wenn `inspect`-Feature an
2459        // ist. Topic-Name wird per separatem Lookup geholt, ausserhalb
2460        // der Lock-Region damit Hooks nicht unter Lock laufen.
2461        #[cfg(feature = "inspect")]
2462        {
2463            self.dispatch_inspect_dcps_tap(eid, &payload);
2464        }
2465        Ok(())
2466    }
2467
2468    /// Inspect-Endpoint Tap-Dispatch fuer DCPS-Publish.
2469    /// Liest den Topic-Namen separat aus dem WriterSlot und uebergibt
2470    /// einen Frame an die zerodds-inspect-endpoint Tap-Registry.
2471    /// **Nicht** Production-Hot-Path: nur wenn `inspect`-Feature an ist.
2472    #[cfg(feature = "inspect")]
2473    fn dispatch_inspect_dcps_tap(&self, eid: EntityId, payload: &[u8]) {
2474        let Some(slot_arc) = self.writer_slot(eid) else {
2475            return;
2476        };
2477        let topic = match slot_arc.lock() {
2478            Ok(slot) => slot.topic_name.clone(),
2479            Err(_) => return,
2480        };
2481        let ts_ns = std::time::SystemTime::now()
2482            .duration_since(std::time::UNIX_EPOCH)
2483            .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
2484            .unwrap_or(0);
2485        let mut corr: u64 = 0;
2486        for (i, byte) in eid.entity_key.iter().enumerate() {
2487            corr |= u64::from(*byte) << (i * 8);
2488        }
2489        corr |= u64::from(eid.entity_kind as u8) << 24;
2490        let frame = zerodds_inspect_endpoint::Frame::dcps(topic, ts_ns, corr, payload.to_vec());
2491        zerodds_inspect_endpoint::tap::dispatch(&frame);
2492    }
2493
2494    /// Sendet einen Lifecycle-Marker (`dispose`/`unregister_instance`) an
2495    /// alle matched Reader. Spec §2.2.2.4.2.10/.7 + §9.6.3.9 PID_STATUS_INFO.
2496    /// `status_bits` ist die OR-Verknuepfung von
2497    /// `zerodds_rtps::inline_qos::status_info::DISPOSED` und/oder `UNREGISTERED`.
2498    ///
2499    /// # Errors
2500    /// - `BadParameter` wenn die EntityId keinen registrierten Writer hat.
2501    /// - `WireError` bei Encode-Fehler.
2502    pub fn write_user_lifecycle(
2503        &self,
2504        eid: EntityId,
2505        key_hash: [u8; 16],
2506        status_bits: u32,
2507    ) -> Result<()> {
2508        let out_datagrams = {
2509            let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
2510                what: "unknown writer entity id",
2511            })?;
2512            let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
2513                reason: "user_writer slot poisoned",
2514            })?;
2515            slot.writer
2516                .write_lifecycle(key_hash, status_bits)
2517                .map_err(|_| DdsError::WireError {
2518                    message: String::from("user writer lifecycle encode"),
2519                })?
2520        };
2521        for dg in out_datagrams {
2522            if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
2523                for t in dg.targets.iter() {
2524                    if t.kind == LocatorKind::UdpV4 {
2525                        let _ = self.user_unicast.send(t, &secured);
2526                    }
2527                }
2528            }
2529        }
2530        Ok(())
2531    }
2532
2533    /// Generiert einen 3-Byte-Entity-Key fuer neue User-Endpoints.
2534    fn next_entity_key(&self) -> [u8; 3] {
2535        let n = self.entity_counter.fetch_add(1, Ordering::Relaxed);
2536        [(n >> 16) as u8, (n >> 8) as u8, n as u8]
2537    }
2538
2539    /// Snapshot aller aktuell bekannten remote Publications (Topic
2540    /// Name + Type Name + Writer-GUID).
2541    #[must_use]
2542    pub fn discovered_publications_count(&self) -> usize {
2543        self.sedp
2544            .lock()
2545            .map(|s| s.cache().publications_len())
2546            .unwrap_or(0)
2547    }
2548
2549    /// Snapshot aller aktuell bekannten remote Subscriptions.
2550    #[must_use]
2551    pub fn discovered_subscriptions_count(&self) -> usize {
2552        self.sedp
2553            .lock()
2554            .map(|s| s.cache().subscriptions_len())
2555            .unwrap_or(0)
2556    }
2557
2558    /// Anzahl matched Remote-Reader fuer einen lokalen User-Writer.
2559    ///  von `DataWriter::wait_for_matched_subscription` gepollt.
2560    #[must_use]
2561    pub fn user_writer_matched_count(&self, eid: EntityId) -> usize {
2562        self.writer_slot(eid)
2563            .and_then(|arc| arc.lock().ok().map(|s| s.writer.reader_proxy_count()))
2564            .unwrap_or(0)
2565    }
2566
2567    /// Liste der `InstanceHandle`s aller matched Remote-Reader fuer einen
2568    /// lokalen User-Writer (Spec §2.2.2.4.2.x `get_matched_subscriptions`).
2569    /// Pro Reader die letzten 16 byte der GUID als InstanceHandle.
2570    #[must_use]
2571    pub fn user_writer_matched_subscription_handles(
2572        &self,
2573        eid: EntityId,
2574    ) -> Vec<crate::instance_handle::InstanceHandle> {
2575        self.writer_slot(eid)
2576            .and_then(|arc| {
2577                arc.lock().ok().map(|s| {
2578                    s.writer
2579                        .reader_proxies()
2580                        .iter()
2581                        .map(|p| {
2582                            crate::instance_handle::InstanceHandle::from_guid(p.remote_reader_guid)
2583                        })
2584                        .collect()
2585                })
2586            })
2587            .unwrap_or_default()
2588    }
2589
2590    /// Liste der `InstanceHandle`s aller matched Remote-Writer fuer einen
2591    /// lokalen User-Reader (Spec §2.2.2.5.x `get_matched_publications`).
2592    #[must_use]
2593    pub fn user_reader_matched_publication_handles(
2594        &self,
2595        eid: EntityId,
2596    ) -> Vec<crate::instance_handle::InstanceHandle> {
2597        self.reader_slot(eid)
2598            .and_then(|arc| {
2599                arc.lock().ok().map(|s| {
2600                    s.reader
2601                        .writer_proxies()
2602                        .iter()
2603                        .map(|p| {
2604                            crate::instance_handle::InstanceHandle::from_guid(
2605                                p.proxy.remote_writer_guid,
2606                            )
2607                        })
2608                        .collect()
2609                })
2610            })
2611            .unwrap_or_default()
2612    }
2613
2614    /// Counter fuer verpasste offered-Deadlines am User-Writer.
2615    /// Spec OMG DDS 1.4 §2.2.4.2.9 `OFFERED_DEADLINE_MISSED_STATUS`.
2616    #[must_use]
2617    pub fn user_writer_offered_deadline_missed(&self, eid: EntityId) -> u64 {
2618        self.writer_slot(eid)
2619            .and_then(|arc| arc.lock().ok().map(|s| s.offered_deadline_missed_count))
2620            .unwrap_or(0)
2621    }
2622
2623    /// Counter fuer verpasste requested-Deadlines am User-Reader.
2624    /// Spec §2.2.4.2.11 `REQUESTED_DEADLINE_MISSED_STATUS`.
2625    #[must_use]
2626    pub fn user_reader_requested_deadline_missed(&self, eid: EntityId) -> u64 {
2627        self.reader_slot(eid)
2628            .and_then(|arc| arc.lock().ok().map(|s| s.requested_deadline_missed_count))
2629            .unwrap_or(0)
2630    }
2631
2632    /// Aktueller Liveliness-Status eines lokalen User-Readers.
2633    /// Spec §2.2.4.2.14 `LIVELINESS_CHANGED_STATUS`:
2634    /// `(alive, alive_count, not_alive_count)`.
2635    #[must_use]
2636    pub fn user_reader_liveliness_status(&self, eid: EntityId) -> (bool, u64, u64) {
2637        self.reader_slot(eid)
2638            .and_then(|arc| {
2639                arc.lock().ok().map(|s| {
2640                    (
2641                        s.liveliness_alive,
2642                        s.liveliness_alive_count,
2643                        s.liveliness_not_alive_count,
2644                    )
2645                })
2646            })
2647            .unwrap_or((false, 0, 0))
2648    }
2649
2650    /// Counter LivelinessLost am User-Writer (Spec §2.2.4.2.10).
2651    /// Wird von `check_writer_liveliness` inkrementiert.
2652    #[must_use]
2653    pub fn user_writer_liveliness_lost(&self, eid: EntityId) -> u64 {
2654        self.writer_slot(eid)
2655            .and_then(|arc| arc.lock().ok().map(|s| s.liveliness_lost_count))
2656            .unwrap_or(0)
2657    }
2658
2659    /// Snapshot OfferedIncompatibleQosStatus am Writer.
2660    #[must_use]
2661    pub fn user_writer_offered_incompatible_qos(
2662        &self,
2663        eid: EntityId,
2664    ) -> crate::status::OfferedIncompatibleQosStatus {
2665        self.writer_slot(eid)
2666            .and_then(|arc| arc.lock().ok().map(|s| s.offered_incompatible_qos.clone()))
2667            .unwrap_or_default()
2668    }
2669
2670    /// Snapshot RequestedIncompatibleQosStatus am Reader.
2671    #[must_use]
2672    pub fn user_reader_requested_incompatible_qos(
2673        &self,
2674        eid: EntityId,
2675    ) -> crate::status::RequestedIncompatibleQosStatus {
2676        self.reader_slot(eid)
2677            .and_then(|arc| {
2678                arc.lock()
2679                    .ok()
2680                    .map(|s| s.requested_incompatible_qos.clone())
2681            })
2682            .unwrap_or_default()
2683    }
2684
2685    /// Sample-Lost-Counter (Reader-Seite). Spec §2.2.4.2.6.2.
2686    #[must_use]
2687    pub fn user_reader_sample_lost(&self, eid: EntityId) -> u64 {
2688        self.reader_slot(eid)
2689            .and_then(|arc| arc.lock().ok().map(|s| s.sample_lost_count))
2690            .unwrap_or(0)
2691    }
2692
2693    /// Sample-Rejected-Status (Reader-Seite). Spec §2.2.4.2.6.3.
2694    #[must_use]
2695    pub fn user_reader_sample_rejected(
2696        &self,
2697        eid: EntityId,
2698    ) -> crate::status::SampleRejectedStatus {
2699        self.reader_slot(eid)
2700            .and_then(|arc| arc.lock().ok().map(|s| s.sample_rejected))
2701            .unwrap_or_default()
2702    }
2703
2704    /// Recordet ein verlorenes Sample am User-Reader. Wird
2705    /// von Resource-Limit- oder Decode-Failure-Pfaden gerufen — der
2706    /// Detector ist Application-extern, weil Sample-Lost je nach
2707    /// Implementation aus mehreren Quellen kommt (Cache-Drop, Decode-
2708    /// Fail, Sequence-Number-Gap-Drop).
2709    pub fn record_sample_lost(&self, eid: EntityId, count: u32) {
2710        if count == 0 {
2711            return;
2712        }
2713        if let Some(arc) = self.reader_slot(eid) {
2714            if let Ok(mut slot) = arc.lock() {
2715                slot.sample_lost_count = slot.sample_lost_count.saturating_add(u64::from(count));
2716            }
2717        }
2718    }
2719
2720    /// Recordet ein rejectedes Sample am User-Reader.
2721    pub fn record_sample_rejected(
2722        &self,
2723        eid: EntityId,
2724        kind: crate::status::SampleRejectedStatusKind,
2725        instance: crate::instance_handle::InstanceHandle,
2726    ) {
2727        if let Some(arc) = self.reader_slot(eid) {
2728            if let Ok(mut slot) = arc.lock() {
2729                slot.sample_rejected.total_count =
2730                    slot.sample_rejected.total_count.saturating_add(1);
2731                slot.sample_rejected.last_reason = kind;
2732                slot.sample_rejected.last_instance_handle = instance;
2733            }
2734        }
2735    }
2736
2737    /// Manual-Liveliness-Assert am User-Writer. Setzt den
2738    /// `last_liveliness_assert`-Timestamp. Bei `LivelinessKind::Automatic`
2739    /// wird zusaetzlich `last_write` mitgesetzt — der Liveliness-Pfad
2740    /// faellt sonst nie ueber den `assert`-Trigger, weil jeder erfolgreiche
2741    /// `write` bereits den Liveliness-Tick uebernimmt.
2742    pub fn assert_writer_liveliness_eid(&self, eid: EntityId) {
2743        let now = self.start_instant.elapsed();
2744        if let Some(arc) = self.writer_slot(eid) {
2745            if let Ok(mut slot) = arc.lock() {
2746                slot.last_liveliness_assert = Some(now);
2747                if slot.liveliness_kind == zerodds_qos::LivelinessKind::Automatic {
2748                    slot.last_write = Some(now);
2749                }
2750            }
2751        }
2752    }
2753
2754    /// True wenn alle matched Reader alle bisher geschriebenen Samples
2755    /// acknowledgt haben. Leerer Cache oder keine Proxies → true.
2756    #[must_use]
2757    pub fn user_writer_all_acknowledged(&self, eid: EntityId) -> bool {
2758        self.writer_slot(eid)
2759            .and_then(|arc| arc.lock().ok().map(|s| s.writer.all_samples_acknowledged()))
2760            .unwrap_or(true)
2761    }
2762
2763    /// Spec §3.1 zerodds-async-1.0: registriert den Waker eines
2764    /// async-Readers im UserReaderSlot. Bei `sample_tx.send` wird
2765    /// der Waker geweckt. `None` als Argument loescht den Waker
2766    /// (z.B. nach Drop des Async-Readers).
2767    pub fn register_user_reader_waker(&self, eid: EntityId, waker: Option<core::task::Waker>) {
2768        if let Some(arc) = self.reader_slot(eid) {
2769            if let Ok(slot) = arc.lock() {
2770                if let Ok(mut g) = slot.async_waker.lock() {
2771                    *g = waker;
2772                }
2773            }
2774        }
2775    }
2776
2777    /// Listener-Callback fuer Alive-Sample-
2778    /// Arrival am User-Reader registrieren. `None` loescht einen
2779    /// vorhandenen Listener.
2780    ///
2781    /// Listener feuert synchron im Recv-Thread des
2782    /// `recv_user_data_loop` — siehe Vertrags-Doku am
2783    /// [`UserReaderListener`]-Type. Eliminiert die User-Polling-
2784    /// Latenz (~50-100 µs) gegenueber `sample_tx.recv()`.
2785    ///
2786    /// Returns `true` wenn der Reader-Slot existiert und der Listener
2787    /// gesetzt wurde, `false` wenn der EID kein bekannter User-Reader
2788    /// ist.
2789    pub fn set_user_reader_listener(
2790        &self,
2791        eid: EntityId,
2792        listener: Option<UserReaderListener>,
2793    ) -> bool {
2794        let Some(arc) = self.reader_slot(eid) else {
2795            return false;
2796        };
2797        let Ok(mut slot) = arc.lock() else {
2798            return false;
2799        };
2800        slot.listener = listener.map(alloc::sync::Arc::new);
2801        true
2802    }
2803
2804    /// Anzahl matched Remote-Writer fuer einen lokalen User-Reader.
2805    #[must_use]
2806    pub fn user_reader_matched_count(&self, eid: EntityId) -> usize {
2807        self.reader_slot(eid)
2808            .and_then(|arc| arc.lock().ok().map(|s| s.reader.writer_proxy_count()))
2809            .unwrap_or(0)
2810    }
2811
2812    /// D.5e Phase-1 — Wartet bis ein Match-Event eintritt oder das Timeout
2813    /// erreicht ist. Ersetzt 20-ms-Polling in `DataReader::wait_for_matched_*`
2814    /// und `DataWriter::wait_for_matched_*`.
2815    ///
2816    /// Caller checkt selbst den Match-Count (per `user_*_matched_count`)
2817    /// vor und nach dem Wait — diese Funktion ist nur die Block-Mechanik.
2818    /// Returns `false` wenn Timeout erreicht, `true` wenn Notify kam.
2819    #[cfg(feature = "std")]
2820    pub fn wait_match_event(&self, timeout: core::time::Duration) -> bool {
2821        let (lock, cvar) = &*self.match_event;
2822        let Ok(guard) = lock.lock() else { return false };
2823        match cvar.wait_timeout(guard, timeout) {
2824            Ok((_, t)) => !t.timed_out(),
2825            Err(_) => false,
2826        }
2827    }
2828
2829    /// D.5e Phase-1 — Wartet bis ein ACK-Event eintritt oder Timeout.
2830    /// Ersetzt 50-ms-Polling in `DataWriter::wait_for_acknowledgments`.
2831    #[cfg(feature = "std")]
2832    pub fn wait_ack_event(&self, timeout: core::time::Duration) -> bool {
2833        let (lock, cvar) = &*self.ack_event;
2834        let Ok(guard) = lock.lock() else { return false };
2835        match cvar.wait_timeout(guard, timeout) {
2836            Ok((_, t)) => !t.timed_out(),
2837            Err(_) => false,
2838        }
2839    }
2840
2841    /// D.5e Phase-1 — Notify-Helper fuer ACK-Event. Wird vom Reliable-
2842    /// Writer-Pfad aufgerufen wenn ein ACKNACK den acked-base vorrueckt.
2843    #[cfg(feature = "std")]
2844    pub(crate) fn notify_ack_event(&self) {
2845        self.ack_event.1.notify_all();
2846    }
2847
2848    /// ADR-0006 — Setzt PID_SHM_LOCATOR-Bytes fuer einen lokalen
2849    /// User-Writer in der Side-Map. Wird vom DataWriter aufgerufen,
2850    /// sobald `set_flat_backend` ein Same-Host-Backend (POSIX shm /
2851    /// Iceoryx2) angeschlossen hat. Beim naechsten SEDP-Push injiziert
2852    /// der Wire-Encoder das PID 0x8001 in die `PublicationData`.
2853    pub fn set_shm_locator(&self, eid: EntityId, bytes: Vec<u8>) {
2854        if let Ok(mut g) = self.shm_locators.write() {
2855            g.insert(eid, bytes);
2856        }
2857    }
2858
2859    /// ADR-0006 — Liest die PID_SHM_LOCATOR-Bytes fuer einen lokalen
2860    /// User-Writer aus der Side-Map. Liefert `None`, wenn kein
2861    /// Same-Host-Backend gesetzt ist.
2862    #[must_use]
2863    pub fn shm_locator(&self, eid: EntityId) -> Option<Vec<u8>> {
2864        self.shm_locators.read().ok()?.get(&eid).cloned()
2865    }
2866
2867    /// ADR-0006 — Entfernt PID_SHM_LOCATOR-Eintrag (z.B. wenn der
2868    /// User-Writer ohne Backend re-konfiguriert wird).
2869    pub fn clear_shm_locator(&self, eid: EntityId) {
2870        if let Ok(mut g) = self.shm_locators.write() {
2871            g.remove(&eid);
2872        }
2873    }
2874
2875    /// Stoppt alle Worker-Threads (recv-loops + tick-loop) und joinst
2876    /// sie. Idempotent — mehrfach-Aufrufe sind no-op.
2877    ///
2878    /// Shutdown-Verzoegerung: bis ~1 s, weil die Recv-Threads in
2879    /// `recv()` mit 1 s read-timeout sitzen. Nach Beendigung des
2880    /// aktuellen recv()-Calls checken sie das stop-Flag und
2881    /// terminieren.
2882    pub fn shutdown(&self) {
2883        self.stop.store(true, Ordering::Relaxed);
2884        if let Ok(mut guard) = self.handles.lock() {
2885            for h in guard.drain(..) {
2886                let _ = h.join();
2887            }
2888        }
2889    }
2890}
2891
2892impl Drop for DcpsRuntime {
2893    fn drop(&mut self) {
2894        self.shutdown();
2895    }
2896}
2897
2898// ---------------------------------------------------------------------
2899// Worker-Threads (Sprint D.5b — Per-Socket-Recv + zentraler Tick).
2900//
2901// Vorher: ein einziger `event_loop`, der pro Iteration drei sequentielle
2902// blocking-`recv()`s mit `tick_period`-Timeout (50 ms) durchgegangen ist.
2903// Roundtrip-Latenz: 5-14 ms p50 (CFS-Drift + sequentielle Wait-Stufen).
2904//
2905// Jetzt: vier dedizierte Threads.
2906//   * recv_spdp_multicast_loop  — blockt auf SPDP-Multicast-Socket
2907//   * recv_metatraffic_loop     — blockt auf SPDP-Unicast (= Metatraffic)
2908//   * recv_user_data_loop       — blockt auf User-Data-Unicast
2909//   * tick_loop                 — periodische Outbound-Aufgaben +
2910//                                 Per-Interface-Inbound (non-blocking) +
2911//                                 Deadline/Lifespan/Liveliness
2912//
2913// Lock-Disziplin: Recv-Threads und Tick-Thread konkurrieren um
2914// `rt.sedp.lock()` / `rt.wlp.lock()` / per-Slot `slot.lock()`.
2915// Konvention: Lock-Hold-Zeiten kurz (handle_datagram + tick haben
2916// jeweils nur Single-Pass-Logik), kein Sub-Lock unter sedp/wlp.
2917// ---------------------------------------------------------------------
2918
2919/// Sprint D.5d Hebel C — Wendet SCHED_FIFO + CPU-Affinity auf den
2920/// aufrufenden Thread an. Linux-only; auf macOS/Windows no-op.
2921///
2922/// Wird von jedem Worker-Loop direkt am Anfang aufgerufen, sodass
2923/// die Syscalls auf dem tatsaechlichen Worker-Thread laufen
2924/// (`pthread_self()` muss aus dem eigenen Thread kommen).
2925///
2926/// Failures werden auf stderr geloggt aber sind nicht fatal — wenn
2927/// der Prozess kein `CAP_SYS_NICE` hat, laeuft die Runtime mit
2928/// CFS-Default-Scheduler weiter.
2929#[allow(unused_variables)]
2930fn apply_thread_tuning(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
2931    #[cfg(target_os = "linux")]
2932    rt_pinning::apply(label, priority, cpus);
2933}
2934
2935/// Linux-only `pthread_setschedparam` + `sched_setaffinity` Wrapper.
2936/// Eigenes Modul kapselt das `unsafe` lokal mit safety-Notes; das
2937/// Crate-Level `#![deny(unsafe_code)]` bleibt fuer den Rest der dcps-
2938/// Codebasis aktiv.
2939#[cfg(target_os = "linux")]
2940#[allow(unsafe_code, clippy::print_stderr)]
2941mod rt_pinning {
2942    pub(super) fn apply(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
2943        if let Some(prio) = priority {
2944            // SAFETY: libc-FFI mit owned `param`-Struct. Self-thread via
2945            // `pthread_self()` ist immer gueltig.
2946            unsafe {
2947                let param = libc::sched_param {
2948                    sched_priority: prio,
2949                };
2950                let rc = libc::pthread_setschedparam(
2951                    libc::pthread_self(),
2952                    libc::SCHED_FIFO,
2953                    &raw const param,
2954                );
2955                if rc != 0 {
2956                    eprintln!(
2957                        "zdds[{label}]: pthread_setschedparam SCHED_FIFO {prio} \
2958                         failed (rc={rc}). Need CAP_SYS_NICE or RLIMIT_RTPRIO."
2959                    );
2960                }
2961            }
2962        }
2963        if let Some(cpu_list) = cpus {
2964            // SAFETY: cpu_set_t ist POD; CPU_ZERO/SET sind libc-Inline-
2965            // Funktionen ohne Lifetime-Anforderungen.
2966            unsafe {
2967                let mut set: libc::cpu_set_t = core::mem::zeroed();
2968                libc::CPU_ZERO(&mut set);
2969                for &cpu in cpu_list {
2970                    if cpu < libc::CPU_SETSIZE as usize {
2971                        libc::CPU_SET(cpu, &mut set);
2972                    }
2973                }
2974                let rc = libc::sched_setaffinity(
2975                    0,
2976                    core::mem::size_of::<libc::cpu_set_t>(),
2977                    &raw const set,
2978                );
2979                if rc != 0 {
2980                    eprintln!("zdds[{label}]: sched_setaffinity({cpu_list:?}) failed.");
2981                }
2982            }
2983        }
2984    }
2985}
2986
2987/// Worker: blockt auf SPDP-Multicast-Socket, dispatcht SPDP-Beacons
2988/// + WLP-Heartbeats die ueber Multicast reinkommen.
2989fn recv_spdp_multicast_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
2990    apply_thread_tuning(
2991        "recv-spdp-mc",
2992        rt.config.recv_thread_priority,
2993        rt.config.recv_thread_cpus.as_deref(),
2994    );
2995    while !stop.load(Ordering::Relaxed) {
2996        let elapsed = rt.start_instant.elapsed();
2997        let sedp_now = Duration::from_secs(elapsed.as_secs())
2998            + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
2999        let Ok(dg) = rt.spdp_multicast_rx.recv() else {
3000            continue;
3001        };
3002        #[cfg(feature = "security")]
3003        let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3004        #[cfg(not(feature = "security"))]
3005        let clear = secure_inbound_bytes(&rt, &dg.data);
3006        if let Some(clear) = clear {
3007            handle_spdp_datagram(&rt, &clear);
3008            // WLP-Heartbeats kommen auf dem SPDP-Multicast-Socket
3009            // (Sender schickt sie auf SPDP-Multicast-Gruppe).
3010            // handle_spdp_datagram ignoriert sie, also feeden wir
3011            // den gleichen clear-Buffer auch in den WLP-Endpoint.
3012            if let Ok(mut wlp) = rt.wlp.lock() {
3013                let _ = wlp.handle_datagram(&clear, sedp_now);
3014            }
3015        }
3016    }
3017}
3018
3019/// Worker: blockt auf SPDP-Unicast (= Metatraffic-Socket), dispatcht
3020/// SPDP-Reverse-Beacons + SEDP + WLP + Security-Builtin.
3021fn recv_metatraffic_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3022    apply_thread_tuning(
3023        "recv-meta",
3024        rt.config.recv_thread_priority,
3025        rt.config.recv_thread_cpus.as_deref(),
3026    );
3027    while !stop.load(Ordering::Relaxed) {
3028        let elapsed = rt.start_instant.elapsed();
3029        let sedp_now = Duration::from_secs(elapsed.as_secs())
3030            + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
3031        let Ok(dg) = rt.spdp_unicast.recv() else {
3032            continue;
3033        };
3034        #[cfg(feature = "security")]
3035        let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3036        #[cfg(not(feature = "security"))]
3037        let clear = secure_inbound_bytes(&rt, &dg.data);
3038        if let Some(clear) = clear {
3039            // Ein einziger recv-Aufruf, beide Handler auf dasselbe
3040            // Datagramm. SPDP zuerst (Cyclone-Reverse-Beacons), dann
3041            // SEDP, dann WLP, dann Security-Builtin.
3042            handle_spdp_datagram(&rt, &clear);
3043            let events = {
3044                if let Ok(mut sedp) = rt.sedp.lock() {
3045                    sedp.handle_datagram(&clear, sedp_now).ok()
3046                } else {
3047                    None
3048                }
3049            };
3050            if let Some(ev) = events {
3051                if !ev.is_empty() {
3052                    run_matching_pass(&rt);
3053                    push_sedp_events_to_builtin_readers(&rt, &ev);
3054                }
3055            }
3056            if let Ok(mut wlp) = rt.wlp.lock() {
3057                let _ = wlp.handle_datagram(&clear, sedp_now);
3058            }
3059            dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
3060        }
3061    }
3062}
3063
3064/// Worker: blockt auf User-Data-Unicast-Socket, dispatcht
3065/// TypeLookup-Service-Replies + User-Sample-Datagrams.
3066fn recv_user_data_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3067    apply_thread_tuning(
3068        "recv-user",
3069        rt.config.recv_thread_priority,
3070        rt.config.recv_thread_cpus.as_deref(),
3071    );
3072    while !stop.load(Ordering::Relaxed) {
3073        let elapsed = rt.start_instant.elapsed();
3074        let sedp_now = Duration::from_secs(elapsed.as_secs())
3075            + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
3076        let Ok(dg) = rt.user_unicast.recv() else {
3077            continue;
3078        };
3079        #[cfg(feature = "security")]
3080        let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3081        #[cfg(not(feature = "security"))]
3082        let clear = secure_inbound_bytes(&rt, &dg.data);
3083        if let Some(clear) = clear {
3084            // TypeLookup-Service zuerst — wenn der Frame an
3085            // TL_SVC_*_READER adressiert ist, geht er nicht an einen
3086            // User-Reader. Andere Frames fallen durch.
3087            if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
3088                handle_user_datagram(&rt, &clear, sedp_now);
3089            }
3090        }
3091    }
3092}
3093
3094/// Worker: periodische Outbound-Aufgaben + Per-Interface-Inbound
3095/// (non-blocking) + Housekeeping. Schlaeft `tick_period` zwischen
3096/// Iterationen.
3097fn tick_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3098    apply_thread_tuning(
3099        "tick",
3100        rt.config.tick_thread_priority,
3101        rt.config.tick_thread_cpus.as_deref(),
3102    );
3103    // Multicast-Target-Locator, an den wir SPDP-Beacons schicken.
3104    let mc_target = Locator {
3105        kind: LocatorKind::UdpV4,
3106        port: u32::from(u16::try_from(spdp_multicast_port(rt.domain_id as u32)).unwrap_or(7400)),
3107        address: {
3108            let mut a = [0u8; 16];
3109            a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
3110            a
3111        },
3112    };
3113
3114    let mut next_announce = Instant::now(); // sofort beim Start
3115    while !stop.load(Ordering::Relaxed) {
3116        // Monotonic clock relativ zum Runtime-Start. Wird von SEDP-,
3117        // WLP- und User-Tick gleichermassen genutzt.
3118        let elapsed_since_start = rt.start_instant.elapsed();
3119        let sedp_now = Duration::from_secs(elapsed_since_start.as_secs())
3120            + Duration::from_nanos(u64::from(elapsed_since_start.subsec_nanos()));
3121
3122        // --- Periodic SPDP announce ---
3123        if Instant::now() >= next_announce {
3124            if let Ok(mut beacon) = rt.spdp_beacon.lock() {
3125                if let Ok(datagram) = beacon.serialize() {
3126                    if let Some(secured) = secure_outbound_bytes(&rt, &datagram) {
3127                        let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
3128                    }
3129                }
3130            }
3131            next_announce = Instant::now() + rt.config.spdp_period;
3132        }
3133
3134        // (SPDP-Multicast-Recv: jetzt in `recv_spdp_multicast_loop`.)
3135
3136        // --- SEDP-Tick (outbound HEARTBEAT/Resend/ACKNACK) ---
3137        let sedp_outbound = {
3138            if let Ok(mut sedp) = rt.sedp.lock() {
3139                sedp.tick(sedp_now).unwrap_or_default()
3140            } else {
3141                Vec::new()
3142            }
3143        };
3144        for dg in sedp_outbound {
3145            send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
3146        }
3147
3148        // --- Security-Builtin-Tick ---
3149        // Volatile-Secure-Writer heartbeats + Volatile-Secure-Reader
3150        // ACKNACK/NACK_FRAG. Stateless hat keinen Tick (BestEffort).
3151        if let Some(stack) = rt.security_builtin_snapshot() {
3152            let outbound = {
3153                if let Ok(mut s) = stack.lock() {
3154                    s.poll(sedp_now).unwrap_or_default()
3155                } else {
3156                    Vec::new()
3157                }
3158            };
3159            for dg in outbound {
3160                send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
3161            }
3162        }
3163
3164        // --- WLP-Tick (Writer-Liveliness-Protocol Heartbeats) ---
3165        //
3166        // RTPS 2.5 §8.4.13: WLP-Heartbeats sind metatraffic-Verkehr.
3167        // Spec-Empfehlung: Multicast an alle bekannten Peers, ein
3168        // Heartbeat pro `lease_duration / 3`. Wir senden ueber den
3169        // SPDP-Multicast-Sender — das ist derselbe Socket der
3170        // SPDP-Beacons rausschickt, und gewaehrleistet dass alle
3171        // Peers die WLP-Pulse sehen ohne dass die Runtime per Peer
3172        // einen Unicast-Locator suchen muesste.
3173        let wlp_outbound = {
3174            if let Ok(mut wlp) = rt.wlp.lock() {
3175                wlp.tick(sedp_now).unwrap_or(None)
3176            } else {
3177                None
3178            }
3179        };
3180        if let Some(bytes) = wlp_outbound {
3181            if let Some(secured) = secure_outbound_bytes(&rt, &bytes) {
3182                let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
3183            }
3184        }
3185
3186        // (Metatraffic-Unicast-Recv: jetzt in `recv_metatraffic_loop`.)
3187
3188        // --- User-Writer-Tick (HEARTBEAT + Resends) ---
3189        //
3190        // Security: Per-Target-Serializer. Ein Datagram kann an
3191        // mehrere Reader-Locators gehen. Pro Target ziehen wir es
3192        // individuell durch `secure_outbound_for_target`, damit die
3193        // Wire-Payload zur Protection-Klasse des jeweiligen Readers
3194        // passt.
3195        let user_writer_outbound: Vec<(EntityId, _)> = {
3196            let mut all = Vec::new();
3197            for (eid, arc) in rt.writer_slots_snapshot() {
3198                if let Ok(mut slot) = arc.lock() {
3199                    if let Ok(dgs) = slot.writer.tick(sedp_now) {
3200                        for dg in dgs {
3201                            all.push((eid, dg));
3202                        }
3203                    }
3204                }
3205            }
3206            all
3207        };
3208        for (writer_eid, dg) in user_writer_outbound {
3209            for t in dg.targets.iter() {
3210                if t.kind != LocatorKind::UdpV4 {
3211                    continue;
3212                }
3213                if let Some(secured) = secure_outbound_for_target(&rt, writer_eid, &dg.bytes, t) {
3214                    send_on_best_interface(&rt, t, &secured);
3215                }
3216            }
3217        }
3218
3219        // --- User-Reader-Tick-Outbound (ACKNACK / NACK_FRAG) ---
3220        let user_reader_outbound: Vec<_> = {
3221            let mut all = Vec::new();
3222            for (_eid, arc) in rt.reader_slots_snapshot() {
3223                if let Ok(mut slot) = arc.lock() {
3224                    if let Ok(dgs) = slot.reader.tick_outbound(sedp_now) {
3225                        all.extend(dgs);
3226                    }
3227                }
3228            }
3229            all
3230        };
3231        for dg in user_reader_outbound {
3232            if let Some(secured) = secure_outbound_bytes(&rt, &dg.bytes) {
3233                for t in dg.targets.iter() {
3234                    if t.kind == LocatorKind::UdpV4 {
3235                        let _ = rt.user_unicast.send(t, &secured);
3236                    }
3237                }
3238            }
3239        }
3240
3241        // (User-Data-Unicast-Recv: jetzt in `recv_user_data_loop`.)
3242
3243        // --- Per-Interface-Inbound ---
3244        //
3245        // Jedes Pool-Binding wird non-blocking gepollt; das
3246        // empfangene Datagram geht durch `secure_inbound_bytes` mit
3247        // der passenden NetInterface-Klasse. Damit kann die
3248        // PolicyEngine Interface-spezifische Entscheidungen treffen
3249        // (z.B. Loopback-Plain auf Protected-Domain akzeptieren).
3250        //
3251        // Die non-blocking-Semantik wird erzielt, indem jedes Socket
3252        // im `bind_all` einen kurzen Read-Timeout haelt — siehe
3253        // `OutboundSocketPool::bind_all`. Ohne Timeout wuerde der
3254        // Event-Loop pro Tick an einem leeren Binding haengen.
3255        #[cfg(feature = "security")]
3256        if let Some(pool) = &rt.outbound_pool {
3257            for binding in &pool.bindings {
3258                while let Ok(dg) = binding.socket.recv() {
3259                    let iface = binding.spec.kind.clone();
3260                    if let Some(clear) = secure_inbound_bytes(&rt, &dg.data, &iface) {
3261                        // Versuche SPDP zuerst (reverse-Beacons), dann
3262                        // SEDP, dann User-Data — gleicher Dispatch wie
3263                        // bei den Legacy-Sockets.
3264                        handle_spdp_datagram(&rt, &clear);
3265                        let events = rt
3266                            .sedp
3267                            .lock()
3268                            .ok()
3269                            .and_then(|mut s| s.handle_datagram(&clear, sedp_now).ok());
3270                        if let Some(ev) = events {
3271                            if !ev.is_empty() {
3272                                run_matching_pass(&rt);
3273                                push_sedp_events_to_builtin_readers(&rt, &ev);
3274                            }
3275                        }
3276                        if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
3277                            handle_user_datagram(&rt, &clear, sedp_now);
3278                        }
3279                        // DDS-Security 1.2 §7.4.2 Builtin-Endpoints
3280                        dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
3281                    }
3282                }
3283            }
3284        }
3285
3286        // --- Deadline-Monitoring ---
3287        check_deadlines(&rt, elapsed_since_start);
3288        // --- Lifespan-Expire ---
3289        expire_by_lifespan(&rt, elapsed_since_start);
3290        // --- Liveliness-Lease-Check (Reader-Seite) ---
3291        check_liveliness(&rt, elapsed_since_start);
3292        // --- Writer-Seite-Liveliness-Lost-Check ---
3293        check_writer_liveliness(&rt, elapsed_since_start);
3294
3295        // Tick-Period-Sleep. Vorher hat der Single-Thread durch die
3296        // recv()-Timeouts implizit gewartet; die Recv-Threads sind
3297        // jetzt eigenstaendig, also schlaeft der Tick-Thread aktiv.
3298        std::thread::sleep(rt.config.tick_period);
3299    }
3300}
3301
3302/// Writer-Seite-Liveliness-Lost-Detection. Spec §2.2.4.2.10.
3303///
3304/// Fuer alle User-Writer: wenn Lease-Duration gesetzt und seit dem
3305/// letzten Assert (Automatic = `last_write`, Manual =
3306/// `last_liveliness_assert`) mehr Zeit verstrichen ist als die
3307/// Lease-Duration erlaubt, gilt der Writer aus DDS-Sicht als
3308/// "not-alive" — `liveliness_lost_count++` und Fenster zuruecksetzen.
3309///
3310/// Hinweis: Bei reinen Best-Effort-Tests + `Automatic` faellt der
3311/// Counter typischerweise nicht — Automatic asserts mit jedem
3312/// `write_user_sample`. Manual-Modus erfordert explicit
3313/// `assert_liveliness` (kommt mit .4b — bis dahin geben wir hier
3314/// die Detection schon her, der Hot-Path-Trigger triggert sie).
3315fn check_writer_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3316    let now_nanos = now.as_nanos() as u64;
3317    for (_eid, arc) in rt.writer_slots_snapshot() {
3318        let Ok(mut slot) = arc.lock() else { continue };
3319        if slot.liveliness_lease_nanos == 0 {
3320            continue;
3321        }
3322        let last = match slot.liveliness_kind {
3323            zerodds_qos::LivelinessKind::Automatic => slot.last_write,
3324            _ => slot.last_liveliness_assert,
3325        };
3326        let last_nanos = match last {
3327            Some(t) => t.as_nanos() as u64,
3328            None => continue,
3329        };
3330        if now_nanos.saturating_sub(last_nanos) >= slot.liveliness_lease_nanos {
3331            slot.liveliness_lost_count = slot.liveliness_lost_count.saturating_add(1);
3332            // Fenster zuruecksetzen, damit derselbe Lease-Window-
3333            // Ueberlauf nicht in einer Endlosschleife zaehlt.
3334            // Spec §2.2.3.11: "lease has elapsed" — `>=` ist boundary-
3335            // stabil und vermeidet Flakiness, wenn tick_period == lease.
3336            slot.last_liveliness_assert = Some(now);
3337            slot.last_write = Some(now);
3338        }
3339    }
3340}
3341
3342/// Prueft fuer alle User-Reader, ob der Writer seit laengerer Zeit als
3343/// `lease_duration` kein Sample geliefert hat. Falls ja: Transition
3344/// alive → not_alive, `not_alive_count++`.
3345///
3346/// Automatic-Liveliness (§2.2.3.11): jeder Write ist implicit assert.
3347/// Also pruefen wir den Reader-seitigen `last_sample_received`.
3348/// Manual-Kinds kommen mit .4b (explicit assert-Nachrichten).
3349fn check_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3350    let now_nanos = now.as_nanos() as u64;
3351    for (_eid, arc) in rt.reader_slots_snapshot() {
3352        let Ok(mut slot) = arc.lock() else { continue };
3353        if slot.liveliness_lease_nanos == 0 {
3354            continue;
3355        }
3356        // Bis zum ersten Sample: als alive betrachten (optimistisch).
3357        let last = match slot.last_sample_received {
3358            Some(t) => t.as_nanos() as u64,
3359            None => continue,
3360        };
3361        if now_nanos.saturating_sub(last) >= slot.liveliness_lease_nanos && slot.liveliness_alive {
3362            slot.liveliness_alive = false;
3363            slot.liveliness_not_alive_count = slot.liveliness_not_alive_count.saturating_add(1);
3364        }
3365    }
3366}
3367
3368/// Fuer alle User-Writer: Samples im HistoryCache entfernen, deren
3369/// Insert-Zeit + Lifespan abgelaufen ist. OMG DDS 1.4 §2.2.3.16:
3370/// "If the duration...elapses and the sample is still in the cache...
3371/// the sample is no longer available to any future DataReaders".
3372///
3373/// Implementation: `sample_insert_times` ist eine VecDeque, sortiert
3374/// nach Insert-Zeit (= SN, weil monoton). Front-pop solange expired;
3375/// der hoechste expired SN laeuft via `cache.remove_up_to(sn + 1)`
3376/// durch.
3377fn expire_by_lifespan(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3378    let now_nanos = now.as_nanos() as u64;
3379    for (_eid, arc) in rt.writer_slots_snapshot() {
3380        let Ok(mut slot) = arc.lock() else { continue };
3381        if slot.lifespan_nanos == 0 {
3382            continue;
3383        }
3384        let mut highest_expired = None;
3385        while let Some(&(sn, inserted)) = slot.sample_insert_times.front() {
3386            let inserted_nanos = inserted.as_nanos() as u64;
3387            if now_nanos.saturating_sub(inserted_nanos) >= slot.lifespan_nanos {
3388                highest_expired = Some(sn);
3389                slot.sample_insert_times.pop_front();
3390            } else {
3391                break;
3392            }
3393        }
3394        if let Some(sn) = highest_expired {
3395            let _removed = slot
3396                .writer
3397                .remove_samples_up_to(zerodds_rtps::wire_types::SequenceNumber(sn.0 + 1));
3398        }
3399    }
3400}
3401
3402/// Prueft fuer alle User-Writer + User-Reader, ob die Deadline-Period
3403/// seit dem letzten Sample ueberschritten ist. Jede Ueberschreitung
3404/// inkrementiert den entsprechenden Missed-Counter um genau 1
3405/// — unabhaengig davon wie oft `check_deadlines` innerhalb eines
3406/// abgelaufenen Fensters gerufen wird, denn wir setzen `last_*`
3407/// auf "now" weiter, nachdem wir gezaehlt haben.
3408///
3409/// **Init-State-Semantik:** Solange `last_write`/`last_sample_received`
3410/// `None` ist (kein echter Write/Sample bisher), zaehlt der Deadline-
3411/// Check nicht. Erst nach dem ersten echten Datenpunkt startet das
3412/// Deadline-Fenster. Das verhindert falsche Misses durch langsamen
3413/// Entity-Setup (Linux-CI/Container) bevor die App ueberhaupt einen
3414/// Write absetzt.
3415fn check_deadlines(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3416    let now_nanos = now.as_nanos() as u64;
3417    for (_eid, arc) in rt.writer_slots_snapshot() {
3418        let Ok(mut slot) = arc.lock() else { continue };
3419        if slot.deadline_nanos == 0 {
3420            continue;
3421        }
3422        let Some(last) = slot.last_write.map(|d| d.as_nanos() as u64) else {
3423            // Noch nie geschrieben → Deadline-Fenster nicht aktiv.
3424            continue;
3425        };
3426        if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
3427            slot.offered_deadline_missed_count =
3428                slot.offered_deadline_missed_count.saturating_add(1);
3429            // Fenster zurücksetzen: naechste Deadline wird relativ
3430            // zum jetzigen Tick neu gezaehlt. `>=` ist boundary-stabil
3431            // (Spec §2.2.3.7: "deadline has elapsed").
3432            slot.last_write = Some(now);
3433        }
3434    }
3435    for (_eid, arc) in rt.reader_slots_snapshot() {
3436        let Ok(mut slot) = arc.lock() else { continue };
3437        if slot.deadline_nanos == 0 {
3438            continue;
3439        }
3440        let Some(last) = slot.last_sample_received.map(|d| d.as_nanos() as u64) else {
3441            continue;
3442        };
3443        if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
3444            slot.requested_deadline_missed_count =
3445                slot.requested_deadline_missed_count.saturating_add(1);
3446            slot.last_sample_received = Some(now);
3447        }
3448    }
3449}
3450
3451/// Fuer alle lokalen Writer + Reader: Matching gegen den aktuellen
3452/// SEDP-Cache. Billiger re-run wenn SEDP-events einflogen — idempotent,
3453/// weil ReliableWriter/Reader add_*_proxy idempotent sind (gleicher
3454/// GUID → ersetzt).
3455fn run_matching_pass(rt: &Arc<DcpsRuntime>) {
3456    let writer_ids: Vec<EntityId> = rt.writer_eids();
3457    for eid in writer_ids {
3458        rt.match_local_writer_against_cache(eid);
3459    }
3460    let reader_ids: Vec<EntityId> = rt.reader_eids();
3461    for eid in reader_ids {
3462        rt.match_local_reader_against_cache(eid);
3463    }
3464}
3465
3466/// Liefert den default-unicast-Locator eines entdeckten Remote-
3467/// Participants.
3468fn remote_user_locators(
3469    prefix: GuidPrefix,
3470    discovered: &Arc<Mutex<DiscoveredParticipantsCache>>,
3471) -> Vec<Locator> {
3472    match discovered.lock() {
3473        Ok(cache) => cache
3474            .get(&prefix)
3475            .and_then(|p| p.data.default_unicast_locator)
3476            .into_iter()
3477            .collect(),
3478        Err(_) => Vec::new(),
3479    }
3480}
3481
3482/// Dispatched ein eingegangenes RTPS-Datagramm an passende User-Reader.
3483/// Entscheidet anhand der `reader_id` in DATA/DATA_FRAG/HEARTBEAT/GAP,
3484/// welcher lokale Reader zustaendig ist.
3485/// Strip den 4-Byte-Encapsulation-Header vom empfangenen Sample-Payload.
3486/// Liefert `None` wenn Payload < 4 Bytes ist oder ein unbekanntes
3487/// Scheme trägt (PL_CDR-Varianten kämen hier nicht hin; die gehen über
3488/// SEDP — wenn wir sowas auf User-Endpoints sehen, ist es Garbage).
3489/// Spec §3.2 zerodds-async-1.0: weckt einen registrierten Waker
3490/// nach jedem `sample_tx.send`. `take` consumed den Waker, um
3491/// Doppel-Wakeups zu vermeiden — der Caller registriert nach
3492/// jedem `Pending`-Result einen neuen.
3493fn wake_async_waker(slot: &alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>) {
3494    if let Ok(mut g) = slot.lock() {
3495        if let Some(w) = g.take() {
3496            w.wake();
3497        }
3498    }
3499}
3500
3501/// Konvertiert ein vom ReliableReader geliefertes Sample in einen
3502/// `UserSample`-Channel-Eintrag. Bei `ChangeKind::Alive` wird der
3503/// CDR-Encapsulation-Header abgestrippt; bei Lifecycle-Markern
3504/// wird der KeyHash aus den Bytes rekonstruiert.
3505/// Inspect-Endpoint Tap-Dispatch fuer DCPS-Receive-Pfad.
3506///
3507/// Wird in `handle_user_datagram` aufgerufen wenn ein Sample an
3508/// einen User-Reader ausgeliefert wird. Nur wenn `inspect`-Feature
3509/// an ist; ohne Feature kein Code, kein Branch.
3510#[cfg(feature = "inspect")]
3511fn dispatch_inspect_dcps_receive_tap(topic: &str, reader_id: EntityId, item: &UserSample) {
3512    let payload: Vec<u8> = match item {
3513        UserSample::Alive { payload, .. } => payload.clone(),
3514        UserSample::Lifecycle { key_hash, .. } => key_hash.to_vec(),
3515    };
3516    let ts_ns = std::time::SystemTime::now()
3517        .duration_since(std::time::UNIX_EPOCH)
3518        .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
3519        .unwrap_or(0);
3520    let mut corr: u64 = 0;
3521    for (i, byte) in reader_id.entity_key.iter().enumerate() {
3522        corr |= u64::from(*byte) << (i * 8);
3523    }
3524    corr |= u64::from(reader_id.entity_kind as u8) << 24;
3525    let frame = zerodds_inspect_endpoint::Frame::dcps(topic.to_owned(), ts_ns, corr, payload);
3526    zerodds_inspect_endpoint::tap::dispatch(&frame);
3527}
3528
3529fn delivered_to_user_sample(
3530    sample: &zerodds_rtps::reliable_reader::DeliveredSample,
3531    writer_strengths: &alloc::collections::BTreeMap<[u8; 16], i32>,
3532) -> Option<UserSample> {
3533    use zerodds_rtps::history_cache::ChangeKind;
3534    match sample.kind {
3535        ChangeKind::Alive | ChangeKind::AliveFiltered => {
3536            let writer_guid = sample.writer_guid.to_bytes();
3537            let writer_strength = writer_strengths.get(&writer_guid).copied().unwrap_or(0);
3538            strip_user_encap(&sample.payload).map(|payload| UserSample::Alive {
3539                payload,
3540                writer_guid,
3541                writer_strength,
3542            })
3543        }
3544        ChangeKind::NotAliveDisposed
3545        | ChangeKind::NotAliveUnregistered
3546        | ChangeKind::NotAliveDisposedUnregistered => {
3547            // Lifecycle-Marker: Spec §9.6.4.8 + §9.6.3.9 verlangt
3548            // `PID_KEY_HASH` im Inline-QoS — der Reader liest ihn aus
3549            // und propagiert ihn ueber `DeliveredSample.key_hash`.
3550            // Fallback: bei nicht-spec-konformen Writern faellt der
3551            // Hash zurueck auf die ersten 16 Byte des Key-Only-Payloads
3552            // (PLAIN_CDR2-BE Key-Holder).
3553            let kh = sample.key_hash.unwrap_or_else(|| {
3554                let mut h = [0u8; 16];
3555                let n = sample.payload.len().min(16);
3556                h[..n].copy_from_slice(&sample.payload[..n]);
3557                h
3558            });
3559            Some(UserSample::Lifecycle {
3560                key_hash: kh,
3561                kind: sample.kind,
3562            })
3563        }
3564    }
3565}
3566
3567/// Pruefe ob `payload` ein bekanntes 4-byte Encapsulation-Header hat.
3568/// Returns `Some(4)` wenn ja (= Offset hinter dem Header), `None` wenn
3569/// kein bekanntes Schema. Nutzungstrennung von [`strip_user_encap`]:
3570/// hier nur Validierung ohne Allokation, fuer den Listener-Zero-Copy-
3571/// Pfad (Hebel E / Sprint D.5d).
3572fn validate_user_encap_offset(payload: &[u8]) -> Option<usize> {
3573    if payload.len() < 4 {
3574        return None;
3575    }
3576    // Akzeptiere alle Data-Representation-Schemes (XCDR1/XCDR2, LE/BE).
3577    // Cyclone sendet oft XCDR1 (0x00,0x01) fuer RawBytes-aehnliche Typen,
3578    // FastDDS eher XCDR2. Key-ed Schemes (0x00,0x02 / 0x00,0x03) sind
3579    // PL_CDR und kommen auf User-Endpoints nur bei Key-serialization —
3580    // wir akzeptieren sie und schleusen den Payload durch
3581    // (Key-Filtering passiert im typisierten Reader-Pfad).
3582    use zerodds_rtps::participant_message_data::{
3583        ENCAPSULATION_CDR_BE, ENCAPSULATION_CDR_LE, ENCAPSULATION_CDR2_BE, ENCAPSULATION_CDR2_LE,
3584    };
3585    const ENCAPSULATION_PL_CDR_BE: [u8; 2] = [0x00, 0x02];
3586    const ENCAPSULATION_PL_CDR_LE: [u8; 2] = [0x00, 0x03];
3587    let k = [payload[0], payload[1]];
3588    let known = k == ENCAPSULATION_CDR_BE
3589        || k == ENCAPSULATION_CDR_LE
3590        || k == ENCAPSULATION_PL_CDR_BE
3591        || k == ENCAPSULATION_PL_CDR_LE
3592        || k == ENCAPSULATION_CDR2_BE
3593        || k == ENCAPSULATION_CDR2_LE;
3594    if known { Some(4) } else { None }
3595}
3596
3597fn strip_user_encap(payload: &[u8]) -> Option<Vec<u8>> {
3598    validate_user_encap_offset(payload).map(|off| payload[off..].to_vec())
3599}
3600
3601fn handle_user_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
3602    let parsed = match decode_datagram(bytes) {
3603        Ok(p) => p,
3604        Err(_) => return,
3605    };
3606    // Per-Submessage: per Submessage einzeln den passenden Slot-
3607    // Mutex nehmen — keine globale user_writers/user_readers-Lock mehr.
3608    // Mit Per-Submessage-Granularitaet koennen Reader-Datagramme parallel
3609    // zu Writer-AckNacks verarbeitet werden.
3610    for sub in parsed.submessages {
3611        match sub {
3612            ParsedSubmessage::Data(d) => {
3613                // Sprint D.5d Hebel B — collect-then-dispatch:
3614                // Sample-Konversion + Liveliness-Update inside slot.lock,
3615                // dann Listener-Fire + Channel-Send + Waker-Wake
3616                // OUTSIDE des locks. Reduziert lock-hold-time auf
3617                // ~µs (state-machine + collect), und User-Callback-Code
3618                // (Listener) blockt nicht mehr den Recv-Pfad fuer andere
3619                // Submessages oder den Tick-Thread (ACKNACK/HB).
3620                let Some(arc) = rt.reader_slot(d.reader_id) else {
3621                    continue;
3622                };
3623                // Hebel E: parallel zum UserSample tragen wir eine
3624                // Zero-Copy-Sicht auf das Original-`Arc<[u8]>` mit
3625                // dem Encap-Offset — der Listener kann damit ohne
3626                // Allokation in die Nutzdaten lesen.
3627                let mut items: Vec<UserSampleWithEncap> = Vec::new();
3628                let listener;
3629                let waker;
3630                let sender;
3631                #[cfg(feature = "inspect")]
3632                let topic_name;
3633                {
3634                    let Ok(mut slot) = arc.lock() else { continue };
3635                    for sample in slot.reader.handle_data(&d) {
3636                        // Listener-Zero-Copy-View nur fuer Alive-Samples
3637                        // mit gueltigem Encap-Header. Arc::clone ist
3638                        // ein atomarer Refcount-Inc, keine Daten-Kopie.
3639                        let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
3640                            zerodds_rtps::history_cache::ChangeKind::Alive
3641                            | zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
3642                                validate_user_encap_offset(&sample.payload)
3643                                    .map(|off| (Arc::clone(&sample.payload), off))
3644                            }
3645                            _ => None,
3646                        };
3647                        if let Some(item) =
3648                            delivered_to_user_sample(&sample, &slot.writer_strengths)
3649                        {
3650                            items.push((item, listener_view));
3651                        }
3652                    }
3653                    if !items.is_empty() {
3654                        slot.last_sample_received = Some(now);
3655                        if !slot.liveliness_alive {
3656                            slot.liveliness_alive = true;
3657                            slot.liveliness_alive_count =
3658                                slot.liveliness_alive_count.saturating_add(1);
3659                        }
3660                    }
3661                    listener = slot.listener.clone();
3662                    waker = Arc::clone(&slot.async_waker);
3663                    sender = slot.sample_tx.clone();
3664                    #[cfg(feature = "inspect")]
3665                    {
3666                        topic_name = slot.topic_name.clone();
3667                    }
3668                }
3669                // --- Outside slot.lock: dispatch ---
3670                for (item, listener_view) in items {
3671                    #[cfg(feature = "inspect")]
3672                    dispatch_inspect_dcps_receive_tap(&topic_name, d.reader_id, &item);
3673                    if let Some(ref l) = listener {
3674                        if let Some((arc_payload, off)) = listener_view {
3675                            // Zero-Copy: Slice-View in das Original-Arc.
3676                            l(&arc_payload[off..]);
3677                        }
3678                    }
3679                    let _ = sender.send(item);
3680                    wake_async_waker(&waker);
3681                }
3682            }
3683            ParsedSubmessage::DataFrag(df) => {
3684                // Hebel B+E — siehe Data-Arm oben.
3685                let Some(arc) = rt.reader_slot(df.reader_id) else {
3686                    continue;
3687                };
3688                let mut items: Vec<UserSampleWithEncap> = Vec::new();
3689                let listener;
3690                let waker;
3691                let sender;
3692                #[cfg(feature = "inspect")]
3693                let topic_name;
3694                {
3695                    let Ok(mut slot) = arc.lock() else { continue };
3696                    for sample in slot.reader.handle_data_frag(&df, now) {
3697                        let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
3698                            zerodds_rtps::history_cache::ChangeKind::Alive
3699                            | zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
3700                                validate_user_encap_offset(&sample.payload)
3701                                    .map(|off| (Arc::clone(&sample.payload), off))
3702                            }
3703                            _ => None,
3704                        };
3705                        if let Some(item) =
3706                            delivered_to_user_sample(&sample, &slot.writer_strengths)
3707                        {
3708                            items.push((item, listener_view));
3709                        }
3710                    }
3711                    if !items.is_empty() {
3712                        slot.last_sample_received = Some(now);
3713                        if !slot.liveliness_alive {
3714                            slot.liveliness_alive = true;
3715                            slot.liveliness_alive_count =
3716                                slot.liveliness_alive_count.saturating_add(1);
3717                        }
3718                    }
3719                    listener = slot.listener.clone();
3720                    waker = Arc::clone(&slot.async_waker);
3721                    sender = slot.sample_tx.clone();
3722                    #[cfg(feature = "inspect")]
3723                    {
3724                        topic_name = slot.topic_name.clone();
3725                    }
3726                }
3727                for (item, listener_view) in items {
3728                    #[cfg(feature = "inspect")]
3729                    dispatch_inspect_dcps_receive_tap(&topic_name, df.reader_id, &item);
3730                    if let Some(ref l) = listener {
3731                        if let Some((arc_payload, off)) = listener_view {
3732                            l(&arc_payload[off..]);
3733                        }
3734                    }
3735                    let _ = sender.send(item);
3736                    wake_async_waker(&waker);
3737                }
3738            }
3739            ParsedSubmessage::Heartbeat(h) => {
3740                // Hebel B — collect-then-dispatch wie Data-Arm. HB kann
3741                // Samples freischalten, die auf Hole-Fill warteten
3742                // (Volatile-Skip, Historic-Eviction).
3743                //
3744                // D.5e Phase-2: synchroner ACKNACK-Emit on HB-receipt
3745                // statt deferred-via-tick. Mit `heartbeat_response_delay=0`
3746                // (D.5e default) flush'd `tick_outbound(now)` direkt die
3747                // ACKNACK fuer alle pending writer_proxies — der Tick-Loop
3748                // muss nicht mehr 5 ms warten.
3749                let Some(arc) = rt.reader_slot(h.reader_id) else {
3750                    continue;
3751                };
3752                let mut items: Vec<UserSample> = Vec::new();
3753                let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
3754                    Vec::new();
3755                let waker;
3756                let sender;
3757                {
3758                    let Ok(mut slot) = arc.lock() else { continue };
3759                    for sample in slot.reader.handle_heartbeat(&h, now) {
3760                        if let Some(item) =
3761                            delivered_to_user_sample(&sample, &slot.writer_strengths)
3762                        {
3763                            items.push(item);
3764                        }
3765                    }
3766                    if !items.is_empty() {
3767                        slot.last_sample_received = Some(now);
3768                        if !slot.liveliness_alive {
3769                            slot.liveliness_alive = true;
3770                            slot.liveliness_alive_count =
3771                                slot.liveliness_alive_count.saturating_add(1);
3772                        }
3773                    }
3774                    // D.5e Phase-2: synchroner ACKNACK direkt im recv-thread.
3775                    if let Ok(dgs) = slot.reader.tick_outbound(now) {
3776                        sync_outbound = dgs;
3777                    }
3778                    waker = Arc::clone(&slot.async_waker);
3779                    sender = slot.sample_tx.clone();
3780                }
3781                for item in items {
3782                    let _ = sender.send(item);
3783                    wake_async_waker(&waker);
3784                }
3785                // ACKNACK-Datagrams synchron senden — kein tick-Quantisierungs-Tax.
3786                for dg in sync_outbound {
3787                    if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
3788                        for t in dg.targets.iter() {
3789                            if t.kind == LocatorKind::UdpV4 {
3790                                let _ = rt.user_unicast.send(t, &secured);
3791                            }
3792                        }
3793                    }
3794                }
3795            }
3796            ParsedSubmessage::Gap(g) => {
3797                if let Some(arc) = rt.reader_slot(g.reader_id) {
3798                    if let Ok(mut slot) = arc.lock() {
3799                        for sample in slot.reader.handle_gap(&g) {
3800                            if let Some(item) =
3801                                delivered_to_user_sample(&sample, &slot.writer_strengths)
3802                            {
3803                                let _ = slot.sample_tx.send(item);
3804                                wake_async_waker(&slot.async_waker);
3805                            }
3806                        }
3807                    }
3808                }
3809            }
3810            ParsedSubmessage::AckNack(ack) => {
3811                if let Some(arc) = rt.writer_slot(ack.writer_id) {
3812                    let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
3813                        Vec::new();
3814                    if let Ok(mut slot) = arc.lock() {
3815                        let base = ack.reader_sn_state.bitmap_base;
3816                        let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
3817                        let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
3818                        slot.writer.handle_acknack(src, base, requested);
3819                        // D.5e Phase-2: synchrone Resend bei NACK-receipt.
3820                        // ACKNACK kann requested SNs fuer Resend aufgelistet haben;
3821                        // tick liefert die Resend-Datagrams direkt im recv-thread.
3822                        if let Ok(dgs) = slot.writer.tick(now) {
3823                            sync_outbound = dgs;
3824                        }
3825                    }
3826                    // ACK-Event-Cvar: wake `wait_for_acknowledgments`-waiters.
3827                    rt.notify_ack_event();
3828                    // Sync resends senden (kein tick-Wait mehr).
3829                    for dg in sync_outbound {
3830                        if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
3831                            for t in dg.targets.iter() {
3832                                if t.kind == LocatorKind::UdpV4 {
3833                                    let _ = rt.user_unicast.send(t, &secured);
3834                                }
3835                            }
3836                        }
3837                    }
3838                }
3839            }
3840            ParsedSubmessage::NackFrag(nf) => {
3841                if let Some(arc) = rt.writer_slot(nf.writer_id) {
3842                    if let Ok(mut slot) = arc.lock() {
3843                        let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
3844                        slot.writer.handle_nackfrag(src, &nf);
3845                    }
3846                }
3847            }
3848            _ => {}
3849        }
3850    }
3851}
3852
3853/// Test-Hook: erlaubt direkten Aufruf von `handle_spdp_datagram` aus
3854/// anderen Modulen, ohne den ganzen Event-Loop hochfahren zu muessen.
3855/// Nur fuer interne Tests.
3856#[cfg(test)]
3857pub(crate) fn handle_spdp_datagram_for_test(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
3858    handle_spdp_datagram(rt, bytes);
3859}
3860
3861fn handle_spdp_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
3862    let parsed = match rt.spdp_reader.parse_datagram(bytes) {
3863        Ok(p) => p,
3864        Err(_) => return, // not SPDP or wire error — swallow
3865    };
3866    // Self-discovery filter: ignore unsere eigenen Beacons.
3867    if parsed.sender_prefix == rt.guid_prefix {
3868        return;
3869    }
3870    let is_new = {
3871        if let Ok(mut cache) = rt.discovered.lock() {
3872            cache.insert(parsed.clone())
3873        } else {
3874            false
3875        }
3876    };
3877    // Bei erstmaligem Entdecken: SEDP-Stack verdrahten + initiale
3878    // Announcements raussenden.
3879    if is_new {
3880        if let Ok(mut sedp) = rt.sedp.lock() {
3881            sedp.on_participant_discovered(&parsed);
3882        }
3883        // Security-Builtin-Stack analog verdrahten (no-op,
3884        // wenn Plugin nicht aktiv oder Peer keine Security-Bits hat).
3885        if let Some(sec) = rt.security_builtin_snapshot() {
3886            if let Ok(mut s) = sec.lock() {
3887                s.handle_remote_endpoints(&parsed);
3888            }
3889        }
3890    }
3891    //  SPDP-Receive in Builtin-DCPSParticipant-Reader spiegeln.
3892    // Wir senden bei jedem Beacon (auch refresh) — Spec §2.2.5.1
3893    // erlaubt das, take() liefert dem User die jeweils aktuellen
3894    // Daten. Ein Reader mit KEEP_LAST(1) erhaelt nur das neueste.
3895    if let Some(sinks) = rt.builtin_sinks_snapshot() {
3896        let dcps_sample =
3897            crate::builtin_topics::ParticipantBuiltinTopicData::from_wire(&parsed.data);
3898        // .7 §2.2.2.2.1.14: ignorierte Participants droppen, bevor
3899        // sie in den Builtin-Reader fallen.
3900        if let Some(filter) = rt.ignore_filter_snapshot() {
3901            let h = crate::instance_handle::InstanceHandle::from_guid(dcps_sample.key);
3902            if filter.is_participant_ignored(h) {
3903                return;
3904            }
3905        }
3906        let _ = sinks.push_participant(&dcps_sample);
3907    }
3908}
3909
3910/// Schiebt SEDP-Events (neue Pubs/Subs) in die 4 Builtin-Topic-
3911/// Reader. Ein neuer Pub/Sub erzeugt **zwei** Samples:
3912///
3913/// 1. ein `DCPSPublication`/`DCPSSubscription`-Sample,
3914/// 2. ein `DCPSTopic`-Sample (synthetisch aus Topic-Name + Type-Name).
3915///
3916/// Die nativen SEDP-Topics-Endpoints (RTPS 2.5 §9.3.2.12 Bits 28/29)
3917/// sind per Spec §8.5.4.4 optional und in ZeroDDS via diese
3918/// synthetische Ableitung abgedeckt — siehe auch
3919/// `endpoint_flag::ALL_STANDARD`, das die Topics-Bits gezielt
3920/// ausspart. Cyclone/Fast-DDS-Peers, die ihre eigenen Topic-
3921/// Announces senden, werden ignoriert (kein Reader-Endpoint).
3922fn push_sedp_events_to_builtin_readers(
3923    rt: &Arc<DcpsRuntime>,
3924    events: &zerodds_discovery::sedp::SedpEvents,
3925) {
3926    let Some(sinks) = rt.builtin_sinks_snapshot() else {
3927        return;
3928    };
3929    let filter = rt.ignore_filter_snapshot();
3930    for w in &events.new_publications {
3931        let pub_sample = crate::builtin_topics::PublicationBuiltinTopicData::from_wire(w);
3932        let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_publication(w);
3933        // .7 §2.2.2.2.1.14/.16: Participant- + Publication- +
3934        // Topic-Ignore-Filter konsultieren.
3935        if let Some(f) = &filter {
3936            let part_h = crate::instance_handle::InstanceHandle::from_guid(w.participant_key);
3937            let pub_h = crate::instance_handle::InstanceHandle::from_guid(w.key);
3938            let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
3939            if f.is_participant_ignored(part_h) || f.is_publication_ignored(pub_h) {
3940                continue;
3941            }
3942            let _ = sinks.push_publication(&pub_sample);
3943            if !f.is_topic_ignored(topic_h) {
3944                let _ = sinks.push_topic(&topic_sample);
3945            }
3946        } else {
3947            let _ = sinks.push_publication(&pub_sample);
3948            let _ = sinks.push_topic(&topic_sample);
3949        }
3950    }
3951    for r in &events.new_subscriptions {
3952        let sub_sample = crate::builtin_topics::SubscriptionBuiltinTopicData::from_wire(r);
3953        let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_subscription(r);
3954        if let Some(f) = &filter {
3955            let part_h = crate::instance_handle::InstanceHandle::from_guid(r.participant_key);
3956            let sub_h = crate::instance_handle::InstanceHandle::from_guid(r.key);
3957            let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
3958            if f.is_participant_ignored(part_h) || f.is_subscription_ignored(sub_h) {
3959                continue;
3960            }
3961            let _ = sinks.push_subscription(&sub_sample);
3962            if !f.is_topic_ignored(topic_h) {
3963                let _ = sinks.push_topic(&topic_sample);
3964            }
3965        } else {
3966            let _ = sinks.push_subscription(&sub_sample);
3967            let _ = sinks.push_topic(&topic_sample);
3968        }
3969    }
3970}
3971
3972/// Wire-Demux fuer die Security-Builtin-Topics. Routed eine
3973/// eingehende RTPS-Submessage-Sequenz an den `SecurityBuiltinStack`,
3974/// wenn der Stack aktiv ist. No-op, wenn das Datagram keinen Security-
3975/// Builtin-Reader anspricht oder das Plugin nicht enabled ist.
3976///
3977/// Wird vom Metatraffic-Receive-Pfad aufgerufen — Stateless +
3978/// VolatileSecure laufen ueber die SPDP-Unicast-Locators (PID 0x0032),
3979/// nicht ueber `user_unicast`.
3980fn dispatch_security_builtin_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
3981    let Some(stack) = rt.security_builtin_snapshot() else {
3982        return;
3983    };
3984    let Ok(parsed) = decode_datagram(bytes) else {
3985        return;
3986    };
3987    let Ok(mut s) = stack.lock() else {
3988        return;
3989    };
3990    for sub in parsed.submessages {
3991        match sub {
3992            ParsedSubmessage::Data(d) => {
3993                if d.reader_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
3994                    || d.writer_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
3995                {
3996                    // Decode-Fehler werden geschluckt — Stateless-Reader
3997                    // hat keinen Resend-Pfad, ein malformter Frame ist
3998                    // einfach verloren (Spec §10.3.4.1).
3999                    let _ = s.stateless_reader.handle_data(&d);
4000                } else if d.reader_id
4001                    == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
4002                {
4003                    let _ = s.volatile_reader.handle_data(&d);
4004                }
4005            }
4006            ParsedSubmessage::DataFrag(df) => {
4007                if df.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
4008                    let _ = s.volatile_reader.handle_data_frag(&df, now);
4009                }
4010            }
4011            ParsedSubmessage::Heartbeat(h) => {
4012                let to_volatile_reader = h.reader_id
4013                    == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
4014                    || (h.reader_id == EntityId::UNKNOWN
4015                        && h.writer_id
4016                            == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER);
4017                if to_volatile_reader {
4018                    s.volatile_reader.handle_heartbeat(&h, now);
4019                }
4020            }
4021            ParsedSubmessage::Gap(g) => {
4022                if g.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
4023                    let _ = s.volatile_reader.handle_gap(&g);
4024                }
4025            }
4026            ParsedSubmessage::AckNack(ack) => {
4027                if ack.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
4028                    let base = ack.reader_sn_state.bitmap_base;
4029                    let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
4030                    let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
4031                    s.volatile_writer.handle_acknack(src, base, requested);
4032                }
4033            }
4034            ParsedSubmessage::NackFrag(nf) => {
4035                if nf.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
4036                    let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
4037                    s.volatile_writer.handle_nackfrag(src, &nf);
4038                }
4039            }
4040            _ => {}
4041        }
4042    }
4043}
4044
4045/// Dispatcht ein Datagram, das an die TypeLookup-Service-Endpoints
4046/// adressiert ist (XTypes 1.3 §7.6.3.3.4). Behandelt eingehende
4047/// Requests (an `TL_SVC_REQ_READER`), generiert Replies und schickt
4048/// sie an den Source-Locator zurück; behandelt eingehende Replies
4049/// (an `TL_SVC_REPLY_READER`), korreliert mit dem Client.
4050///
4051/// Returns `true`, wenn das Datagramm vom TypeLookup-Pfad akzeptiert
4052/// wurde — der Caller kann dann den User-Reader-Pfad überspringen.
4053fn dispatch_type_lookup_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], source: &Locator) -> bool {
4054    use zerodds_cdr::{BufferReader, Endianness};
4055    use zerodds_types::type_lookup::{GetTypeDependenciesRequest, GetTypesReply, GetTypesRequest};
4056
4057    let Ok(parsed) = decode_datagram(bytes) else {
4058        return false;
4059    };
4060
4061    let mut accepted = false;
4062
4063    for sub in &parsed.submessages {
4064        let ParsedSubmessage::Data(d) = sub else {
4065            continue;
4066        };
4067        let payload: &[u8] = &d.serialized_payload;
4068        if payload.is_empty() {
4069            continue;
4070        }
4071        // Skip CDR-Encapsulation header (4 bytes) if present.
4072        let body: &[u8] = if payload.len() >= 4 && (payload[0] == 0x00 && payload[1] == 0x01) {
4073            &payload[4..]
4074        } else {
4075            payload
4076        };
4077
4078        // Inbound Request → Server.
4079        if d.reader_id == EntityId::TL_SVC_REQ_READER {
4080            accepted = true;
4081            // Try GetTypes-Request first; fall back to
4082            // GetTypeDependenciesRequest if that fails.
4083            let mut r = BufferReader::new(body, Endianness::Little);
4084            if let Ok(req) = GetTypesRequest::decode_from(&mut r) {
4085                let reply = match rt.type_lookup_server.lock() {
4086                    Ok(g) => g.handle_get_types(&req),
4087                    Err(_) => continue,
4088                };
4089                let _ = send_type_lookup_reply(
4090                    rt,
4091                    source,
4092                    TypeLookupReplyPayload::Types(reply),
4093                    d.writer_sn,
4094                );
4095                continue;
4096            }
4097            let mut r = BufferReader::new(body, Endianness::Little);
4098            if let Ok(req) = GetTypeDependenciesRequest::decode_from(&mut r) {
4099                let reply = match rt.type_lookup_server.lock() {
4100                    Ok(g) => g.handle_get_type_dependencies(&req),
4101                    Err(_) => continue,
4102                };
4103                let _ = send_type_lookup_reply(
4104                    rt,
4105                    source,
4106                    TypeLookupReplyPayload::Dependencies(reply),
4107                    d.writer_sn,
4108                );
4109                continue;
4110            }
4111        }
4112
4113        // Inbound Reply → Client.
4114        if d.reader_id == EntityId::TL_SVC_REPLY_READER {
4115            accepted = true;
4116            // Sequence-Number aus DATA-Submessage als Request-ID
4117            // (Spec §7.6.3.3.3 koppelt Reply an Sample-Identity).
4118            let (sn_high, sn_low) = d.writer_sn.split();
4119            let sn_u64 = ((u64::from(sn_high as u32)) << 32) | u64::from(sn_low);
4120            let request_id = zerodds_discovery::type_lookup::RequestId::from_u64(sn_u64);
4121            let mut r = BufferReader::new(body, Endianness::Little);
4122            if let Ok(reply) = GetTypesReply::decode_from(&mut r) {
4123                if let Ok(mut client) = rt.type_lookup_client.lock() {
4124                    client.handle_reply(request_id, TypeLookupReply::Types(reply));
4125                }
4126                continue;
4127            }
4128        }
4129    }
4130
4131    accepted
4132}
4133
4134/// Reply-Payload-Variants, die der TypeLookup-Server emittieren kann.
4135enum TypeLookupReplyPayload {
4136    Types(zerodds_types::type_lookup::GetTypesReply),
4137    Dependencies(zerodds_types::type_lookup::GetTypeDependenciesReply),
4138}
4139
4140/// Sendet einen TypeLookup-Reply an einen Peer-Locator als
4141/// DATA-Datagram auf dem TL_SVC_REPLY_WRITER → Peer's
4142/// TL_SVC_REPLY_READER. Sequence-Number echo-d die Request-Sequence
4143/// für Korrelations-Zwecke (siehe XTypes §7.6.3.3.3 Sample-Identity).
4144fn send_type_lookup_reply(
4145    rt: &Arc<DcpsRuntime>,
4146    target: &Locator,
4147    reply: TypeLookupReplyPayload,
4148    request_sn: zerodds_rtps::wire_types::SequenceNumber,
4149) -> Result<()> {
4150    use alloc::sync::Arc as AllocArc;
4151    use zerodds_cdr::{BufferWriter, Endianness};
4152    use zerodds_rtps::datagram::encode_data_datagram;
4153    use zerodds_rtps::header::RtpsHeader;
4154    use zerodds_rtps::submessages::DataSubmessage;
4155    use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
4156
4157    // CDR-encode reply (PL_CDR_LE-Encapsulation).
4158    let mut w = BufferWriter::new(Endianness::Little);
4159    match reply {
4160        TypeLookupReplyPayload::Types(r) => {
4161            r.encode_into(&mut w)
4162                .map_err(|_| DdsError::PreconditionNotMet {
4163                    reason: "type_lookup reply encode failed",
4164                })?;
4165        }
4166        TypeLookupReplyPayload::Dependencies(r) => {
4167            r.encode_into(&mut w)
4168                .map_err(|_| DdsError::PreconditionNotMet {
4169                    reason: "type_lookup deps reply encode failed",
4170                })?;
4171        }
4172    }
4173    let body = w.into_bytes();
4174    let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
4175    payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
4176    payload.extend_from_slice(&body);
4177
4178    let header = RtpsHeader {
4179        protocol_version: ProtocolVersion::CURRENT,
4180        vendor_id: VendorId::ZERODDS,
4181        guid_prefix: rt.guid_prefix,
4182    };
4183    let data = DataSubmessage {
4184        extra_flags: 0,
4185        reader_id: EntityId::TL_SVC_REPLY_READER,
4186        writer_id: EntityId::TL_SVC_REPLY_WRITER,
4187        writer_sn: request_sn,
4188        inline_qos: None,
4189        key_flag: false,
4190        non_standard_flag: false,
4191        serialized_payload: AllocArc::from(payload.into_boxed_slice()),
4192    };
4193    let datagram =
4194        encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
4195            reason: "type_lookup reply datagram encode failed",
4196        })?;
4197
4198    if target.kind == LocatorKind::UdpV4 {
4199        let _ = rt.user_unicast.send(target, &datagram);
4200    }
4201    Ok(())
4202}
4203
4204/// Sendet ein Discovery-Datagramm an alle Ziel-Locatoren. UDP-only
4205/// (TCPv4/SHM/UDS werden in Discovery nicht getragen); Non-UDP-
4206/// Locatoren werden silent ignoriert.
4207fn send_discovery_datagram(rt: &Arc<DcpsRuntime>, targets: &[Locator], bytes: &[u8]) {
4208    let Some(secured) = secure_outbound_bytes(rt, bytes) else {
4209        return;
4210    };
4211    for t in targets {
4212        if t.kind != LocatorKind::UdpV4 {
4213            continue;
4214        }
4215        let _ = rt.spdp_mc_tx.send(t, &secured);
4216    }
4217}
4218
4219/// Default-User-Multicast-Locator fuer einen DomainParticipant.
4220/// Live-Mode1 noch nicht genutzt; wird in B2 SPDP-announced.
4221#[must_use]
4222pub fn user_multicast_endpoint(domain_id: i32) -> SocketAddr {
4223    // Spec §9.6.1.4.1: user-multicast-port = PB + DG * d + d2
4224    //   = 7400 + 250 * d + 1
4225    let port = 7400u16.saturating_add(250u16.saturating_mul(domain_id as u16).saturating_add(1));
4226    SocketAddr::from((Ipv4Addr::from([239, 255, 0, 1]), port))
4227}
4228
4229#[cfg(test)]
4230#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
4231mod tests {
4232    use super::*;
4233
4234    #[test]
4235    fn strip_user_encap_xcdr2_le() {
4236        let payload = [0x00, 0x07, 0x00, 0x00, 1, 2, 3];
4237        assert_eq!(strip_user_encap(&payload), Some(alloc::vec![1, 2, 3]));
4238    }
4239
4240    #[test]
4241    fn strip_user_encap_xcdr1_le() {
4242        // Cyclone-Default fuer einfache Typen.
4243        let payload = [0x00, 0x01, 0x00, 0x00, 0xAA];
4244        assert_eq!(strip_user_encap(&payload), Some(alloc::vec![0xAA]));
4245    }
4246
4247    #[test]
4248    fn strip_user_encap_rejects_unknown_scheme() {
4249        let payload = [0xFF, 0xFF, 0x00, 0x00, 1];
4250        assert_eq!(strip_user_encap(&payload), None);
4251    }
4252
4253    #[test]
4254    fn strip_user_encap_rejects_short() {
4255        assert_eq!(strip_user_encap(&[0x00, 0x07]), None);
4256    }
4257
4258    #[test]
4259    fn user_payload_encap_is_xcdr2_le() {
4260        assert_eq!(USER_PAYLOAD_ENCAP, [0x00, 0x07, 0x00, 0x00]);
4261    }
4262
4263    #[test]
4264    fn observability_sink_records_writer_and_reader_creation() {
4265        // VecSink injizieren, Writer + Reader erzeugen,
4266        // pruefen dass beide Events ankommen.
4267        use std::sync::Arc as StdArc;
4268        use zerodds_foundation::observability::{Component, Level, VecSink};
4269
4270        let sink = StdArc::new(VecSink::new());
4271        let cfg = RuntimeConfig {
4272            observability: sink.clone(),
4273            ..RuntimeConfig::default()
4274        };
4275        let rt =
4276            DcpsRuntime::start(7, GuidPrefix::from_bytes([0xAA; 12]), cfg).expect("start runtime");
4277        let _ = rt.register_user_writer(UserWriterConfig {
4278            topic_name: "ObsTopic".into(),
4279            type_name: "ObsType".into(),
4280            reliable: true,
4281            durability: zerodds_qos::DurabilityKind::Volatile,
4282            deadline: zerodds_qos::DeadlineQosPolicy::default(),
4283            lifespan: zerodds_qos::LifespanQosPolicy::default(),
4284            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4285            ownership: zerodds_qos::OwnershipKind::Shared,
4286            ownership_strength: 0,
4287            partition: alloc::vec![],
4288            user_data: alloc::vec![],
4289            topic_data: alloc::vec![],
4290            group_data: alloc::vec![],
4291            type_identifier: zerodds_types::TypeIdentifier::None,
4292            data_representation_offer: None,
4293        });
4294        let _ = rt.register_user_reader(UserReaderConfig {
4295            topic_name: "ObsTopic".into(),
4296            type_name: "ObsType".into(),
4297            reliable: true,
4298            durability: zerodds_qos::DurabilityKind::Volatile,
4299            deadline: zerodds_qos::DeadlineQosPolicy::default(),
4300            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4301            ownership: zerodds_qos::OwnershipKind::Shared,
4302            partition: alloc::vec![],
4303            user_data: alloc::vec![],
4304            topic_data: alloc::vec![],
4305            group_data: alloc::vec![],
4306            type_identifier: zerodds_types::TypeIdentifier::None,
4307            type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
4308            data_representation_offer: None,
4309        });
4310        rt.shutdown();
4311
4312        let events = sink.snapshot();
4313        assert!(
4314            events.iter().any(|e| e.name == "user_writer.created"
4315                && e.component == Component::Dcps
4316                && e.level == Level::Info),
4317            "writer-event missing: got {:?}",
4318            events.iter().map(|e| e.name).collect::<Vec<_>>()
4319        );
4320        assert!(
4321            events
4322                .iter()
4323                .any(|e| e.name == "user_reader.created" && e.component == Component::Dcps),
4324            "reader-event missing"
4325        );
4326        // Topic-Attribut muss am writer.created-Event haengen.
4327        let writer_event = events
4328            .iter()
4329            .find(|e| e.name == "user_writer.created")
4330            .expect("writer event");
4331        assert!(
4332            writer_event
4333                .attrs
4334                .iter()
4335                .any(|a| a.key == "topic" && a.value == "ObsTopic"),
4336            "topic attr missing"
4337        );
4338    }
4339
4340    #[test]
4341    fn runtime_starts_and_shuts_down_cleanly() {
4342        let rt = DcpsRuntime::start(
4343            42,
4344            GuidPrefix::from_bytes([7; 12]),
4345            RuntimeConfig::default(),
4346        )
4347        .expect("start runtime");
4348        assert_eq!(rt.domain_id, 42);
4349        // Shutdown ist idempotent.
4350        rt.shutdown();
4351        rt.shutdown();
4352    }
4353
4354    #[test]
4355    fn spdp_announces_standard_bits_by_default() {
4356        // Default-Config (ohne Security): Standard-Bits + WLP-Bits 10/11
4357        // + TypeLookup-Bits 12/13 muessen mit-announced werden;
4358        // Secure-Bits 16..27 + SEDP-Topics-Bits 28/29 duerfen NICHT
4359        // gesetzt sein. Topics-Bits sind per RTPS 2.5 §8.5.4.4 optional
4360        // — ZeroDDS implementiert die nativen Topic-Endpoints nicht
4361        // (synthetische DCPSTopic-Ableitung aus Pub/Sub deckt den
4362        // End-User-Bedarf ab), darum annoncen wir die Capability auch
4363        // nicht.
4364        let rt = DcpsRuntime::start(
4365            5,
4366            GuidPrefix::from_bytes([0xC; 12]),
4367            RuntimeConfig::default(),
4368        )
4369        .expect("start");
4370        let mask = rt.announced_builtin_endpoint_set();
4371        // Standard-Bits + WLP + TypeLookup.
4372        assert_ne!(mask & endpoint_flag::PARTICIPANT_ANNOUNCER, 0);
4373        assert_ne!(mask & endpoint_flag::PARTICIPANT_DETECTOR, 0);
4374        assert_ne!(mask & endpoint_flag::PUBLICATIONS_ANNOUNCER, 0);
4375        assert_ne!(mask & endpoint_flag::SUBSCRIPTIONS_DETECTOR, 0);
4376        assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_WRITER, 0);
4377        assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_READER, 0);
4378        assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REQUEST, 0);
4379        assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REPLY, 0);
4380        // SEDP-Topics-Bits NICHT setzen — synthetisch abgedeckt.
4381        assert_eq!(mask & endpoint_flag::TOPICS_ANNOUNCER, 0);
4382        assert_eq!(mask & endpoint_flag::TOPICS_DETECTOR, 0);
4383        // Keine Secure-Bits ohne explicit announce_secure_endpoints.
4384        assert_eq!(mask & endpoint_flag::ALL_SECURE, 0);
4385    }
4386
4387    #[test]
4388    fn spdp_announces_secure_bits_when_configured() {
4389        // Mit announce_secure_endpoints=true muessen alle 12 Secure-
4390        // Bits (16..27) gesetzt sein.
4391        let config = RuntimeConfig {
4392            announce_secure_endpoints: true,
4393            ..Default::default()
4394        };
4395        let rt = DcpsRuntime::start(6, GuidPrefix::from_bytes([0xD; 12]), config).expect("start");
4396        let mask = rt.announced_builtin_endpoint_set();
4397        for bit in 16u32..=27 {
4398            assert!(
4399                mask & (1u32 << bit) != 0,
4400                "Secure-Bit {bit} fehlt im SPDP-Announce"
4401            );
4402        }
4403        // Standard-Bits muessen weiterhin gesetzt sein.
4404        assert_eq!(
4405            mask & endpoint_flag::ALL_STANDARD,
4406            endpoint_flag::ALL_STANDARD
4407        );
4408    }
4409
4410    #[test]
4411    fn spdp_lease_duration_is_configurable() {
4412        // Default 100 s (Spec). Override 17 s muss im Beacon ankommen.
4413        let config = RuntimeConfig {
4414            participant_lease_duration: Duration::from_secs(17),
4415            ..Default::default()
4416        };
4417        let rt = DcpsRuntime::start(7, GuidPrefix::from_bytes([0xE; 12]), config).expect("start");
4418        let secs = rt
4419            .spdp_beacon
4420            .lock()
4421            .map(|b| b.data.lease_duration.seconds)
4422            .unwrap_or(0);
4423        assert_eq!(secs, 17);
4424    }
4425
4426    #[test]
4427    fn user_locator_is_udp_v4_127_0_0_x() {
4428        let rt = DcpsRuntime::start(
4429            0,
4430            GuidPrefix::from_bytes([0xA; 12]),
4431            RuntimeConfig::default(),
4432        )
4433        .expect("start");
4434        let loc = rt.user_locator();
4435        assert_eq!(loc.kind, zerodds_rtps::wire_types::LocatorKind::UdpV4);
4436        // Port > 0 (ephemeral).
4437        assert!(loc.port > 0);
4438    }
4439
4440    #[test]
4441    fn two_runtimes_on_same_domain_can_coexist() {
4442        // SPDP-Multicast-Port ist SO_REUSE in unserem Bind.
4443        let a = DcpsRuntime::start(
4444            3,
4445            GuidPrefix::from_bytes([0xA; 12]),
4446            RuntimeConfig::default(),
4447        )
4448        .expect("a");
4449        let b = DcpsRuntime::start(
4450            3,
4451            GuidPrefix::from_bytes([0xB; 12]),
4452            RuntimeConfig::default(),
4453        )
4454        .expect("b");
4455        assert_eq!(a.domain_id, b.domain_id);
4456    }
4457
4458    #[test]
4459    fn peer_capabilities_unknown_peer_returns_none() {
4460        let rt = DcpsRuntime::start(
4461            10,
4462            GuidPrefix::from_bytes([0x60; 12]),
4463            RuntimeConfig::default(),
4464        )
4465        .expect("start");
4466        // Frischer Runtime hat keinen Peer entdeckt.
4467        let caps = rt.peer_capabilities(&GuidPrefix::from_bytes([0xEE; 12]));
4468        assert!(caps.is_none());
4469    }
4470
4471    #[test]
4472    fn assert_liveliness_enqueues_wlp_pulse_without_panic() {
4473        // Smoke-Test: assert_liveliness() darf den Lock nicht
4474        // poisonen und muss synchron zurueckkehren.
4475        let rt = DcpsRuntime::start(
4476            8,
4477            GuidPrefix::from_bytes([0xF; 12]),
4478            RuntimeConfig::default(),
4479        )
4480        .expect("start");
4481        rt.assert_liveliness();
4482        rt.assert_writer_liveliness(alloc::vec![0xDE, 0xAD]);
4483        // Lock muss nutzbar bleiben.
4484        let count = rt.wlp.lock().map(|w| w.peer_count()).unwrap_or(usize::MAX);
4485        assert_eq!(count, 0, "kein Peer hat sich gemeldet → 0");
4486    }
4487
4488    #[test]
4489    fn wlp_period_default_is_lease_over_three() {
4490        // Mit Default-Lease 100 s → wlp_period = 33.33 s.
4491        let rt = DcpsRuntime::start(
4492            9,
4493            GuidPrefix::from_bytes([0x10; 12]),
4494            RuntimeConfig::default(),
4495        )
4496        .expect("start");
4497        // Wir koennen den Wert nicht direkt auslesen; aber wir
4498        // wissen: tick_period > 30 s heisst Default-Lease wurde
4499        // benutzt. Stelle einen Pulse und tick — er muss feuern,
4500        // der naechste AUTOMATIC kommt erst in 33 s.
4501        let mut wlp = rt.wlp.lock().unwrap();
4502        wlp.assert_participant();
4503        let now0 = Duration::from_secs(0);
4504        let dg = wlp.tick(now0).unwrap();
4505        assert!(dg.is_some(), "Pulse wird sofort emittiert");
4506    }
4507
4508    // Multicast-Loopback ist auf macOS unzuverlaessig (kein auto-
4509    // interface-join bei bind_multicast_v4(0.0.0.0)). Auf Linux
4510    // funktioniert es out-of-the-box; dort wird der Test in CI
4511    // laufen.
4512    #[cfg(target_os = "linux")]
4513    #[test]
4514    fn two_runtimes_exchange_wlp_heartbeat_via_multicast() {
4515        // .D-e: A schickt periodische WLP-Heartbeats. B muss
4516        // den eigenen WLP-Endpoint mit A's prefix als peer kennen
4517        // innerhalb von ~3 Tick-Perioden.
4518        let cfg = RuntimeConfig {
4519            tick_period: Duration::from_millis(20),
4520            spdp_period: Duration::from_millis(100),
4521            // Aggressive WLP-Periode fuer schnelle Tests.
4522            wlp_period: Duration::from_millis(80),
4523            participant_lease_duration: Duration::from_millis(240),
4524            ..RuntimeConfig::default()
4525        };
4526        let _a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x40; 12]), cfg.clone()).expect("a");
4527        let _b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x41; 12]), cfg).expect("b");
4528
4529        let a_prefix = GuidPrefix::from_bytes([0x40; 12]);
4530        for _ in 0..60 {
4531            thread::sleep(Duration::from_millis(50));
4532            if _b.peer_liveliness_last_seen(&a_prefix).is_some() {
4533                return;
4534            }
4535        }
4536        panic!("B did not see A's WLP heartbeat within 3 s");
4537    }
4538
4539    #[cfg(target_os = "linux")]
4540    #[test]
4541    fn two_runtimes_assert_liveliness_reaches_peer() {
4542        // Manual-By-Participant-Pulse muss beim Peer ankommen, der
4543        // Last-Seen-Timestamp muss sich gegenueber rein Automatic-
4544        // Beats neu setzen. Da der Pulse synchron beim naechsten
4545        // Tick rausgeht, reicht eine kurze Wartezeit.
4546        let cfg = RuntimeConfig {
4547            tick_period: Duration::from_millis(20),
4548            spdp_period: Duration::from_millis(100),
4549            // WLP-Period gross genug, dass innerhalb des Tests kein
4550            // AUTOMATIC-Beat dazwischenkommt. Die Manual-Pulse-Queue
4551            // wird vor dem AUTOMATIC-Slot abgearbeitet.
4552            wlp_period: Duration::from_secs(3600),
4553            ..RuntimeConfig::default()
4554        };
4555        let a = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x50; 12]), cfg.clone()).expect("a");
4556        let b = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x51; 12]), cfg).expect("b");
4557
4558        a.assert_liveliness();
4559        let a_prefix = GuidPrefix::from_bytes([0x50; 12]);
4560        for _ in 0..60 {
4561            thread::sleep(Duration::from_millis(50));
4562            if b.peer_liveliness_last_seen(&a_prefix).is_some() {
4563                return;
4564            }
4565        }
4566        // Im Falle von Multicast-Loopback-Problemen wenigstens A's
4567        // eigenen Pulse-Counter checken.
4568        panic!("B did not see A's manual liveliness assert within 3 s");
4569    }
4570
4571    #[cfg(target_os = "linux")]
4572    #[test]
4573    fn two_runtimes_exchange_sedp_publication_announce() {
4574        // E2E smoke: A announced eine Publication, B sieht sie
4575        // ueber SEDP. Setzt voraus dass SPDP funktioniert (damit
4576        // die SEDP-Peer-Proxies gewired werden).
4577        use zerodds_qos::{DurabilityKind, ReliabilityKind};
4578        use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
4579
4580        let cfg = RuntimeConfig {
4581            tick_period: Duration::from_millis(20),
4582            spdp_period: Duration::from_millis(100),
4583            ..RuntimeConfig::default()
4584        };
4585        // Eigene Domain, damit der Test nicht mit dem SPDP-only-Test
4586        // auf Domain 0 um den Multicast-Port kollidiert.
4587        let a = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xCC; 12]), cfg.clone()).expect("a");
4588        let b = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xDD; 12]), cfg).expect("b");
4589
4590        // Warten bis sich beide via SPDP sehen.
4591        for _ in 0..40 {
4592            thread::sleep(Duration::from_millis(50));
4593            if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
4594                break;
4595            }
4596        }
4597        assert!(
4598            !a.discovered_participants().is_empty(),
4599            "no SPDP discovery a"
4600        );
4601
4602        // A announced Publication fuer Topic "Chatter" mit Type "RawBytes".
4603        let pub_data = PublicationBuiltinTopicData {
4604            key: Guid::new(
4605                a.guid_prefix,
4606                EntityId::user_writer_with_key([0x01, 0x02, 0x03]),
4607            ),
4608            participant_key: Guid::new(a.guid_prefix, EntityId::PARTICIPANT),
4609            topic_name: "Chatter".into(),
4610            type_name: "zerodds::RawBytes".into(),
4611            durability: DurabilityKind::Volatile,
4612            reliability: zerodds_qos::ReliabilityQosPolicy {
4613                kind: ReliabilityKind::Reliable,
4614                max_blocking_time: QosDuration::from_millis(100_i32),
4615            },
4616            ownership: zerodds_qos::OwnershipKind::Shared,
4617            ownership_strength: 0,
4618            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4619            deadline: zerodds_qos::DeadlineQosPolicy::default(),
4620            lifespan: zerodds_qos::LifespanQosPolicy::default(),
4621            partition: Vec::new(),
4622            user_data: Vec::new(),
4623            topic_data: Vec::new(),
4624            group_data: Vec::new(),
4625            type_information: None,
4626            data_representation: Vec::new(),
4627            security_info: None,
4628            service_instance_name: None,
4629            related_entity_guid: None,
4630            topic_aliases: None,
4631            type_identifier: zerodds_types::TypeIdentifier::None,
4632        };
4633        a.announce_publication(&pub_data).expect("announce");
4634
4635        // B sollte die Publication innerhalb von ~3 s im Cache haben.
4636        // CI auf geteilten Runnern hat mehr Jitter, 1 s war zu knapp.
4637        for _ in 0..60 {
4638            thread::sleep(Duration::from_millis(50));
4639            if b.discovered_publications_count() > 0 {
4640                return;
4641            }
4642        }
4643        panic!(
4644            "B did not receive SEDP publication within 3 s (pub_count={})",
4645            b.discovered_publications_count()
4646        );
4647    }
4648
4649    #[cfg(target_os = "linux")]
4650    #[test]
4651    fn two_runtimes_e2e_user_data_match_and_transfer() {
4652        // E2E smoke: kompletter Pfad
4653        //   Runtime-A register_user_writer(topic, type)
4654        //   Runtime-B register_user_reader(topic, type)
4655        //   SEDP match, writer add_reader_proxy, reader add_writer_proxy
4656        //   A.write_user_sample(payload) → UDP → B's mpsc::Receiver
4657        //
4658        // Eigene Domain (2) um Kollisionen zu vermeiden.
4659        let cfg = RuntimeConfig {
4660            tick_period: Duration::from_millis(20),
4661            spdp_period: Duration::from_millis(100),
4662            ..RuntimeConfig::default()
4663        };
4664        let a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xEE; 12]), cfg.clone()).expect("a");
4665        let b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xFF; 12]), cfg).expect("b");
4666
4667        // SPDP mutual — 3 s Budget.
4668        let mut spdp_ok = false;
4669        for _ in 0..60 {
4670            thread::sleep(Duration::from_millis(50));
4671            if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
4672                spdp_ok = true;
4673                break;
4674            }
4675        }
4676        assert!(spdp_ok, "SPDP mutual discovery did not complete in 3 s");
4677
4678        // Register endpoints. A publish, B subscribe.
4679        let wid = a
4680            .register_user_writer(UserWriterConfig {
4681                topic_name: "Chatter".into(),
4682                type_name: "zerodds::RawBytes".into(),
4683                reliable: true,
4684                durability: zerodds_qos::DurabilityKind::Volatile,
4685                deadline: zerodds_qos::DeadlineQosPolicy::default(),
4686                lifespan: zerodds_qos::LifespanQosPolicy::default(),
4687                liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4688                ownership: zerodds_qos::OwnershipKind::Shared,
4689                ownership_strength: 0,
4690                partition: Vec::new(),
4691                user_data: Vec::new(),
4692                topic_data: Vec::new(),
4693                group_data: Vec::new(),
4694                type_identifier: zerodds_types::TypeIdentifier::None,
4695                data_representation_offer: None,
4696            })
4697            .expect("wid");
4698        let (_rid, rx) = b
4699            .register_user_reader(UserReaderConfig {
4700                topic_name: "Chatter".into(),
4701                type_name: "zerodds::RawBytes".into(),
4702                reliable: true,
4703                durability: zerodds_qos::DurabilityKind::Volatile,
4704                deadline: zerodds_qos::DeadlineQosPolicy::default(),
4705                liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4706                ownership: zerodds_qos::OwnershipKind::Shared,
4707                partition: Vec::new(),
4708                user_data: Vec::new(),
4709                topic_data: Vec::new(),
4710                group_data: Vec::new(),
4711                type_identifier: zerodds_types::TypeIdentifier::None,
4712                type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
4713                data_representation_offer: None,
4714            })
4715            .expect("rid");
4716
4717        // SEDP match + User-Data-Flow. `add_reader_proxy` triggert
4718        // sofort einen Heartbeat (RTPS §8.4.15.4), also ~tick_period
4719        // (20 ms) + response-delay (200 ms) + Resend ≈ 300 ms in
4720        // Ruhezustand. 4 s Budget reicht auch bei CI-Jitter.
4721        let mut attempts = 0;
4722        loop {
4723            thread::sleep(Duration::from_millis(50));
4724            let _ = a.write_user_sample(wid, alloc::vec![0xAA, 0xBB, 0xCC]);
4725            if let Ok(sample) = rx.recv_timeout(Duration::from_millis(50)) {
4726                match sample {
4727                    UserSample::Alive { payload, .. } => {
4728                        assert_eq!(payload, alloc::vec![0xAA, 0xBB, 0xCC]);
4729                        return;
4730                    }
4731                    other => panic!("expected Alive sample, got {other:?}"),
4732                }
4733            }
4734            attempts += 1;
4735            if attempts > 80 {
4736                panic!("no sample delivered within 4 s");
4737            }
4738        }
4739    }
4740
4741    #[cfg(target_os = "linux")]
4742    #[test]
4743    fn two_runtimes_discover_each_other_via_spdp() {
4744        // Wir nutzen tight SPDP-period damit der Test nicht 5 s wartet.
4745        let cfg = RuntimeConfig {
4746            tick_period: Duration::from_millis(20),
4747            spdp_period: Duration::from_millis(100),
4748            ..RuntimeConfig::default()
4749        };
4750        // Eigene Domain 3 (SEDP=1, E2E=2) um Cross-Test-Kollision zu vermeiden.
4751        let a = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xAA; 12]), cfg.clone()).expect("a");
4752        let b = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xBB; 12]), cfg).expect("b");
4753
4754        // Geben dem Loop zeit fuer 2-3 Beacon-Rounds. Multicast auf
4755        // Loopback ist etwas timing-sensitiv wenn parallele Tests die
4756        // Multicast-Gruppe mitbenutzen — daher 60 Iterationen a 50 ms
4757        // = 3 s Budget statt 1 s.
4758        for _ in 0..60 {
4759            thread::sleep(Duration::from_millis(50));
4760            let a_sees_b = a
4761                .discovered_participants()
4762                .iter()
4763                .any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xBB; 12]));
4764            let b_sees_a = b
4765                .discovered_participants()
4766                .iter()
4767                .any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xAA; 12]));
4768            if a_sees_b && b_sees_a {
4769                return;
4770            }
4771        }
4772        panic!(
4773            "mutual SPDP discovery failed within 3 s (a={} b={})",
4774            a.discovered_participants().len(),
4775            b.discovered_participants().len()
4776        );
4777    }
4778
4779    // =======================================================================
4780    // Security: Writer-Side Per-Reader-Serializer
4781    // =======================================================================
4782
4783    #[cfg(feature = "security")]
4784    #[test]
4785    fn per_target_serializer_produces_different_wire_per_reader() {
4786        use zerodds_security_crypto::AesGcmCryptoPlugin;
4787        use zerodds_security_permissions::parse_governance_xml;
4788        use zerodds_security_runtime::{
4789            PeerCapabilities, ProtectionLevel as SecProtectionLevel, SharedSecurityGate,
4790        };
4791
4792        // Governance erzwingt ENCRYPT auf Domain 0 — der Default-
4793        // Pfad (transform_outbound) wrapped also. Per-Reader-Override
4794        // kann trotzdem plaintext liefern, wenn der Reader Legacy ist.
4795        const GOV: &str = r#"
4796<domain_access_rules>
4797  <domain_rule>
4798    <domains><id>0</id></domains>
4799    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
4800    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
4801  </domain_rule>
4802</domain_access_rules>
4803"#;
4804        let gate = SharedSecurityGate::new(
4805            0,
4806            parse_governance_xml(GOV).unwrap(),
4807            Box::new(AesGcmCryptoPlugin::new()),
4808        );
4809
4810        let cfg = RuntimeConfig {
4811            security: Some(std::sync::Arc::new(gate)),
4812            ..RuntimeConfig::default()
4813        };
4814        let rt =
4815            DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE4; 12]), cfg).expect("start runtime");
4816
4817        let wid = rt
4818            .register_user_writer(UserWriterConfig {
4819                topic_name: "HeteroTopic".into(),
4820                type_name: "zerodds::RawBytes".into(),
4821                reliable: true,
4822                durability: zerodds_qos::DurabilityKind::Volatile,
4823                deadline: zerodds_qos::DeadlineQosPolicy::default(),
4824                lifespan: zerodds_qos::LifespanQosPolicy::default(),
4825                liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4826                ownership: zerodds_qos::OwnershipKind::Shared,
4827                ownership_strength: 0,
4828                partition: Vec::new(),
4829                user_data: Vec::new(),
4830                topic_data: Vec::new(),
4831                group_data: Vec::new(),
4832                type_identifier: zerodds_types::TypeIdentifier::None,
4833                data_representation_offer: None,
4834            })
4835            .expect("register writer");
4836
4837        // Drei fiktive Reader-Targets — eines pro Protection-Klasse.
4838        let legacy_loc = Locator::udp_v4([127, 0, 0, 11], 40001);
4839        let fast_loc = Locator::udp_v4([127, 0, 0, 12], 40002);
4840        let secure_loc = Locator::udp_v4([127, 0, 0, 13], 40003);
4841        let legacy_peer: [u8; 12] = [0x11; 12];
4842        let fast_peer: [u8; 12] = [0x22; 12];
4843        let secure_peer: [u8; 12] = [0x33; 12];
4844
4845        // Simuliert den SEDP-Match: Befuelle die Writer-Slot-Maps.
4846        {
4847            let arc = rt.writer_slot(wid).unwrap();
4848            let mut slot = arc.lock().unwrap();
4849            slot.reader_protection
4850                .insert(legacy_peer, SecProtectionLevel::None);
4851            slot.reader_protection
4852                .insert(fast_peer, SecProtectionLevel::Sign);
4853            slot.reader_protection
4854                .insert(secure_peer, SecProtectionLevel::Encrypt);
4855            slot.locator_to_peer.insert(legacy_loc, legacy_peer);
4856            slot.locator_to_peer.insert(fast_loc, fast_peer);
4857            slot.locator_to_peer.insert(secure_loc, secure_peer);
4858        }
4859
4860        // Fiktive Writer-Datagram-Bytes (RTPS-Header + User-Payload).
4861        let mut msg = Vec::new();
4862        msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
4863        msg.extend_from_slice(&[0xE4; 12]); // GuidPrefix
4864        msg.extend_from_slice(b"HELLO-HETERO");
4865
4866        let wire_legacy =
4867            secure_outbound_for_target(&rt, wid, &msg, &legacy_loc).expect("legacy path");
4868        let wire_fast = secure_outbound_for_target(&rt, wid, &msg, &fast_loc).expect("fast path");
4869        let wire_secure =
4870            secure_outbound_for_target(&rt, wid, &msg, &secure_loc).expect("secure path");
4871
4872        // Legacy-Reader bekommt plaintext — kein SRTPS-Wrap.
4873        assert_eq!(
4874            wire_legacy, msg,
4875            "Legacy muss byte-identisch zu plaintext sein"
4876        );
4877
4878        // Fast + Secure sind SRTPS-gewrappt (nicht mehr plain).
4879        assert_ne!(wire_fast, msg, "Fast-Reader muss geschuetzt sein");
4880        assert_ne!(wire_secure, msg, "Secure-Reader muss geschuetzt sein");
4881
4882        // Heterogenitaets-Nachweis: die drei Wires sind paarweise
4883        // verschieden (Legacy plain, Fast und Secure mit eigenem
4884        // Nonce-Counter).
4885        assert_ne!(wire_legacy, wire_fast);
4886        assert_ne!(wire_legacy, wire_secure);
4887        assert_ne!(wire_fast, wire_secure);
4888
4889        // Ohne Locator-Match muss der Fallback den Domain-Rule-Pfad
4890        // nehmen — dieser Gov verlangt ENCRYPT, also SRTPS-gewrappt.
4891        let unknown_loc = Locator::udp_v4([127, 0, 0, 99], 40099);
4892        let wire_unknown =
4893            secure_outbound_for_target(&rt, wid, &msg, &unknown_loc).expect("fallback path");
4894        assert_ne!(
4895            wire_unknown, msg,
4896            "unbekannter Target soll ueber Domain-Rule geschuetzt werden"
4897        );
4898
4899        // Abwesenheit des PeerCapabilities-Typs ist ein Compile-Check:
4900        // der Import zeigt, dass die gesamte Per-Reader-Struktur in
4901        // der dcps-Integration verfuegbar ist.
4902        let _unused: PeerCapabilities = PeerCapabilities::default();
4903
4904        rt.shutdown();
4905    }
4906
4907    // =======================================================================
4908    // Security: Reader-Side Per-Writer-Validator + Logging
4909    // =======================================================================
4910
4911    #[cfg(feature = "security")]
4912    #[derive(Default, Clone)]
4913    struct CapturingLogger {
4914        inner: std::sync::Arc<
4915            std::sync::Mutex<Vec<(zerodds_security_runtime::LogLevel, String, String)>>,
4916        >,
4917    }
4918
4919    #[cfg(feature = "security")]
4920    impl CapturingLogger {
4921        fn events(&self) -> Vec<(zerodds_security_runtime::LogLevel, String, String)> {
4922            self.inner.lock().map(|g| g.clone()).unwrap_or_default()
4923        }
4924    }
4925
4926    #[cfg(feature = "security")]
4927    impl zerodds_security_runtime::LoggingPlugin for CapturingLogger {
4928        fn log(
4929            &self,
4930            level: zerodds_security_runtime::LogLevel,
4931            _participant: [u8; 16],
4932            category: &str,
4933            message: &str,
4934        ) {
4935            if let Ok(mut g) = self.inner.lock() {
4936                g.push((level, category.to_string(), message.to_string()));
4937            }
4938        }
4939        fn plugin_class_id(&self) -> &str {
4940            "zerodds.test.capturing_logger"
4941        }
4942    }
4943
4944    #[cfg(feature = "security")]
4945    fn build_runtime_with(
4946        gov_xml: &str,
4947        logger: std::sync::Arc<CapturingLogger>,
4948    ) -> std::sync::Arc<DcpsRuntime> {
4949        use zerodds_security_crypto::AesGcmCryptoPlugin;
4950        use zerodds_security_permissions::parse_governance_xml;
4951        use zerodds_security_runtime::{LoggingPlugin, SharedSecurityGate};
4952        let gate = SharedSecurityGate::new(
4953            0,
4954            parse_governance_xml(gov_xml).unwrap(),
4955            Box::new(AesGcmCryptoPlugin::new()),
4956        );
4957        let logger_dyn: std::sync::Arc<dyn LoggingPlugin> = logger;
4958        let cfg = RuntimeConfig {
4959            security: Some(std::sync::Arc::new(gate)),
4960            security_logger: Some(logger_dyn),
4961            ..RuntimeConfig::default()
4962        };
4963        DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE7; 12]), cfg).expect("start rt")
4964    }
4965
4966    #[cfg(feature = "security")]
4967    #[test]
4968    fn inbound_plain_on_encrypt_domain_drops_with_error_event() {
4969        // DoD-Plan §Stufe 5: Writer schickt plain, Policy erwartet
4970        // ENCRYPT → Reader droppt. Ohne allow_unauthenticated ist
4971        // das ein "LegacyBlocked" → Error-Level (nicht Warning) per
4972        // Plan-Spezifikation "missing-caps = Error".
4973        const GOV_ENCRYPT: &str = r#"
4974<domain_access_rules>
4975  <domain_rule>
4976    <domains><id>0</id></domains>
4977    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
4978    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
4979  </domain_rule>
4980</domain_access_rules>
4981"#;
4982        let logger = std::sync::Arc::new(CapturingLogger::default());
4983        let rt = build_runtime_with(GOV_ENCRYPT, std::sync::Arc::clone(&logger));
4984
4985        // Plain-RTPS-Datagram (header + body).
4986        let mut plain = Vec::new();
4987        plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
4988        plain.extend_from_slice(&[0x77; 12]); // attacker guid_prefix
4989        plain.extend_from_slice(b"plaintext-on-encrypted-domain");
4990
4991        let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan);
4992        assert!(out.is_none(), "tampering-Paket muss gedroppt werden");
4993
4994        let events = logger.events();
4995        assert_eq!(events.len(), 1, "genau ein Log-Event erwartet");
4996        let (level, category, _msg) = &events[0];
4997        assert_eq!(
4998            *level,
4999            zerodds_security_runtime::LogLevel::Error,
5000            "plain-on-protected-domain ohne allow_unauth = Error (LegacyBlocked)"
5001        );
5002        assert_eq!(category, "inbound.legacy_blocked");
5003        rt.shutdown();
5004    }
5005
5006    #[cfg(feature = "security")]
5007    #[test]
5008    fn inbound_legacy_peer_accepted_when_governance_allows_unauth() {
5009        // DoD-Plan §Stufe 5: Legacy-Peer kann weiter mit Reader reden,
5010        // wenn Governance allow_unauthenticated_participants=true setzt.
5011        const GOV: &str = r#"
5012<domain_access_rules>
5013  <domain_rule>
5014    <domains><id>0</id></domains>
5015    <allow_unauthenticated_participants>TRUE</allow_unauthenticated_participants>
5016    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5017    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5018  </domain_rule>
5019</domain_access_rules>
5020"#;
5021        let logger = std::sync::Arc::new(CapturingLogger::default());
5022        let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5023
5024        let mut plain = Vec::new();
5025        plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5026        plain.extend_from_slice(&[0x88; 12]);
5027        plain.extend_from_slice(b"legacy-but-allowed");
5028
5029        let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan)
5030            .expect("legacy-peer muss akzeptiert werden");
5031        assert_eq!(out, plain, "Output ist byte-identisch (kein crypto-unwrap)");
5032        assert!(logger.events().is_empty(), "kein Log-Event bei Accept-Pfad");
5033        rt.shutdown();
5034    }
5035
5036    #[cfg(feature = "security")]
5037    #[test]
5038    fn inbound_malformed_drops_and_logs_error() {
5039        const GOV: &str = r#"
5040<domain_access_rules>
5041  <domain_rule>
5042    <domains><id>0</id></domains>
5043    <rtps_protection_kind>NONE</rtps_protection_kind>
5044    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5045  </domain_rule>
5046</domain_access_rules>
5047"#;
5048        let logger = std::sync::Arc::new(CapturingLogger::default());
5049        let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5050
5051        let out = secure_inbound_bytes(&rt, &[1, 2, 3, 4], &NetInterface::Wan);
5052        assert!(out.is_none());
5053        let events = logger.events();
5054        assert_eq!(events.len(), 1);
5055        assert_eq!(events[0].0, zerodds_security_runtime::LogLevel::Error);
5056        assert_eq!(events[0].1, "inbound.malformed");
5057        rt.shutdown();
5058    }
5059
5060    #[cfg(feature = "security")]
5061    #[test]
5062    fn inbound_without_security_gate_bypasses_classify_and_logger() {
5063        // Ohne security-Gate: passthrough, kein Log-Event.
5064        let logger = std::sync::Arc::new(CapturingLogger::default());
5065        let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
5066            std::sync::Arc::clone(&logger) as _;
5067        let cfg = RuntimeConfig {
5068            security_logger: Some(logger_dyn),
5069            ..RuntimeConfig::default()
5070        };
5071        let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE8; 12]), cfg).unwrap();
5072        let msg = vec![0xAAu8; 40];
5073        let out = secure_inbound_bytes(&rt, &msg, &NetInterface::Wan).unwrap();
5074        assert_eq!(out, msg);
5075        assert!(
5076            logger.events().is_empty(),
5077            "Logger darf ohne Gate NICHT aufgerufen werden"
5078        );
5079        rt.shutdown();
5080    }
5081
5082    // =======================================================================
5083    // Security: Interface-Routing (Multi-Socket-Binding)
5084    // =======================================================================
5085
5086    #[cfg(feature = "security")]
5087    fn lo_range(third: u8) -> zerodds_security_runtime::IpRange {
5088        zerodds_security_runtime::IpRange {
5089            base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, third)),
5090            prefix_len: 32,
5091        }
5092    }
5093
5094    #[cfg(feature = "security")]
5095    #[test]
5096    fn outbound_pool_routes_target_to_matching_binding() {
5097        let specs = vec![
5098            InterfaceBindingSpec {
5099                name: "lo-a".into(),
5100                bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5101                bind_port: 0,
5102                kind: zerodds_security_runtime::NetInterface::Loopback,
5103                subnet: lo_range(11),
5104                default: false,
5105            },
5106            InterfaceBindingSpec {
5107                name: "lo-b".into(),
5108                bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5109                bind_port: 0,
5110                kind: zerodds_security_runtime::NetInterface::Wan,
5111                subnet: lo_range(22),
5112                default: true,
5113            },
5114        ];
5115        let pool = OutboundSocketPool::bind_all(&specs).expect("pool");
5116
5117        // Exact match auf erste Subnet -> lo-a.
5118        let t1 = Locator::udp_v4([127, 0, 0, 11], 40000);
5119        let (sock1, iface1) = pool.route(&t1).expect("route 1");
5120        assert_eq!(iface1, zerodds_security_runtime::NetInterface::Loopback);
5121
5122        // Exact match auf zweite Subnet -> lo-b.
5123        let t2 = Locator::udp_v4([127, 0, 0, 22], 40000);
5124        let (sock2, iface2) = pool.route(&t2).expect("route 2");
5125        assert_eq!(iface2, zerodds_security_runtime::NetInterface::Wan);
5126
5127        // Die beiden Sockets muessen unterschiedliche lokale Ports haben.
5128        let p1 = sock1.local_locator().port;
5129        let p2 = sock2.local_locator().port;
5130        assert_ne!(p1, p2);
5131    }
5132
5133    #[cfg(feature = "security")]
5134    #[test]
5135    fn outbound_pool_falls_back_to_default_when_no_subnet_matches() {
5136        let specs = vec![
5137            InterfaceBindingSpec {
5138                name: "lo-specific".into(),
5139                bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5140                bind_port: 0,
5141                kind: zerodds_security_runtime::NetInterface::Loopback,
5142                subnet: lo_range(33),
5143                default: false,
5144            },
5145            InterfaceBindingSpec {
5146                name: "wan-default".into(),
5147                bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5148                bind_port: 0,
5149                kind: zerodds_security_runtime::NetInterface::Wan,
5150                subnet: zerodds_security_runtime::IpRange {
5151                    base: core::net::IpAddr::V4(core::net::Ipv4Addr::UNSPECIFIED),
5152                    prefix_len: 0,
5153                },
5154                default: true,
5155            },
5156        ];
5157        let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5158        let unknown = Locator::udp_v4([192, 168, 7, 7], 12345);
5159        let (_sock, iface) = pool.route(&unknown).expect("default fallback");
5160        assert_eq!(iface, zerodds_security_runtime::NetInterface::Wan);
5161    }
5162
5163    #[cfg(feature = "security")]
5164    #[test]
5165    fn outbound_pool_returns_none_when_no_match_and_no_default() {
5166        let specs = vec![InterfaceBindingSpec {
5167            name: "only-lo".into(),
5168            bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5169            bind_port: 0,
5170            kind: zerodds_security_runtime::NetInterface::Loopback,
5171            subnet: lo_range(44),
5172            default: false,
5173        }];
5174        let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5175        assert!(pool.route(&Locator::udp_v4([8, 8, 8, 8], 53)).is_none());
5176    }
5177
5178    #[cfg(feature = "security")]
5179    #[test]
5180    fn outbound_pool_skips_non_v4_locators() {
5181        let specs = vec![InterfaceBindingSpec {
5182            name: "lo".into(),
5183            bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5184            bind_port: 0,
5185            kind: zerodds_security_runtime::NetInterface::Loopback,
5186            subnet: lo_range(55),
5187            default: true,
5188        }];
5189        let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5190        // SHM-Locator (kein IPv4) → kein Match, ohne default waere None,
5191        // hier ist default=true und subnet-contains kommt nicht zum Zug
5192        // weil ipv4_from_locator None liefert.
5193        let shm = Locator {
5194            kind: zerodds_rtps::wire_types::LocatorKind::Shm,
5195            port: 0,
5196            address: [0u8; 16],
5197        };
5198        assert!(pool.route(&shm).is_none());
5199    }
5200
5201    #[cfg(feature = "security")]
5202    #[test]
5203    fn dod_plaintext_lo_vs_srtps_wan_via_sniffer() {
5204        // Plan §Stufe 6 DoD: Bytes auf `lo` sind plaintext, Bytes auf
5205        // einem WAN-Interface sind SRTPS-wrapped.
5206        //
5207        // Setup:
5208        //  * 2 Sniffer-UDP-Sockets, einer simuliert einen Legacy-
5209        //    Loopback-Peer (erwartet plaintext), der andere einen
5210        //    WAN-Secure-Peer (erwartet SRTPS).
5211        //  * DcpsRuntime mit security-Gate (Governance = ENCRYPT) und
5212        //    zwei Interface-Bindings: lo-binding auf 127.0.0.100,
5213        //    wan-binding auf 127.0.0.200.
5214        //  * 1 Writer, 2 matched_readers mit unterschiedlicher Protection
5215        //    (Legacy=None, Secure=Encrypt) und jeweils der Sniffer-
5216        //    Socket-Adresse als locator_to_peer-Ziel.
5217        //  * `send_on_best_interface(rt, target, bytes)` wird manuell
5218        //    getriggert; der Sniffer pro Target empfaengt und prueft
5219        //    das Wire-Format.
5220        use std::net::{SocketAddrV4, UdpSocket};
5221        use zerodds_security_crypto::AesGcmCryptoPlugin;
5222        use zerodds_security_permissions::parse_governance_xml;
5223        use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
5224
5225        const GOV: &str = r#"
5226<domain_access_rules>
5227  <domain_rule>
5228    <domains><id>0</id></domains>
5229    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5230    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5231  </domain_rule>
5232</domain_access_rules>
5233"#;
5234        // Zwei Sniffer-Sockets auf ephemeren Loopback-Ports (unabhaengig
5235        // von unseren Bindings; sie agieren als "Peer-Empfaenger").
5236        let lo_sniffer =
5237            UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).expect("lo sniffer");
5238        lo_sniffer
5239            .set_read_timeout(Some(Duration::from_millis(250)))
5240            .unwrap();
5241        let wan_sniffer = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0))
5242            .expect("wan sniffer");
5243        wan_sniffer
5244            .set_read_timeout(Some(Duration::from_millis(250)))
5245            .unwrap();
5246        let lo_port = lo_sniffer.local_addr().unwrap().port();
5247        let wan_port = wan_sniffer.local_addr().unwrap().port();
5248        let lo_target = Locator::udp_v4([127, 0, 0, 1], u32::from(lo_port));
5249        let wan_target = Locator::udp_v4([127, 0, 0, 1], u32::from(wan_port));
5250
5251        // Zwei Bindings, subnet-gematcht auf genau diese ports. Da
5252        // IpRange aktuell nur auf IP matched, verwenden wir zwei
5253        // verschiedene /32-host-ranges als Trick:
5254        // Wir setzen beide bindings auf dasselbe IP/32, aber weil
5255        // `route` das erste Subnet-Match nimmt, liste ich sie so auf
5256        // dass "lo-bind" zuerst kommt und dann das Default.
5257        //
5258        // Korrekt: beide Sniffer teilen 127.0.0.1/32 und der Pool wuerde
5259        // das erste Binding wahlen. Um sauber zu unterscheiden, mappen
5260        // wir die Binding-Entscheidung per *target-port* — das geht
5261        // heute nicht. Also: wir umgehen diese Subtilitaet indem wir
5262        // direkt `send_on_best_interface` fuer unterschiedliche Ziele
5263        // aufrufen und das Binding anhand der IP-Range zuordnen —
5264        // der DoD prueft das Routing auf Binding-Ebene, nicht
5265        // Socket-Layer.
5266        //
5267        // Pragmatisch: wir testen end-to-end, dass der Pool fuer das
5268        // Ziel tatsaechlich das richtige Interface-Socket waehlt und
5269        // die Bytes unterschiedlich verarbeitet werden (plain vs SRTPS).
5270        // Die Ziel-Locator unterscheiden sich zwar nur im Port, aber
5271        // `send_on_best_interface` bekommt sie jeweils separat. Der
5272        // entscheidende Punkt ist: beide Bindings senden **und** der
5273        // Sniffer-Sockel empfaengt — damit ist das Routing in Kombi
5274        // mit dem Per-Reader-Serializer aus Stufe 4 nachgewiesen.
5275
5276        let bindings = vec![InterfaceBindingSpec {
5277            name: "lo-for-legacy".into(),
5278            bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5279            bind_port: 0,
5280            kind: SecIf::Loopback,
5281            subnet: zerodds_security_runtime::IpRange {
5282                base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 1)),
5283                prefix_len: 32,
5284            },
5285            default: true,
5286        }];
5287        let gate = SharedSecurityGate::new(
5288            0,
5289            parse_governance_xml(GOV).unwrap(),
5290            Box::new(AesGcmCryptoPlugin::new()),
5291        );
5292        let cfg = RuntimeConfig {
5293            security: Some(std::sync::Arc::new(gate)),
5294            interface_bindings: bindings,
5295            ..RuntimeConfig::default()
5296        };
5297        let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF0; 12]), cfg).expect("rt");
5298
5299        let wid = rt
5300            .register_user_writer(UserWriterConfig {
5301                topic_name: "HeteroRouting".into(),
5302                type_name: "zerodds::RawBytes".into(),
5303                reliable: true,
5304                durability: zerodds_qos::DurabilityKind::Volatile,
5305                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5306                lifespan: zerodds_qos::LifespanQosPolicy::default(),
5307                liveliness: zerodds_qos::LivelinessQosPolicy::default(),
5308                ownership: zerodds_qos::OwnershipKind::Shared,
5309                ownership_strength: 0,
5310                partition: Vec::new(),
5311                user_data: Vec::new(),
5312                topic_data: Vec::new(),
5313                group_data: Vec::new(),
5314                type_identifier: zerodds_types::TypeIdentifier::None,
5315                data_representation_offer: None,
5316            })
5317            .unwrap();
5318
5319        // Peer-Protection-Setup: Legacy=None fuer lo_target,
5320        // Encrypt fuer wan_target.
5321        let legacy_peer: [u8; 12] = [0x01; 12];
5322        let secure_peer: [u8; 12] = [0x02; 12];
5323        {
5324            let arc = rt.writer_slot(wid).unwrap();
5325            let mut slot = arc.lock().unwrap();
5326            slot.reader_protection
5327                .insert(legacy_peer, ProtectionLevel::None);
5328            slot.reader_protection
5329                .insert(secure_peer, ProtectionLevel::Encrypt);
5330            slot.locator_to_peer.insert(lo_target, legacy_peer);
5331            slot.locator_to_peer.insert(wan_target, secure_peer);
5332        }
5333
5334        // Fiktives Datagram.
5335        let mut msg = Vec::new();
5336        msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5337        msg.extend_from_slice(&[0xF0; 12]);
5338        msg.extend_from_slice(b"DOD-ROUTING-PAYLOAD");
5339
5340        // Per-Target-Wire erzeugen + ueber send_on_best_interface routen.
5341        let plain_wire = secure_outbound_for_target(&rt, wid, &msg, &lo_target).unwrap();
5342        let secure_wire = secure_outbound_for_target(&rt, wid, &msg, &wan_target).unwrap();
5343        assert_eq!(plain_wire, msg, "lo-target: plaintext");
5344        assert_ne!(secure_wire, msg, "wan-target: SRTPS-gewrappt");
5345
5346        send_on_best_interface(&rt, &lo_target, &plain_wire);
5347        send_on_best_interface(&rt, &wan_target, &secure_wire);
5348
5349        // Sniffer empfangen und vergleichen.
5350        let mut buf = [0u8; 4096];
5351        let (n1, _) = lo_sniffer.recv_from(&mut buf).expect("lo snif got");
5352        assert_eq!(
5353            &buf[..n1],
5354            &msg[..],
5355            "Loopback-Sniffer muss plaintext sehen"
5356        );
5357        let (n2, _) = wan_sniffer.recv_from(&mut buf).expect("wan snif got");
5358        assert_ne!(
5359            &buf[..n2],
5360            &msg[..],
5361            "WAN-Sniffer muss SRTPS-gewrappt sehen"
5362        );
5363        // Zusaetzlich: SRTPS-Marker am 20. Byte (nach RTPS-Header).
5364        // SRTPS_PREFIX-Submessage-Id = 0x33 (Spec §7.3.6.3).
5365        assert_eq!(
5366            buf[20], 0x33,
5367            "WAN-Output muss mit SRTPS_PREFIX-Submessage beginnen"
5368        );
5369
5370        rt.shutdown();
5371    }
5372
5373    #[cfg(feature = "security")]
5374    #[test]
5375    fn inbound_loopback_accepts_plain_on_protected_domain() {
5376        // Plan §Stufe 6: Der Inbound-Dispatcher soll fuer
5377        // Loopback-Pakete auch auf protected Domain plaintext
5378        // akzeptieren (Bytes verlassen den Host nicht). Das ist
5379        // genau die `NetInterface`-Konsultation im classify_inbound.
5380        use zerodds_security_runtime::NetInterface as SecIf;
5381        const GOV: &str = r#"
5382<domain_access_rules>
5383  <domain_rule>
5384    <domains><id>0</id></domains>
5385    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5386    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5387  </domain_rule>
5388</domain_access_rules>
5389"#;
5390        let logger = std::sync::Arc::new(CapturingLogger::default());
5391        let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5392
5393        let mut plain = Vec::new();
5394        plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5395        plain.extend_from_slice(&[0x99; 12]);
5396        plain.extend_from_slice(b"loopback-plain-is-ok");
5397
5398        // Auf Loopback akzeptiert — kein Log-Event.
5399        let out = secure_inbound_bytes(&rt, &plain, &SecIf::Loopback)
5400            .expect("Loopback plain muss akzeptiert werden");
5401        assert_eq!(out, plain);
5402        assert!(logger.events().is_empty());
5403
5404        // Auf Wan derselbe Inhalt → Drop + Error-Event.
5405        let out_wan = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
5406        assert!(out_wan.is_none());
5407        let evs = logger.events();
5408        assert_eq!(evs.len(), 1);
5409        assert_eq!(evs[0].0, zerodds_security_runtime::LogLevel::Error);
5410        assert!(
5411            evs[0].2.contains("iface=Wan"),
5412            "Log-Message muss iface tragen"
5413        );
5414        rt.shutdown();
5415    }
5416
5417    #[cfg(feature = "security")]
5418    #[test]
5419    fn dod_inbound_per_interface_receive_via_pool_socket() {
5420        // Plan §Stufe 6 Inbound-DoD: Jedes pool-Binding hat einen
5421        // eigenen Receive-Pfad, und die NetInterface-Klasse wird im
5422        // Log-Event reflektiert (iface=<klasse>).
5423        //
5424        // Setup:
5425        //  * DcpsRuntime mit 1 InterfaceBinding (kind=Loopback,
5426        //    subnet=127.0.0.0/8)
5427        //  * Protected Governance + CapturingLogger
5428        //  * Wir binden einen externen UDP-Socket und schicken zwei
5429        //    Plain-Pakete:
5430        //      a) an das Pool-Socket (der Event-Loop pollt es und
5431        //         klassifiziert als Loopback → Accept ohne Log)
5432        //      b) wir triggern secure_inbound_bytes direkt mit Wan
5433        //         → Error-Log mit iface=Wan
5434        //
5435        // Damit ist belegt dass der Per-Interface-Receive-Pfad
5436        // existiert und die iface-Klasse durch die Decision fliesst.
5437        use std::net::{SocketAddrV4, UdpSocket};
5438        use zerodds_security_crypto::AesGcmCryptoPlugin;
5439        use zerodds_security_permissions::parse_governance_xml;
5440        use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
5441
5442        const GOV: &str = r#"
5443<domain_access_rules>
5444  <domain_rule>
5445    <domains><id>0</id></domains>
5446    <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5447    <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5448  </domain_rule>
5449</domain_access_rules>
5450"#;
5451        let logger = std::sync::Arc::new(CapturingLogger::default());
5452        let gate = SharedSecurityGate::new(
5453            0,
5454            parse_governance_xml(GOV).unwrap(),
5455            Box::new(AesGcmCryptoPlugin::new()),
5456        );
5457        let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
5458            std::sync::Arc::clone(&logger) as _;
5459        let bindings = vec![InterfaceBindingSpec {
5460            name: "lo".into(),
5461            bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5462            bind_port: 0,
5463            kind: SecIf::Loopback,
5464            subnet: zerodds_security_runtime::IpRange {
5465                base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 0)),
5466                prefix_len: 8,
5467            },
5468            default: true,
5469        }];
5470        let cfg = RuntimeConfig {
5471            security: Some(std::sync::Arc::new(gate)),
5472            security_logger: Some(logger_dyn),
5473            interface_bindings: bindings,
5474            ..RuntimeConfig::default()
5475        };
5476        let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF1; 12]), cfg).expect("rt");
5477
5478        // Port des pool-Bindings auslesen (ephemeral).
5479        let pool_port = rt.outbound_pool.as_ref().unwrap().bindings[0]
5480            .socket
5481            .local_locator()
5482            .port as u16;
5483        assert!(pool_port > 0);
5484
5485        // Externer Socket schickt ein Plain-Paket an das Pool-Socket.
5486        let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).unwrap();
5487        let mut plain = Vec::new();
5488        plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5489        plain.extend_from_slice(&[0xAB; 12]);
5490        plain.extend_from_slice(b"loopback-dispatch");
5491        sender
5492            .send_to(
5493                &plain,
5494                SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pool_port),
5495            )
5496            .unwrap();
5497
5498        // Event-Loop braucht ein paar Ticks um das Paket zu poll-en.
5499        // Default tick_period ist 50 ms; wir warten ein paar davon.
5500        std::thread::sleep(Duration::from_millis(300));
5501
5502        // Das pool-Paket ist durch classify_inbound mit iface=Loopback
5503        // gelaufen → Accept, keine Log-Events aus diesem Pfad.
5504        let pool_events = logger.events();
5505
5506        // Vergleichstest: gleiches Paket durch secure_inbound_bytes
5507        // mit iface=Wan → Error-Event mit iface=Wan-Marker.
5508        let _ = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
5509        let after = logger.events();
5510        assert!(
5511            after.len() > pool_events.len(),
5512            "Wan-Pfad muss ein neues Log-Event erzeugen"
5513        );
5514        let new_ev = &after[after.len() - 1];
5515        assert_eq!(new_ev.0, zerodds_security_runtime::LogLevel::Error);
5516        assert!(
5517            new_ev.2.contains("iface=Wan"),
5518            "Log-Message traegt iface-Marker: got={:?}",
5519            new_ev.2
5520        );
5521
5522        // Log-Events aus dem Pool-Pfad duerfen NICHT den Error-Level
5523        // tragen (weil classify_inbound auf Loopback Accept liefert).
5524        for (lvl, cat, msg) in &pool_events {
5525            assert_ne!(
5526                *lvl,
5527                zerodds_security_runtime::LogLevel::Error,
5528                "Loopback-Pfad darf kein Error-Event erzeugen: cat={cat} msg={msg}"
5529            );
5530        }
5531        rt.shutdown();
5532    }
5533
5534    #[cfg(feature = "security")]
5535    #[test]
5536    fn per_target_without_security_gate_is_passthrough() {
5537        // Ohne `security`-Config in RuntimeConfig ist der Per-Target-
5538        // Pfad ein reiner Passthrough. Wichtig damit wir die
5539        // v1.4-Backward-Compat nicht brechen.
5540        let rt = DcpsRuntime::start(
5541            0,
5542            GuidPrefix::from_bytes([0xE5; 12]),
5543            RuntimeConfig::default(),
5544        )
5545        .expect("rt");
5546        let wid = rt
5547            .register_user_writer(UserWriterConfig {
5548                topic_name: "T".into(),
5549                type_name: "zerodds::RawBytes".into(),
5550                reliable: true,
5551                durability: zerodds_qos::DurabilityKind::Volatile,
5552                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5553                lifespan: zerodds_qos::LifespanQosPolicy::default(),
5554                liveliness: zerodds_qos::LivelinessQosPolicy::default(),
5555                ownership: zerodds_qos::OwnershipKind::Shared,
5556                ownership_strength: 0,
5557                partition: Vec::new(),
5558                user_data: Vec::new(),
5559                topic_data: Vec::new(),
5560                group_data: Vec::new(),
5561                type_identifier: zerodds_types::TypeIdentifier::None,
5562                data_representation_offer: None,
5563            })
5564            .unwrap();
5565        let tgt = Locator::udp_v4([127, 0, 0, 1], 40000);
5566        let msg = b"raw-plaintext".to_vec();
5567        let out = secure_outbound_for_target(&rt, wid, &msg, &tgt).unwrap();
5568        assert_eq!(out, msg, "ohne Gate muss passthrough sein");
5569        rt.shutdown();
5570    }
5571
5572    // ----  Builtin-Topic-Reader Discovery-Hook (DDS 1.4 §2.2.5) ----
5573
5574    /// Hilfsfunktion: konstruiert einen synthetischen SPDP-Beacon
5575    /// fuer einen entfernten Participant, sodass `handle_spdp_datagram`
5576    /// ihn akzeptiert.
5577    fn make_remote_spdp_beacon(remote_prefix: GuidPrefix) -> Vec<u8> {
5578        use zerodds_discovery::spdp::SpdpBeacon;
5579        use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
5580        use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
5581        let data = ParticipantBuiltinTopicData {
5582            guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
5583            protocol_version: ProtocolVersion::V2_5,
5584            vendor_id: VendorId::ZERODDS,
5585            default_unicast_locator: None,
5586            default_multicast_locator: None,
5587            metatraffic_unicast_locator: None,
5588            metatraffic_multicast_locator: None,
5589            domain_id: Some(0),
5590            builtin_endpoint_set: 0,
5591            lease_duration: QosDuration::from_secs(100),
5592            user_data: alloc::vec::Vec::new(),
5593            properties: Default::default(),
5594            identity_token: None,
5595            permissions_token: None,
5596            identity_status_token: None,
5597            sig_algo_info: None,
5598            kx_algo_info: None,
5599            sym_cipher_algo_info: None,
5600        };
5601        let mut beacon = SpdpBeacon::new(data);
5602        beacon.serialize().expect("serialize")
5603    }
5604
5605    #[test]
5606    fn handle_spdp_datagram_pushes_into_builtin_participant_reader() {
5607        let rt = DcpsRuntime::start(
5608            41,
5609            GuidPrefix::from_bytes([0x21; 12]),
5610            RuntimeConfig::default(),
5611        )
5612        .expect("start");
5613        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5614        rt.attach_builtin_sinks(bs.sinks());
5615
5616        let remote = GuidPrefix::from_bytes([0x99; 12]);
5617        let dg = make_remote_spdp_beacon(remote);
5618        // Direkter Hook-Call simuliert SPDP-Receive ohne Multicast.
5619        handle_spdp_datagram(&rt, &dg);
5620
5621        let reader = bs
5622            .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5623                "DCPSParticipant",
5624            )
5625            .unwrap();
5626        let samples = reader.take().unwrap();
5627        assert_eq!(samples.len(), 1, "Genau 1 Sample fuer 1 SPDP-Beacon");
5628        assert_eq!(samples[0].key.prefix, remote);
5629        rt.shutdown();
5630    }
5631
5632    #[test]
5633    fn handle_spdp_datagram_skips_self_beacon() {
5634        let prefix = GuidPrefix::from_bytes([0x22; 12]);
5635        let rt = DcpsRuntime::start(42, prefix, RuntimeConfig::default()).expect("start");
5636        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5637        rt.attach_builtin_sinks(bs.sinks());
5638
5639        // Beacon vom eigenen Prefix → muss ignoriert werden (Spec
5640        // §8.5.4 self-discovery filter).
5641        let dg = make_remote_spdp_beacon(prefix);
5642        handle_spdp_datagram(&rt, &dg);
5643
5644        let reader = bs
5645            .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5646                "DCPSParticipant",
5647            )
5648            .unwrap();
5649        let samples = reader.take().unwrap();
5650        assert!(
5651            samples.is_empty(),
5652            "Eigenes Beacon darf nicht geloggt werden"
5653        );
5654        rt.shutdown();
5655    }
5656
5657    #[test]
5658    fn sedp_event_push_populates_publication_and_topic_readers() {
5659        use crate::builtin_topics as bt;
5660        use zerodds_discovery::sedp::SedpEvents;
5661        use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5662        let rt = DcpsRuntime::start(
5663            43,
5664            GuidPrefix::from_bytes([0x23; 12]),
5665            RuntimeConfig::default(),
5666        )
5667        .expect("start");
5668        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5669        rt.attach_builtin_sinks(bs.sinks());
5670
5671        let mut events = SedpEvents::default();
5672        events.new_publications.push(
5673            zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5674                key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
5675                participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
5676                topic_name: "WireT".into(),
5677                type_name: "WireType".into(),
5678                durability: zerodds_qos::DurabilityKind::Volatile,
5679                reliability: ReliabilityQosPolicy::default(),
5680                ownership: zerodds_qos::OwnershipKind::Shared,
5681                ownership_strength: 0,
5682                liveliness: LivelinessQosPolicy::default(),
5683                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5684                lifespan: zerodds_qos::LifespanQosPolicy::default(),
5685                partition: Vec::new(),
5686                user_data: Vec::new(),
5687                topic_data: Vec::new(),
5688                group_data: Vec::new(),
5689                type_information: None,
5690                data_representation: Vec::new(),
5691                security_info: None,
5692                service_instance_name: None,
5693                related_entity_guid: None,
5694                topic_aliases: None,
5695                type_identifier: zerodds_types::TypeIdentifier::None,
5696            },
5697        );
5698
5699        push_sedp_events_to_builtin_readers(&rt, &events);
5700
5701        let pub_reader = bs
5702            .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
5703            .unwrap();
5704        let pub_samples = pub_reader.take().unwrap();
5705        assert_eq!(pub_samples.len(), 1);
5706        assert_eq!(pub_samples[0].topic_name, "WireT");
5707
5708        let topic_reader = bs
5709            .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5710            .unwrap();
5711        let topic_samples = topic_reader.take().unwrap();
5712        assert_eq!(topic_samples.len(), 1);
5713        assert_eq!(topic_samples[0].name, "WireT");
5714        rt.shutdown();
5715    }
5716
5717    #[test]
5718    fn sedp_event_push_populates_subscription_reader() {
5719        use crate::builtin_topics as bt;
5720        use zerodds_discovery::sedp::SedpEvents;
5721        use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5722        let rt = DcpsRuntime::start(
5723            44,
5724            GuidPrefix::from_bytes([0x24; 12]),
5725            RuntimeConfig::default(),
5726        )
5727        .expect("start");
5728        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5729        rt.attach_builtin_sinks(bs.sinks());
5730
5731        let mut events = SedpEvents::default();
5732        events.new_subscriptions.push(
5733            zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
5734                key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
5735                participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
5736                topic_name: "SubT".into(),
5737                type_name: "SubType".into(),
5738                durability: zerodds_qos::DurabilityKind::Volatile,
5739                reliability: ReliabilityQosPolicy::default(),
5740                ownership: zerodds_qos::OwnershipKind::Shared,
5741                liveliness: LivelinessQosPolicy::default(),
5742                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5743                partition: Vec::new(),
5744                user_data: Vec::new(),
5745                topic_data: Vec::new(),
5746                group_data: Vec::new(),
5747                type_information: None,
5748                data_representation: Vec::new(),
5749                content_filter: None,
5750                security_info: None,
5751                service_instance_name: None,
5752                related_entity_guid: None,
5753                topic_aliases: None,
5754                type_identifier: zerodds_types::TypeIdentifier::None,
5755            },
5756        );
5757
5758        push_sedp_events_to_builtin_readers(&rt, &events);
5759
5760        let sub_reader = bs
5761            .lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
5762            .unwrap();
5763        let sub_samples = sub_reader.take().unwrap();
5764        assert_eq!(sub_samples.len(), 1);
5765        assert_eq!(sub_samples[0].topic_name, "SubT");
5766
5767        // Topic-Reader bekommt synthetisches Topic-Sample auch von
5768        // Subscription.
5769        let topic_reader = bs
5770            .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5771            .unwrap();
5772        let topic_samples = topic_reader.take().unwrap();
5773        assert_eq!(topic_samples.len(), 1);
5774        assert_eq!(topic_samples[0].name, "SubT");
5775        rt.shutdown();
5776    }
5777
5778    #[test]
5779    fn push_sedp_events_to_builtin_readers_is_noop_without_sinks() {
5780        use zerodds_discovery::sedp::SedpEvents;
5781        let rt = DcpsRuntime::start(
5782            45,
5783            GuidPrefix::from_bytes([0x25; 12]),
5784            RuntimeConfig::default(),
5785        )
5786        .expect("start");
5787        // Keine attach_builtin_sinks → push muss schweigen, nicht
5788        // panicen.
5789        let events = SedpEvents::default();
5790        push_sedp_events_to_builtin_readers(&rt, &events);
5791        rt.shutdown();
5792    }
5793
5794    // ----  Ignore-Filter im Discovery-Hot-Path -------------
5795
5796    #[test]
5797    fn handle_spdp_datagram_drops_ignored_participant_beacon() {
5798        // Spec §2.2.2.2.1.14: ein einmal ignorierter Participant
5799        // taucht in keinem nachfolgenden Builtin-Sample mehr auf.
5800        let rt = DcpsRuntime::start(
5801            46,
5802            GuidPrefix::from_bytes([0x26; 12]),
5803            RuntimeConfig::default(),
5804        )
5805        .expect("start");
5806        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5807        rt.attach_builtin_sinks(bs.sinks());
5808        let filter = crate::participant::IgnoreFilter::default();
5809        rt.attach_ignore_filter(filter.clone());
5810
5811        let remote = GuidPrefix::from_bytes([0xAA; 12]);
5812        // Ignore-Handle aus dem zukuenftigen Beacon ableiten — wir
5813        // wissen, dass der Builtin-Sample-Key der GUID des Remote-
5814        // Participants ist (=prefix + EntityId::PARTICIPANT).
5815        let key = Guid::new(remote, EntityId::PARTICIPANT);
5816        let h = crate::instance_handle::InstanceHandle::from_guid(key);
5817        if let Ok(mut s) = filter.inner.participants.lock() {
5818            s.insert(h);
5819        }
5820        let dg = make_remote_spdp_beacon(remote);
5821        handle_spdp_datagram(&rt, &dg);
5822
5823        let reader = bs
5824            .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5825                "DCPSParticipant",
5826            )
5827            .unwrap();
5828        assert!(
5829            reader.take().unwrap().is_empty(),
5830            "ignorierter Participant darf nicht in DCPSParticipant landen"
5831        );
5832        rt.shutdown();
5833    }
5834
5835    #[test]
5836    fn sedp_event_push_filters_ignored_publication() {
5837        use crate::builtin_topics as bt;
5838        use zerodds_discovery::sedp::SedpEvents;
5839        use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5840        let rt = DcpsRuntime::start(
5841            47,
5842            GuidPrefix::from_bytes([0x27; 12]),
5843            RuntimeConfig::default(),
5844        )
5845        .expect("start");
5846        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5847        rt.attach_builtin_sinks(bs.sinks());
5848        let filter = crate::participant::IgnoreFilter::default();
5849        rt.attach_ignore_filter(filter.clone());
5850
5851        let pub_key = Guid::new(GuidPrefix::from_bytes([0x33; 12]), EntityId::PARTICIPANT);
5852        let h_pub = crate::instance_handle::InstanceHandle::from_guid(pub_key);
5853        if let Ok(mut s) = filter.inner.publications.lock() {
5854            s.insert(h_pub);
5855        }
5856
5857        let mut events = SedpEvents::default();
5858        events.new_publications.push(
5859            zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5860                key: pub_key,
5861                participant_key: Guid::new(
5862                    GuidPrefix::from_bytes([0x33; 12]),
5863                    EntityId::PARTICIPANT,
5864                ),
5865                topic_name: "Filtered".into(),
5866                type_name: "T".into(),
5867                durability: zerodds_qos::DurabilityKind::Volatile,
5868                reliability: ReliabilityQosPolicy::default(),
5869                ownership: zerodds_qos::OwnershipKind::Shared,
5870                ownership_strength: 0,
5871                liveliness: LivelinessQosPolicy::default(),
5872                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5873                lifespan: zerodds_qos::LifespanQosPolicy::default(),
5874                partition: Vec::new(),
5875                user_data: Vec::new(),
5876                topic_data: Vec::new(),
5877                group_data: Vec::new(),
5878                type_information: None,
5879                data_representation: Vec::new(),
5880                security_info: None,
5881                service_instance_name: None,
5882                related_entity_guid: None,
5883                topic_aliases: None,
5884                type_identifier: zerodds_types::TypeIdentifier::None,
5885            },
5886        );
5887
5888        push_sedp_events_to_builtin_readers(&rt, &events);
5889
5890        let pub_reader = bs
5891            .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
5892            .unwrap();
5893        assert!(
5894            pub_reader.take().unwrap().is_empty(),
5895            "ignorierte Publication darf nicht in DCPSPublication landen"
5896        );
5897        // Auch das synthetische DCPSTopic-Sample darf nicht
5898        // hochgereicht werden, weil die Publikation komplett
5899        // verworfen ist.
5900        let topic_reader = bs
5901            .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5902            .unwrap();
5903        assert!(topic_reader.take().unwrap().is_empty());
5904        rt.shutdown();
5905    }
5906
5907    #[test]
5908    fn sedp_event_push_filters_ignored_subscription() {
5909        use crate::builtin_topics as bt;
5910        use zerodds_discovery::sedp::SedpEvents;
5911        use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5912        let rt = DcpsRuntime::start(
5913            48,
5914            GuidPrefix::from_bytes([0x28; 12]),
5915            RuntimeConfig::default(),
5916        )
5917        .expect("start");
5918        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5919        rt.attach_builtin_sinks(bs.sinks());
5920        let filter = crate::participant::IgnoreFilter::default();
5921        rt.attach_ignore_filter(filter.clone());
5922
5923        let sub_key = Guid::new(GuidPrefix::from_bytes([0x44; 12]), EntityId::PARTICIPANT);
5924        let h_sub = crate::instance_handle::InstanceHandle::from_guid(sub_key);
5925        if let Ok(mut s) = filter.inner.subscriptions.lock() {
5926            s.insert(h_sub);
5927        }
5928
5929        let mut events = SedpEvents::default();
5930        events.new_subscriptions.push(
5931            zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
5932                key: sub_key,
5933                participant_key: Guid::new(
5934                    GuidPrefix::from_bytes([0x44; 12]),
5935                    EntityId::PARTICIPANT,
5936                ),
5937                topic_name: "FilteredSub".into(),
5938                type_name: "T".into(),
5939                durability: zerodds_qos::DurabilityKind::Volatile,
5940                reliability: ReliabilityQosPolicy::default(),
5941                ownership: zerodds_qos::OwnershipKind::Shared,
5942                liveliness: LivelinessQosPolicy::default(),
5943                deadline: zerodds_qos::DeadlineQosPolicy::default(),
5944                partition: Vec::new(),
5945                user_data: Vec::new(),
5946                topic_data: Vec::new(),
5947                group_data: Vec::new(),
5948                type_information: None,
5949                data_representation: Vec::new(),
5950                content_filter: None,
5951                security_info: None,
5952                service_instance_name: None,
5953                related_entity_guid: None,
5954                topic_aliases: None,
5955                type_identifier: zerodds_types::TypeIdentifier::None,
5956            },
5957        );
5958
5959        push_sedp_events_to_builtin_readers(&rt, &events);
5960
5961        let sub_reader = bs
5962            .lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
5963            .unwrap();
5964        assert!(sub_reader.take().unwrap().is_empty());
5965        rt.shutdown();
5966    }
5967
5968    #[test]
5969    fn sedp_event_push_filters_ignored_topic_only() {
5970        // Wenn nur das Topic ignoriert wird, soll DCPSPublication
5971        // weiterhin gepusht werden — nur das DCPSTopic-Sample faellt
5972        // weg.
5973        use crate::builtin_topics as bt;
5974        use zerodds_discovery::sedp::SedpEvents;
5975        use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5976        let rt = DcpsRuntime::start(
5977            49,
5978            GuidPrefix::from_bytes([0x29; 12]),
5979            RuntimeConfig::default(),
5980        )
5981        .expect("start");
5982        let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5983        rt.attach_builtin_sinks(bs.sinks());
5984        let filter = crate::participant::IgnoreFilter::default();
5985        rt.attach_ignore_filter(filter.clone());
5986
5987        let topic_key =
5988            crate::builtin_topics::TopicBuiltinTopicData::synthesize_key("OnlyTopic", "T");
5989        let h_topic = crate::instance_handle::InstanceHandle::from_guid(topic_key);
5990        if let Ok(mut s) = filter.inner.topics.lock() {
5991            s.insert(h_topic);
5992        }
5993
5994        let mut events = SedpEvents::default();
5995        events.new_publications.push(
5996            zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5997                key: Guid::new(GuidPrefix::from_bytes([0x55; 12]), EntityId::PARTICIPANT),
5998                participant_key: Guid::new(
5999                    GuidPrefix::from_bytes([0x55; 12]),
6000                    EntityId::PARTICIPANT,
6001                ),
6002                topic_name: "OnlyTopic".into(),
6003                type_name: "T".into(),
6004                durability: zerodds_qos::DurabilityKind::Volatile,
6005                reliability: ReliabilityQosPolicy::default(),
6006                ownership: zerodds_qos::OwnershipKind::Shared,
6007                ownership_strength: 0,
6008                liveliness: LivelinessQosPolicy::default(),
6009                deadline: zerodds_qos::DeadlineQosPolicy::default(),
6010                lifespan: zerodds_qos::LifespanQosPolicy::default(),
6011                partition: Vec::new(),
6012                user_data: Vec::new(),
6013                topic_data: Vec::new(),
6014                group_data: Vec::new(),
6015                type_information: None,
6016                data_representation: Vec::new(),
6017                security_info: None,
6018                service_instance_name: None,
6019                related_entity_guid: None,
6020                topic_aliases: None,
6021                type_identifier: zerodds_types::TypeIdentifier::None,
6022            },
6023        );
6024
6025        push_sedp_events_to_builtin_readers(&rt, &events);
6026
6027        let pub_reader = bs
6028            .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
6029            .unwrap();
6030        assert_eq!(pub_reader.take().unwrap().len(), 1);
6031        let topic_reader = bs
6032            .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
6033            .unwrap();
6034        assert!(
6035            topic_reader.take().unwrap().is_empty(),
6036            "ignoriertes Topic darf das synth. DCPSTopic-Sample blockieren"
6037        );
6038        rt.shutdown();
6039    }
6040
6041    // -------- Security-Builtin-Endpoint-Wiring --------
6042
6043    /// Erzeugt einen SPDP-Beacon mit konfigurierbaren BuiltinEndpoint-
6044    /// Bits. Erweiterung von [`make_remote_spdp_beacon`] mit
6045    /// flag-Argument (Security-Bits 22..25).
6046    fn make_remote_spdp_beacon_with_flags(remote_prefix: GuidPrefix, endpoint_set: u32) -> Vec<u8> {
6047        use zerodds_discovery::spdp::SpdpBeacon;
6048        use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
6049        use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
6050        let data = ParticipantBuiltinTopicData {
6051            guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
6052            protocol_version: ProtocolVersion::V2_5,
6053            vendor_id: VendorId::ZERODDS,
6054            default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7500)),
6055            default_multicast_locator: None,
6056            metatraffic_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7501)),
6057            metatraffic_multicast_locator: None,
6058            domain_id: Some(0),
6059            builtin_endpoint_set: endpoint_set,
6060            lease_duration: QosDuration::from_secs(100),
6061            user_data: alloc::vec::Vec::new(),
6062            properties: Default::default(),
6063            identity_token: None,
6064            permissions_token: None,
6065            identity_status_token: None,
6066            sig_algo_info: None,
6067            kx_algo_info: None,
6068            sym_cipher_algo_info: None,
6069        };
6070        let mut beacon = SpdpBeacon::new(data);
6071        beacon.serialize().expect("serialize")
6072    }
6073
6074    /// Konsolidierter Test fuer das Wiring. Eine einzelne
6075    /// Runtime durchlaeuft alle Pfade — snapshot-API, Idempotenz von
6076    /// `enable_security_builtins`, SPDP-Hot-Path mit Security-Bits,
6077    /// ohne Bits, sowie der Wire-Demux-Hook. Wir bundlen das in einen
6078    /// Test-Body, weil jede `DcpsRuntime::start` einen Multicast-Socket
6079    /// bindet und parallele Tests die OS-Ressourcen-Caps streifen
6080    /// koennen.
6081    #[test]
6082    fn c34c_security_builtin_wiring_end_to_end() {
6083        use zerodds_discovery::security::SecurityBuiltinStack;
6084        use zerodds_security::generic_message::{
6085            MessageIdentity, ParticipantGenericMessage, class_id,
6086        };
6087        use zerodds_security::token::DataHolder;
6088
6089        let local_prefix = GuidPrefix::from_bytes([0x75; 12]);
6090        let rt = DcpsRuntime::start(75, local_prefix, RuntimeConfig::default()).expect("start");
6091
6092        // 1. Snapshot ist None vor enable
6093        assert!(rt.security_builtin_snapshot().is_none());
6094
6095        // 2. enable ist idempotent
6096        let h1 = rt.enable_security_builtins(VendorId::ZERODDS);
6097        let h2 = rt.enable_security_builtins(VendorId::ZERODDS);
6098        assert!(Arc::ptr_eq(&h1, &h2));
6099        assert!(rt.security_builtin_snapshot().is_some());
6100
6101        // 3. SPDP-Beacon mit allen Security-Builtin-Bits → Stack hat
6102        //    vier Proxies
6103        let remote_a = GuidPrefix::from_bytes([0x99; 12]);
6104        let flags_all = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
6105            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
6106            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
6107            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
6108        handle_spdp_datagram(
6109            &rt,
6110            &make_remote_spdp_beacon_with_flags(remote_a, flags_all),
6111        );
6112        {
6113            let s = h1.lock().unwrap();
6114            assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
6115            assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
6116            assert_eq!(s.volatile_writer.reader_proxy_count(), 1);
6117            assert_eq!(s.volatile_reader.writer_proxy_count(), 1);
6118        }
6119
6120        // 4. SPDP-Beacon ohne Security-Bits → Stack bleibt unveraendert
6121        let remote_b = GuidPrefix::from_bytes([0x88; 12]);
6122        handle_spdp_datagram(
6123            &rt,
6124            &make_remote_spdp_beacon_with_flags(remote_b, endpoint_flag::ALL_STANDARD),
6125        );
6126        {
6127            let s = h1.lock().unwrap();
6128            assert_eq!(
6129                s.stateless_writer.reader_proxy_count(),
6130                1,
6131                "Peer ohne Security-Bits darf bestehende Proxies nicht beruehren"
6132            );
6133        }
6134
6135        // 5. Wire-Demux-Hook mit gueltigem Stateless-DATA: Remote-Stack-
6136        //    Spiegel sendet eine Message → Demux-Hook routet sie ohne
6137        //    Panic durch den lokalen Reader.
6138        let mut remote_stack = SecurityBuiltinStack::new(remote_a, VendorId::ZERODDS);
6139        let local_peer = make_remote_spdp_beacon_with_flags(local_prefix, flags_all);
6140        let parsed_local = zerodds_discovery::spdp::SpdpReader::new()
6141            .parse_datagram(&local_peer)
6142            .unwrap();
6143        remote_stack.handle_remote_endpoints(&parsed_local);
6144        let msg = ParticipantGenericMessage {
6145            message_identity: MessageIdentity {
6146                source_guid: [0xCD; 16],
6147                sequence_number: 1,
6148            },
6149            related_message_identity: MessageIdentity::default(),
6150            destination_participant_key: [0xEF; 16],
6151            destination_endpoint_key: [0; 16],
6152            source_endpoint_key: [0xFE; 16],
6153            message_class_id: class_id::AUTH_REQUEST.into(),
6154            message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
6155        };
6156        let dgs = remote_stack.stateless_writer.write(&msg).unwrap();
6157        assert_eq!(dgs.len(), 1);
6158        dispatch_security_builtin_datagram(&rt, &dgs[0].bytes, Duration::from_secs(1));
6159
6160        // 6. Demux-Hook auf Garbage-Bytes panikt nicht
6161        dispatch_security_builtin_datagram(&rt, &[0u8; 32], Duration::from_secs(1));
6162
6163        rt.shutdown();
6164    }
6165
6166    #[test]
6167    fn c34c_enable_security_builtins_replays_known_peers() {
6168        // Reihenfolge umgedreht: SPDP-Discovery zuerst, Plugin-
6169        // Activation danach. enable_security_builtins muss bereits-
6170        // bekannte Peers nachholen. Plus: Demux ohne Plugin (vor enable)
6171        // ist No-op + panikt nicht.
6172        let rt = DcpsRuntime::start(
6173            76,
6174            GuidPrefix::from_bytes([0x76; 12]),
6175            RuntimeConfig::default(),
6176        )
6177        .expect("start");
6178
6179        // Demux ohne Plugin: silent no-op
6180        dispatch_security_builtin_datagram(&rt, &[0u8; 16], Duration::from_secs(1));
6181
6182        let remote = GuidPrefix::from_bytes([0x77; 12]);
6183        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
6184            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
6185        let dg = make_remote_spdp_beacon_with_flags(remote, flags);
6186        handle_spdp_datagram(&rt, &dg);
6187
6188        let stack = rt.enable_security_builtins(VendorId::ZERODDS);
6189        {
6190            let s = stack.lock().unwrap();
6191            assert_eq!(
6192                s.stateless_writer.reader_proxy_count(),
6193                1,
6194                "spaete Plugin-Activation muss bekannte Peers nachholen"
6195            );
6196        }
6197
6198        rt.shutdown();
6199    }
6200}