1extern crate alloc;
18use alloc::vec::Vec;
19use core::time::Duration;
20
21use zerodds_rtps::error::WireError;
22use zerodds_rtps::fragment_assembler::AssemblerCaps;
23use zerodds_rtps::history_cache::HistoryKind;
24use zerodds_rtps::message_builder::{DEFAULT_MTU, OutboundDatagram};
25use zerodds_rtps::reader_proxy::ReaderProxy;
26use zerodds_rtps::reliable_reader::{
27 DEFAULT_HEARTBEAT_RESPONSE_DELAY, ReliableReader, ReliableReaderConfig,
28};
29use zerodds_rtps::reliable_writer::{DEFAULT_FRAGMENT_SIZE, ReliableWriter, ReliableWriterConfig};
30use zerodds_rtps::submessages::{
31 DataFragSubmessage, DataSubmessage, GapSubmessage, HeartbeatSubmessage, NackFragSubmessage,
32};
33use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
34use zerodds_rtps::writer_proxy::WriterProxy;
35
36use zerodds_security::error::{SecurityError, SecurityErrorKind, SecurityResult};
37use zerodds_security::generic_message::ParticipantGenericMessage;
38
39use crate::security::codec::{decode_generic_message, encode_generic_message};
40
41pub const VOLATILE_SECURE_DEFAULT_DEPTH: usize = 16;
45
46pub const VOLATILE_SECURE_HEARTBEAT_PERIOD: Duration = Duration::from_millis(250);
49
50pub const VOLATILE_SECURE_READER_CAPACITY: usize = 64;
52
53#[derive(Debug)]
55pub struct VolatileSecureMessageWriter {
56 inner: ReliableWriter,
57}
58
59impl VolatileSecureMessageWriter {
60 #[must_use]
62 pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
63 let guid = Guid::new(
64 participant_prefix,
65 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
66 );
67 Self {
68 inner: ReliableWriter::new(ReliableWriterConfig {
69 guid,
70 vendor_id,
71 reader_proxies: Vec::new(),
72 max_samples: VOLATILE_SECURE_DEFAULT_DEPTH,
73 history_kind: HistoryKind::KeepLast {
74 depth: VOLATILE_SECURE_DEFAULT_DEPTH,
75 },
76 heartbeat_period: VOLATILE_SECURE_HEARTBEAT_PERIOD,
77 fragment_size: DEFAULT_FRAGMENT_SIZE,
78 mtu: DEFAULT_MTU,
79 }),
80 }
81 }
82
83 #[must_use]
85 pub fn guid(&self) -> Guid {
86 self.inner.guid()
87 }
88
89 #[must_use]
91 pub fn reader_proxy_count(&self) -> usize {
92 self.inner.reader_proxy_count()
93 }
94
95 #[must_use]
97 pub fn inner(&self) -> &ReliableWriter {
98 &self.inner
99 }
100
101 pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
103 self.inner.add_reader_proxy(proxy);
104 }
105
106 pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
108 self.inner.remove_reader_proxy(guid)
109 }
110
111 pub fn write(
118 &mut self,
119 msg: &ParticipantGenericMessage,
120 ) -> Result<Vec<OutboundDatagram>, WireError> {
121 let payload = encode_generic_message(msg);
122 self.inner.write(&payload)
123 }
124
125 pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
130 self.inner.tick(now)
131 }
132
133 pub fn handle_acknack(
135 &mut self,
136 src_guid: Guid,
137 base: SequenceNumber,
138 requested: impl IntoIterator<Item = SequenceNumber>,
139 ) {
140 self.inner.handle_acknack(src_guid, base, requested);
141 }
142
143 pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
145 self.inner.handle_nackfrag(src_guid, nf);
146 }
147}
148
149#[derive(Debug)]
151pub struct VolatileSecureMessageReader {
152 inner: ReliableReader,
153}
154
155impl VolatileSecureMessageReader {
156 #[must_use]
158 pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
159 let guid = Guid::new(
160 participant_prefix,
161 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
162 );
163 Self {
164 inner: ReliableReader::new(ReliableReaderConfig {
165 guid,
166 vendor_id,
167 writer_proxies: Vec::new(),
168 max_samples_per_proxy: VOLATILE_SECURE_READER_CAPACITY,
169 heartbeat_response_delay: DEFAULT_HEARTBEAT_RESPONSE_DELAY,
170 assembler_caps: AssemblerCaps::default(),
171 }),
172 }
173 }
174
175 #[must_use]
177 pub fn guid(&self) -> Guid {
178 self.inner.guid()
179 }
180
181 #[must_use]
183 pub fn writer_proxy_count(&self) -> usize {
184 self.inner.writer_proxy_count()
185 }
186
187 #[must_use]
189 pub fn inner(&self) -> &ReliableReader {
190 &self.inner
191 }
192
193 pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
195 self.inner.add_writer_proxy(proxy);
196 }
197
198 pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
200 self.inner.remove_writer_proxy(guid)
201 }
202
203 pub fn handle_data(
209 &mut self,
210 data: &DataSubmessage,
211 ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
212 let samples = self.inner.handle_data(data);
213 decode_samples(samples.into_iter().map(|s| s.payload))
214 }
215
216 pub fn handle_data_frag(
221 &mut self,
222 df: &DataFragSubmessage,
223 now: Duration,
224 ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
225 let samples = self.inner.handle_data_frag(df, now);
226 decode_samples(samples.into_iter().map(|s| s.payload))
227 }
228
229 pub fn handle_gap(
234 &mut self,
235 gap: &GapSubmessage,
236 ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
237 let samples = self.inner.handle_gap(gap);
238 decode_samples(samples.into_iter().map(|s| s.payload))
239 }
240
241 pub fn handle_heartbeat(&mut self, hb: &HeartbeatSubmessage, now: Duration) {
243 self.inner.handle_heartbeat(hb, now);
244 }
245
246 pub fn tick_outbound(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
251 self.inner.tick_outbound(now)
252 }
253}
254
255fn decode_samples<B, I>(payloads: I) -> SecurityResult<Vec<ParticipantGenericMessage>>
256where
257 B: AsRef<[u8]>,
258 I: IntoIterator<Item = B>,
259{
260 let mut out = Vec::new();
261 for p in payloads {
262 out.push(decode_generic_message(p.as_ref())?);
267 }
268 Ok(out)
269}
270
271const _: Option<SecurityErrorKind> = None;
273const _: Option<SecurityError> = None;
274
275#[cfg(test)]
276#[allow(clippy::expect_used, clippy::unwrap_used)]
277mod tests {
278 use super::*;
279 use zerodds_rtps::wire_types::Locator;
280 use zerodds_security::generic_message::{MessageIdentity, class_id};
281 use zerodds_security::token::DataHolder;
282
283 fn local_prefix() -> GuidPrefix {
284 GuidPrefix::from_bytes([1; 12])
285 }
286 fn remote_prefix() -> GuidPrefix {
287 GuidPrefix::from_bytes([2; 12])
288 }
289
290 fn sample_msg() -> ParticipantGenericMessage {
291 ParticipantGenericMessage {
292 message_identity: MessageIdentity {
293 source_guid: [0xAA; 16],
294 sequence_number: 1,
295 },
296 related_message_identity: MessageIdentity::default(),
297 destination_participant_key: [0xBB; 16],
298 destination_endpoint_key: [0; 16],
299 source_endpoint_key: [0xCC; 16],
300 message_class_id: class_id::PARTICIPANT_CRYPTO_TOKENS.into(),
301 message_data: alloc::vec![DataHolder::new("DDS:Crypto:AES-GCM-GMAC")],
302 }
303 }
304
305 #[test]
306 fn writer_has_expected_entity_id() {
307 let w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
308 assert_eq!(
309 w.guid().entity_id,
310 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
311 );
312 }
313
314 #[test]
315 fn reader_has_expected_entity_id() {
316 let r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
317 assert_eq!(
318 r.guid().entity_id,
319 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
320 );
321 }
322
323 #[test]
324 fn writer_starts_with_zero_proxies() {
325 let w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
326 assert_eq!(w.reader_proxy_count(), 0);
327 }
328
329 #[test]
330 fn reader_starts_with_zero_proxies() {
331 let r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
332 assert_eq!(r.writer_proxy_count(), 0);
333 }
334
335 #[test]
336 fn write_without_proxies_returns_empty_datagrams() {
337 let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
338 let dgs = w.write(&sample_msg()).unwrap();
339 assert!(dgs.is_empty());
340 }
341
342 #[test]
343 fn write_with_one_proxy_produces_one_datagram() {
344 let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
345 let remote = Guid::new(
346 remote_prefix(),
347 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
348 );
349 w.add_reader_proxy(ReaderProxy::new(
350 remote,
351 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
352 alloc::vec![],
353 true,
354 ));
355 let dgs = w.write(&sample_msg()).unwrap();
356 assert_eq!(dgs.len(), 1);
357 }
358
359 #[test]
360 fn add_remove_reader_proxy_roundtrip() {
361 let mut w = VolatileSecureMessageWriter::new(local_prefix(), VendorId::ZERODDS);
362 let remote = Guid::new(
363 remote_prefix(),
364 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
365 );
366 w.add_reader_proxy(ReaderProxy::new(remote, alloc::vec![], alloc::vec![], true));
367 assert_eq!(w.reader_proxy_count(), 1);
368 assert!(w.remove_reader_proxy(remote).is_some());
369 assert_eq!(w.reader_proxy_count(), 0);
370 }
371
372 #[test]
373 fn add_remove_writer_proxy_roundtrip() {
374 let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
375 let remote = Guid::new(
376 remote_prefix(),
377 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
378 );
379 r.add_writer_proxy(WriterProxy::new(
380 remote,
381 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
382 alloc::vec![],
383 true,
384 ));
385 assert_eq!(r.writer_proxy_count(), 1);
386 assert!(r.remove_writer_proxy(remote).is_some());
387 assert_eq!(r.writer_proxy_count(), 0);
388 }
389
390 #[test]
391 fn reader_decodes_data_with_known_writer() {
392 let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
393 let remote = Guid::new(
394 remote_prefix(),
395 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
396 );
397 r.add_writer_proxy(WriterProxy::new(
398 remote,
399 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
400 alloc::vec![],
401 true,
402 ));
403 let msg = sample_msg();
404 let payload = encode_generic_message(&msg);
405 let data = DataSubmessage {
406 extra_flags: 0,
407 reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
408 writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
409 writer_sn: SequenceNumber(1),
410 inline_qos: None,
411 key_flag: false,
412 non_standard_flag: false,
413 serialized_payload: payload.into(),
414 };
415 let out = r.handle_data(&data).unwrap();
416 assert_eq!(out.len(), 1);
417 assert_eq!(out[0], msg);
418 }
419
420 #[test]
421 fn reader_drops_data_from_unknown_writer() {
422 let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
423 let msg = sample_msg();
425 let payload = encode_generic_message(&msg);
426 let data = DataSubmessage {
427 extra_flags: 0,
428 reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
429 writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
430 writer_sn: SequenceNumber(1),
431 inline_qos: None,
432 key_flag: false,
433 non_standard_flag: false,
434 serialized_payload: payload.into(),
435 };
436 let out = r.handle_data(&data).unwrap();
437 assert!(out.is_empty());
438 }
439
440 #[test]
441 fn reader_rejects_corrupt_payload() {
442 let mut r = VolatileSecureMessageReader::new(local_prefix(), VendorId::ZERODDS);
443 let remote = Guid::new(
444 remote_prefix(),
445 EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
446 );
447 r.add_writer_proxy(WriterProxy::new(remote, alloc::vec![], alloc::vec![], true));
448 let data = DataSubmessage {
449 extra_flags: 0,
450 reader_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER,
451 writer_id: EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER,
452 writer_sn: SequenceNumber(1),
453 inline_qos: None,
454 key_flag: false,
455 non_standard_flag: false,
456 serialized_payload: alloc::vec![0x00, 0x99, 0, 0].into(),
457 };
458 let err = r.handle_data(&data).unwrap_err();
459 assert_eq!(err.kind, SecurityErrorKind::BadArgument);
460 }
461}