Skip to main content

zerodds_rtps/
subscription_data.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! SubscriptionBuiltinTopicData (DDSI-RTPS 2.5 §8.5.4.3, §9.6.2.2.4).
4//!
5//! Inhalt der SEDP-Subscriptions-DATA-Submessage. Analog zur
6//! [`crate::publication_data::PublicationBuiltinTopicData`] —
7//! derselbe Wire-Aufbau, minus Writer-spezifische Felder wie
8//! Lifespan/Ownership-Strength, dafuer Reader-spezifische wie
9//! Time-Based-Filter .
10//!
11//! GUIDs + topic/type
12//! + Durability + Reliability.
13
14extern crate alloc;
15use alloc::string::String;
16use alloc::vec::Vec;
17
18use crate::endpoint_security_info::EndpointSecurityInfo;
19use crate::error::WireError;
20use crate::parameter_list::{Parameter, ParameterList, pid};
21use crate::participant_data::{Duration, ENCAPSULATION_PL_CDR_LE};
22use crate::publication_data::{
23    DurabilityKind, ReliabilityKind, ReliabilityQos, decode_cdr_string, decode_duration,
24    decode_i32, decode_liveliness, decode_partition, decode_u32, encode_cdr_string_le,
25    encode_duration_le, encode_liveliness_le, encode_partition_le, encode_u32_le, guid_from_param,
26};
27use crate::wire_types::Guid;
28
29/// Discovered Subscription / lokaler DataReader — Subset.
30///
31/// Wire-identisch zu [`PublicationBuiltinTopicData`] in Phase 1;
32/// getrennter Typ, damit SEDP-Publications- und Subscriptions-
33/// Caches klar unterschiedbar bleiben und Erweiterungen
34/// (z.B. `expects_inline_qos`, `time_based_filter`) nicht ueber
35/// den Publication-Typ gezogen werden.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct SubscriptionBuiltinTopicData {
38    /// Endpoint-GUID (= Reader-GUID).
39    pub key: Guid,
40    /// GUID des Participants, dem der Reader gehoert.
41    pub participant_key: Guid,
42    /// Topic-Name.
43    pub topic_name: String,
44    /// IDL-Type-Name.
45    pub type_name: String,
46    /// Durability-QoS.
47    pub durability: DurabilityKind,
48    /// Reliability-QoS.
49    pub reliability: ReliabilityQos,
50    /// Ownership-QoS (Spec §2.2.3.23). Default Shared.
51    pub ownership: zerodds_qos::OwnershipKind,
52    /// Liveliness-QoS (Spec §2.2.3.11).
53    pub liveliness: zerodds_qos::LivelinessQosPolicy,
54    /// Deadline-QoS (Spec §2.2.3.7).
55    pub deadline: zerodds_qos::DeadlineQosPolicy,
56    /// Partition-QoS (Spec §2.2.3.13).
57    pub partition: Vec<String>,
58    /// UserData-QoS (Spec §2.2.3.1) — opaque sequence<octet>.
59    pub user_data: Vec<u8>,
60    /// TopicData-QoS (Spec §2.2.3.3) — opaque sequence<octet>.
61    pub topic_data: Vec<u8>,
62    /// GroupData-QoS (Spec §2.2.3.2) — opaque sequence<octet>.
63    pub group_data: Vec<u8>,
64    /// Type-Information als opaque bytes (XTypes §7.6.3.2.2).
65    pub type_information: Option<alloc::vec::Vec<u8>>,
66    /// Akzeptierte Data-Representations.
67    pub data_representation: alloc::vec::Vec<i16>,
68    /// Content-Filter-Property (DDSI-RTPS §9.6.3.4 Table 9.14). Nur
69    /// gesetzt wenn der Reader als `ContentFilteredTopic` erstellt
70    /// wurde.
71    pub content_filter: Option<ContentFilterProperty>,
72    /// Endpoint-Security-Info (PID 0x1004, DDS-Security 1.1 §7.4.1.5).
73    /// `None` bei Legacy-Peers. WP 4H-c-Matching prueft Writer/Reader-
74    /// Paare auf Protection-Kompatibilitaet.
75    pub security_info: Option<EndpointSecurityInfo>,
76    /// PID_SERVICE_INSTANCE_NAME (DDS-RPC 1.0 §7.8.2) — logischer
77    /// Service-Instance-Name eines RPC-Endpoints.
78    pub service_instance_name: Option<String>,
79    /// PID_RELATED_ENTITY_GUID (DDS-RPC 1.0 §7.8.2) — GUID des
80    /// Pendant-Endpoints (z.B. Reply-Reader → Reply-Writer).
81    pub related_entity_guid: Option<Guid>,
82    /// PID_TOPIC_ALIASES (DDS-RPC 1.0 §7.8.2).
83    pub topic_aliases: Option<Vec<String>>,
84    /// PID_ZERODDS_TYPE_ID (Vendor-PID 0x8002) — Reader-Type-Identifier
85    /// für XTypes-aware Match (siehe `PublicationBuiltinTopicData`).
86    pub type_identifier: zerodds_types::TypeIdentifier,
87}
88
89/// Content-Filter-Property — projiziert eine Content-Filtered-Topic-
90/// Referenz ueber SEDP.
91///
92/// Spec: OMG DDS 1.4 §9.6.3.4 Table 9.14 / DDSI-RTPS 2.5 §9.6.3.4.
93/// Fuenf Strings + eine String-Sequenz, alle CDR-serialized.
94#[derive(Debug, Clone, PartialEq, Eq, Default)]
95pub struct ContentFilterProperty {
96    /// Name des Content-Filtered-Topics (ableitbar aus dem Reader-
97    /// Identifier).
98    pub content_filtered_topic_name: String,
99    /// Name des zugrundeliegenden Topics (gleich wie
100    /// `topic_name` im SubscriptionBuiltinTopicData).
101    pub related_topic_name: String,
102    /// Filter-Klasse, z.B. `"DDSSQL"`. Typkonstanten siehe
103    /// [`filter_class`].
104    pub filter_class_name: String,
105    /// Filter-Ausdruck (SQL-Subset).
106    pub filter_expression: String,
107    /// Positional Parameters (`%0`, `%1`, ...).
108    pub expression_parameters: Vec<String>,
109}
110
111/// Standard-Filter-Klassen-Namen.
112pub mod filter_class {
113    /// OMG-SQL-Filter (DDSSQL) — das einzige standardisierte Kind.
114    pub const DDSSQL: &str = "DDSSQL";
115}
116
117/// Encoded ContentFilterProperty als PL-Value (fuenf CDR-Strings +
118/// Sequence<String>).
119///
120/// # Errors
121/// `ValueOutOfRange` wenn ein String ueberlange ist.
122pub fn encode_content_filter_property_le(
123    cfp: &ContentFilterProperty,
124) -> Result<Vec<u8>, WireError> {
125    let mut out = Vec::new();
126    out.extend_from_slice(&encode_cdr_string_le(&cfp.content_filtered_topic_name)?);
127    out.extend_from_slice(&encode_cdr_string_le(&cfp.related_topic_name)?);
128    out.extend_from_slice(&encode_cdr_string_le(&cfp.filter_class_name)?);
129    out.extend_from_slice(&encode_cdr_string_le(&cfp.filter_expression)?);
130    out.extend_from_slice(&encode_partition_le(&cfp.expression_parameters)?);
131    Ok(out)
132}
133
134/// Decoded ContentFilterProperty aus PL-Value. `None` bei Wire-Fehler.
135pub fn decode_content_filter_property(
136    value: &[u8],
137    little_endian: bool,
138) -> Option<ContentFilterProperty> {
139    let (s1, rest1) = take_cdr_string(value, little_endian)?;
140    let (s2, rest2) = take_cdr_string(rest1, little_endian)?;
141    let (s3, rest3) = take_cdr_string(rest2, little_endian)?;
142    let (s4, rest4) = take_cdr_string(rest3, little_endian)?;
143    let params = decode_partition(rest4, little_endian)?;
144    Some(ContentFilterProperty {
145        content_filtered_topic_name: s1,
146        related_topic_name: s2,
147        filter_class_name: s3,
148        filter_expression: s4,
149        expression_parameters: params,
150    })
151}
152
153/// Hilfsfunktion: liest **einen** CDR-String aus einem Byte-Slice und
154/// liefert den Rest nach 4-Byte-Align-Padding zurueck.
155fn take_cdr_string(bytes: &[u8], little_endian: bool) -> Option<(String, &[u8])> {
156    if bytes.len() < 4 {
157        return None;
158    }
159    let mut lb = [0u8; 4];
160    lb.copy_from_slice(&bytes[..4]);
161    let len = if little_endian {
162        u32::from_le_bytes(lb)
163    } else {
164        u32::from_be_bytes(lb)
165    } as usize;
166    let consumed_raw = 4 + len;
167    if consumed_raw > bytes.len() {
168        return None;
169    }
170    let s = decode_cdr_string(&bytes[..consumed_raw], little_endian).ok()?;
171    // 4-Byte-Align zum naechsten Element.
172    let padded = (consumed_raw + 3) & !3;
173    let next = padded.min(bytes.len());
174    Some((s, &bytes[next..]))
175}
176
177impl SubscriptionBuiltinTopicData {
178    /// Encoded zu PL_CDR_LE-Bytes (mit 4-byte Encapsulation-Header).
179    ///
180    /// Direkt implementiert (nicht ueber [`PublicationBuiltinTopicData`]
181    /// delegiert), damit Erweiterungs-PIDs, die writer-only sind (z.B.
182    /// `PID_LIFESPAN`, `PID_OWNERSHIP_STRENGTH`), nicht versehentlich
183    /// in den Subscription-Payload wandern.
184    ///
185    /// # Errors
186    /// `ValueOutOfRange` wenn ein String laenger als u32::MAX ist.
187    pub fn to_pl_cdr_le(&self) -> Result<Vec<u8>, WireError> {
188        let mut params = ParameterList::new();
189
190        params.push(Parameter::new(
191            pid::PARTICIPANT_GUID,
192            self.participant_key.to_bytes().to_vec(),
193        ));
194        params.push(Parameter::new(
195            pid::ENDPOINT_GUID,
196            self.key.to_bytes().to_vec(),
197        ));
198        params.push(Parameter::new(
199            pid::TOPIC_NAME,
200            encode_cdr_string_le(&self.topic_name)?,
201        ));
202        params.push(Parameter::new(
203            pid::TYPE_NAME,
204            encode_cdr_string_le(&self.type_name)?,
205        ));
206        params.push(Parameter::new(
207            pid::DURABILITY,
208            (self.durability as u32).to_le_bytes().to_vec(),
209        ));
210
211        let mut rel = Vec::with_capacity(12);
212        rel.extend_from_slice(&(self.reliability.kind as u32).to_le_bytes());
213        rel.extend_from_slice(&self.reliability.max_blocking_time.to_bytes_le());
214        params.push(Parameter::new(pid::RELIABILITY, rel));
215
216        // OWNERSHIP (reader-side: "ich akzeptiere nur diesen Ownership-Modus").
217        params.push(Parameter::new(
218            pid::OWNERSHIP,
219            encode_u32_le(self.ownership as u32).to_vec(),
220        ));
221
222        // LIVELINESS — requested lease.
223        params.push(Parameter::new(
224            pid::LIVELINESS,
225            encode_liveliness_le(self.liveliness),
226        ));
227
228        // DEADLINE — requested period.
229        params.push(Parameter::new(
230            pid::DEADLINE,
231            encode_duration_le(self.deadline.period).to_vec(),
232        ));
233
234        // PARTITION.
235        if !self.partition.is_empty() {
236            params.push(Parameter::new(
237                pid::PARTITION,
238                encode_partition_le(&self.partition)?,
239            ));
240        }
241
242        // USER_DATA / TOPIC_DATA / GROUP_DATA — opaque sequence<octet>.
243        if !self.user_data.is_empty() {
244            params.push(Parameter::new(
245                pid::USER_DATA,
246                crate::publication_data::encode_octet_seq_le(&self.user_data)?,
247            ));
248        }
249        if !self.topic_data.is_empty() {
250            params.push(Parameter::new(
251                pid::TOPIC_DATA,
252                crate::publication_data::encode_octet_seq_le(&self.topic_data)?,
253            ));
254        }
255        if !self.group_data.is_empty() {
256            params.push(Parameter::new(
257                pid::GROUP_DATA,
258                crate::publication_data::encode_octet_seq_le(&self.group_data)?,
259            ));
260        }
261
262        if let Some(ti) = &self.type_information {
263            params.push(Parameter::new(pid::TYPE_INFORMATION, ti.clone()));
264        }
265
266        if let Some(cfp) = &self.content_filter {
267            params.push(Parameter::new(
268                pid::CONTENT_FILTER_PROPERTY,
269                encode_content_filter_property_le(cfp)?,
270            ));
271        }
272
273        if let Some(info) = self.security_info {
274            params.push(Parameter::new(
275                pid::ENDPOINT_SECURITY_INFO,
276                info.to_bytes(true).to_vec(),
277            ));
278        }
279
280        // ----------------------------------------------------------------
281        // DDS-RPC 1.0 Discovery-PIDs (§7.8.2) — nur wenn gesetzt.
282        // ----------------------------------------------------------------
283        if let Some(name) = &self.service_instance_name {
284            params.push(Parameter::new(
285                pid::SERVICE_INSTANCE_NAME,
286                encode_cdr_string_le(name)?,
287            ));
288        }
289        if let Some(guid) = self.related_entity_guid {
290            params.push(Parameter::new(
291                pid::RELATED_ENTITY_GUID,
292                guid.to_bytes().to_vec(),
293            ));
294        }
295        if let Some(aliases) = &self.topic_aliases {
296            params.push(Parameter::new(
297                pid::TOPIC_ALIASES,
298                encode_partition_le(aliases)?,
299            ));
300        }
301
302        // PID_ZERODDS_TYPE_ID (F-TYPES-3 Wire-up).
303        if self.type_identifier != zerodds_types::TypeIdentifier::None {
304            let mut w = zerodds_cdr::BufferWriter::new(zerodds_cdr::Endianness::Little);
305            self.type_identifier
306                .encode_into(&mut w)
307                .map_err(|_| WireError::ValueOutOfRange {
308                    message: "type_identifier encoding failed",
309                })?;
310            params.push(Parameter::new(pid::ZERODDS_TYPE_ID, w.into_bytes()));
311        }
312
313        if !self.data_representation.is_empty() {
314            let mut dr = Vec::with_capacity(4 + 2 * self.data_representation.len());
315            let len = u32::try_from(self.data_representation.len()).map_err(|_| {
316                WireError::ValueOutOfRange {
317                    message: "data_representation length exceeds u32::MAX",
318                }
319            })?;
320            dr.extend_from_slice(&len.to_le_bytes());
321            for rep in &self.data_representation {
322                dr.extend_from_slice(&rep.to_le_bytes());
323            }
324            params.push(Parameter::new(pid::DATA_REPRESENTATION, dr));
325        }
326
327        let mut out = Vec::with_capacity(params.parameters.len() * 24 + 16);
328        out.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
329        out.extend_from_slice(&[0, 0]); // options
330        out.extend_from_slice(&params.to_bytes(true));
331        Ok(out)
332    }
333
334    /// Decoded aus PL_CDR_LE-Bytes (mit Encapsulation-Header).
335    ///
336    /// # Errors
337    /// `UnexpectedEof` bei zu kurzen Bytes,
338    /// `UnsupportedEncapsulation` bei unbekanntem Encoding,
339    /// `ValueOutOfRange` wenn Pflicht-PIDs fehlen.
340    pub fn from_pl_cdr_le(bytes: &[u8]) -> Result<Self, WireError> {
341        if bytes.len() < 4 {
342            return Err(WireError::UnexpectedEof {
343                needed: 4,
344                offset: 0,
345            });
346        }
347        let little_endian = match &bytes[..2] {
348            b if b == ENCAPSULATION_PL_CDR_LE => true,
349            [0x00, 0x02] => false,
350            other => {
351                return Err(WireError::UnsupportedEncapsulation {
352                    kind: [other[0], other[1]],
353                });
354            }
355        };
356        let pl = ParameterList::from_bytes(&bytes[4..], little_endian)?;
357
358        let key = pl
359            .find(pid::ENDPOINT_GUID)
360            .and_then(guid_from_param)
361            .ok_or(WireError::ValueOutOfRange {
362                message: "ENDPOINT_GUID missing or wrong length",
363            })?;
364        let participant_key = pl
365            .find(pid::PARTICIPANT_GUID)
366            .and_then(guid_from_param)
367            .unwrap_or_else(|| Guid::new(key.prefix, crate::wire_types::EntityId::PARTICIPANT));
368        let topic_name = pl
369            .find(pid::TOPIC_NAME)
370            .map(|p| decode_cdr_string(&p.value, little_endian))
371            .transpose()?
372            .ok_or(WireError::ValueOutOfRange {
373                message: "TOPIC_NAME missing",
374            })?;
375        let type_name = pl
376            .find(pid::TYPE_NAME)
377            .map(|p| decode_cdr_string(&p.value, little_endian))
378            .transpose()?
379            .ok_or(WireError::ValueOutOfRange {
380                message: "TYPE_NAME missing",
381            })?;
382
383        let durability = pl
384            .find(pid::DURABILITY)
385            .and_then(|p| {
386                if p.value.len() >= 4 {
387                    let mut b = [0u8; 4];
388                    b.copy_from_slice(&p.value[..4]);
389                    Some(DurabilityKind::from_u32(if little_endian {
390                        u32::from_le_bytes(b)
391                    } else {
392                        u32::from_be_bytes(b)
393                    }))
394                } else {
395                    None
396                }
397            })
398            .unwrap_or_default();
399
400        let reliability = pl
401            .find(pid::RELIABILITY)
402            .and_then(|p| {
403                if p.value.len() >= 12 {
404                    let mut k = [0u8; 4];
405                    k.copy_from_slice(&p.value[..4]);
406                    let kind = ReliabilityKind::from_u32(if little_endian {
407                        u32::from_le_bytes(k)
408                    } else {
409                        u32::from_be_bytes(k)
410                    });
411                    let mut d = [0u8; 8];
412                    d.copy_from_slice(&p.value[4..12]);
413                    let max_blocking_time = if little_endian {
414                        Duration::from_bytes_le(d)
415                    } else {
416                        let mut s = [0u8; 4];
417                        s.copy_from_slice(&d[..4]);
418                        let mut f = [0u8; 4];
419                        f.copy_from_slice(&d[4..]);
420                        Duration {
421                            seconds: i32::from_be_bytes(s),
422                            fraction: u32::from_be_bytes(f),
423                        }
424                    };
425                    Some(ReliabilityQos {
426                        kind,
427                        max_blocking_time,
428                    })
429                } else {
430                    None
431                }
432            })
433            .unwrap_or_default();
434
435        let ownership = pl
436            .find(pid::OWNERSHIP)
437            .and_then(|p| decode_u32(&p.value, little_endian))
438            .map(zerodds_qos::OwnershipKind::from_u32)
439            .unwrap_or_default();
440        // OWNERSHIP_STRENGTH ignoriert (writer-only).
441        let _ = decode_i32;
442
443        let liveliness = pl
444            .find(pid::LIVELINESS)
445            .and_then(|p| decode_liveliness(&p.value, little_endian))
446            .unwrap_or_default();
447
448        let deadline = pl
449            .find(pid::DEADLINE)
450            .and_then(|p| decode_duration(&p.value, little_endian))
451            .map(|period| zerodds_qos::DeadlineQosPolicy { period })
452            .unwrap_or_default();
453
454        let partition = pl
455            .find(pid::PARTITION)
456            .and_then(|p| decode_partition(&p.value, little_endian))
457            .unwrap_or_default();
458
459        let user_data = pl
460            .find(pid::USER_DATA)
461            .and_then(|p| crate::publication_data::decode_octet_seq(&p.value, little_endian))
462            .unwrap_or_default();
463        let topic_data = pl
464            .find(pid::TOPIC_DATA)
465            .and_then(|p| crate::publication_data::decode_octet_seq(&p.value, little_endian))
466            .unwrap_or_default();
467        let group_data = pl
468            .find(pid::GROUP_DATA)
469            .and_then(|p| crate::publication_data::decode_octet_seq(&p.value, little_endian))
470            .unwrap_or_default();
471
472        let type_information = pl.find(pid::TYPE_INFORMATION).map(|p| p.value.clone());
473        let content_filter = pl
474            .find(pid::CONTENT_FILTER_PROPERTY)
475            .and_then(|p| decode_content_filter_property(&p.value, little_endian));
476
477        let security_info = pl
478            .find(pid::ENDPOINT_SECURITY_INFO)
479            .and_then(|p| EndpointSecurityInfo::from_bytes(&p.value, little_endian).ok());
480
481        let service_instance_name = pl
482            .find(pid::SERVICE_INSTANCE_NAME)
483            .map(|p| decode_cdr_string(&p.value, little_endian))
484            .transpose()
485            .ok()
486            .flatten();
487        let related_entity_guid = pl.find(pid::RELATED_ENTITY_GUID).and_then(guid_from_param);
488        let topic_aliases = pl
489            .find(pid::TOPIC_ALIASES)
490            .and_then(|p| decode_partition(&p.value, little_endian));
491
492        let type_identifier = pl
493            .find(pid::ZERODDS_TYPE_ID)
494            .and_then(|p| {
495                let mut r =
496                    zerodds_cdr::BufferReader::new(&p.value, zerodds_cdr::Endianness::Little);
497                zerodds_types::TypeIdentifier::decode_from(&mut r).ok()
498            })
499            .unwrap_or_default();
500
501        let data_representation = pl
502            .find(pid::DATA_REPRESENTATION)
503            .map(|p| {
504                let v = &p.value;
505                if v.len() < 4 {
506                    return Vec::new();
507                }
508                let mut n_bytes = [0u8; 4];
509                n_bytes.copy_from_slice(&v[..4]);
510                let n = if little_endian {
511                    u32::from_le_bytes(n_bytes)
512                } else {
513                    u32::from_be_bytes(n_bytes)
514                } as usize;
515                let cap = n.min(v.len().saturating_sub(4) / 2);
516                let mut reps = Vec::with_capacity(cap);
517                for i in 0..n {
518                    let off = 4 + i * 2;
519                    if off + 2 > v.len() {
520                        break;
521                    }
522                    let mut b = [0u8; 2];
523                    b.copy_from_slice(&v[off..off + 2]);
524                    reps.push(if little_endian {
525                        i16::from_le_bytes(b)
526                    } else {
527                        i16::from_be_bytes(b)
528                    });
529                }
530                reps
531            })
532            .unwrap_or_default();
533
534        Ok(Self {
535            key,
536            participant_key,
537            topic_name,
538            type_name,
539            durability,
540            reliability,
541            ownership,
542            liveliness,
543            deadline,
544            partition,
545            user_data,
546            topic_data,
547            group_data,
548            type_information,
549            data_representation,
550            content_filter,
551            security_info,
552            service_instance_name,
553            related_entity_guid,
554            topic_aliases,
555            type_identifier,
556        })
557    }
558}
559
560#[cfg(test)]
561#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
562mod tests {
563    use super::*;
564    use crate::participant_data::Duration;
565    use crate::publication_data::ReliabilityKind;
566    use crate::wire_types::{EntityId, GuidPrefix};
567
568    #[test]
569    fn roundtrip_le() {
570        let s = SubscriptionBuiltinTopicData {
571            key: Guid::new(
572                GuidPrefix::from_bytes([2; 12]),
573                EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
574            ),
575            participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
576            topic_name: "ChatterTopic".into(),
577            type_name: "std_msgs::String".into(),
578            durability: DurabilityKind::TransientLocal,
579            reliability: ReliabilityQos {
580                kind: ReliabilityKind::Reliable,
581                max_blocking_time: Duration::from_secs(5),
582            },
583            ownership: zerodds_qos::OwnershipKind::Shared,
584            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
585            deadline: zerodds_qos::DeadlineQosPolicy::default(),
586            partition: alloc::vec::Vec::new(),
587            user_data: alloc::vec::Vec::new(),
588            topic_data: alloc::vec::Vec::new(),
589            group_data: alloc::vec::Vec::new(),
590            type_information: None,
591            data_representation: alloc::vec::Vec::new(),
592            content_filter: None,
593            security_info: None,
594            service_instance_name: None,
595            related_entity_guid: None,
596            topic_aliases: None,
597            type_identifier: zerodds_types::TypeIdentifier::None,
598        };
599        let bytes = s.to_pl_cdr_le().unwrap();
600        let decoded = SubscriptionBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
601        assert_eq!(decoded, s);
602    }
603
604    #[test]
605    fn security_info_roundtrip() {
606        use crate::endpoint_security_info::{EndpointSecurityInfo, attrs, plugin_attrs};
607        let s = SubscriptionBuiltinTopicData {
608            key: Guid::new(
609                GuidPrefix::from_bytes([4; 12]),
610                EntityId::user_reader_with_key([0x10, 0x20, 0x30]),
611            ),
612            participant_key: Guid::new(GuidPrefix::from_bytes([4; 12]), EntityId::PARTICIPANT),
613            topic_name: "ST".into(),
614            type_name: "Foo".into(),
615            durability: DurabilityKind::Volatile,
616            reliability: ReliabilityQos::default(),
617            ownership: zerodds_qos::OwnershipKind::Shared,
618            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
619            deadline: zerodds_qos::DeadlineQosPolicy::default(),
620            partition: alloc::vec::Vec::new(),
621            user_data: alloc::vec::Vec::new(),
622            topic_data: alloc::vec::Vec::new(),
623            group_data: alloc::vec::Vec::new(),
624            type_information: None,
625            data_representation: alloc::vec::Vec::new(),
626            content_filter: None,
627            security_info: Some(EndpointSecurityInfo {
628                endpoint_security_attributes: attrs::IS_VALID | attrs::IS_PAYLOAD_PROTECTED,
629                plugin_endpoint_security_attributes: plugin_attrs::IS_VALID
630                    | plugin_attrs::IS_PAYLOAD_ENCRYPTED,
631            }),
632            service_instance_name: None,
633            related_entity_guid: None,
634            topic_aliases: None,
635            type_identifier: zerodds_types::TypeIdentifier::None,
636        };
637        let bytes = s.to_pl_cdr_le().unwrap();
638        let decoded = SubscriptionBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
639        assert_eq!(decoded.security_info, s.security_info);
640    }
641
642    #[test]
643    fn content_filter_property_roundtrip_le() {
644        let cfp = ContentFilterProperty {
645            content_filtered_topic_name: "FilteredShapes".into(),
646            related_topic_name: "Square".into(),
647            filter_class_name: filter_class::DDSSQL.into(),
648            filter_expression: "color = %0 AND x > %1".into(),
649            expression_parameters: alloc::vec!["'RED'".into(), "50".into()],
650        };
651        let bytes = encode_content_filter_property_le(&cfp).unwrap();
652        let decoded = decode_content_filter_property(&bytes, true).unwrap();
653        assert_eq!(decoded, cfp);
654    }
655
656    #[test]
657    fn subscription_with_content_filter_roundtrip_le() {
658        let cfp = ContentFilterProperty {
659            content_filtered_topic_name: "Filt".into(),
660            related_topic_name: "Square".into(),
661            filter_class_name: "DDSSQL".into(),
662            filter_expression: "x > %0".into(),
663            expression_parameters: alloc::vec!["42".into()],
664        };
665        let s = SubscriptionBuiltinTopicData {
666            key: Guid::new(
667                GuidPrefix::from_bytes([3; 12]),
668                EntityId::user_reader_with_key([1, 2, 3]),
669            ),
670            participant_key: Guid::new(GuidPrefix::from_bytes([3; 12]), EntityId::PARTICIPANT),
671            topic_name: "Square".into(),
672            type_name: "ShapeType".into(),
673            durability: DurabilityKind::Volatile,
674            reliability: ReliabilityQos::default(),
675            ownership: zerodds_qos::OwnershipKind::Shared,
676            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
677            deadline: zerodds_qos::DeadlineQosPolicy::default(),
678            partition: alloc::vec::Vec::new(),
679            user_data: alloc::vec::Vec::new(),
680            topic_data: alloc::vec::Vec::new(),
681            group_data: alloc::vec::Vec::new(),
682            type_information: None,
683            data_representation: alloc::vec::Vec::new(),
684            content_filter: Some(cfp.clone()),
685            security_info: None,
686            service_instance_name: None,
687            related_entity_guid: None,
688            topic_aliases: None,
689            type_identifier: zerodds_types::TypeIdentifier::None,
690        };
691        let bytes = s.to_pl_cdr_le().unwrap();
692        let decoded = SubscriptionBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
693        assert_eq!(decoded.content_filter, Some(cfp));
694    }
695
696    fn sample_sub() -> SubscriptionBuiltinTopicData {
697        SubscriptionBuiltinTopicData {
698            key: Guid::new(
699                GuidPrefix::from_bytes([5; 12]),
700                EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
701            ),
702            participant_key: Guid::new(GuidPrefix::from_bytes([5; 12]), EntityId::PARTICIPANT),
703            topic_name: "Calc_Reply".into(),
704            type_name: "Calc::Reply".into(),
705            durability: DurabilityKind::Volatile,
706            reliability: ReliabilityQos::default(),
707            ownership: zerodds_qos::OwnershipKind::Shared,
708            liveliness: zerodds_qos::LivelinessQosPolicy::default(),
709            deadline: zerodds_qos::DeadlineQosPolicy::default(),
710            partition: alloc::vec::Vec::new(),
711            user_data: alloc::vec::Vec::new(),
712            topic_data: alloc::vec::Vec::new(),
713            group_data: alloc::vec::Vec::new(),
714            type_information: None,
715            data_representation: alloc::vec::Vec::new(),
716            content_filter: None,
717            security_info: None,
718            service_instance_name: None,
719            related_entity_guid: None,
720            topic_aliases: None,
721            type_identifier: zerodds_types::TypeIdentifier::None,
722        }
723    }
724
725    #[test]
726    fn rpc_discovery_pids_roundtrip_subscription() {
727        let mut s = sample_sub();
728        s.service_instance_name = Some("SrvInst".into());
729        s.related_entity_guid = Some(Guid::new(
730            GuidPrefix::from_bytes([5; 12]),
731            EntityId::user_writer_with_key([1, 2, 3]),
732        ));
733        s.topic_aliases = Some(alloc::vec!["Alias1".into(), "Alias2".into()]);
734        let bytes = s.to_pl_cdr_le().unwrap();
735        let decoded = SubscriptionBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
736        assert_eq!(decoded.service_instance_name, s.service_instance_name);
737        assert_eq!(decoded.related_entity_guid, s.related_entity_guid);
738        assert_eq!(decoded.topic_aliases, s.topic_aliases);
739    }
740
741    #[test]
742    fn rpc_pids_optional_legacy_subscription_parses_ok() {
743        let s = sample_sub();
744        let bytes = s.to_pl_cdr_le().unwrap();
745        let decoded = SubscriptionBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
746        assert!(decoded.service_instance_name.is_none());
747        assert!(decoded.related_entity_guid.is_none());
748        assert!(decoded.topic_aliases.is_none());
749    }
750}