Skip to main content

zerodds_rtps/
publication_data.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! PublicationBuiltinTopicData (DDSI-RTPS 2.5 §8.5.4.2, §9.6.2.2.3).
4//!
5//! Inhalt der SEDP-Publications-DATA-Submessage, die ein Participant
6//! sendet, um einen lokalen DataWriter bei Remote-Participants bekannt
7//! zu machen. Serialisiert als PL_CDR_LE-encoded ParameterList in der
8//! `serialized_payload` einer DATA-Submessage.
9//!
10//! topic_name + type_name + GUIDs +
11//! minimale QoS-Felder (durability, reliability). Keine Deadline,
12//! Liveliness, Lifespan, Ownership, Partition etc. — die werden
13//! gelesen und in `extra`-Vec gespeichert, aber nicht typisiert.
14//!
15//! **QoS-Enums hier lokal** — sobald WP 1.5 volles QoS-Matching
16//! bringt, wandern DurabilityKind/ReliabilityKind nach `zerodds-qos`.
17
18extern crate alloc;
19use alloc::string::String;
20use alloc::vec::Vec;
21
22use crate::endpoint_security_info::EndpointSecurityInfo;
23use crate::error::WireError;
24use crate::parameter_list::{Parameter, ParameterList, pid};
25use crate::participant_data::{Duration, ENCAPSULATION_PL_CDR_LE};
26use crate::wire_types::Guid;
27
28/// Durability-QoS Kind.
29///
30/// Canonical in [`zerodds_qos::DurabilityKind`]; RTPS re-exportiert für
31/// Abwärtskompatibilität.
32pub use zerodds_qos::DurabilityKind;
33
34/// Reliability-QoS Kind.
35///
36/// Canonical in [`zerodds_qos::ReliabilityKind`]; RTPS re-exportiert.
37pub use zerodds_qos::ReliabilityKind;
38
39/// Reliability-QoS Wert: Kind + max_blocking_time.
40///
41/// Canonical in [`zerodds_qos::ReliabilityQosPolicy`]; RTPS re-exportiert
42/// unter dem historischen Alias `ReliabilityQos`.
43pub use zerodds_qos::ReliabilityQosPolicy as ReliabilityQos;
44
45/// `DataRepresentationId` — XTypes 1.3 §7.6.3.1.1 + RTPS 2.5 PID 0x0073.
46///
47/// Pro Spec: 16-bit signed integer; Werte 0..2 sind normativ definiert.
48/// Pro RTI/Cyclone/FastDDS Convention werden weitere Werte als
49/// vendor-specific reserviert.
50pub mod data_representation {
51    /// XCDR1 (legacy CDR Plain-CDR + PL_CDR mutable). Default wenn das
52    /// PID nicht present ist (Spec §7.6.3.1.2).
53    pub const XCDR: i16 = 0;
54    /// XML (rare, für CFP-Profile). Nicht in unserer Default-Liste.
55    pub const XML: i16 = 1;
56    /// XCDR2 (PLAIN_CDR2 + DELIMITED_CDR2 + PL_CDR2 mutable).
57    /// ZeroDDS' nativer Encap (`0x0007`/`0x0009`/`0x000B`).
58    pub const XCDR2: i16 = 2;
59
60    /// ZeroDDS-Default-Announce-Liste fuer Writer und Reader.
61    ///
62    /// **XCDR1 first** = legacy preferred (matched von strict-Spec-
63    /// Vendoren wie RTI Connext + RTI Shapes Demo deren Reader nur
64    /// `[XCDR1]` akzeptieren). **XCDR2 second** als modern-Fallback
65    /// fuer Peers die XCDR1 nicht koennen.
66    ///
67    /// Per Spec strict (XTypes 1.3 §7.6.3.1.2): Writer-Match besteht
68    /// wenn Writer's first-Element in Reader's accepted-list ist.
69    /// `[XCDR1, XCDR2]` matched also:
70    /// - Reader [XCDR1] (legacy/RTI strict): writer.first=XCDR1 ∈ {XCDR1} ✓
71    /// - Reader [XCDR1, XCDR2] (ZeroDDS/Cyclone/FastDDS): ✓
72    /// - Reader [XCDR2] (modern only): writer.first=XCDR1 ∉ {XCDR2} ✗
73    ///   → fuer XCDR2-only-Peers muss der User die Liste umstellen.
74    ///
75    /// User-Override: pro Writer/Reader via QoS, oder global durch
76    /// `RuntimeConfig::data_representation_offer` (TBD).
77    pub const DEFAULT_OFFER: [i16; 2] = [XCDR, XCDR2];
78
79    /// `DataRepMatchMode` — bestimmt, wie Writer und Reader DataRep-
80    /// Listen vergleichen.
81    ///
82    /// * **Strict** (XTypes 1.3 §7.6.3.1.2 normativ): Writer's FIRST
83    ///   Element muss in Reader's List sein. Genau wie RTI Connext.
84    /// * **Tolerant** (Industry-Norm, Cyclone + FastDDS): Match wenn
85    ///   die Listen ueberlappen (any-overlap), Wire-Format = first-overlap.
86    ///
87    /// Default in ZeroDDS: `Tolerant` — maximiert Interop, weil unsere
88    /// eigenen Reader auch dann RTI-Writer matchen, wenn das first-Element
89    /// nicht 100% deckungsgleich ist.
90    #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91    pub enum DataRepMatchMode {
92        /// Strict-Spec Match: Writer.first ∈ Reader.list.
93        Strict,
94        /// Tolerant Match: any element in Writer.list ∈ Reader.list.
95        /// Industry-Default.
96        #[default]
97        Tolerant,
98    }
99
100    /// Bestimmt das ausgehandelte Wire-Format zwischen Writer und Reader.
101    ///
102    /// Liefert `Some(id)` mit der DataRepresentationId die im Wire
103    /// emittiert werden soll. `None` heisst: keine Ueberlappung —
104    /// kein Match.
105    ///
106    /// `writer_offered` kann mehrere Werte enthalten (z.B.
107    /// `[XCDR2, XCDR1]`); `reader_accepted` ebenfalls.
108    /// Beide Listen koennen leer sein — Spec-Default ist `[XCDR1]`
109    /// in dem Fall (Spec §7.6.3.1.2).
110    #[must_use]
111    pub fn negotiate(
112        writer_offered: &[i16],
113        reader_accepted: &[i16],
114        mode: DataRepMatchMode,
115    ) -> Option<i16> {
116        // Spec-Defaults bei leeren Listen.
117        let w_default = [XCDR];
118        let r_default = [XCDR];
119        let w: &[i16] = if writer_offered.is_empty() {
120            &w_default
121        } else {
122            writer_offered
123        };
124        let r: &[i16] = if reader_accepted.is_empty() {
125            &r_default
126        } else {
127            reader_accepted
128        };
129
130        match mode {
131            DataRepMatchMode::Strict => {
132                // §7.6.3.1.2: Writer's first element muss in Reader's list sein.
133                let first = w.first().copied()?;
134                if r.contains(&first) {
135                    Some(first)
136                } else {
137                    None
138                }
139            }
140            DataRepMatchMode::Tolerant => {
141                // Industry: any overlap. Wir bevorzugen Writer's
142                // Praeferenz-Reihenfolge (first-match wins), aber
143                // sehen die VOLLE writer-Liste, nicht nur first.
144                w.iter().copied().find(|id| r.contains(id))
145            }
146        }
147    }
148
149    /// Encap-Header (4 byte) fuer @final-Structs unter der gegebenen
150    /// DataRep. Fuer @appendable/@mutable wird ein anderer
151    /// Encap-Code benoetigt — siehe `encap_for_extensibility`.
152    ///
153    /// Zurueckgabe-Format: `[byte0, byte1, byte2, byte3]` wobei
154    /// byte0 immer `0x00` und byte1 die Repr-ID nach RTPS 2.5
155    /// §10.5.
156    #[must_use]
157    pub fn encap_for_final_le(id: i16) -> [u8; 4] {
158        match id {
159            XCDR2 => [0x00, 0x07, 0x00, 0x00], // PLAIN_CDR2_LE
160            _ => [0x00, 0x01, 0x00, 0x00],     // CDR_LE (XCDR1 default)
161        }
162    }
163}
164
165/// Discovered Publication / lokaler DataWriter — Subset.
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct PublicationBuiltinTopicData {
168    /// Endpoint-GUID (= Writer-GUID).
169    pub key: Guid,
170    /// GUID des Participants, dem der Writer gehoert.
171    pub participant_key: Guid,
172    /// Topic-Name (DDS-Topic, z.B. "ChatterTopic").
173    pub topic_name: String,
174    /// IDL-Type-Name (z.B. "std_msgs::String").
175    pub type_name: String,
176    /// Durability-QoS.
177    pub durability: DurabilityKind,
178    /// Reliability-QoS.
179    pub reliability: ReliabilityQos,
180    /// Ownership-QoS (Spec §2.2.3.23). Default Shared.
181    pub ownership: zerodds_qos::OwnershipKind,
182    /// Ownership-Strength (Spec §2.2.3.24). Nur relevant wenn
183    /// `ownership == Exclusive`; Default 0.
184    pub ownership_strength: i32,
185    /// Liveliness-QoS (Spec §2.2.3.11).
186    pub liveliness: zerodds_qos::LivelinessQosPolicy,
187    /// Deadline-QoS (Spec §2.2.3.7).
188    pub deadline: zerodds_qos::DeadlineQosPolicy,
189    /// Lifespan-QoS (Spec §2.2.3.16) — writer-only.
190    pub lifespan: zerodds_qos::LifespanQosPolicy,
191    /// Partition-QoS (Spec §2.2.3.13). Leere Liste = "default partition" ("").
192    pub partition: Vec<String>,
193    /// UserData-QoS (Spec §2.2.3.1) — opaque sequence<octet>, Discovery-
194    /// propagiert. Leerer Vec = nicht gesetzt.
195    pub user_data: Vec<u8>,
196    /// TopicData-QoS (Spec §2.2.3.3) — opaque sequence<octet>, vom
197    /// Topic via Pub-Discovery propagiert.
198    pub topic_data: Vec<u8>,
199    /// GroupData-QoS (Spec §2.2.3.2) — opaque sequence<octet>, vom
200    /// Publisher via Pub-Discovery propagiert.
201    pub group_data: Vec<u8>,
202    /// Type-Information (TypeIdentifier-Hashes + Dependencies, XTypes
203    /// §7.6.3.2.2). Opaque bytes: die Struktur lebt in `zerodds-types`,
204    /// aber wir transportieren den serialisierten Blob, um zirkulaere
205    /// Crate-Abhaengigkeiten zu vermeiden.
206    pub type_information: Option<Vec<u8>>,
207    /// Akzeptierte Data-Representations (0=XCDR1, 1=XML, 2=XCDR2, ...).
208    /// Spec: XTypes 1.3 §7.6.3.1.1 / RTPS 2.5 PID 0x0073.
209    /// Default-Liste bei leer ist `[XCDR1]` per XTypes §7.6.3.1.2 — wir
210    /// emittieren das PID immer explicit, damit Strict-Vendoren wie
211    /// RTI 7.7.0 SEDP-matchen koennen.
212    pub data_representation: Vec<i16>,
213    /// Endpoint-Security-Info (PID 0x1004, DDS-Security 1.1 §7.4.1.5).
214    /// `None` bei Legacy-Peers ohne Security-PID. WP 4H-c matched
215    /// darauf: Writer/Reader-Paare mit inkompatiblen Protection-Leveln
216    /// werden abgelehnt.
217    pub security_info: Option<EndpointSecurityInfo>,
218    /// PID_SERVICE_INSTANCE_NAME (DDS-RPC 1.0 §7.8.2) — logischer
219    /// Service-Instance-Name eines RPC-Endpoints. `None` fuer
220    /// gewoehnliche Pub/Sub-Topics.
221    pub service_instance_name: Option<String>,
222    /// PID_RELATED_ENTITY_GUID (DDS-RPC 1.0 §7.8.2) — GUID des
223    /// Pendant-Endpoints in einem RPC-Endpoint-Pair. Bei einem
224    /// Request-Writer zeigt das auf den Reply-Reader desselben
225    /// Requesters; bei einem Reply-Writer auf den Request-Reader
226    /// desselben Repliers.
227    pub related_entity_guid: Option<Guid>,
228    /// PID_TOPIC_ALIASES (DDS-RPC 1.0 §7.8.2) — alternative Topic-
229    /// Namen fuer Routing-/Compat-Layer. Reihenfolge ist signifikant.
230    pub topic_aliases: Option<Vec<String>>,
231    /// PID_ZERODDS_TYPE_ID (Vendor-PID 0x8002) — XTypes-1.3 §7.3.4.2
232    /// TypeIdentifier des Writer-Type für XTypes-aware Reader-Match
233    /// (XTypes §7.6.3.7 + DDS 1.4 §2.2.3 TypeConsistencyEnforcement).
234    pub type_identifier: zerodds_types::TypeIdentifier,
235}
236
237impl PublicationBuiltinTopicData {
238    /// Encoded zu PL_CDR_LE-Bytes (mit 4-byte Encapsulation-Header).
239    /// Output ist direkt als `serialized_payload` einer DATA-
240    /// Submessage verwendbar.
241    ///
242    /// # Errors
243    /// `ValueOutOfRange` wenn ein String laenger als u32::MAX ist.
244    pub fn to_pl_cdr_le(&self) -> Result<Vec<u8>, WireError> {
245        let mut params = ParameterList::new();
246
247        // PARTICIPANT_GUID: 16 Byte
248        params.push(Parameter::new(
249            pid::PARTICIPANT_GUID,
250            self.participant_key.to_bytes().to_vec(),
251        ));
252
253        // ENDPOINT_GUID: 16 Byte
254        params.push(Parameter::new(
255            pid::ENDPOINT_GUID,
256            self.key.to_bytes().to_vec(),
257        ));
258
259        // TOPIC_NAME: CDR-String (4 byte len + UTF-8 + null)
260        params.push(Parameter::new(
261            pid::TOPIC_NAME,
262            encode_cdr_string_le(&self.topic_name)?,
263        ));
264
265        // TYPE_NAME: CDR-String
266        params.push(Parameter::new(
267            pid::TYPE_NAME,
268            encode_cdr_string_le(&self.type_name)?,
269        ));
270
271        // DURABILITY: 4 Byte u32
272        params.push(Parameter::new(
273            pid::DURABILITY,
274            (self.durability as u32).to_le_bytes().to_vec(),
275        ));
276
277        // RELIABILITY: 4 Byte kind + 8 Byte max_blocking_time
278        let mut rel = Vec::with_capacity(12);
279        rel.extend_from_slice(&(self.reliability.kind as u32).to_le_bytes());
280        rel.extend_from_slice(&self.reliability.max_blocking_time.to_bytes_le());
281        params.push(Parameter::new(pid::RELIABILITY, rel));
282
283        // OWNERSHIP: 4 Byte u32 kind
284        params.push(Parameter::new(
285            pid::OWNERSHIP,
286            encode_u32_le(self.ownership as u32).to_vec(),
287        ));
288
289        // OWNERSHIP_STRENGTH: 4 Byte int32 (nur sinnvoll bei Exclusive,
290        // aber wir schicken's immer — Reader ignoriert bei Shared).
291        params.push(Parameter::new(
292            pid::OWNERSHIP_STRENGTH,
293            encode_u32_le(self.ownership_strength as u32).to_vec(),
294        ));
295
296        // LIVELINESS: 4 Byte kind + 8 Byte lease_duration
297        params.push(Parameter::new(
298            pid::LIVELINESS,
299            encode_liveliness_le(self.liveliness),
300        ));
301
302        // DEADLINE: 8 Byte Duration_t
303        params.push(Parameter::new(
304            pid::DEADLINE,
305            encode_duration_le(self.deadline.period).to_vec(),
306        ));
307
308        // LIFESPAN: 8 Byte Duration_t
309        params.push(Parameter::new(
310            pid::LIFESPAN,
311            encode_duration_le(self.lifespan.duration).to_vec(),
312        ));
313
314        // PARTITION: nur wenn non-empty — leere Liste = Default (= "").
315        if !self.partition.is_empty() {
316            params.push(Parameter::new(
317                pid::PARTITION,
318                encode_partition_le(&self.partition)?,
319            ));
320        }
321
322        // USER_DATA / TOPIC_DATA / GROUP_DATA: opaque sequence<octet>.
323        // Wire = 4 byte u32 length + N byte data. Nur wenn gesetzt
324        // (leerer Vec = Default, lassen wir aus dem ParameterList).
325        if !self.user_data.is_empty() {
326            params.push(Parameter::new(
327                pid::USER_DATA,
328                encode_octet_seq_le(&self.user_data)?,
329            ));
330        }
331        if !self.topic_data.is_empty() {
332            params.push(Parameter::new(
333                pid::TOPIC_DATA,
334                encode_octet_seq_le(&self.topic_data)?,
335            ));
336        }
337        if !self.group_data.is_empty() {
338            params.push(Parameter::new(
339                pid::GROUP_DATA,
340                encode_octet_seq_le(&self.group_data)?,
341            ));
342        }
343
344        // TYPE_INFORMATION: serialisierter TypeInformation-Blob
345        // (optional, XTypes §7.6.3.2.2).
346        if let Some(ti) = &self.type_information {
347            params.push(Parameter::new(pid::TYPE_INFORMATION, ti.clone()));
348        }
349
350        // ENDPOINT_SECURITY_INFO: 2x u32 masks (§7.4.1.5). Nur wenn gesetzt,
351        // sonst Legacy-Verhalten (Cyclone/Fast-DDS ohne Security lassen
352        // die PID weg).
353        if let Some(info) = self.security_info {
354            params.push(Parameter::new(
355                pid::ENDPOINT_SECURITY_INFO,
356                info.to_bytes(true).to_vec(),
357            ));
358        }
359
360        // ----------------------------------------------------------------
361        // DDS-RPC 1.0 Discovery-PIDs (§7.8.2) — nur wenn gesetzt.
362        // ----------------------------------------------------------------
363        if let Some(name) = &self.service_instance_name {
364            params.push(Parameter::new(
365                pid::SERVICE_INSTANCE_NAME,
366                encode_cdr_string_le(name)?,
367            ));
368        }
369        if let Some(guid) = self.related_entity_guid {
370            params.push(Parameter::new(
371                pid::RELATED_ENTITY_GUID,
372                guid.to_bytes().to_vec(),
373            ));
374        }
375        if let Some(aliases) = &self.topic_aliases {
376            params.push(Parameter::new(
377                pid::TOPIC_ALIASES,
378                encode_partition_le(aliases)?,
379            ));
380        }
381
382        // PID_ZERODDS_TYPE_ID (F-TYPES-3 Wire-up).
383        if self.type_identifier != zerodds_types::TypeIdentifier::None {
384            let mut w = zerodds_cdr::BufferWriter::new(zerodds_cdr::Endianness::Little);
385            self.type_identifier
386                .encode_into(&mut w)
387                .map_err(|_| WireError::ValueOutOfRange {
388                    message: "type_identifier encoding failed",
389                })?;
390            params.push(Parameter::new(pid::ZERODDS_TYPE_ID, w.into_bytes()));
391        }
392
393        // DATA_REPRESENTATION: sequence<int16> — u32 Laenge + 2*N bytes.
394        if !self.data_representation.is_empty() {
395            let mut dr = Vec::with_capacity(4 + 2 * self.data_representation.len());
396            let len = u32::try_from(self.data_representation.len()).map_err(|_| {
397                WireError::ValueOutOfRange {
398                    message: "data_representation length exceeds u32::MAX",
399                }
400            })?;
401            dr.extend_from_slice(&len.to_le_bytes());
402            for rep in &self.data_representation {
403                dr.extend_from_slice(&rep.to_le_bytes());
404            }
405            params.push(Parameter::new(pid::DATA_REPRESENTATION, dr));
406        }
407
408        let mut out = Vec::with_capacity(params.parameters.len() * 24 + 16);
409        out.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
410        out.extend_from_slice(&[0, 0]); // options
411        out.extend_from_slice(&params.to_bytes(true));
412        Ok(out)
413    }
414
415    /// Decoded aus PL_CDR_LE-Bytes (mit Encapsulation-Header).
416    ///
417    /// # Errors
418    /// `UnexpectedEof` bei zu kurzen Bytes,
419    /// `UnsupportedEncapsulation` bei unbekanntem Encoding,
420    /// `ValueOutOfRange` wenn Pflicht-PIDs fehlen oder Werte
421    /// falsche Laenge haben.
422    pub fn from_pl_cdr_le(bytes: &[u8]) -> Result<Self, WireError> {
423        if bytes.len() < 4 {
424            return Err(WireError::UnexpectedEof {
425                needed: 4,
426                offset: 0,
427            });
428        }
429        let little_endian = match &bytes[..2] {
430            b if b == ENCAPSULATION_PL_CDR_LE => true,
431            [0x00, 0x02] => false,
432            other => {
433                return Err(WireError::UnsupportedEncapsulation {
434                    kind: [other[0], other[1]],
435                });
436            }
437        };
438        let pl = ParameterList::from_bytes(&bytes[4..], little_endian)?;
439
440        let key = pl
441            .find(pid::ENDPOINT_GUID)
442            .and_then(guid_from_param)
443            .ok_or(WireError::ValueOutOfRange {
444                message: "ENDPOINT_GUID missing or wrong length",
445            })?;
446
447        // PARTICIPANT_GUID ist technisch optional (kann aus ENDPOINT_GUID-
448        // Prefix abgeleitet werden), aber wir verlangen es, wenn es da ist.
449        let participant_key = pl
450            .find(pid::PARTICIPANT_GUID)
451            .and_then(guid_from_param)
452            .unwrap_or_else(|| {
453                // Fallback: Participant = ENDPOINT_GUID.prefix + PARTICIPANT-EntityId
454                Guid::new(key.prefix, crate::wire_types::EntityId::PARTICIPANT)
455            });
456
457        let topic_name = pl
458            .find(pid::TOPIC_NAME)
459            .map(|p| decode_cdr_string(&p.value, little_endian))
460            .transpose()?
461            .ok_or(WireError::ValueOutOfRange {
462                message: "TOPIC_NAME missing",
463            })?;
464
465        let type_name = pl
466            .find(pid::TYPE_NAME)
467            .map(|p| decode_cdr_string(&p.value, little_endian))
468            .transpose()?
469            .ok_or(WireError::ValueOutOfRange {
470                message: "TYPE_NAME missing",
471            })?;
472
473        let durability = pl
474            .find(pid::DURABILITY)
475            .and_then(|p| {
476                if p.value.len() >= 4 {
477                    let mut b = [0u8; 4];
478                    b.copy_from_slice(&p.value[..4]);
479                    Some(DurabilityKind::from_u32(if little_endian {
480                        u32::from_le_bytes(b)
481                    } else {
482                        u32::from_be_bytes(b)
483                    }))
484                } else {
485                    None
486                }
487            })
488            .unwrap_or_default();
489
490        let reliability = pl
491            .find(pid::RELIABILITY)
492            .and_then(|p| {
493                if p.value.len() >= 12 {
494                    let mut k = [0u8; 4];
495                    k.copy_from_slice(&p.value[..4]);
496                    let kind = ReliabilityKind::from_u32(if little_endian {
497                        u32::from_le_bytes(k)
498                    } else {
499                        u32::from_be_bytes(k)
500                    });
501                    let mut d = [0u8; 8];
502                    d.copy_from_slice(&p.value[4..12]);
503                    let max_blocking_time = if little_endian {
504                        Duration::from_bytes_le(d)
505                    } else {
506                        // BE-Decoding: seconds+fraction als BE interpretieren
507                        let mut s = [0u8; 4];
508                        s.copy_from_slice(&d[..4]);
509                        let mut f = [0u8; 4];
510                        f.copy_from_slice(&d[4..]);
511                        Duration {
512                            seconds: i32::from_be_bytes(s),
513                            fraction: u32::from_be_bytes(f),
514                        }
515                    };
516                    Some(ReliabilityQos {
517                        kind,
518                        max_blocking_time,
519                    })
520                } else {
521                    None
522                }
523            })
524            .unwrap_or_default();
525
526        let ownership = pl
527            .find(pid::OWNERSHIP)
528            .and_then(|p| decode_u32(&p.value, little_endian))
529            .map(zerodds_qos::OwnershipKind::from_u32)
530            .unwrap_or_default();
531
532        let ownership_strength = pl
533            .find(pid::OWNERSHIP_STRENGTH)
534            .and_then(|p| decode_i32(&p.value, little_endian))
535            .unwrap_or(0);
536
537        let liveliness = pl
538            .find(pid::LIVELINESS)
539            .and_then(|p| decode_liveliness(&p.value, little_endian))
540            .unwrap_or_default();
541
542        let deadline = pl
543            .find(pid::DEADLINE)
544            .and_then(|p| decode_duration(&p.value, little_endian))
545            .map(|period| zerodds_qos::DeadlineQosPolicy { period })
546            .unwrap_or_default();
547
548        let lifespan = pl
549            .find(pid::LIFESPAN)
550            .and_then(|p| decode_duration(&p.value, little_endian))
551            .map(|duration| zerodds_qos::LifespanQosPolicy { duration })
552            .unwrap_or_default();
553
554        let partition = pl
555            .find(pid::PARTITION)
556            .and_then(|p| decode_partition(&p.value, little_endian))
557            .unwrap_or_default();
558
559        let user_data = pl
560            .find(pid::USER_DATA)
561            .and_then(|p| decode_octet_seq(&p.value, little_endian))
562            .unwrap_or_default();
563        let topic_data = pl
564            .find(pid::TOPIC_DATA)
565            .and_then(|p| decode_octet_seq(&p.value, little_endian))
566            .unwrap_or_default();
567        let group_data = pl
568            .find(pid::GROUP_DATA)
569            .and_then(|p| decode_octet_seq(&p.value, little_endian))
570            .unwrap_or_default();
571
572        let type_information = pl.find(pid::TYPE_INFORMATION).map(|p| p.value.clone());
573
574        let security_info = pl
575            .find(pid::ENDPOINT_SECURITY_INFO)
576            .and_then(|p| EndpointSecurityInfo::from_bytes(&p.value, little_endian).ok());
577
578        let service_instance_name = pl
579            .find(pid::SERVICE_INSTANCE_NAME)
580            .map(|p| decode_cdr_string(&p.value, little_endian))
581            .transpose()
582            .ok()
583            .flatten();
584        let related_entity_guid = pl.find(pid::RELATED_ENTITY_GUID).and_then(guid_from_param);
585        let topic_aliases = pl
586            .find(pid::TOPIC_ALIASES)
587            .and_then(|p| decode_partition(&p.value, little_endian));
588
589        let type_identifier = pl
590            .find(pid::ZERODDS_TYPE_ID)
591            .and_then(|p| {
592                let mut r =
593                    zerodds_cdr::BufferReader::new(&p.value, zerodds_cdr::Endianness::Little);
594                zerodds_types::TypeIdentifier::decode_from(&mut r).ok()
595            })
596            .unwrap_or_default();
597
598        let data_representation = pl
599            .find(pid::DATA_REPRESENTATION)
600            .map(|p| {
601                let v = &p.value;
602                if v.len() < 4 {
603                    return Vec::new();
604                }
605                let mut n_bytes = [0u8; 4];
606                n_bytes.copy_from_slice(&v[..4]);
607                let n = if little_endian {
608                    u32::from_le_bytes(n_bytes)
609                } else {
610                    u32::from_be_bytes(n_bytes)
611                } as usize;
612                // DoS-Cap: Vec::with_capacity(n) koennte bei n=u32::MAX/2
613                // ca. 4 GB reservieren. Kappen auf tatsaechlich lesbare
614                // Elemente: (v.len()-4)/2 i16-Elemente.
615                let cap = n.min(v.len().saturating_sub(4) / 2);
616                let mut reps = Vec::with_capacity(cap);
617                for i in 0..n {
618                    let off = 4 + i * 2;
619                    if off + 2 > v.len() {
620                        break;
621                    }
622                    let mut b = [0u8; 2];
623                    b.copy_from_slice(&v[off..off + 2]);
624                    reps.push(if little_endian {
625                        i16::from_le_bytes(b)
626                    } else {
627                        i16::from_be_bytes(b)
628                    });
629                }
630                reps
631            })
632            .unwrap_or_default();
633
634        Ok(Self {
635            key,
636            participant_key,
637            topic_name,
638            type_name,
639            durability,
640            reliability,
641            ownership,
642            ownership_strength,
643            liveliness,
644            deadline,
645            lifespan,
646            partition,
647            user_data,
648            topic_data,
649            group_data,
650            type_information,
651            data_representation,
652            security_info,
653            service_instance_name,
654            related_entity_guid,
655            topic_aliases,
656            type_identifier,
657        })
658    }
659}
660
661// ============================================================================
662// Helpers
663// ============================================================================
664
665/// ADR-0006 / zerodds-flatdata-1.0 §3.1: injiziert PID_SHM_LOCATOR
666/// (Vendor-PID 0x8001) in eine bereits PL-CDR-LE-encodierte
667/// `PublicationBuiltinTopicData` Bytes-Sequenz. Das Vendor-PID
668/// traegt KEIN MUST_UNDERSTAND-Bit — fremde Vendoren ignorieren
669/// es safe, ZeroDDS-Reader auf demselben Host attachen an SHM.
670///
671/// Side-Map-Pattern: das Feld wandert nicht in den Wire-Struct
672/// (sonst 21+ Construction-Sites cross-workspace), sondern liegt
673/// als `BTreeMap<EntityId, Vec<u8>>` in `DcpsRuntime` und wird
674/// via diese Helper am Wire-Encode-Ende eingebracht.
675///
676/// Der Inhalt von `locator_bytes` ist die bereits gepackte
677/// SHM-Locator-Struktur (siehe zerodds-flatdata-1.0 §3.1.2:
678/// `u32 hostname_hash` plus `u32 uid` plus `u32 slot_count` plus
679/// `u32 slot_size` plus CDR-String `segment_path`). Der Caller
680/// serialisiert das vor dem Aufruf.
681///
682/// # Errors
683/// `ValueOutOfRange` wenn `bytes` keinen Sentinel-Trailer
684/// (`0x01 0x00 0x00 0x00`) am Ende hat oder zu kurz ist.
685pub fn inject_pid_shm_locator(bytes: &mut Vec<u8>, locator_bytes: &[u8]) -> Result<(), WireError> {
686    use crate::parameter_list::pid;
687    if bytes.len() < 4 {
688        return Err(WireError::ValueOutOfRange {
689            message: "inject_pid_shm_locator: bytes too short",
690        });
691    }
692    let sentinel_pos = bytes.len() - 4;
693    // Sentinel-Tag = 0x01 0x00 (PID_SENTINEL) + 0x00 0x00 (length).
694    if bytes[sentinel_pos..] != [0x01, 0x00, 0x00, 0x00] {
695        return Err(WireError::ValueOutOfRange {
696            message: "inject_pid_shm_locator: missing PID_SENTINEL trailer",
697        });
698    }
699    // Padded auf 4-Byte-Boundary fuer ParameterList-Konformitaet.
700    let padded_len = (locator_bytes.len() + 3) & !3;
701    if padded_len > u16::MAX as usize {
702        return Err(WireError::ValueOutOfRange {
703            message: "inject_pid_shm_locator: locator > u16::MAX",
704        });
705    }
706    let mut inject = Vec::with_capacity(4 + padded_len + 4);
707    inject.extend_from_slice(&pid::SHM_LOCATOR.to_le_bytes());
708    inject.extend_from_slice(&(padded_len as u16).to_le_bytes());
709    inject.extend_from_slice(locator_bytes);
710    // Zero-Pad auf 4-Byte-Boundary.
711    inject.resize(inject.len() + (padded_len - locator_bytes.len()), 0);
712    // Append den (entfernten) Sentinel-Trailer.
713    inject.extend_from_slice(&bytes[sentinel_pos..]);
714    bytes.truncate(sentinel_pos);
715    bytes.extend_from_slice(&inject);
716    Ok(())
717}
718
719pub(crate) fn guid_from_param(p: &Parameter) -> Option<Guid> {
720    if p.value.len() == 16 {
721        let mut g = [0u8; 16];
722        g.copy_from_slice(&p.value);
723        Some(Guid::from_bytes(g))
724    } else {
725        None
726    }
727}
728
729/// CDR-String als Value-Bytes (inkl. 4-Byte-Length-Prefix + Null-
730/// Terminator, plus evtl. Padding auf 4-Byte-Boundary). LE only —
731/// ParameterList-Values werden in der Endianness der Submessage
732/// geschrieben, die wir auf LE festgelegt haben.
733/// Encoded 8 Byte LE Duration_t. Kein trailing padding (aus-aligned).
734pub(crate) fn encode_duration_le(d: Duration) -> [u8; 8] {
735    let mut out = [0u8; 8];
736    out[..4].copy_from_slice(&d.seconds.to_le_bytes());
737    out[4..].copy_from_slice(&d.fraction.to_le_bytes());
738    out
739}
740
741pub(crate) fn decode_duration(value: &[u8], little_endian: bool) -> Option<Duration> {
742    if value.len() < 8 {
743        return None;
744    }
745    let mut s = [0u8; 4];
746    s.copy_from_slice(&value[..4]);
747    let mut f = [0u8; 4];
748    f.copy_from_slice(&value[4..8]);
749    if little_endian {
750        Some(Duration {
751            seconds: i32::from_le_bytes(s),
752            fraction: u32::from_le_bytes(f),
753        })
754    } else {
755        Some(Duration {
756            seconds: i32::from_be_bytes(s),
757            fraction: u32::from_be_bytes(f),
758        })
759    }
760}
761
762/// Encode u32 LE.
763pub(crate) fn encode_u32_le(v: u32) -> [u8; 4] {
764    v.to_le_bytes()
765}
766
767pub(crate) fn decode_u32(value: &[u8], little_endian: bool) -> Option<u32> {
768    if value.len() < 4 {
769        return None;
770    }
771    let mut b = [0u8; 4];
772    b.copy_from_slice(&value[..4]);
773    if little_endian {
774        Some(u32::from_le_bytes(b))
775    } else {
776        Some(u32::from_be_bytes(b))
777    }
778}
779
780pub(crate) fn decode_i32(value: &[u8], little_endian: bool) -> Option<i32> {
781    decode_u32(value, little_endian).map(|u| u as i32)
782}
783
784/// LivelinessQos encoden: 4 Byte kind + 8 Byte lease_duration = 12 Byte.
785pub(crate) fn encode_liveliness_le(l: zerodds_qos::LivelinessQosPolicy) -> Vec<u8> {
786    let mut out = Vec::with_capacity(12);
787    out.extend_from_slice(&(l.kind as u32).to_le_bytes());
788    out.extend_from_slice(&encode_duration_le(l.lease_duration));
789    out
790}
791
792pub(crate) fn decode_liveliness(
793    value: &[u8],
794    little_endian: bool,
795) -> Option<zerodds_qos::LivelinessQosPolicy> {
796    if value.len() < 12 {
797        return None;
798    }
799    let kind_u = decode_u32(&value[..4], little_endian)?;
800    let lease = decode_duration(&value[4..12], little_endian)?;
801    Some(zerodds_qos::LivelinessQosPolicy {
802        kind: zerodds_qos::LivelinessKind::from_u32(kind_u),
803        lease_duration: lease,
804    })
805}
806
807/// Partition = sequence<string>. CDR-Layout: u32 count + N × CDR-String
808/// (jeder CDR-String mit eigenem Alignment-Padding).
809/// Encoded eine opaque `sequence<octet>` als `u32 length + N byte data`,
810/// gepaddet auf 4-Byte-Boundary. DDS QoS UserData/TopicData/GroupData.
811pub fn encode_octet_seq_le(data: &[u8]) -> Result<Vec<u8>, WireError> {
812    let len = u32::try_from(data.len()).map_err(|_| WireError::ValueOutOfRange {
813        message: "octet sequence length exceeds u32::MAX",
814    })?;
815    let mut out = Vec::with_capacity(4 + data.len() + 3);
816    out.extend_from_slice(&len.to_le_bytes());
817    out.extend_from_slice(data);
818    while out.len() % 4 != 0 {
819        out.push(0);
820    }
821    Ok(out)
822}
823
824/// Decoded eine opaque `sequence<octet>` aus dem PID-Value.
825pub fn decode_octet_seq(value: &[u8], little_endian: bool) -> Option<Vec<u8>> {
826    let n = decode_u32(value, little_endian)? as usize;
827    if 4 + n > value.len() {
828        return None;
829    }
830    Some(value[4..4 + n].to_vec())
831}
832
833pub(crate) fn encode_partition_le(partitions: &[String]) -> Result<Vec<u8>, WireError> {
834    let mut out = Vec::new();
835    let len = u32::try_from(partitions.len()).map_err(|_| WireError::ValueOutOfRange {
836        message: "partition count exceeds u32::MAX",
837    })?;
838    out.extend_from_slice(&len.to_le_bytes());
839    for p in partitions {
840        // Jeder nested String beginnt auf 4-Byte-Grenze relativ zum
841        // Start der PARTITION-Value. Outer-count ist genau 4 Byte, also
842        // ist der erste String schon aligned. Nach jedem String padden
843        // wir erneut auf 4.
844        out.extend_from_slice(&encode_cdr_string_le(p)?);
845    }
846    Ok(out)
847}
848
849pub(crate) fn decode_partition(value: &[u8], little_endian: bool) -> Option<Vec<String>> {
850    let n = decode_u32(value, little_endian)? as usize;
851    // DoS-Cap: maximal so viele Strings wie bei minimalem 1-Byte-String
852    // + 4-byte-length + 4-byte-pad = 12 bytes pro Eintrag im Puffer
853    // Platz haetten. Bei n=u32::MAX sonst 4 GB reservation.
854    let cap = n.min(value.len().saturating_sub(4) / 5);
855    let mut out = Vec::with_capacity(cap);
856    let mut pos = 4;
857    for _ in 0..n {
858        if pos + 4 > value.len() {
859            return None;
860        }
861        let mut lb = [0u8; 4];
862        lb.copy_from_slice(&value[pos..pos + 4]);
863        let slen = if little_endian {
864            u32::from_le_bytes(lb)
865        } else {
866            u32::from_be_bytes(lb)
867        } as usize;
868        let next_raw_end = pos + 4 + slen;
869        if next_raw_end > value.len() {
870            return None;
871        }
872        let s =
873            decode_cdr_string(&value[pos..next_raw_end.min(value.len())], little_endian).ok()?;
874        out.push(s);
875        // Pad auf 4-Byte-Boundary.
876        let padded_end = (next_raw_end + 3) & !3;
877        pos = padded_end;
878    }
879    Some(out)
880}
881
882pub(crate) fn encode_cdr_string_le(s: &str) -> Result<Vec<u8>, WireError> {
883    let bytes = s.as_bytes();
884    let len =
885        u32::try_from(bytes.len().saturating_add(1)).map_err(|_| WireError::ValueOutOfRange {
886            message: "CDR string length exceeds u32::MAX",
887        })?;
888    let mut out = Vec::with_capacity(4 + bytes.len() + 4);
889    out.extend_from_slice(&len.to_le_bytes());
890    out.extend_from_slice(bytes);
891    out.push(0); // null-terminator
892    // Padding auf 4-Byte-Boundary (pro ParameterList-Wert gefordert)
893    while out.len() % 4 != 0 {
894        out.push(0);
895    }
896    Ok(out)
897}
898
899/// CDR-String aus Value-Bytes dekodieren. Ignoriert Trailing-Padding.
900pub(crate) fn decode_cdr_string(value: &[u8], little_endian: bool) -> Result<String, WireError> {
901    if value.len() < 4 {
902        return Err(WireError::UnexpectedEof {
903            needed: 4,
904            offset: 0,
905        });
906    }
907    let mut lb = [0u8; 4];
908    lb.copy_from_slice(&value[..4]);
909    let len = if little_endian {
910        u32::from_le_bytes(lb)
911    } else {
912        u32::from_be_bytes(lb)
913    } as usize;
914    if len == 0 {
915        return Err(WireError::ValueOutOfRange {
916            message: "CDR string length 0 (missing null terminator)",
917        });
918    }
919    if value.len() < 4 + len {
920        return Err(WireError::UnexpectedEof {
921            needed: 4 + len,
922            offset: 0,
923        });
924    }
925    let raw = &value[4..4 + len];
926    if raw[len - 1] != 0 {
927        return Err(WireError::ValueOutOfRange {
928            message: "CDR string missing null terminator",
929        });
930    }
931    String::from_utf8(raw[..len - 1].to_vec()).map_err(|_| WireError::ValueOutOfRange {
932        message: "CDR string is not valid UTF-8",
933    })
934}
935
936#[cfg(test)]
937#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
938mod tests {
939    use super::*;
940
941    #[test]
942    fn durability_try_from_u32_rejects_unknown() {
943        assert_eq!(
944            DurabilityKind::try_from_u32(0),
945            Some(DurabilityKind::Volatile)
946        );
947        assert_eq!(
948            DurabilityKind::try_from_u32(1),
949            Some(DurabilityKind::TransientLocal)
950        );
951        assert_eq!(
952            DurabilityKind::try_from_u32(3),
953            Some(DurabilityKind::Persistent)
954        );
955        assert_eq!(DurabilityKind::try_from_u32(99), None);
956    }
957
958    #[test]
959    fn reliability_try_from_u32_rejects_unknown() {
960        assert_eq!(
961            ReliabilityKind::try_from_u32(1),
962            Some(ReliabilityKind::BestEffort)
963        );
964        assert_eq!(
965            ReliabilityKind::try_from_u32(2),
966            Some(ReliabilityKind::Reliable)
967        );
968        // 0 ist kein gueltiger Reliability-Wire-Wert.
969        assert_eq!(ReliabilityKind::try_from_u32(0), None);
970        assert_eq!(ReliabilityKind::try_from_u32(42), None);
971    }
972
973    #[test]
974    fn legacy_from_u32_still_defaults_for_sedp_forward_compat() {
975        // forward-compat Path: unbekannt → Default (SEDP-Parser nutzt das).
976        assert_eq!(DurabilityKind::from_u32(99), DurabilityKind::Volatile);
977        assert_eq!(ReliabilityKind::from_u32(99), ReliabilityKind::BestEffort);
978    }
979    use crate::wire_types::{EntityId, GuidPrefix};
980    use alloc::vec;
981
982    fn sample_data() -> PublicationBuiltinTopicData {
983        PublicationBuiltinTopicData {
984            key: Guid::new(
985                GuidPrefix::from_bytes([1; 12]),
986                EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
987            ),
988            participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
989            topic_name: "ChatterTopic".into(),
990            type_name: "std_msgs::String".into(),
991            durability: DurabilityKind::Volatile,
992            reliability: ReliabilityQos {
993                kind: ReliabilityKind::Reliable,
994                max_blocking_time: Duration::from_secs(10),
995            },
996            ownership: zerodds_qos::OwnershipKind::Shared,
997            ownership_strength: 0,
998            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
999            deadline: zerodds_qos::DeadlineQosPolicy::default(),
1000            lifespan: zerodds_qos::LifespanQosPolicy::default(),
1001            partition: alloc::vec::Vec::new(),
1002            user_data: alloc::vec::Vec::new(),
1003            topic_data: alloc::vec::Vec::new(),
1004            group_data: alloc::vec::Vec::new(),
1005            type_information: None,
1006            data_representation: alloc::vec::Vec::new(),
1007            security_info: None,
1008            service_instance_name: None,
1009            related_entity_guid: None,
1010            topic_aliases: None,
1011            type_identifier: zerodds_types::TypeIdentifier::None,
1012        }
1013    }
1014
1015    #[test]
1016    fn roundtrip_le() {
1017        let d = sample_data();
1018        let bytes = d.to_pl_cdr_le().unwrap();
1019        assert_eq!(&bytes[..2], &[0x00, 0x03]); // PL_CDR_LE
1020        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1021        assert_eq!(decoded, d);
1022    }
1023
1024    #[test]
1025    fn security_info_roundtrip() {
1026        use crate::endpoint_security_info::{EndpointSecurityInfo, attrs, plugin_attrs};
1027        let mut d = sample_data();
1028        d.security_info = Some(EndpointSecurityInfo {
1029            endpoint_security_attributes: attrs::IS_VALID | attrs::IS_SUBMESSAGE_PROTECTED,
1030            plugin_endpoint_security_attributes: plugin_attrs::IS_VALID
1031                | plugin_attrs::IS_SUBMESSAGE_ENCRYPTED,
1032        });
1033        let bytes = d.to_pl_cdr_le().unwrap();
1034        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1035        assert_eq!(decoded.security_info, d.security_info);
1036    }
1037
1038    #[test]
1039    fn legacy_peer_without_security_info_parses_ok() {
1040        let d = sample_data();
1041        assert!(d.security_info.is_none());
1042        let bytes = d.to_pl_cdr_le().unwrap();
1043        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1044        assert!(decoded.security_info.is_none());
1045    }
1046
1047    #[test]
1048    fn roundtrip_utf8_topic_name() {
1049        let mut d = sample_data();
1050        d.topic_name = "Zählung".into();
1051        let bytes = d.to_pl_cdr_le().unwrap();
1052        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1053        assert_eq!(decoded.topic_name, "Zählung");
1054    }
1055
1056    #[test]
1057    fn decode_rejects_unknown_encapsulation() {
1058        let mut bytes = vec![0xFF, 0xFF, 0x00, 0x00];
1059        bytes.extend_from_slice(&[0u8; 16]);
1060        let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1061        assert!(matches!(
1062            res,
1063            Err(WireError::UnsupportedEncapsulation { .. })
1064        ));
1065    }
1066
1067    #[test]
1068    fn decode_rejects_missing_topic_name() {
1069        let mut pl = ParameterList::new();
1070        pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
1071        pl.push(Parameter::new(
1072            pid::TYPE_NAME,
1073            encode_cdr_string_le("T").unwrap(),
1074        ));
1075        let mut bytes = Vec::new();
1076        bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1077        bytes.extend_from_slice(&[0, 0]);
1078        bytes.extend_from_slice(&pl.to_bytes(true));
1079        let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1080        assert!(
1081            matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TOPIC_NAME"))
1082        );
1083    }
1084
1085    #[test]
1086    fn decode_rejects_missing_type_name() {
1087        let mut pl = ParameterList::new();
1088        pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
1089        pl.push(Parameter::new(
1090            pid::TOPIC_NAME,
1091            encode_cdr_string_le("T").unwrap(),
1092        ));
1093        let mut bytes = Vec::new();
1094        bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1095        bytes.extend_from_slice(&[0, 0]);
1096        bytes.extend_from_slice(&pl.to_bytes(true));
1097        let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1098        assert!(
1099            matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TYPE_NAME"))
1100        );
1101    }
1102
1103    #[test]
1104    fn decode_rejects_missing_endpoint_guid() {
1105        let mut pl = ParameterList::new();
1106        pl.push(Parameter::new(
1107            pid::TOPIC_NAME,
1108            encode_cdr_string_le("T").unwrap(),
1109        ));
1110        pl.push(Parameter::new(
1111            pid::TYPE_NAME,
1112            encode_cdr_string_le("U").unwrap(),
1113        ));
1114        let mut bytes = Vec::new();
1115        bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1116        bytes.extend_from_slice(&[0, 0]);
1117        bytes.extend_from_slice(&pl.to_bytes(true));
1118        let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
1119        assert!(
1120            matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("ENDPOINT_GUID"))
1121        );
1122    }
1123
1124    #[test]
1125    fn unknown_pids_are_skipped() {
1126        let mut bytes = sample_data().to_pl_cdr_le().unwrap();
1127        // Neuen unbekannten PID (0x7FFF, 4 byte) vor Sentinel einfuegen.
1128        // Der Sentinel-Parameter ist die letzten 4 Bytes.
1129        let sentinel_pos = bytes.len() - 4;
1130        let mut inject = vec![0xFFu8, 0x7F, 4, 0, 0xDE, 0xAD, 0xBE, 0xEF];
1131        inject.extend_from_slice(&bytes[sentinel_pos..]);
1132        bytes.truncate(sentinel_pos);
1133        bytes.extend_from_slice(&inject);
1134        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1135        assert_eq!(decoded, sample_data());
1136    }
1137
1138    #[test]
1139    fn inject_pid_shm_locator_appends_before_sentinel() {
1140        // Locator-Body: 16 byte (vier u32) + 8 byte CDR-String "x"
1141        // (4 byte len = 2, 1 byte 'x', 1 byte null, 2 byte pad).
1142        let mut locator = Vec::new();
1143        locator.extend_from_slice(&0xDEAD_BEEFu32.to_le_bytes()); // hostname_hash
1144        locator.extend_from_slice(&1000u32.to_le_bytes()); // uid
1145        locator.extend_from_slice(&64u32.to_le_bytes()); // slot_count
1146        locator.extend_from_slice(&4096u32.to_le_bytes()); // slot_size
1147        // CDR-String "/dev/shm/zd-1\0":
1148        let path = "/dev/shm/zd-1";
1149        locator.extend_from_slice(&((path.len() as u32) + 1).to_le_bytes());
1150        locator.extend_from_slice(path.as_bytes());
1151        locator.push(0);
1152        // Pad auf 4-Byte-Boundary.
1153        let pad = (4 - locator.len() % 4) % 4;
1154        locator.resize(locator.len() + pad, 0);
1155
1156        let mut bytes = sample_data().to_pl_cdr_le().unwrap();
1157        let len_before = bytes.len();
1158        super::inject_pid_shm_locator(&mut bytes, &locator).unwrap();
1159        // Bytes sind gewachsen (PID-Header 4 + locator).
1160        assert!(bytes.len() > len_before);
1161        // Und decodieren immer noch — Vendor-PID wird als unbekannt
1162        // ignoriert (kein MUST_UNDERSTAND-Bit), der Rest bleibt
1163        // identisch.
1164        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1165        assert_eq!(decoded, sample_data());
1166        // Sanity: PID 0x8001 ist tatsaechlich enthalten.
1167        let pid_found = bytes.windows(2).any(|w| w == 0x8001u16.to_le_bytes());
1168        assert!(pid_found, "PID_SHM_LOCATOR should appear in bytes");
1169    }
1170
1171    #[test]
1172    fn inject_pid_shm_locator_rejects_missing_sentinel() {
1173        let mut bytes = vec![0u8; 8];
1174        let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
1175        assert!(res.is_err());
1176    }
1177
1178    #[test]
1179    fn inject_pid_shm_locator_rejects_too_short() {
1180        let mut bytes = vec![0u8, 1u8];
1181        let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
1182        assert!(res.is_err());
1183    }
1184
1185    #[test]
1186    fn participant_key_fallback_when_pid_missing() {
1187        // Bauen wir ein PL ohne PARTICIPANT_GUID: der Decoder soll
1188        // participant_key aus ENDPOINT_GUID.prefix + PARTICIPANT ableiten.
1189        let d = sample_data();
1190        let mut pl = ParameterList::new();
1191        pl.push(Parameter::new(
1192            pid::ENDPOINT_GUID,
1193            d.key.to_bytes().to_vec(),
1194        ));
1195        pl.push(Parameter::new(
1196            pid::TOPIC_NAME,
1197            encode_cdr_string_le(&d.topic_name).unwrap(),
1198        ));
1199        pl.push(Parameter::new(
1200            pid::TYPE_NAME,
1201            encode_cdr_string_le(&d.type_name).unwrap(),
1202        ));
1203        let mut bytes = Vec::new();
1204        bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
1205        bytes.extend_from_slice(&[0, 0]);
1206        bytes.extend_from_slice(&pl.to_bytes(true));
1207        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1208        assert_eq!(decoded.participant_key.prefix, d.key.prefix);
1209        assert_eq!(decoded.participant_key.entity_id, EntityId::PARTICIPANT);
1210    }
1211
1212    #[test]
1213    fn durability_kind_from_u32_unknown_defaults_volatile() {
1214        assert_eq!(DurabilityKind::from_u32(0), DurabilityKind::Volatile);
1215        assert_eq!(DurabilityKind::from_u32(1), DurabilityKind::TransientLocal);
1216        assert_eq!(DurabilityKind::from_u32(999), DurabilityKind::Volatile);
1217    }
1218
1219    #[test]
1220    fn rpc_discovery_pids_roundtrip() {
1221        let mut d = sample_data();
1222        d.service_instance_name = Some("CalcInstance-1".into());
1223        d.related_entity_guid = Some(Guid::new(
1224            crate::wire_types::GuidPrefix::from_bytes([7; 12]),
1225            crate::wire_types::EntityId::user_reader_with_key([0xAA, 0xBB, 0xCC]),
1226        ));
1227        d.topic_aliases = Some(alloc::vec!["LegacyCalc_Request".into(), "v2_Req".into()]);
1228
1229        let bytes = d.to_pl_cdr_le().unwrap();
1230        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1231        assert_eq!(decoded.service_instance_name, d.service_instance_name);
1232        assert_eq!(decoded.related_entity_guid, d.related_entity_guid);
1233        assert_eq!(decoded.topic_aliases, d.topic_aliases);
1234    }
1235
1236    #[test]
1237    fn rpc_pids_optional_legacy_peer_parses_ok() {
1238        let d = sample_data();
1239        assert!(d.service_instance_name.is_none());
1240        assert!(d.related_entity_guid.is_none());
1241        assert!(d.topic_aliases.is_none());
1242        let bytes = d.to_pl_cdr_le().unwrap();
1243        let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
1244        assert!(decoded.service_instance_name.is_none());
1245        assert!(decoded.related_entity_guid.is_none());
1246        assert!(decoded.topic_aliases.is_none());
1247    }
1248
1249    #[test]
1250    fn rpc_pid_constants_in_emitted_bytes() {
1251        // Cyclone-Compat-Snapshot: PID 0x0080..0x0083 muessen byte-genau
1252        // im Stream auftauchen, sonst kann ein Cyclone-Reader sie nicht
1253        // dispatchen.
1254        let mut d = sample_data();
1255        d.service_instance_name = Some("X".into());
1256        d.related_entity_guid = Some(Guid::new(
1257            crate::wire_types::GuidPrefix::from_bytes([1; 12]),
1258            crate::wire_types::EntityId::PARTICIPANT,
1259        ));
1260        d.topic_aliases = Some(alloc::vec!["A".into()]);
1261        let bytes = d.to_pl_cdr_le().unwrap();
1262        // PIDs sind 2-byte little-endian.
1263        let mut found_080 = false;
1264        let mut found_081 = false;
1265        let mut found_082 = false;
1266        for w in bytes.windows(2) {
1267            if w == [0x80, 0x00] {
1268                found_080 = true;
1269            }
1270            if w == [0x81, 0x00] {
1271                found_081 = true;
1272            }
1273            if w == [0x82, 0x00] {
1274                found_082 = true;
1275            }
1276        }
1277        assert!(found_080 && found_081 && found_082);
1278    }
1279
1280    #[test]
1281    fn reliability_kind_from_u32_unknown_defaults_best_effort() {
1282        assert_eq!(ReliabilityKind::from_u32(1), ReliabilityKind::BestEffort);
1283        assert_eq!(ReliabilityKind::from_u32(2), ReliabilityKind::Reliable);
1284        assert_eq!(ReliabilityKind::from_u32(999), ReliabilityKind::BestEffort);
1285    }
1286}