Skip to main content

zerodds_discovery/security/
stateless.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Builtin-Endpoint `DCPSParticipantStatelessMessage` — DDS-Security 1.2
4//! §7.4.4 + §10.3.4.
5//!
6//! Wire-Profil:
7//! - Reliability: BestEffort (Spec §7.5.3 — "stateless").
8//! - Durability:  Volatile.
9//! - Topic-Type:  `ParticipantGenericMessage` (Spec §7.5.5), encoded mit
10//!   4-Byte CDR-LE-Encapsulation-Header + XCDR1-Body via
11//!   [`security_runtime::builtin_topics::encode_generic_message`].
12//! - EntityIds:   `BUILTIN_PARTICIPANT_STATELESS_MESSAGE_{WRITER,READER}`.
13//!
14//! Wir nutzen **kein** [`zerodds_rtps::ReliableWriter`], weil Stateless
15//! laut Spec keinen Empfangs-Status, keine HEARTBEATs und keine
16//! AckNack-Loop hat. Stattdessen: simple Multi-Reader-Fan-out-Liste; pro
17//! `write()` wird ein DATA-Datagramm pro [`ReaderProxy`] erzeugt.
18//!
19//! C3.4-c-Scope: keine Plugin-Pipeline-Logik im Reader. Der Reader
20//! dekodiert die `ParticipantGenericMessage` und reicht sie an den
21//! Caller — der Auth-Plugin-Hook (Spec §10.3.4.1) wird vom DCPS-Layer
22//! oben drauf gesetzt.
23
24extern crate alloc;
25
26use alloc::rc::Rc;
27use alloc::vec::Vec;
28
29use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
30use zerodds_rtps::error::WireError;
31use zerodds_rtps::header::RtpsHeader;
32use zerodds_rtps::message_builder::OutboundDatagram;
33use zerodds_rtps::reader_proxy::ReaderProxy;
34use zerodds_rtps::submessages::DataSubmessage;
35use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
36use zerodds_rtps::writer_proxy::WriterProxy;
37
38use zerodds_security::error::{SecurityError, SecurityErrorKind, SecurityResult};
39use zerodds_security::generic_message::ParticipantGenericMessage;
40
41use crate::security::codec::{decode_generic_message, encode_generic_message};
42
43/// Stateless-Message-Writer (Spec §7.4.4 + §10.3.4).
44///
45/// Pflegt Multi-Reader-Fan-out fuer den
46/// `BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER`-Endpoint. Jedes
47/// `write()` erzeugt pro registriertem [`ReaderProxy`] ein
48/// [`OutboundDatagram`] — kein Cache, kein Resend, kein HEARTBEAT
49/// (Stateless = kein Empfangs-Status).
50#[derive(Debug)]
51pub struct StatelessMessageWriter {
52    guid: Guid,
53    vendor_id: VendorId,
54    next_sn: i64,
55    reader_proxies: Vec<ReaderProxy>,
56}
57
58impl StatelessMessageWriter {
59    /// Erzeugt einen Writer fuer den lokalen Participant.
60    #[must_use]
61    pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
62        Self {
63            guid: Guid::new(
64                participant_prefix,
65                EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
66            ),
67            vendor_id,
68            next_sn: 1,
69            reader_proxies: Vec::new(),
70        }
71    }
72
73    /// GUID des Writers.
74    #[must_use]
75    pub fn guid(&self) -> Guid {
76        self.guid
77    }
78
79    /// Read-only-Slice der registrierten Reader-Proxies.
80    #[must_use]
81    pub fn reader_proxies(&self) -> &[ReaderProxy] {
82        &self.reader_proxies
83    }
84
85    /// Anzahl registrierter Reader-Proxies.
86    #[must_use]
87    pub fn reader_proxy_count(&self) -> usize {
88        self.reader_proxies.len()
89    }
90
91    /// Fuegt einen Reader-Proxy hinzu (idempotent: gleiche GUID
92    /// ueberschreibt).
93    pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
94        let guid = proxy.remote_reader_guid;
95        if let Some(idx) = self
96            .reader_proxies
97            .iter()
98            .position(|p| p.remote_reader_guid == guid)
99        {
100            self.reader_proxies[idx] = proxy;
101        } else {
102            self.reader_proxies.push(proxy);
103        }
104    }
105
106    /// Entfernt einen Reader-Proxy. Liefert ihn zurueck, falls
107    /// vorhanden.
108    pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
109        let idx = self
110            .reader_proxies
111            .iter()
112            .position(|p| p.remote_reader_guid == guid)?;
113        Some(self.reader_proxies.remove(idx))
114    }
115
116    /// Sendet eine `ParticipantGenericMessage` an alle Reader-Proxies.
117    ///
118    /// Liefert ein Datagram pro Proxy (oder leer, wenn keine
119    /// registriert sind).
120    ///
121    /// # Errors
122    /// `WireError::ValueOutOfRange` bei Sequence-Number-Overflow oder
123    /// `WireError::*` aus der DATA-Encoding-Pipeline.
124    pub fn write(
125        &mut self,
126        msg: &ParticipantGenericMessage,
127    ) -> Result<Vec<OutboundDatagram>, WireError> {
128        if self.reader_proxies.is_empty() {
129            return Ok(Vec::new());
130        }
131        let payload = encode_generic_message(msg);
132        let sn = SequenceNumber(self.next_sn);
133        self.next_sn = self
134            .next_sn
135            .checked_add(1)
136            .ok_or(WireError::ValueOutOfRange {
137                message: "stateless writer sequence overflow",
138            })?;
139
140        let mut out = Vec::with_capacity(self.reader_proxies.len());
141        for proxy in &self.reader_proxies {
142            let data = DataSubmessage {
143                extra_flags: 0,
144                reader_id: proxy.remote_reader_guid.entity_id,
145                writer_id: self.guid.entity_id,
146                writer_sn: sn,
147                inline_qos: None,
148                key_flag: false,
149                non_standard_flag: false,
150                serialized_payload: payload.clone().into(),
151            };
152            let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
153            let bytes = encode_data_datagram(header, &[data])?;
154            // Ziel = Unicast-Locators des Proxies (Multicast ignorieren
155            // wir bei Stateless: Auth-Handshake ist immer Punkt-zu-Punkt).
156            let targets = Rc::new(proxy.unicast_locators.clone());
157            out.push(OutboundDatagram { bytes, targets });
158        }
159        Ok(out)
160    }
161}
162
163/// Stateless-Message-Reader (Spec §7.4.4 + §10.3.4).
164///
165/// Decoded eingehende DATA-Submessages, die an den
166/// `BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER` adressiert sind.
167/// Stateless: kein History-Cache, kein ACKNACK, kein Heartbeat-State.
168/// Ein bekannter Writer-Proxy ist fuer Source-Authenticity-Checks
169/// optional — der Reader liefert die Message immer, der Auth-Plugin-
170/// Hook entscheidet nach dem `source_guid`-Feld.
171#[derive(Debug)]
172pub struct StatelessMessageReader {
173    guid: Guid,
174    #[allow(dead_code)]
175    vendor_id: VendorId,
176    /// Bekannte Writer-Proxies. Der Reader nutzt sie nicht fuers
177    /// Filtering (Stateless akzeptiert von jedem Writer), sondern fuer
178    /// Caller-Diagnose (`writer_proxy_count`).
179    writer_proxies: Vec<WriterProxy>,
180}
181
182impl StatelessMessageReader {
183    /// Erzeugt einen Reader fuer den lokalen Participant.
184    #[must_use]
185    pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
186        Self {
187            guid: Guid::new(
188                participant_prefix,
189                EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
190            ),
191            vendor_id,
192            writer_proxies: Vec::new(),
193        }
194    }
195
196    /// GUID des Readers.
197    #[must_use]
198    pub fn guid(&self) -> Guid {
199        self.guid
200    }
201
202    /// Anzahl registrierter Writer-Proxies.
203    #[must_use]
204    pub fn writer_proxy_count(&self) -> usize {
205        self.writer_proxies.len()
206    }
207
208    /// Read-only-Slice der registrierten Writer-Proxies.
209    #[must_use]
210    pub fn writer_proxies(&self) -> &[WriterProxy] {
211        &self.writer_proxies
212    }
213
214    /// Fuegt einen Writer-Proxy hinzu (idempotent).
215    pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
216        let guid = proxy.remote_writer_guid;
217        if let Some(idx) = self
218            .writer_proxies
219            .iter()
220            .position(|p| p.remote_writer_guid == guid)
221        {
222            self.writer_proxies[idx] = proxy;
223        } else {
224            self.writer_proxies.push(proxy);
225        }
226    }
227
228    /// Entfernt einen Writer-Proxy.
229    pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
230        let idx = self
231            .writer_proxies
232            .iter()
233            .position(|p| p.remote_writer_guid == guid)?;
234        Some(self.writer_proxies.remove(idx))
235    }
236
237    /// Verarbeitet eine eingehende DATA-Submessage und decoded sie zu
238    /// einer `ParticipantGenericMessage`.
239    ///
240    /// # Errors
241    /// `BadArgument` wenn die Encapsulation/CDR-Decode scheitert.
242    pub fn handle_data(
243        &mut self,
244        data: &DataSubmessage,
245    ) -> SecurityResult<ParticipantGenericMessage> {
246        decode_generic_message(&data.serialized_payload)
247    }
248
249    /// Verarbeitet ein vollstaendiges RTPS-Datagram. Liefert alle
250    /// dekodierten Stateless-Messages aus diesem Datagramm.
251    ///
252    /// # Errors
253    /// - `BadArgument` wenn das Datagram nicht parst (Wire-Decoder-
254    ///   Fehler) oder eine relevante DATA-Submessage einen kaputten
255    ///   Generic-Message-Body hat.
256    pub fn handle_datagram(
257        &mut self,
258        datagram: &[u8],
259    ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
260        let parsed = decode_datagram(datagram).map_err(|_| {
261            SecurityError::new(
262                SecurityErrorKind::BadArgument,
263                "stateless reader: wire decode failed",
264            )
265        })?;
266        let mut out = Vec::new();
267        for sub in parsed.submessages {
268            if let ParsedSubmessage::Data(d) = sub {
269                if d.reader_id == self.guid.entity_id
270                    || d.writer_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
271                {
272                    out.push(decode_generic_message(&d.serialized_payload)?);
273                }
274            }
275        }
276        Ok(out)
277    }
278}
279
280#[cfg(test)]
281#[allow(
282    clippy::expect_used,
283    clippy::unwrap_used,
284    clippy::panic,
285    clippy::unreachable
286)]
287mod tests {
288    use super::*;
289    use zerodds_rtps::wire_types::Locator;
290    use zerodds_security::generic_message::{MessageIdentity, class_id};
291    use zerodds_security::token::DataHolder;
292
293    fn sample_msg(seq: i64) -> ParticipantGenericMessage {
294        ParticipantGenericMessage {
295            message_identity: MessageIdentity {
296                source_guid: [0xAA; 16],
297                sequence_number: seq,
298            },
299            related_message_identity: MessageIdentity::default(),
300            destination_participant_key: [0xBB; 16],
301            destination_endpoint_key: [0; 16],
302            source_endpoint_key: [0xCC; 16],
303            message_class_id: class_id::AUTH_REQUEST.into(),
304            message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
305        }
306    }
307
308    fn local_prefix() -> GuidPrefix {
309        GuidPrefix::from_bytes([1; 12])
310    }
311
312    fn remote_prefix() -> GuidPrefix {
313        GuidPrefix::from_bytes([2; 12])
314    }
315
316    #[test]
317    fn writer_has_expected_entity_id() {
318        let w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
319        assert_eq!(
320            w.guid().entity_id,
321            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
322        );
323        assert_eq!(w.guid().prefix, local_prefix());
324    }
325
326    #[test]
327    fn reader_has_expected_entity_id() {
328        let r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
329        assert_eq!(
330            r.guid().entity_id,
331            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
332        );
333    }
334
335    #[test]
336    fn write_without_proxies_returns_empty() {
337        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
338        let dgs = w.write(&sample_msg(1)).unwrap();
339        assert!(dgs.is_empty(), "no proxies → no fan-out");
340    }
341
342    #[test]
343    fn write_to_one_proxy_produces_one_datagram() {
344        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
345        let remote = Guid::new(
346            remote_prefix(),
347            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
348        );
349        w.add_reader_proxy(ReaderProxy::new(
350            remote,
351            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
352            alloc::vec![],
353            false,
354        ));
355        let dgs = w.write(&sample_msg(1)).unwrap();
356        assert_eq!(dgs.len(), 1);
357        assert_eq!(dgs[0].targets.len(), 1);
358    }
359
360    #[test]
361    fn write_to_two_proxies_produces_two_datagrams() {
362        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
363        let remote_a = Guid::new(
364            GuidPrefix::from_bytes([2; 12]),
365            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
366        );
367        let remote_b = Guid::new(
368            GuidPrefix::from_bytes([3; 12]),
369            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
370        );
371        w.add_reader_proxy(ReaderProxy::new(
372            remote_a,
373            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
374            alloc::vec![],
375            false,
376        ));
377        w.add_reader_proxy(ReaderProxy::new(
378            remote_b,
379            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7412)],
380            alloc::vec![],
381            false,
382        ));
383        assert_eq!(w.reader_proxy_count(), 2);
384        let dgs = w.write(&sample_msg(1)).unwrap();
385        assert_eq!(dgs.len(), 2);
386    }
387
388    #[test]
389    fn add_reader_proxy_is_idempotent() {
390        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
391        let remote = Guid::new(
392            remote_prefix(),
393            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
394        );
395        w.add_reader_proxy(ReaderProxy::new(
396            remote,
397            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
398            alloc::vec![],
399            false,
400        ));
401        w.add_reader_proxy(ReaderProxy::new(
402            remote,
403            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
404            alloc::vec![],
405            false,
406        ));
407        assert_eq!(w.reader_proxy_count(), 1);
408    }
409
410    #[test]
411    fn remove_reader_proxy_returns_proxy() {
412        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
413        let remote = Guid::new(
414            remote_prefix(),
415            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
416        );
417        w.add_reader_proxy(ReaderProxy::new(
418            remote,
419            alloc::vec![],
420            alloc::vec![],
421            false,
422        ));
423        let removed = w.remove_reader_proxy(remote);
424        assert!(removed.is_some());
425        assert_eq!(w.reader_proxy_count(), 0);
426        assert!(w.remove_reader_proxy(remote).is_none());
427    }
428
429    #[test]
430    fn write_increments_sequence_number() {
431        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
432        let remote = Guid::new(
433            remote_prefix(),
434            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
435        );
436        w.add_reader_proxy(ReaderProxy::new(
437            remote,
438            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
439            alloc::vec![],
440            false,
441        ));
442        let dg1 = w.write(&sample_msg(1)).unwrap()[0].clone();
443        let dg2 = w.write(&sample_msg(2)).unwrap()[0].clone();
444        // Decoded SN aus den Wire-Bytes
445        let p1 = decode_datagram(&dg1.bytes).unwrap();
446        let p2 = decode_datagram(&dg2.bytes).unwrap();
447        let sn1 = match &p1.submessages[0] {
448            ParsedSubmessage::Data(d) => d.writer_sn,
449            _ => unreachable!(),
450        };
451        let sn2 = match &p2.submessages[0] {
452            ParsedSubmessage::Data(d) => d.writer_sn,
453            _ => unreachable!(),
454        };
455        assert_eq!(sn1, SequenceNumber(1));
456        assert_eq!(sn2, SequenceNumber(2));
457    }
458
459    #[test]
460    fn write_carries_writer_entity_id_on_wire() {
461        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
462        let remote = Guid::new(
463            remote_prefix(),
464            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
465        );
466        w.add_reader_proxy(ReaderProxy::new(
467            remote,
468            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
469            alloc::vec![],
470            false,
471        ));
472        let dgs = w.write(&sample_msg(1)).unwrap();
473        let parsed = decode_datagram(&dgs[0].bytes).unwrap();
474        match &parsed.submessages[0] {
475            ParsedSubmessage::Data(d) => {
476                assert_eq!(
477                    d.writer_id,
478                    EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
479                );
480                assert_eq!(
481                    d.reader_id,
482                    EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
483                );
484            }
485            _ => panic!("expected DATA"),
486        }
487    }
488
489    #[test]
490    fn reader_handle_data_decodes_generic_message() {
491        let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
492        let msg = sample_msg(42);
493        let payload = encode_generic_message(&msg);
494        let data = DataSubmessage {
495            extra_flags: 0,
496            reader_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
497            writer_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
498            writer_sn: SequenceNumber(1),
499            inline_qos: None,
500            key_flag: false,
501            non_standard_flag: false,
502            serialized_payload: payload.into(),
503        };
504        let decoded = r.handle_data(&data).unwrap();
505        assert_eq!(decoded, msg);
506    }
507
508    #[test]
509    fn reader_handle_data_rejects_corrupt_payload() {
510        let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
511        let data = DataSubmessage {
512            extra_flags: 0,
513            reader_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
514            writer_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
515            writer_sn: SequenceNumber(1),
516            inline_qos: None,
517            key_flag: false,
518            non_standard_flag: false,
519            serialized_payload: alloc::vec![0x00, 0x99, 0, 0].into(),
520        };
521        let err = r.handle_data(&data).unwrap_err();
522        assert_eq!(err.kind, SecurityErrorKind::BadArgument);
523    }
524
525    #[test]
526    fn reader_writer_proxy_management() {
527        let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
528        let remote = Guid::new(
529            remote_prefix(),
530            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
531        );
532        r.add_writer_proxy(WriterProxy::new(
533            remote,
534            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
535            alloc::vec![],
536            false,
537        ));
538        // Idempotenz
539        r.add_writer_proxy(WriterProxy::new(
540            remote,
541            alloc::vec![],
542            alloc::vec![],
543            false,
544        ));
545        assert_eq!(r.writer_proxy_count(), 1);
546        assert!(r.remove_writer_proxy(remote).is_some());
547        assert_eq!(r.writer_proxy_count(), 0);
548    }
549
550    #[test]
551    fn end_to_end_writer_to_reader_loopback() {
552        // Writer baut Datagram, Reader dekodiert es zurueck.
553        let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
554        let mut r = StatelessMessageReader::new(remote_prefix(), VendorId::ZERODDS);
555        let remote_reader_guid = Guid::new(
556            remote_prefix(),
557            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
558        );
559        w.add_reader_proxy(ReaderProxy::new(
560            remote_reader_guid,
561            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
562            alloc::vec![],
563            false,
564        ));
565        let msg = sample_msg(7);
566        let dgs = w.write(&msg).unwrap();
567        let decoded = r.handle_datagram(&dgs[0].bytes).unwrap();
568        assert_eq!(decoded.len(), 1);
569        assert_eq!(decoded[0], msg);
570    }
571
572    #[test]
573    fn reader_handle_datagram_rejects_invalid_magic() {
574        let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
575        let err = r.handle_datagram(&[0u8; 24]).unwrap_err();
576        assert_eq!(err.kind, SecurityErrorKind::BadArgument);
577    }
578}