Skip to main content

zerodds_rtps/
inline_qos.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Inline-QoS-Helper fuer DDS-RPC und SEDP (DDS-RPC 1.0 §7.8.2).
4//!
5//! Inline-QoS ist eine [`crate::parameter_list::ParameterList`] im Body
6//! einer DATA/DATA_FRAG-Submessage (Q-Flag, Spec §9.4.5.3). Phase 2 nutzt
7//! sie bereits fuer `PID_KEY_HASH`, `PID_STATUS_INFO` etc. — dieses Modul
8//! ergaenzt Helper fuer die RPC-spezifischen Inline-QoS-PIDs:
9//!
10//! * `PID_RELATED_SAMPLE_IDENTITY = 0x0083` (DDS-RPC 1.0 §7.8.2): wird
11//!   vom Reply-Writer in der Inline-QoS jeder Reply-DATA gesetzt; Wert
12//!   ist die `request_id` (`SampleIdentity` = 16 byte writer_guid +
13//!   8 byte sequence_number) des korrelierten Requests. Encoding:
14//!   XCDR2-Final, Alignment-Cap=4, also 24 byte ohne Padding.
15//!
16//! Wir kapseln Encode/Decode hier, damit `zerodds-rpc` ohne harte Abhaengigkeit
17//! auf RTPS-Internas wie `Parameter`-Layout arbeiten kann.
18
19use crate::error::WireError;
20use crate::parameter_list::{
21    MUST_UNDERSTAND_BIT, Parameter, ParameterList, VENDOR_SPECIFIC_BIT, pid,
22};
23
24/// Spec-Konstante: 16 byte writer-GUID + 8 byte sequence-number.
25pub const SAMPLE_IDENTITY_WIRE_SIZE: usize = 24;
26
27/// Wire-Repraesentation einer `SampleIdentity` (DDS-RPC 1.0 §7.5.1.1.1).
28///
29/// Identisch zum Layout in `zerodds-rpc::common_types::SampleIdentity`, hier
30/// aber als reiner Byte-Helper modelliert — `zerodds-rtps` darf nicht auf
31/// `zerodds-rpc` zurueckgreifen (Crate-Abhaengigkeitsrichtung).
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
33pub struct SampleIdentityBytes {
34    /// 16 byte writer-GUID.
35    pub writer_guid: [u8; 16],
36    /// 64-bit Sequence-Number.
37    pub sequence_number: u64,
38}
39
40impl SampleIdentityBytes {
41    /// Konstruktor.
42    #[must_use]
43    pub const fn new(writer_guid: [u8; 16], sequence_number: u64) -> Self {
44        Self {
45            writer_guid,
46            sequence_number,
47        }
48    }
49
50    /// XCDR2-Final-Encoder mit gegebener Endianness.
51    /// Layout: 16 byte GUID + 8 byte u64 (Alignment-Cap=4 ⇒ kein Padding).
52    #[must_use]
53    pub fn to_bytes(&self, little_endian: bool) -> [u8; SAMPLE_IDENTITY_WIRE_SIZE] {
54        let mut out = [0u8; SAMPLE_IDENTITY_WIRE_SIZE];
55        out[..16].copy_from_slice(&self.writer_guid);
56        let sn = if little_endian {
57            self.sequence_number.to_le_bytes()
58        } else {
59            self.sequence_number.to_be_bytes()
60        };
61        out[16..].copy_from_slice(&sn);
62        out
63    }
64
65    /// XCDR2-Final-Decoder.
66    ///
67    /// # Errors
68    /// `WireError::UnexpectedEof` wenn Buffer kuerzer als 24 byte ist.
69    pub fn from_bytes(bytes: &[u8], little_endian: bool) -> Result<Self, WireError> {
70        if bytes.len() < SAMPLE_IDENTITY_WIRE_SIZE {
71            return Err(WireError::UnexpectedEof {
72                needed: SAMPLE_IDENTITY_WIRE_SIZE,
73                offset: 0,
74            });
75        }
76        let mut writer_guid = [0u8; 16];
77        writer_guid.copy_from_slice(&bytes[..16]);
78        let mut sn = [0u8; 8];
79        sn.copy_from_slice(&bytes[16..24]);
80        let sequence_number = if little_endian {
81            u64::from_le_bytes(sn)
82        } else {
83            u64::from_be_bytes(sn)
84        };
85        Ok(Self {
86            writer_guid,
87            sequence_number,
88        })
89    }
90}
91
92/// Baut einen `Parameter` mit `PID_RELATED_SAMPLE_IDENTITY` (0x0083) und
93/// dem 24-byte XCDR2-Encoding der `SampleIdentity` als Value.
94#[must_use]
95pub fn related_sample_identity_param(id: SampleIdentityBytes, little_endian: bool) -> Parameter {
96    Parameter::new(
97        pid::RELATED_SAMPLE_IDENTITY,
98        id.to_bytes(little_endian).to_vec(),
99    )
100}
101
102/// Baut eine Inline-QoS-`ParameterList`, die nur den
103/// `PID_RELATED_SAMPLE_IDENTITY` traegt — gut genug fuer die Reply-DATA
104/// einer einfachen RPC-Operation. Caller koennen weitere Parameter
105/// (`PID_KEY_HASH` etc.) per `push` ergaenzen.
106#[must_use]
107pub fn reply_inline_qos(id: SampleIdentityBytes, little_endian: bool) -> ParameterList {
108    let mut pl = ParameterList::new();
109    pl.push(related_sample_identity_param(id, little_endian));
110    pl
111}
112
113/// Spec §9.6.3.9 — `PID_STATUS_INFO`-Bits.
114pub mod status_info {
115    /// Bit 0: Sample wurde via `dispose` als NOT_ALIVE_DISPOSED markiert.
116    pub const DISPOSED: u32 = 0x0000_0001;
117    /// Bit 1: Sample wurde via `unregister_instance` als
118    /// NOT_ALIVE_NO_WRITERS markiert.
119    pub const UNREGISTERED: u32 = 0x0000_0002;
120    /// Bit 2: Sample wurde vom Writer per Content-Filter gefiltert.
121    pub const FILTERED: u32 = 0x0000_0004;
122}
123
124/// Baut einen `Parameter` mit `PID_STATUS_INFO` (0x0071). Wert ist ein
125/// 4-byte Statusword (Bits per [`status_info`]). Spec verlangt
126/// **Big-Endian**-Encoding unabhaengig vom RTPS-Header-Endianess
127/// (DDSI-RTPS 2.5 §9.6.3.9).
128#[must_use]
129pub fn status_info_param(bits: u32) -> Parameter {
130    Parameter::new(pid::STATUS_INFO, bits.to_be_bytes().to_vec())
131}
132
133/// Liest `PID_STATUS_INFO` aus einer Inline-QoS-Liste. Liefert das
134/// 4-byte Statusword, oder `None` wenn die PID fehlt / das Value
135/// nicht 4 byte ist.
136#[must_use]
137pub fn find_status_info(pl: &ParameterList) -> Option<u32> {
138    let p = pl.find(pid::STATUS_INFO)?;
139    if p.value.len() != 4 {
140        return None;
141    }
142    let mut b = [0u8; 4];
143    b.copy_from_slice(&p.value);
144    Some(u32::from_be_bytes(b))
145}
146
147/// Liest `PID_KEY_HASH` aus einer Inline-QoS-Liste (Spec §9.6.4.8 +
148/// XTypes 1.3 §7.6.8). Liefert die 16-byte Identitaet der Instanz,
149/// oder `None` wenn die PID fehlt / das Value eine unzulaessige
150/// Laenge hat.
151#[must_use]
152pub fn find_key_hash(pl: &ParameterList) -> Option<[u8; 16]> {
153    let p = pl.find(pid::KEY_HASH)?;
154    if p.value.len() != 16 {
155        return None;
156    }
157    let mut b = [0u8; 16];
158    b.copy_from_slice(&p.value);
159    Some(b)
160}
161
162/// Baut eine Inline-QoS-`ParameterList` fuer einen Lifecycle-Marker —
163/// `PID_KEY_HASH` (16 byte) + `PID_STATUS_INFO` (4 byte). Wird vom
164/// Writer beim `dispose`/`unregister_instance` gesendet.
165#[must_use]
166pub fn lifecycle_inline_qos(key_hash: [u8; 16], status_bits: u32) -> ParameterList {
167    let mut pl = ParameterList::new();
168    pl.push(Parameter::new(pid::KEY_HASH, key_hash.to_vec()));
169    pl.push(status_info_param(status_bits));
170    pl
171}
172
173/// Spec §8.7.9 — `PID_ORIGINAL_WRITER_INFO` als Inline-QoS. 24-byte
174/// Wert: 16 byte original GUID + 8 byte SequenceNumber (signed i64).
175/// Vom Persistence-Service gesetzt, wenn er ein historisches Sample
176/// im Auftrag eines anderen Writers weiterleitet.
177#[must_use]
178pub fn original_writer_info_param(
179    original_guid: [u8; 16],
180    original_sn: i64,
181    little_endian: bool,
182) -> Parameter {
183    let mut value = alloc::vec::Vec::with_capacity(24);
184    value.extend_from_slice(&original_guid);
185    let sn_bytes = if little_endian {
186        original_sn.to_le_bytes()
187    } else {
188        original_sn.to_be_bytes()
189    };
190    value.extend_from_slice(&sn_bytes);
191    Parameter::new(pid::ORIGINAL_WRITER_INFO, value)
192}
193
194/// Liest `PID_ORIGINAL_WRITER_INFO` aus einer Inline-QoS-Liste.
195///
196/// # Errors
197/// `WireError::UnexpectedEof` wenn der Value-Slice unter 24 byte liegt.
198pub fn find_original_writer_info(
199    pl: &ParameterList,
200    little_endian: bool,
201) -> Result<Option<([u8; 16], i64)>, WireError> {
202    let target = pid::ORIGINAL_WRITER_INFO;
203    for p in &pl.parameters {
204        let masked = p.id & !(MUST_UNDERSTAND_BIT | VENDOR_SPECIFIC_BIT);
205        if masked == target {
206            if p.value.len() < 24 {
207                return Err(WireError::UnexpectedEof {
208                    needed: 24,
209                    offset: 0,
210                });
211            }
212            let mut g = [0u8; 16];
213            g.copy_from_slice(&p.value[..16]);
214            let mut s = [0u8; 8];
215            s.copy_from_slice(&p.value[16..24]);
216            let sn = if little_endian {
217                i64::from_le_bytes(s)
218            } else {
219                i64::from_be_bytes(s)
220            };
221            return Ok(Some((g, sn)));
222        }
223    }
224    Ok(None)
225}
226
227/// Spec §8.7.7 / §9.6.2.2.5 — `PID_DIRECTED_WRITE` als Inline-QoS.
228/// Markiert ein Sample als Punkt-zu-Punkt-Send an einen einzigen
229/// Ziel-Reader (16 byte GUID). Andere Reader, die das Sample
230/// empfangen (z.B. via Multicast), MUESSEN es verwerfen.
231#[must_use]
232pub fn directed_write_param(target_reader_guid: [u8; 16]) -> Parameter {
233    Parameter::new(pid::DIRECTED_WRITE, target_reader_guid.to_vec())
234}
235
236/// Liest `PID_DIRECTED_WRITE` aus einer Inline-QoS-Liste.
237///
238/// Liefert `Ok(None)` wenn die PID nicht gesetzt ist (Sample ist nicht
239/// directed). Ansonsten liefert die 16-byte Ziel-GUID.
240///
241/// # Errors
242/// `WireError::UnexpectedEof` wenn der Value-Slice unter 16 byte liegt.
243pub fn find_directed_write(pl: &ParameterList) -> Result<Option<[u8; 16]>, WireError> {
244    let target = pid::DIRECTED_WRITE;
245    for p in &pl.parameters {
246        let masked = p.id & !(MUST_UNDERSTAND_BIT | VENDOR_SPECIFIC_BIT);
247        if masked == target {
248            if p.value.len() < 16 {
249                return Err(WireError::UnexpectedEof {
250                    needed: 16,
251                    offset: 0,
252                });
253            }
254            let mut g = [0u8; 16];
255            g.copy_from_slice(&p.value[..16]);
256            return Ok(Some(g));
257        }
258    }
259    Ok(None)
260}
261
262/// Spec §8.7.7 — Receiver-side Filter: liefert `true`, wenn das Sample
263/// an den Reader mit `own_reader_guid` adressiert ist (oder kein
264/// Directed-Write gesetzt ist). `false` wenn ein PID_DIRECTED_WRITE
265/// vorhanden ist und der Wert NICHT mit own_guid uebereinstimmt — der
266/// Caller MUSS dann das Sample verwerfen.
267///
268/// # Errors
269/// Wire-Decoding-Fehler aus `find_directed_write`.
270pub fn directed_write_matches_reader(
271    pl: &ParameterList,
272    own_reader_guid: [u8; 16],
273) -> Result<bool, WireError> {
274    Ok(match find_directed_write(pl)? {
275        None => true,
276        Some(target) => target == own_reader_guid,
277    })
278}
279
280/// Liest `PID_RELATED_SAMPLE_IDENTITY` aus einer Inline-QoS-Liste.
281///
282/// Maskiert Must-Understand-Bit + Vendor-Bit. Liefert `Ok(None)` wenn
283/// die PID nicht vorhanden ist (Inline-QoS ohne RPC-Korrelation —
284/// z.B. ein User-Sample-DATA das nichts mit RPC zu tun hat).
285///
286/// # Errors
287/// `WireError::UnexpectedEof` wenn der Value-Slice unter 24 byte liegt.
288pub fn find_related_sample_identity(
289    pl: &ParameterList,
290    little_endian: bool,
291) -> Result<Option<SampleIdentityBytes>, WireError> {
292    let target = pid::RELATED_SAMPLE_IDENTITY;
293    for p in &pl.parameters {
294        let masked = p.id & !(MUST_UNDERSTAND_BIT | VENDOR_SPECIFIC_BIT);
295        if masked == target {
296            return Ok(Some(SampleIdentityBytes::from_bytes(
297                &p.value,
298                little_endian,
299            )?));
300        }
301    }
302    Ok(None)
303}
304
305#[cfg(test)]
306#[allow(clippy::expect_used, clippy::unwrap_used)]
307mod tests {
308    use super::*;
309    use alloc::sync::Arc;
310
311    fn sample_id() -> SampleIdentityBytes {
312        SampleIdentityBytes::new([0xAB; 16], 0x0102_0304_0506_0708)
313    }
314
315    #[test]
316    fn sample_identity_layout_is_24_bytes() {
317        let id = sample_id();
318        let bytes_le = id.to_bytes(true);
319        let bytes_be = id.to_bytes(false);
320        assert_eq!(bytes_le.len(), SAMPLE_IDENTITY_WIRE_SIZE);
321        assert_eq!(bytes_be.len(), SAMPLE_IDENTITY_WIRE_SIZE);
322        // Erste 16 byte = GUID (endianness-unabhaengig).
323        assert_eq!(&bytes_le[..16], &[0xAB; 16]);
324        assert_eq!(&bytes_be[..16], &[0xAB; 16]);
325        // Letzte 8 byte = Sequence-Number, je Endianness anders.
326        assert_eq!(&bytes_le[16..], &0x0102_0304_0506_0708u64.to_le_bytes());
327        assert_eq!(&bytes_be[16..], &0x0102_0304_0506_0708u64.to_be_bytes());
328    }
329
330    #[test]
331    fn sample_identity_roundtrip_le() {
332        let id = sample_id();
333        let bytes = id.to_bytes(true);
334        assert_eq!(SampleIdentityBytes::from_bytes(&bytes, true).unwrap(), id);
335    }
336
337    #[test]
338    fn sample_identity_roundtrip_be() {
339        let id = sample_id();
340        let bytes = id.to_bytes(false);
341        assert_eq!(SampleIdentityBytes::from_bytes(&bytes, false).unwrap(), id);
342    }
343
344    #[test]
345    fn sample_identity_too_short_is_eof_error() {
346        let res = SampleIdentityBytes::from_bytes(&[0u8; 10], true);
347        assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
348    }
349
350    #[test]
351    fn related_sample_identity_param_uses_pid_0x0083() {
352        let p = related_sample_identity_param(sample_id(), true);
353        assert_eq!(p.id, 0x0083);
354        assert_eq!(p.id, pid::RELATED_SAMPLE_IDENTITY);
355        assert_eq!(p.value.len(), SAMPLE_IDENTITY_WIRE_SIZE);
356    }
357
358    #[test]
359    fn reply_inline_qos_contains_only_pid_0x0083() {
360        let pl = reply_inline_qos(sample_id(), true);
361        assert_eq!(pl.parameters.len(), 1);
362        assert_eq!(pl.parameters[0].id, pid::RELATED_SAMPLE_IDENTITY);
363    }
364
365    #[test]
366    fn find_related_sample_identity_finds_pid() {
367        let id = sample_id();
368        let pl = reply_inline_qos(id, true);
369        let got = find_related_sample_identity(&pl, true).unwrap();
370        assert_eq!(got, Some(id));
371    }
372
373    #[test]
374    fn find_related_sample_identity_missing_returns_none() {
375        let pl = ParameterList::new();
376        let got = find_related_sample_identity(&pl, true).unwrap();
377        assert_eq!(got, None);
378    }
379
380    #[test]
381    fn find_related_sample_identity_with_must_understand_bit_works() {
382        // Cyclone setzt Inline-QoS-PIDs gerne mit MU-Bit. Maskierung muss
383        // greifen.
384        let id = sample_id();
385        let mut pl = ParameterList::new();
386        pl.push(Parameter::new(
387            MUST_UNDERSTAND_BIT | pid::RELATED_SAMPLE_IDENTITY,
388            id.to_bytes(true).to_vec(),
389        ));
390        let got = find_related_sample_identity(&pl, true).unwrap();
391        assert_eq!(got, Some(id));
392    }
393
394    #[test]
395    fn find_related_sample_identity_truncated_value_is_error() {
396        let mut pl = ParameterList::new();
397        pl.push(Parameter::new(
398            pid::RELATED_SAMPLE_IDENTITY,
399            alloc::vec![0u8; 8],
400        ));
401        let res = find_related_sample_identity(&pl, true);
402        assert!(matches!(res, Err(WireError::UnexpectedEof { .. })));
403    }
404
405    #[test]
406    fn roundtrip_in_data_submessage_inline_qos() {
407        // E2E: DataSubmessage mit Q-Flag + PID 0x0083 in Inline-QoS,
408        // dann roundtrip durch write_body / read_body_with_flags.
409        use crate::submessages::{DATA_FLAG_INLINE_QOS, DataSubmessage};
410        use crate::wire_types::{EntityId, SequenceNumber};
411
412        let id = sample_id();
413        let inline = reply_inline_qos(id, true);
414        let payload: Arc<[u8]> = Arc::from(alloc::vec![1u8, 2, 3, 4].as_slice());
415        let data = DataSubmessage {
416            extra_flags: 0,
417            reader_id: EntityId::user_reader_with_key([1, 2, 3]),
418            writer_id: EntityId::user_writer_with_key([4, 5, 6]),
419            writer_sn: SequenceNumber(42),
420            inline_qos: Some(inline),
421            key_flag: false,
422            non_standard_flag: false,
423            serialized_payload: payload.clone(),
424        };
425        let (body, flags) = data.write_body(true);
426        assert_ne!(flags & DATA_FLAG_INLINE_QOS, 0);
427
428        let decoded = DataSubmessage::read_body_with_flags(&body, true, flags).unwrap();
429        assert!(decoded.inline_qos.is_some());
430        let pl = decoded.inline_qos.unwrap();
431        let back = find_related_sample_identity(&pl, true).unwrap();
432        assert_eq!(back, Some(id));
433        // Payload byte-identisch.
434        assert_eq!(&decoded.serialized_payload[..], &[1, 2, 3, 4]);
435    }
436
437    // ---- Spec §8.7.7 Directed Write ----
438
439    #[test]
440    fn directed_write_param_carries_target_guid() {
441        let target = [0xCA; 16];
442        let p = directed_write_param(target);
443        assert_eq!(p.id, pid::DIRECTED_WRITE);
444        assert_eq!(p.value, target.to_vec());
445    }
446
447    #[test]
448    fn find_directed_write_returns_target_when_present() {
449        let mut pl = ParameterList::new();
450        let target = [0xAB; 16];
451        pl.push(directed_write_param(target));
452        assert_eq!(find_directed_write(&pl).unwrap(), Some(target));
453    }
454
455    #[test]
456    fn find_directed_write_returns_none_when_absent() {
457        let pl = ParameterList::new();
458        assert_eq!(find_directed_write(&pl).unwrap(), None);
459    }
460
461    #[test]
462    fn find_directed_write_rejects_truncated_value() {
463        let mut pl = ParameterList::new();
464        pl.push(Parameter::new(pid::DIRECTED_WRITE, alloc::vec![0; 8]));
465        let r = find_directed_write(&pl);
466        assert!(matches!(r, Err(WireError::UnexpectedEof { .. })));
467    }
468
469    #[test]
470    fn directed_write_matches_reader_returns_true_for_matching_target() {
471        let mut pl = ParameterList::new();
472        let me = [0xAB; 16];
473        pl.push(directed_write_param(me));
474        assert!(directed_write_matches_reader(&pl, me).unwrap());
475    }
476
477    #[test]
478    fn directed_write_matches_reader_returns_false_for_other_target() {
479        let mut pl = ParameterList::new();
480        pl.push(directed_write_param([0xAB; 16]));
481        let other = [0xCD; 16];
482        assert!(!directed_write_matches_reader(&pl, other).unwrap());
483    }
484
485    #[test]
486    fn directed_write_matches_reader_returns_true_when_no_directed_write() {
487        let pl = ParameterList::new();
488        // Kein Directed-Write → jeder Reader passt.
489        assert!(directed_write_matches_reader(&pl, [0; 16]).unwrap());
490    }
491
492    // ---- Spec §8.7.9 OriginalWriterInfo ----
493
494    #[test]
495    fn original_writer_info_param_24_byte_layout_le() {
496        let p = original_writer_info_param([0xAB; 16], 0x0102_0304_0506_0708, true);
497        assert_eq!(p.id, pid::ORIGINAL_WRITER_INFO);
498        assert_eq!(p.value.len(), 24);
499        assert_eq!(&p.value[..16], &[0xAB; 16]);
500        assert_eq!(&p.value[16..], &0x0102_0304_0506_0708i64.to_le_bytes());
501    }
502
503    #[test]
504    fn original_writer_info_roundtrip_le() {
505        let mut pl = ParameterList::new();
506        pl.push(original_writer_info_param([0xCD; 16], 42, true));
507        let back = find_original_writer_info(&pl, true).unwrap();
508        assert_eq!(back, Some(([0xCD; 16], 42)));
509    }
510
511    #[test]
512    fn original_writer_info_roundtrip_be() {
513        let mut pl = ParameterList::new();
514        pl.push(original_writer_info_param([0xCD; 16], 42, false));
515        let back = find_original_writer_info(&pl, false).unwrap();
516        assert_eq!(back, Some(([0xCD; 16], 42)));
517    }
518
519    #[test]
520    fn find_original_writer_info_returns_none_when_absent() {
521        let pl = ParameterList::new();
522        assert_eq!(find_original_writer_info(&pl, true).unwrap(), None);
523    }
524
525    #[test]
526    fn find_original_writer_info_rejects_truncated() {
527        let mut pl = ParameterList::new();
528        pl.push(Parameter::new(
529            pid::ORIGINAL_WRITER_INFO,
530            alloc::vec![0; 16],
531        ));
532        let r = find_original_writer_info(&pl, true);
533        assert!(matches!(r, Err(WireError::UnexpectedEof { .. })));
534    }
535
536    #[test]
537    fn vendor_trace_context_pid_constant() {
538        assert_eq!(pid::VENDOR_TRACE_CONTEXT, 0x0D00);
539    }
540
541    #[test]
542    fn vendor_trace_context_param_roundtrip() {
543        let mut pl = ParameterList::new();
544        let payload = alloc::vec![1, 2, 3, 4, 5, 6, 7, 8];
545        pl.push(Parameter::new(pid::VENDOR_TRACE_CONTEXT, payload.clone()));
546        let bytes = pl.to_bytes(true);
547        let pl2 = ParameterList::from_bytes(&bytes, true).expect("decode");
548        let p = pl2.find(pid::VENDOR_TRACE_CONTEXT).expect("present");
549        assert_eq!(p.value, payload);
550    }
551}