Skip to main content

zerodds_discovery/security/
volatile_secure.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Builtin-Endpoint `DCPSParticipantVolatileMessageSecure` — DDS-Security
4//! 1.2 §7.4.5 + §10.5.4.
5//!
6//! Wire-Profil:
7//! - Reliability: Reliable (Spec §7.5.4 Tab.20).
8//! - Durability:  Volatile (KEEP_LAST 1 nach Spec, wir nehmen
9//!   konservativ KEEP_LAST 16 fuer Re-Send-Fenster).
10//! - Topic-Type:  `ParticipantGenericMessage` (Spec §7.5.5).
11//! - EntityIds:   `BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_{WRITER,READER}`.
12//!
13//! Wrapper um [`zerodds_rtps::reliable_writer::ReliableWriter`] +
14//! [`zerodds_rtps::reliable_reader::ReliableReader`] mit festen EntityIds —
15//! analog zu den SEDP-Endpoints, nur mit anderem Topic-Type-Codec.
16
17extern crate alloc;
18use alloc::vec::Vec;
19use core::time::Duration;
20
21use zerodds_rtps::error::WireError;
22use zerodds_rtps::fragment_assembler::AssemblerCaps;
23use zerodds_rtps::history_cache::HistoryKind;
24use zerodds_rtps::message_builder::{DEFAULT_MTU, OutboundDatagram};
25use zerodds_rtps::reader_proxy::ReaderProxy;
26use zerodds_rtps::reliable_reader::{
27    DEFAULT_HEARTBEAT_RESPONSE_DELAY, ReliableReader, ReliableReaderConfig,
28};
29use zerodds_rtps::reliable_writer::{DEFAULT_FRAGMENT_SIZE, ReliableWriter, ReliableWriterConfig};
30use zerodds_rtps::submessages::{
31    DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage, NackFragSubmessage,
32};
33use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
34use zerodds_rtps::writer_proxy::WriterProxy;
35
36use zerodds_security::error::{SecurityError, SecurityErrorKind, SecurityResult};
37use zerodds_security::generic_message::ParticipantGenericMessage;
38
39use crate::security::codec::{decode_generic_message, encode_generic_message};
40
41/// Default-History-Tiefe (Spec sagt KEEP_LAST 1 — wir nehmen
42/// konservativ 16, damit kurze Crypto-Token-Bursts beim Onboarding nicht
43/// einzelne Tokens droppen).
44pub const VOLATILE_SECURE_DEFAULT_DEPTH: usize = 16;
45
46/// Default-HEARTBEAT-Periode. Kuerzer als SEDP-Default — wir wollen
47/// schnelle Crypto-Token-Lieferung beim Auth-Onboarding.
48pub const VOLATILE_SECURE_HEARTBEAT_PERIOD: Duration = Duration::from_millis(250);
49
50/// Reader-Cache-Tiefe (analog zu SEDP-Reader 256).
51pub const VOLATILE_SECURE_READER_CAPACITY: usize = 64;
52
53/// Writer fuer `DCPSParticipantVolatileMessageSecure`.
54#[derive(Debug)]
55pub struct VolatileSecureMessageWriter {
56    inner: ReliableWriter,
57}
58
59impl VolatileSecureMessageWriter {
60    /// Erzeugt einen Writer fuer den lokalen Participant.
61    #[must_use]
62    pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
63        let guid = Guid::new(
64            participant_prefix,
65            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
66        );
67        Self {
68            inner: ReliableWriter::new(ReliableWriterConfig {
69                guid,
70                vendor_id,
71                reader_proxies: Vec::new(),
72                max_samples: VOLATILE_SECURE_DEFAULT_DEPTH,
73                history_kind: HistoryKind::KeepLast {
74                    depth: VOLATILE_SECURE_DEFAULT_DEPTH,
75                },
76                heartbeat_period: VOLATILE_SECURE_HEARTBEAT_PERIOD,
77                fragment_size: DEFAULT_FRAGMENT_SIZE,
78                mtu: DEFAULT_MTU,
79            }),
80        }
81    }
82
83    /// GUID des Writers.
84    #[must_use]
85    pub fn guid(&self) -> Guid {
86        self.inner.guid()
87    }
88
89    /// Anzahl registrierter Reader-Proxies.
90    #[must_use]
91    pub fn reader_proxy_count(&self) -> usize {
92        self.inner.reader_proxy_count()
93    }
94
95    /// Read-only-Zugriff auf den ReliableWriter (Tests/Diagnose).
96    #[must_use]
97    pub fn inner(&self) -> &ReliableWriter {
98        &self.inner
99    }
100
101    /// Fuegt einen Reader-Proxy hinzu.
102    pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
103        self.inner.add_reader_proxy(proxy);
104    }
105
106    /// Entfernt einen Reader-Proxy.
107    pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
108        self.inner.remove_reader_proxy(guid)
109    }
110
111    /// Sendet eine `ParticipantGenericMessage`. Liefert pro Reader-
112    /// Proxy ein Datagramm.
113    ///
114    /// # Errors
115    /// `WireError` aus dem Reliable-Writer (Cache-Overflow bei
116    /// `KeepAll`, Sequence-Overflow).
117    pub fn write(
118        &mut self,
119        msg: &ParticipantGenericMessage,
120    ) -> Result<Vec<OutboundDatagram>, WireError> {
121        let payload = encode_generic_message(msg);
122        self.inner.write(&payload)
123    }
124
125    /// Tick (HEARTBEAT + Resends).
126    ///
127    /// # Errors
128    /// Wire-Encode-Fehler.
129    pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
130        self.inner.tick(now)
131    }
132
133    /// Dispatch eines ACKNACK vom Remote-Reader.
134    pub fn handle_acknack(
135        &mut self,
136        src_guid: Guid,
137        base: SequenceNumber,
138        requested: impl IntoIterator<Item = SequenceNumber>,
139    ) {
140        self.inner.handle_acknack(src_guid, base, requested);
141    }
142
143    /// Dispatch eines NACK_FRAG vom Remote-Reader.
144    pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
145        self.inner.handle_nackfrag(src_guid, nf);
146    }
147}
148
149/// Reader fuer `DCPSParticipantVolatileMessageSecure`.
150#[derive(Debug)]
151pub struct VolatileSecureMessageReader {
152    inner: ReliableReader,
153}
154
155impl VolatileSecureMessageReader {
156    /// Erzeugt einen Reader fuer den lokalen Participant.
157    #[must_use]
158    pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
159        let guid = Guid::new(
160            participant_prefix,
161            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
162        );
163        Self {
164            inner: ReliableReader::new(ReliableReaderConfig {
165                guid,
166                vendor_id,
167                writer_proxies: Vec::new(),
168                max_samples_per_proxy: VOLATILE_SECURE_READER_CAPACITY,
169                heartbeat_response_delay: DEFAULT_HEARTBEAT_RESPONSE_DELAY,
170                assembler_caps: AssemblerCaps::default(),
171            }),
172        }
173    }
174
175    /// GUID des Readers.
176    #[must_use]
177    pub fn guid(&self) -> Guid {
178        self.inner.guid()
179    }
180
181    /// Anzahl registrierter Writer-Proxies.
182    #[must_use]
183    pub fn writer_proxy_count(&self) -> usize {
184        self.inner.writer_proxy_count()
185    }
186
187    /// Read-only-Zugriff auf den ReliableReader (Tests/Diagnose).
188    #[must_use]
189    pub fn inner(&self) -> &ReliableReader {
190        &self.inner
191    }
192
193    /// Fuegt einen Writer-Proxy hinzu.
194    pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
195        self.inner.add_writer_proxy(proxy);
196    }
197
198    /// Entfernt einen Writer-Proxy.
199    pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
200        self.inner.remove_writer_proxy(guid)
201    }
202
203    /// Verarbeitet eine eingehende DATA-Submessage und liefert
204    /// dekodierte Generic-Messages.
205    ///
206    /// # Errors
207    /// `BadArgument` wenn die Encapsulation/CDR-Decode scheitert.
208    pub fn handle_data(
209        &mut self,
210        data: &DataSubmessage,
211    ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
212        let samples = self.inner.handle_data(data);
213        decode_samples(samples.into_iter().map(|s| s.payload))
214    }
215
216    /// DATA_FRAG verarbeiten.
217    ///
218    /// # Errors
219    /// siehe [`handle_data`](Self::handle_data).
220    pub fn handle_data_frag(
221        &mut self,
222        df: &DataFragSubmessage,
223        now: Duration,
224    ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
225        let samples = self.inner.handle_data_frag(df, now);
226        decode_samples(samples.into_iter().map(|s| s.payload))
227    }
228
229    /// GAP verarbeiten.
230    ///
231    /// # Errors
232    /// siehe [`handle_data`](Self::handle_data).
233    pub fn handle_gap(
234        &mut self,
235        gap: &GapSubmessage,
236    ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
237        let samples = self.inner.handle_gap(gap);
238        decode_samples(samples.into_iter().map(|s| s.payload))
239    }
240
241    /// HEARTBEAT verarbeiten.
242    pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
243        self.inner.handle_heartbeat(hb, now);
244    }
245
246    /// Tick (ACKNACK / NACK_FRAG-Outbound).
247    ///
248    /// # Errors
249    /// Wire-Encode-Fehler.
250    pub fn tick_outbound(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
251        self.inner.tick_outbound(now)
252    }
253}
254
255fn decode_samples<B, I>(payloads: I) -> SecurityResult<Vec<ParticipantGenericMessage>>
256where
257    B: AsRef<[u8]>,
258    I: IntoIterator<Item = B>,
259{
260    let mut out = Vec::new();
261    for p in payloads {
262        // Original-SecurityError direkt durchreichen — der Detail-String
263        // im Codec ist bereits aussagekraeftig genug, kein Topic-spezi-
264        // fischer Wrapper noetig (Spec §7.5.5 macht keine Unterscheidung
265        // pro Topic auf der Codec-Ebene).
266        out.push(decode_generic_message(p.as_ref())?);
267    }
268    Ok(out)
269}
270
271// Marker, dass beide Imports oben dauerhaft genutzt werden.
272const _: Option<SecurityErrorKind> = None;
273const _: Option<SecurityError> = None;
274
275#[cfg(test)]
276#[allow(clippy::expect_used, clippy::unwrap_used)]
277mod tests {
278    use super::*;
279    use zerodds_rtps::wire_types::Locator;
280    use zerodds_security::generic_message::{MessageIdentity, class_id};
281    use zerodds_security::token::DataHolder;
282
283    fn local_prefix() -> GuidPrefix {
284        GuidPrefix::from_bytes([1; 12])
285    }
286    fn remote_prefix() -> GuidPrefix {
287        GuidPrefix::from_bytes([2; 12])
288    }
289
290    fn sample_msg() -> ParticipantGenericMessage {
291        ParticipantGenericMessage {
292            message_identity: MessageIdentity {
293                source_guid: [0xAA; 16],
294                sequence_number: 1,
295            },
296            related_message_identity: MessageIdentity::default(),
297            destination_participant_key: [0xBB; 16],
298            destination_endpoint_key: [0; 16],
299            source_endpoint_key: [0xCC; 16],
300            message_class_id: class_id::PARTICIPANT_CRYPTO_TOKENS.into(),
301            message_data: alloc::vec![DataHolder::new("DDS:Crypto:AES-GCM-GMAC")],
302        }
303    }
304
305    #[test]
306    fn writer_has_expected_entity_id() {
307        let w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
308        assert_eq!(
309            w.guid().entity_id,
310            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
311        );
312    }
313
314    #[test]
315    fn reader_has_expected_entity_id() {
316        let r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
317        assert_eq!(
318            r.guid().entity_id,
319            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
320        );
321    }
322
323    #[test]
324    fn writer_starts_with_zero_proxies() {
325        let w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
326        assert_eq!(w.reader_proxy_count(), 0);
327    }
328
329    #[test]
330    fn reader_starts_with_zero_proxies() {
331        let r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
332        assert_eq!(r.writer_proxy_count(), 0);
333    }
334
335    #[test]
336    fn write_without_proxies_returns_empty_datagrams() {
337        let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
338        let dgs = w.write(&sample_msg()).unwrap();
339        assert!(dgs.is_empty());
340    }
341
342    #[test]
343    fn write_with_one_proxy_produces_one_datagram() {
344        let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
345        let remote = Guid::new(
346            remote_prefix(),
347            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
348        );
349        w.add_reader_proxy(ReaderProxy::new(
350            remote,
351            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
352            alloc::vec![],
353            true,
354        ));
355        let dgs = w.write(&sample_msg()).unwrap();
356        assert_eq!(dgs.len(), 1);
357    }
358
359    #[test]
360    fn add_remove_reader_proxy_roundtrip() {
361        let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
362        let remote = Guid::new(
363            remote_prefix(),
364            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
365        );
366        w.add_reader_proxy(ReaderProxy::new(remote, alloc::vec![], alloc::vec![], true));
367        assert_eq!(w.reader_proxy_count(), 1);
368        assert!(w.remove_reader_proxy(remote).is_some());
369        assert_eq!(w.reader_proxy_count(), 0);
370    }
371
372    #[test]
373    fn add_remove_writer_proxy_roundtrip() {
374        let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
375        let remote = Guid::new(
376            remote_prefix(),
377            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
378        );
379        r.add_writer_proxy(WriterProxy::new(
380            remote,
381            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
382            alloc::vec![],
383            true,
384        ));
385        assert_eq!(r.writer_proxy_count(), 1);
386        assert!(r.remove_writer_proxy(remote).is_some());
387        assert_eq!(r.writer_proxy_count(), 0);
388    }
389
390    #[test]
391    fn reader_decodes_data_with_known_writer() {
392        let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
393        let remote = Guid::new(
394            remote_prefix(),
395            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
396        );
397        r.add_writer_proxy(WriterProxy::new(
398            remote,
399            alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
400            alloc::vec![],
401            true,
402        ));
403        let msg = sample_msg();
404        let payload = encode_generic_message(&msg);
405        let data = DataSubmessage {
406            extra_flags: 0,
407            reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
408            writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
409            writer_sn: SequenceNumber(1),
410            inline_qos: None,
411            key_flag: false,
412            non_standard_flag: false,
413            serialized_payload: payload.into(),
414        };
415        let out = r.handle_data(&data).unwrap();
416        assert_eq!(out.len(), 1);
417        assert_eq!(out[0], msg);
418    }
419
420    #[test]
421    fn reader_drops_data_from_unknown_writer() {
422        let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
423        // Kein Writer-Proxy registriert → handle_data muss leer liefern
424        let msg = sample_msg();
425        let payload = encode_generic_message(&msg);
426        let data = DataSubmessage {
427            extra_flags: 0,
428            reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
429            writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
430            writer_sn: SequenceNumber(1),
431            inline_qos: None,
432            key_flag: false,
433            non_standard_flag: false,
434            serialized_payload: payload.into(),
435        };
436        let out = r.handle_data(&data).unwrap();
437        assert!(out.is_empty());
438    }
439
440    #[test]
441    fn reader_rejects_corrupt_payload() {
442        let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
443        let remote = Guid::new(
444            remote_prefix(),
445            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
446        );
447        r.add_writer_proxy(WriterProxy::new(remote, alloc::vec![], alloc::vec![], true));
448        let data = DataSubmessage {
449            extra_flags: 0,
450            reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
451            writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
452            writer_sn: SequenceNumber(1),
453            inline_qos: None,
454            key_flag: false,
455            non_standard_flag: false,
456            serialized_payload: alloc::vec![0x00, 0x99, 0, 0].into(),
457        };
458        let err = r.handle_data(&data).unwrap_err();
459        assert_eq!(err.kind, SecurityErrorKind::BadArgument);
460    }
461}