Skip to main content

zerodds_discovery/security/
stack.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! `SecurityBuiltinStack` — bundelt die zwei Security-Builtin-Topic-
4//! Endpoint-Paare in einer Struktur.
5//!
6//! - `DCPSParticipantStatelessMessage` (Auth-Handshake, BestEffort).
7//! - `DCPSParticipantVolatileMessageSecure` (Crypto-KeyExchange, Reliable).
8//!
9//! Wird vom Participant-Wiring (DCPS-Layer) instanziiert, sobald ein
10//! Security-Plugin registriert ist und die Discovery-Bits 22..25 im
11//! `BuiltinEndpointSet` annonciert werden. Der Stack pflegt die Reader/
12//! Writer-Proxies pro Remote-Participant — `handle_remote_endpoints`
13//! wird vom SPDP-Hot-Path aufgerufen, sobald ein Peer mit den
14//! entsprechenden Bits entdeckt wurde.
15
16extern crate alloc;
17use alloc::vec::Vec;
18use core::time::Duration;
19
20use zerodds_rtps::error::WireError;
21use zerodds_rtps::message_builder::OutboundDatagram;
22use zerodds_rtps::reader_proxy::ReaderProxy;
23use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, Locator, VendorId};
24use zerodds_rtps::writer_proxy::WriterProxy;
25
26use crate::capabilities::PeerCapabilities;
27use crate::security::stateless::{StatelessMessageReader, StatelessMessageWriter};
28use crate::security::volatile_secure::{VolatileSecureMessageReader, VolatileSecureMessageWriter};
29use crate::spdp::DiscoveredParticipant;
30
31/// Bundle aus den vier Security-Builtin-Endpoints.
32#[derive(Debug)]
33pub struct SecurityBuiltinStack {
34    local_prefix: GuidPrefix,
35    /// Stateless-Auth-Writer (Spec §7.4.4).
36    pub stateless_writer: StatelessMessageWriter,
37    /// Stateless-Auth-Reader.
38    pub stateless_reader: StatelessMessageReader,
39    /// Volatile-Secure-Writer (Spec §7.4.5).
40    pub volatile_writer: VolatileSecureMessageWriter,
41    /// Volatile-Secure-Reader.
42    pub volatile_reader: VolatileSecureMessageReader,
43}
44
45impl SecurityBuiltinStack {
46    /// Erzeugt einen frischen Stack ohne Remote-Proxies.
47    #[must_use]
48    pub fn new(local_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
49        Self {
50            local_prefix,
51            stateless_writer: StatelessMessageWriter::new(local_prefix, vendor_id),
52            stateless_reader: StatelessMessageReader::new(local_prefix, vendor_id),
53            volatile_writer: VolatileSecureMessageWriter::new(local_prefix, vendor_id),
54            volatile_reader: VolatileSecureMessageReader::new(local_prefix, vendor_id),
55        }
56    }
57
58    /// Lokaler GuidPrefix.
59    #[must_use]
60    pub fn local_prefix(&self) -> GuidPrefix {
61        self.local_prefix
62    }
63
64    /// Verdrahtet Reader-/Writer-Proxies auf Basis der vom Peer
65    /// annoncierten BuiltinEndpointSet-Bits (Spec §7.4.7.1):
66    ///
67    /// - Bits 22+23 (`PARTICIPANT_STATELESS_MESSAGE_*`) → Stateless-Slot
68    /// - Bits 24+25 (`PARTICIPANT_VOLATILE_MESSAGE_SECURE_*`) → Volatile-Slot
69    ///
70    /// Wir routen ueber `metatraffic_unicast_locator` (PID 0x0032),
71    /// fallback auf `default_unicast_locator`. Selbst-Discovery
72    /// (`peer.sender_prefix == self.local_prefix`) wird ignoriert.
73    pub fn handle_remote_endpoints(&mut self, peer: &DiscoveredParticipant) {
74        if peer.sender_prefix == self.local_prefix {
75            return;
76        }
77        let caps = PeerCapabilities::from_bits(peer.data.builtin_endpoint_set);
78        if !caps.has_stateless_auth && !caps.has_volatile_secure {
79            return;
80        }
81        let unicast: Vec<Locator> = peer
82            .data
83            .metatraffic_unicast_locator
84            .or(peer.data.default_unicast_locator)
85            .into_iter()
86            .collect();
87        let remote_prefix = peer.sender_prefix;
88
89        if caps.has_stateless_auth {
90            self.stateless_writer.add_reader_proxy(ReaderProxy::new(
91                Guid::new(
92                    remote_prefix,
93                    EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
94                ),
95                unicast.clone(),
96                Vec::new(),
97                false,
98            ));
99            self.stateless_reader.add_writer_proxy(WriterProxy::new(
100                Guid::new(
101                    remote_prefix,
102                    EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
103                ),
104                unicast.clone(),
105                Vec::new(),
106                false,
107            ));
108        }
109
110        if caps.has_volatile_secure {
111            self.volatile_writer.add_reader_proxy(ReaderProxy::new(
112                Guid::new(
113                    remote_prefix,
114                    EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
115                ),
116                unicast.clone(),
117                Vec::new(),
118                true,
119            ));
120            self.volatile_reader.add_writer_proxy(WriterProxy::new(
121                Guid::new(
122                    remote_prefix,
123                    EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
124                ),
125                unicast,
126                Vec::new(),
127                true,
128            ));
129        }
130    }
131
132    /// Cleanup nach SPDP-Lease-Timeout: alle Proxies dieses Prefixes
133    /// entfernen. Liefert `(stateless_pairs_removed,
134    /// volatile_pairs_removed)`.
135    pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
136        let mut stateless = 0usize;
137        let mut volatile = 0usize;
138        if self
139            .stateless_writer
140            .remove_reader_proxy(Guid::new(
141                prefix,
142                EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
143            ))
144            .is_some()
145        {
146            stateless += 1;
147        }
148        self.stateless_reader.remove_writer_proxy(Guid::new(
149            prefix,
150            EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
151        ));
152        if self
153            .volatile_writer
154            .remove_reader_proxy(Guid::new(
155                prefix,
156                EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
157            ))
158            .is_some()
159        {
160            volatile += 1;
161        }
162        self.volatile_reader.remove_writer_proxy(Guid::new(
163            prefix,
164            EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
165        ));
166        (stateless, volatile)
167    }
168
169    /// Tick ueber alle Endpoints. Liefert HEARTBEATs/Resends vom
170    /// Volatile-Writer plus ACKNACK/NACK_FRAG vom Volatile-Reader.
171    /// Stateless hat keinen Tick (BestEffort, kein Resend-State).
172    ///
173    /// # Errors
174    /// Wire-Encode-Fehler aus dem Reliable-Layer.
175    pub fn poll(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
176        let mut out = Vec::new();
177        out.extend(self.volatile_writer.tick(now)?);
178        out.extend(self.volatile_reader.tick_outbound(now)?);
179        Ok(out)
180    }
181}
182
183#[cfg(test)]
184#[allow(clippy::expect_used, clippy::unwrap_used)]
185mod tests {
186    use super::*;
187    use zerodds_rtps::participant_data::{
188        Duration as DdsDuration, ParticipantBuiltinTopicData, endpoint_flag,
189    };
190    use zerodds_rtps::wire_types::ProtocolVersion;
191    use zerodds_security::generic_message::{MessageIdentity, ParticipantGenericMessage, class_id};
192    use zerodds_security::token::DataHolder;
193
194    fn local_prefix() -> GuidPrefix {
195        GuidPrefix::from_bytes([1; 12])
196    }
197    fn remote_prefix() -> GuidPrefix {
198        GuidPrefix::from_bytes([2; 12])
199    }
200
201    fn remote_with(flags: u32) -> DiscoveredParticipant {
202        DiscoveredParticipant {
203            sender_prefix: remote_prefix(),
204            sender_vendor: VendorId::ZERODDS,
205            data: ParticipantBuiltinTopicData {
206                guid: Guid::new(remote_prefix(), EntityId::PARTICIPANT),
207                protocol_version: ProtocolVersion::V2_5,
208                vendor_id: VendorId::ZERODDS,
209                default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7411)),
210                default_multicast_locator: None,
211                metatraffic_unicast_locator: None,
212                metatraffic_multicast_locator: None,
213                domain_id: None,
214                builtin_endpoint_set: flags,
215                lease_duration: DdsDuration::from_secs(30),
216                user_data: alloc::vec::Vec::new(),
217                properties: Default::default(),
218                identity_token: None,
219                permissions_token: None,
220                identity_status_token: None,
221                sig_algo_info: None,
222                kx_algo_info: None,
223                sym_cipher_algo_info: None,
224            },
225        }
226    }
227
228    fn sample_stateless_msg() -> ParticipantGenericMessage {
229        ParticipantGenericMessage {
230            message_identity: MessageIdentity {
231                source_guid: [0xAA; 16],
232                sequence_number: 1,
233            },
234            related_message_identity: MessageIdentity::default(),
235            destination_participant_key: [0xBB; 16],
236            destination_endpoint_key: [0; 16],
237            source_endpoint_key: [0xCC; 16],
238            message_class_id: class_id::AUTH_REQUEST.into(),
239            message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
240        }
241    }
242
243    #[test]
244    fn new_stack_has_zero_proxies_everywhere() {
245        let s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
246        assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
247        assert_eq!(s.stateless_reader.writer_proxy_count(), 0);
248        assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
249        assert_eq!(s.volatile_reader.writer_proxy_count(), 0);
250        assert_eq!(s.local_prefix(), local_prefix());
251    }
252
253    #[test]
254    fn handle_remote_endpoints_with_all_bits_wires_all_four() {
255        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
256        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
257            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
258            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
259            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
260        s.handle_remote_endpoints(&remote_with(flags));
261        assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
262        assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
263        assert_eq!(s.volatile_writer.reader_proxy_count(), 1);
264        assert_eq!(s.volatile_reader.writer_proxy_count(), 1);
265    }
266
267    #[test]
268    fn handle_remote_endpoints_with_only_stateless_bits_skips_volatile() {
269        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
270        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
271            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
272        s.handle_remote_endpoints(&remote_with(flags));
273        assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
274        assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
275        assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
276        assert_eq!(s.volatile_reader.writer_proxy_count(), 0);
277    }
278
279    #[test]
280    fn handle_remote_endpoints_with_no_security_bits_is_noop() {
281        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
282        let flags = endpoint_flag::ALL_STANDARD;
283        s.handle_remote_endpoints(&remote_with(flags));
284        assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
285        assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
286    }
287
288    #[test]
289    fn self_discovery_is_ignored() {
290        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
291        let mut peer = remote_with(endpoint_flag::ALL_SECURE);
292        peer.sender_prefix = local_prefix();
293        s.handle_remote_endpoints(&peer);
294        assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
295    }
296
297    #[test]
298    fn handle_remote_endpoints_is_idempotent_on_repeat_announcement() {
299        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
300        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
301            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
302        s.handle_remote_endpoints(&remote_with(flags));
303        s.handle_remote_endpoints(&remote_with(flags));
304        assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
305    }
306
307    #[test]
308    fn on_participant_lost_clears_proxies() {
309        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
310        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
311            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
312            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
313            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
314        s.handle_remote_endpoints(&remote_with(flags));
315        let (sl, vol) = s.on_participant_lost(remote_prefix());
316        assert_eq!(sl, 1);
317        assert_eq!(vol, 1);
318        assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
319        assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
320    }
321
322    #[test]
323    fn poll_on_empty_stack_returns_no_datagrams() {
324        let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
325        let dgs = s.poll(Duration::from_secs(1)).unwrap();
326        assert!(dgs.is_empty());
327    }
328
329    #[test]
330    fn end_to_end_stateless_message_loopback_between_stacks() {
331        let mut a = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
332        let mut b = SecurityBuiltinStack::new(remote_prefix(), VendorId::ZERODDS);
333        let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
334            | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
335        // A entdeckt B
336        a.handle_remote_endpoints(&remote_with_prefix(remote_prefix(), flags));
337        // B entdeckt A
338        b.handle_remote_endpoints(&remote_with_prefix(local_prefix(), flags));
339
340        let msg = sample_stateless_msg();
341        let dgs = a.stateless_writer.write(&msg).unwrap();
342        assert_eq!(dgs.len(), 1);
343        let received = b.stateless_reader.handle_datagram(&dgs[0].bytes).unwrap();
344        assert_eq!(received.len(), 1);
345        assert_eq!(received[0], msg);
346    }
347
348    fn remote_with_prefix(prefix: GuidPrefix, flags: u32) -> DiscoveredParticipant {
349        let mut peer = remote_with(flags);
350        peer.sender_prefix = prefix;
351        peer.data.guid = Guid::new(prefix, EntityId::PARTICIPANT);
352        peer
353    }
354
355    #[test]
356    fn end_to_end_volatile_secure_handshake_via_reliable_loop() {
357        // A schickt eine Crypto-Token-Message ueber das VolatileSecure-
358        // Topic an B. Wir simulieren den Reliable-Hop manuell:
359        // 1. A.write produziert DATA + (initial pre-emptiver HEARTBEAT folgt im Tick)
360        // 2. B.handle_data dekodiert die Message
361        // 3. B.tick_outbound emittiert ein ACKNACK
362        // 4. A.handle_acknack akzeptiert es
363        let mut a = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
364        let mut b = SecurityBuiltinStack::new(remote_prefix(), VendorId::ZERODDS);
365        let flags = endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
366            | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
367        a.handle_remote_endpoints(&remote_with_prefix(remote_prefix(), flags));
368        b.handle_remote_endpoints(&remote_with_prefix(local_prefix(), flags));
369
370        let mut msg = sample_stateless_msg();
371        msg.message_class_id = class_id::PARTICIPANT_CRYPTO_TOKENS.into();
372
373        let dgs = a.volatile_writer.write(&msg).unwrap();
374        assert_eq!(dgs.len(), 1, "ein Datagram pro Reader-Proxy");
375        // Wire-Decode + dispatch in Bs Volatile-Reader
376        let parsed = zerodds_rtps::datagram::decode_datagram(&dgs[0].bytes).unwrap();
377        let mut received_msgs = Vec::new();
378        for sub in parsed.submessages {
379            if let zerodds_rtps::datagram::ParsedSubmessage::Data(d) = sub {
380                if d.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
381                    received_msgs.extend(b.volatile_reader.handle_data(&d).unwrap());
382                }
383            }
384        }
385        assert_eq!(received_msgs.len(), 1);
386        assert_eq!(received_msgs[0], msg);
387
388        // ACKNACK fliesst zurueck, A muss handle_acknack akzeptieren
389        let outbound = b
390            .volatile_reader
391            .tick_outbound(Duration::from_millis(500))
392            .unwrap();
393        // B kennt A als Writer-Proxy → ACKNACK-Datagrams sollten existieren
394        assert!(
395            !outbound.is_empty(),
396            "Reader sollte initiales ACKNACK senden"
397        );
398    }
399}