1extern crate alloc;
25
26use alloc::rc::Rc;
27use alloc::vec::Vec;
28
29use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram, encode_data_datagram};
30use zerodds_rtps::error::WireError;
31use zerodds_rtps::header::RtpsHeader;
32use zerodds_rtps::message_builder::OutboundDatagram;
33use zerodds_rtps::reader_proxy::ReaderProxy;
34use zerodds_rtps::submessages::DataSubmessage;
35use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
36use zerodds_rtps::writer_proxy::WriterProxy;
37
38use zerodds_security::error::{SecurityError, SecurityErrorKind, SecurityResult};
39use zerodds_security::generic_message::ParticipantGenericMessage;
40
41use crate::security::codec::{decode_generic_message, encode_generic_message};
42
43#[derive(Debug)]
51pub struct StatelessMessageWriter {
52 guid: Guid,
53 vendor_id: VendorId,
54 next_sn: i64,
55 reader_proxies: Vec<ReaderProxy>,
56}
57
58impl StatelessMessageWriter {
59 #[must_use]
61 pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
62 Self {
63 guid: Guid::new(
64 participant_prefix,
65 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
66 ),
67 vendor_id,
68 next_sn: 1,
69 reader_proxies: Vec::new(),
70 }
71 }
72
73 #[must_use]
75 pub fn guid(&self) -> Guid {
76 self.guid
77 }
78
79 #[must_use]
81 pub fn reader_proxies(&self) -> &[ReaderProxy] {
82 &self.reader_proxies
83 }
84
85 #[must_use]
87 pub fn reader_proxy_count(&self) -> usize {
88 self.reader_proxies.len()
89 }
90
91 pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
94 let guid = proxy.remote_reader_guid;
95 if let Some(idx) = self
96 .reader_proxies
97 .iter()
98 .position(|p| p.remote_reader_guid == guid)
99 {
100 self.reader_proxies[idx] = proxy;
101 } else {
102 self.reader_proxies.push(proxy);
103 }
104 }
105
106 pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
109 let idx = self
110 .reader_proxies
111 .iter()
112 .position(|p| p.remote_reader_guid == guid)?;
113 Some(self.reader_proxies.remove(idx))
114 }
115
116 pub fn write(
125 &mut self,
126 msg: &ParticipantGenericMessage,
127 ) -> Result<Vec<OutboundDatagram>, WireError> {
128 if self.reader_proxies.is_empty() {
129 return Ok(Vec::new());
130 }
131 let payload = encode_generic_message(msg);
132 let sn = SequenceNumber(self.next_sn);
133 self.next_sn = self
134 .next_sn
135 .checked_add(1)
136 .ok_or(WireError::ValueOutOfRange {
137 message: "stateless writer sequence overflow",
138 })?;
139
140 let mut out = Vec::with_capacity(self.reader_proxies.len());
141 for proxy in &self.reader_proxies {
142 let data = DataSubmessage {
143 extra_flags: 0,
144 reader_id: proxy.remote_reader_guid.entity_id,
145 writer_id: self.guid.entity_id,
146 writer_sn: sn,
147 inline_qos: None,
148 key_flag: false,
149 non_standard_flag: false,
150 serialized_payload: payload.clone().into(),
151 };
152 let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
153 let bytes = encode_data_datagram(header, &[data])?;
154 let targets = Rc::new(proxy.unicast_locators.clone());
157 out.push(OutboundDatagram { bytes, targets });
158 }
159 Ok(out)
160 }
161}
162
163#[derive(Debug)]
172pub struct StatelessMessageReader {
173 guid: Guid,
174 #[allow(dead_code)]
175 vendor_id: VendorId,
176 writer_proxies: Vec<WriterProxy>,
180}
181
182impl StatelessMessageReader {
183 #[must_use]
185 pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
186 Self {
187 guid: Guid::new(
188 participant_prefix,
189 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
190 ),
191 vendor_id,
192 writer_proxies: Vec::new(),
193 }
194 }
195
196 #[must_use]
198 pub fn guid(&self) -> Guid {
199 self.guid
200 }
201
202 #[must_use]
204 pub fn writer_proxy_count(&self) -> usize {
205 self.writer_proxies.len()
206 }
207
208 #[must_use]
210 pub fn writer_proxies(&self) -> &[WriterProxy] {
211 &self.writer_proxies
212 }
213
214 pub fn add_writer_proxy(&mut self, proxy: WriterProxy) {
216 let guid = proxy.remote_writer_guid;
217 if let Some(idx) = self
218 .writer_proxies
219 .iter()
220 .position(|p| p.remote_writer_guid == guid)
221 {
222 self.writer_proxies[idx] = proxy;
223 } else {
224 self.writer_proxies.push(proxy);
225 }
226 }
227
228 pub fn remove_writer_proxy(&mut self, guid: Guid) -> Option<WriterProxy> {
230 let idx = self
231 .writer_proxies
232 .iter()
233 .position(|p| p.remote_writer_guid == guid)?;
234 Some(self.writer_proxies.remove(idx))
235 }
236
237 pub fn handle_data(
243 &mut self,
244 data: &DataSubmessage,
245 ) -> SecurityResult<ParticipantGenericMessage> {
246 decode_generic_message(&data.serialized_payload)
247 }
248
249 pub fn handle_datagram(
257 &mut self,
258 datagram: &[u8],
259 ) -> SecurityResult<Vec<ParticipantGenericMessage>> {
260 let parsed = decode_datagram(datagram).map_err(|_| {
261 SecurityError::new(
262 SecurityErrorKind::BadArgument,
263 "stateless reader: wire decode failed",
264 )
265 })?;
266 let mut out = Vec::new();
267 for sub in parsed.submessages {
268 if let ParsedSubmessage::Data(d) = sub {
269 if d.reader_id == self.guid.entity_id
270 || d.writer_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
271 {
272 out.push(decode_generic_message(&d.serialized_payload)?);
273 }
274 }
275 }
276 Ok(out)
277 }
278}
279
280#[cfg(test)]
281#[allow(
282 clippy::expect_used,
283 clippy::unwrap_used,
284 clippy::panic,
285 clippy::unreachable
286)]
287mod tests {
288 use super::*;
289 use zerodds_rtps::wire_types::Locator;
290 use zerodds_security::generic_message::{MessageIdentity, class_id};
291 use zerodds_security::token::DataHolder;
292
293 fn sample_msg(seq: i64) -> ParticipantGenericMessage {
294 ParticipantGenericMessage {
295 message_identity: MessageIdentity {
296 source_guid: [0xAA; 16],
297 sequence_number: seq,
298 },
299 related_message_identity: MessageIdentity::default(),
300 destination_participant_key: [0xBB; 16],
301 destination_endpoint_key: [0; 16],
302 source_endpoint_key: [0xCC; 16],
303 message_class_id: class_id::AUTH_REQUEST.into(),
304 message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
305 }
306 }
307
308 fn local_prefix() -> GuidPrefix {
309 GuidPrefix::from_bytes([1; 12])
310 }
311
312 fn remote_prefix() -> GuidPrefix {
313 GuidPrefix::from_bytes([2; 12])
314 }
315
316 #[test]
317 fn writer_has_expected_entity_id() {
318 let w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
319 assert_eq!(
320 w.guid().entity_id,
321 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
322 );
323 assert_eq!(w.guid().prefix, local_prefix());
324 }
325
326 #[test]
327 fn reader_has_expected_entity_id() {
328 let r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
329 assert_eq!(
330 r.guid().entity_id,
331 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
332 );
333 }
334
335 #[test]
336 fn write_without_proxies_returns_empty() {
337 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
338 let dgs = w.write(&sample_msg(1)).unwrap();
339 assert!(dgs.is_empty(), "no proxies → no fan-out");
340 }
341
342 #[test]
343 fn write_to_one_proxy_produces_one_datagram() {
344 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
345 let remote = Guid::new(
346 remote_prefix(),
347 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
348 );
349 w.add_reader_proxy(ReaderProxy::new(
350 remote,
351 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
352 alloc::vec![],
353 false,
354 ));
355 let dgs = w.write(&sample_msg(1)).unwrap();
356 assert_eq!(dgs.len(), 1);
357 assert_eq!(dgs[0].targets.len(), 1);
358 }
359
360 #[test]
361 fn write_to_two_proxies_produces_two_datagrams() {
362 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
363 let remote_a = Guid::new(
364 GuidPrefix::from_bytes([2; 12]),
365 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
366 );
367 let remote_b = Guid::new(
368 GuidPrefix::from_bytes([3; 12]),
369 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
370 );
371 w.add_reader_proxy(ReaderProxy::new(
372 remote_a,
373 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
374 alloc::vec![],
375 false,
376 ));
377 w.add_reader_proxy(ReaderProxy::new(
378 remote_b,
379 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7412)],
380 alloc::vec![],
381 false,
382 ));
383 assert_eq!(w.reader_proxy_count(), 2);
384 let dgs = w.write(&sample_msg(1)).unwrap();
385 assert_eq!(dgs.len(), 2);
386 }
387
388 #[test]
389 fn add_reader_proxy_is_idempotent() {
390 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
391 let remote = Guid::new(
392 remote_prefix(),
393 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
394 );
395 w.add_reader_proxy(ReaderProxy::new(
396 remote,
397 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
398 alloc::vec![],
399 false,
400 ));
401 w.add_reader_proxy(ReaderProxy::new(
402 remote,
403 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
404 alloc::vec![],
405 false,
406 ));
407 assert_eq!(w.reader_proxy_count(), 1);
408 }
409
410 #[test]
411 fn remove_reader_proxy_returns_proxy() {
412 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
413 let remote = Guid::new(
414 remote_prefix(),
415 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
416 );
417 w.add_reader_proxy(ReaderProxy::new(
418 remote,
419 alloc::vec![],
420 alloc::vec![],
421 false,
422 ));
423 let removed = w.remove_reader_proxy(remote);
424 assert!(removed.is_some());
425 assert_eq!(w.reader_proxy_count(), 0);
426 assert!(w.remove_reader_proxy(remote).is_none());
427 }
428
429 #[test]
430 fn write_increments_sequence_number() {
431 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
432 let remote = Guid::new(
433 remote_prefix(),
434 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
435 );
436 w.add_reader_proxy(ReaderProxy::new(
437 remote,
438 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
439 alloc::vec![],
440 false,
441 ));
442 let dg1 = w.write(&sample_msg(1)).unwrap()[0].clone();
443 let dg2 = w.write(&sample_msg(2)).unwrap()[0].clone();
444 let p1 = decode_datagram(&dg1.bytes).unwrap();
446 let p2 = decode_datagram(&dg2.bytes).unwrap();
447 let sn1 = match &p1.submessages[0] {
448 ParsedSubmessage::Data(d) => d.writer_sn,
449 _ => unreachable!(),
450 };
451 let sn2 = match &p2.submessages[0] {
452 ParsedSubmessage::Data(d) => d.writer_sn,
453 _ => unreachable!(),
454 };
455 assert_eq!(sn1, SequenceNumber(1));
456 assert_eq!(sn2, SequenceNumber(2));
457 }
458
459 #[test]
460 fn write_carries_writer_entity_id_on_wire() {
461 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
462 let remote = Guid::new(
463 remote_prefix(),
464 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
465 );
466 w.add_reader_proxy(ReaderProxy::new(
467 remote,
468 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
469 alloc::vec![],
470 false,
471 ));
472 let dgs = w.write(&sample_msg(1)).unwrap();
473 let parsed = decode_datagram(&dgs[0].bytes).unwrap();
474 match &parsed.submessages[0] {
475 ParsedSubmessage::Data(d) => {
476 assert_eq!(
477 d.writer_id,
478 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
479 );
480 assert_eq!(
481 d.reader_id,
482 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
483 );
484 }
485 _ => panic!("expected DATA"),
486 }
487 }
488
489 #[test]
490 fn reader_handle_data_decodes_generic_message() {
491 let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
492 let msg = sample_msg(42);
493 let payload = encode_generic_message(&msg);
494 let data = DataSubmessage {
495 extra_flags: 0,
496 reader_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
497 writer_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
498 writer_sn: SequenceNumber(1),
499 inline_qos: None,
500 key_flag: false,
501 non_standard_flag: false,
502 serialized_payload: payload.into(),
503 };
504 let decoded = r.handle_data(&data).unwrap();
505 assert_eq!(decoded, msg);
506 }
507
508 #[test]
509 fn reader_handle_data_rejects_corrupt_payload() {
510 let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
511 let data = DataSubmessage {
512 extra_flags: 0,
513 reader_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
514 writer_id: EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
515 writer_sn: SequenceNumber(1),
516 inline_qos: None,
517 key_flag: false,
518 non_standard_flag: false,
519 serialized_payload: alloc::vec![0x00, 0x99, 0, 0].into(),
520 };
521 let err = r.handle_data(&data).unwrap_err();
522 assert_eq!(err.kind, SecurityErrorKind::BadArgument);
523 }
524
525 #[test]
526 fn reader_writer_proxy_management() {
527 let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
528 let remote = Guid::new(
529 remote_prefix(),
530 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER,
531 );
532 r.add_writer_proxy(WriterProxy::new(
533 remote,
534 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
535 alloc::vec![],
536 false,
537 ));
538 r.add_writer_proxy(WriterProxy::new(
540 remote,
541 alloc::vec![],
542 alloc::vec![],
543 false,
544 ));
545 assert_eq!(r.writer_proxy_count(), 1);
546 assert!(r.remove_writer_proxy(remote).is_some());
547 assert_eq!(r.writer_proxy_count(), 0);
548 }
549
550 #[test]
551 fn end_to_end_writer_to_reader_loopback() {
552 let mut w = StatelessMessageWriter::new(local_prefix(), VendorId::ZERODDS);
554 let mut r = StatelessMessageReader::new(remote_prefix(), VendorId::ZERODDS);
555 let remote_reader_guid = Guid::new(
556 remote_prefix(),
557 EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
558 );
559 w.add_reader_proxy(ReaderProxy::new(
560 remote_reader_guid,
561 alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
562 alloc::vec![],
563 false,
564 ));
565 let msg = sample_msg(7);
566 let dgs = w.write(&msg).unwrap();
567 let decoded = r.handle_datagram(&dgs[0].bytes).unwrap();
568 assert_eq!(decoded.len(), 1);
569 assert_eq!(decoded[0], msg);
570 }
571
572 #[test]
573 fn reader_handle_datagram_rejects_invalid_magic() {
574 let mut r = StatelessMessageReader::new(local_prefix(), VendorId::ZERODDS);
575 let err = r.handle_datagram(&[0u8; 24]).unwrap_err();
576 assert_eq!(err.kind, SecurityErrorKind::BadArgument);
577 }
578}