zerodds_discovery/security/
stack.rs1extern crate alloc;
17use alloc::vec::Vec;
18use core::time::Duration;
19
20use zerodds_rtps::error::WireError;
21use zerodds_rtps::message_builder::OutboundDatagram;
22use zerodds_rtps::reader_proxy::ReaderProxy;
23use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, Locator, VendorId};
24use zerodds_rtps::writer_proxy::WriterProxy;
25
26use crate::capabilities::PeerCapabilities;
27use crate::security::stateless::{StatelessMessageReader, StatelessMessageWriter};
28use crate::security::volatile_secure::{VolatileSecureMessageReader, VolatileSecureMessageWriter};
29use crate::spdp::DiscoveredParticipant;
30
31#[derive(Debug)]
33pub struct SecurityBuiltinStack {
34 local_prefix: GuidPrefix,
35 pub stateless_writer: StatelessMessageWriter,
37 pub stateless_reader: StatelessMessageReader,
39 pub volatile_writer: VolatileSecureMessageWriter,
41 pub volatile_reader: VolatileSecureMessageReader,
43}
44
45impl SecurityBuiltinStack {
46 #[must_use]
48 pub fn new(local_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
49 Self {
50 local_prefix,
51 stateless_writer: StatelessMessageWriter::new(local_prefix, vendor_id),
52 stateless_reader: StatelessMessageReader::new(local_prefix, vendor_id),
53 volatile_writer: VolatileSecureMessageWriter::new(local_prefix, vendor_id),
54 volatile_reader: VolatileSecureMessageReader::new(local_prefix, vendor_id),
55 }
56 }
57
58 #[must_use]
60 pub fn local_prefix(&self) -> GuidPrefix {
61 self.local_prefix
62 }
63
64 pub fn handle_remote_endpoints(&mut self, peer: &DiscoveredParticipant) {
74 if peer.sender_prefix == self.local_prefix {
75 return;
76 }
77 let caps = PeerCapabilities::from_bits(peer.data.builtin_endpoint_set);
78 if !caps.has_stateless_auth && !caps.has_volatile_secure {
79 return;
80 }
81 let unicast: Vec<Locator> = peer
82 .data
83 .metatraffic_unicast_locator
84 .or(peer.data.default_unicast_locator)
85 .into_iter()
86 .collect();
87 let remote_prefix = peer.sender_prefix;
88
89 if caps.has_stateless_auth {
90 self.stateless_writer.add_reader_proxy(ReaderProxy::new(
91 Guid::new(
92 remote_prefix,
93 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
94 ),
95 unicast.clone(),
96 Vec::new(),
97 false,
98 ));
99 self.stateless_reader.add_writer_proxy(WriterProxy::new(
100 Guid::new(
101 remote_prefix,
102 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
103 ),
104 unicast.clone(),
105 Vec::new(),
106 false,
107 ));
108 }
109
110 if caps.has_volatile_secure {
111 self.volatile_writer.add_reader_proxy(ReaderProxy::new(
112 Guid::new(
113 remote_prefix,
114 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
115 ),
116 unicast.clone(),
117 Vec::new(),
118 true,
119 ));
120 self.volatile_reader.add_writer_proxy(WriterProxy::new(
121 Guid::new(
122 remote_prefix,
123 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
124 ),
125 unicast,
126 Vec::new(),
127 true,
128 ));
129 }
130 }
131
132 pub fn on_participant_lost(&mut self, prefix: GuidPrefix) -> (usize, usize) {
136 let mut stateless = 0usize;
137 let mut volatile = 0usize;
138 if self
139 .stateless_writer
140 .remove_reader_proxy(Guid::new(
141 prefix,
142 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
143 ))
144 .is_some()
145 {
146 stateless += 1;
147 }
148 self.stateless_reader.remove_writer_proxy(Guid::new(
149 prefix,
150 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
151 ));
152 if self
153 .volatile_writer
154 .remove_reader_proxy(Guid::new(
155 prefix,
156 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
157 ))
158 .is_some()
159 {
160 volatile += 1;
161 }
162 self.volatile_reader.remove_writer_proxy(Guid::new(
163 prefix,
164 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
165 ));
166 (stateless, volatile)
167 }
168
169 pub fn poll(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
176 let mut out = Vec::new();
177 out.extend(self.volatile_writer.tick(now)?);
178 out.extend(self.volatile_reader.tick_outbound(now)?);
179 Ok(out)
180 }
181}
182
183#[cfg(test)]
184#[allow(clippy::expect_used, clippy::unwrap_used)]
185mod tests {
186 use super::*;
187 use zerodds_rtps::participant_data::{
188 Duration as DdsDuration, ParticipantBuiltinTopicData, endpoint_flag,
189 };
190 use zerodds_rtps::wire_types::ProtocolVersion;
191 use zerodds_security::generic_message::{MessageIdentity, ParticipantGenericMessage, class_id};
192 use zerodds_security::token::DataHolder;
193
194 fn local_prefix() -> GuidPrefix {
195 GuidPrefix::from_bytes([1; 12])
196 }
197 fn remote_prefix() -> GuidPrefix {
198 GuidPrefix::from_bytes([2; 12])
199 }
200
201 fn remote_with(flags: u32) -> DiscoveredParticipant {
202 DiscoveredParticipant {
203 sender_prefix: remote_prefix(),
204 sender_vendor: VendorId::ZERODDS,
205 data: ParticipantBuiltinTopicData {
206 guid: Guid::new(remote_prefix(), EntityId::PARTICIPANT),
207 protocol_version: ProtocolVersion::V2_5,
208 vendor_id: VendorId::ZERODDS,
209 default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7411)),
210 default_multicast_locator: None,
211 metatraffic_unicast_locator: None,
212 metatraffic_multicast_locator: None,
213 domain_id: None,
214 builtin_endpoint_set: flags,
215 lease_duration: DdsDuration::from_secs(30),
216 user_data: alloc::vec::Vec::new(),
217 properties: Default::default(),
218 identity_token: None,
219 permissions_token: None,
220 identity_status_token: None,
221 sig_algo_info: None,
222 kx_algo_info: None,
223 sym_cipher_algo_info: None,
224 },
225 }
226 }
227
228 fn sample_stateless_msg() -> ParticipantGenericMessage {
229 ParticipantGenericMessage {
230 message_identity: MessageIdentity {
231 source_guid: [0xAA; 16],
232 sequence_number: 1,
233 },
234 related_message_identity: MessageIdentity::default(),
235 destination_participant_key: [0xBB; 16],
236 destination_endpoint_key: [0; 16],
237 source_endpoint_key: [0xCC; 16],
238 message_class_id: class_id::AUTH_REQUEST.into(),
239 message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
240 }
241 }
242
243 #[test]
244 fn new_stack_has_zero_proxies_everywhere() {
245 let s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
246 assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
247 assert_eq!(s.stateless_reader.writer_proxy_count(), 0);
248 assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
249 assert_eq!(s.volatile_reader.writer_proxy_count(), 0);
250 assert_eq!(s.local_prefix(), local_prefix());
251 }
252
253 #[test]
254 fn handle_remote_endpoints_with_all_bits_wires_all_four() {
255 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
256 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
257 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
258 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
259 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
260 s.handle_remote_endpoints(&remote_with(flags));
261 assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
262 assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
263 assert_eq!(s.volatile_writer.reader_proxy_count(), 1);
264 assert_eq!(s.volatile_reader.writer_proxy_count(), 1);
265 }
266
267 #[test]
268 fn handle_remote_endpoints_with_only_stateless_bits_skips_volatile() {
269 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
270 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
271 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
272 s.handle_remote_endpoints(&remote_with(flags));
273 assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
274 assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
275 assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
276 assert_eq!(s.volatile_reader.writer_proxy_count(), 0);
277 }
278
279 #[test]
280 fn handle_remote_endpoints_with_no_security_bits_is_noop() {
281 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
282 let flags = endpoint_flag::ALL_STANDARD;
283 s.handle_remote_endpoints(&remote_with(flags));
284 assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
285 assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
286 }
287
288 #[test]
289 fn self_discovery_is_ignored() {
290 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
291 let mut peer = remote_with(endpoint_flag::ALL_SECURE);
292 peer.sender_prefix = local_prefix();
293 s.handle_remote_endpoints(&peer);
294 assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
295 }
296
297 #[test]
298 fn handle_remote_endpoints_is_idempotent_on_repeat_announcement() {
299 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
300 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
301 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
302 s.handle_remote_endpoints(&remote_with(flags));
303 s.handle_remote_endpoints(&remote_with(flags));
304 assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
305 }
306
307 #[test]
308 fn on_participant_lost_clears_proxies() {
309 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
310 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
311 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
312 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
313 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
314 s.handle_remote_endpoints(&remote_with(flags));
315 let (sl, vol) = s.on_participant_lost(remote_prefix());
316 assert_eq!(sl, 1);
317 assert_eq!(vol, 1);
318 assert_eq!(s.stateless_writer.reader_proxy_count(), 0);
319 assert_eq!(s.volatile_writer.reader_proxy_count(), 0);
320 }
321
322 #[test]
323 fn poll_on_empty_stack_returns_no_datagrams() {
324 let mut s = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
325 let dgs = s.poll(Duration::from_secs(1)).unwrap();
326 assert!(dgs.is_empty());
327 }
328
329 #[test]
330 fn end_to_end_stateless_message_loopback_between_stacks() {
331 let mut a = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
332 let mut b = SecurityBuiltinStack::new(remote_prefix(), VendorId::ZERODDS);
333 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
334 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
335 a.handle_remote_endpoints(&remote_with_prefix(remote_prefix(), flags));
337 b.handle_remote_endpoints(&remote_with_prefix(local_prefix(), flags));
339
340 let msg = sample_stateless_msg();
341 let dgs = a.stateless_writer.write(&msg).unwrap();
342 assert_eq!(dgs.len(), 1);
343 let received = b.stateless_reader.handle_datagram(&dgs[0].bytes).unwrap();
344 assert_eq!(received.len(), 1);
345 assert_eq!(received[0], msg);
346 }
347
348 fn remote_with_prefix(prefix: GuidPrefix, flags: u32) -> DiscoveredParticipant {
349 let mut peer = remote_with(flags);
350 peer.sender_prefix = prefix;
351 peer.data.guid = Guid::new(prefix, EntityId::PARTICIPANT);
352 peer
353 }
354
355 #[test]
356 fn end_to_end_volatile_secure_handshake_via_reliable_loop() {
357 let mut a = SecurityBuiltinStack::new(local_prefix(), VendorId::ZERODDS);
364 let mut b = SecurityBuiltinStack::new(remote_prefix(), VendorId::ZERODDS);
365 let flags = endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
366 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
367 a.handle_remote_endpoints(&remote_with_prefix(remote_prefix(), flags));
368 b.handle_remote_endpoints(&remote_with_prefix(local_prefix(), flags));
369
370 let mut msg = sample_stateless_msg();
371 msg.message_class_id = class_id::PARTICIPANT_CRYPTO_TOKENS.into();
372
373 let dgs = a.volatile_writer.write(&msg).unwrap();
374 assert_eq!(dgs.len(), 1, "ein Datagram pro Reader-Proxy");
375 let parsed = zerodds_rtps::datagram::decode_datagram(&dgs[0].bytes).unwrap();
377 let mut received_msgs = Vec::new();
378 for sub in parsed.submessages {
379 if let zerodds_rtps::datagram::ParsedSubmessage::Data(d) = sub {
380 if d.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
381 received_msgs.extend(b.volatile_reader.handle_data(&d).unwrap());
382 }
383 }
384 }
385 assert_eq!(received_msgs.len(), 1);
386 assert_eq!(received_msgs[0], msg);
387
388 let outbound = b
390 .volatile_reader
391 .tick_outbound(Duration::from_millis(500))
392 .unwrap();
393 assert!(
395 !outbound.is_empty(),
396 "Reader sollte initiales ACKNACK senden"
397 );
398 }
399}