1extern crate alloc;
28use alloc::collections::BTreeMap;
29use alloc::string::String;
30use alloc::sync::Arc;
31use alloc::vec::Vec;
32use core::time::Duration;
33use std::net::{Ipv4Addr, SocketAddr};
34use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
35use std::sync::mpsc;
36use std::sync::{Condvar, Mutex, RwLock};
37use std::thread::{self, JoinHandle};
38use std::time::Instant;
39
40use zerodds_discovery::security::SecurityBuiltinStack;
41use zerodds_discovery::sedp::SedpStack;
42use zerodds_discovery::spdp::{
43 DiscoveredParticipant, DiscoveredParticipantsCache, SpdpBeacon, SpdpReader,
44};
45use zerodds_discovery::type_lookup::{
46 TypeLookupClient, TypeLookupEndpoints, TypeLookupReply, TypeLookupServer,
47};
48use zerodds_qos::Duration as QosDuration;
49use zerodds_rtps::EntityId;
50use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
51use zerodds_rtps::fragment_assembler::AssemblerCaps;
52use zerodds_rtps::history_cache::HistoryKind;
53use zerodds_rtps::message_builder::DEFAULT_MTU;
54use zerodds_rtps::participant_data::{ParticipantBuiltinTopicData, endpoint_flag};
55use zerodds_rtps::reliable_reader::{ReliableReader, ReliableReaderConfig};
56use zerodds_rtps::reliable_writer::{
57 DEFAULT_FRAGMENT_SIZE, DEFAULT_HEARTBEAT_PERIOD, ReliableWriter, ReliableWriterConfig,
58};
59use zerodds_rtps::wire_types::{
60 Guid, GuidPrefix, Locator, LocatorKind, ProtocolVersion, SPDP_DEFAULT_MULTICAST_ADDRESS,
61 VendorId, spdp_multicast_port,
62};
63use zerodds_transport::Transport;
64use zerodds_transport_udp::UdpTransport;
65
66#[cfg(feature = "security")]
67use zerodds_security_runtime::{EndpointProtection, IpRange, NetInterface, ProtectionLevel};
68
69use crate::error::{DdsError, Result};
70
71pub const DEFAULT_TICK_PERIOD: Duration = Duration::from_millis(5);
83
84pub const DEFAULT_SPDP_PERIOD: Duration = Duration::from_secs(5);
86
87fn deadline_compat(offered_nanos: u64, requested_nanos: u64) -> bool {
92 if offered_nanos == 0 || requested_nanos == 0 {
93 return true;
95 }
96 offered_nanos <= requested_nanos
97}
98
99fn partitions_overlap(offered: &[String], requested: &[String]) -> bool {
102 if offered.is_empty() && requested.is_empty() {
103 return true;
104 }
105 let off_default = offered.is_empty();
107 let req_default = requested.is_empty();
108 if off_default && requested.iter().any(|s| s.is_empty()) {
109 return true;
110 }
111 if req_default && offered.iter().any(|s| s.is_empty()) {
112 return true;
113 }
114 offered.iter().any(|o| requested.iter().any(|r| r == o))
116}
117
118#[cfg(feature = "std")]
129fn announce_locator(uc: &UdpTransport, hint: Ipv4Addr) -> Locator {
130 let raw = uc.local_locator();
131 let port = raw.port;
133 let ip = Ipv4Addr::new(
135 raw.address[12],
136 raw.address[13],
137 raw.address[14],
138 raw.address[15],
139 );
140 if !ip.is_unspecified() {
141 return raw;
142 }
143 if let Ok(probe) =
147 std::net::UdpSocket::bind(std::net::SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
148 {
149 if probe
150 .connect(std::net::SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 1), 7))
151 .is_ok()
152 {
153 if let Ok(std::net::SocketAddr::V4(local)) = probe.local_addr() {
154 let resolved = local.ip();
155 if !resolved.is_unspecified() {
156 return Locator::udp_v4(resolved.octets(), port);
157 }
158 }
159 }
160 }
161 if !hint.is_unspecified() {
164 return Locator::udp_v4(hint.octets(), port);
165 }
166 Locator::udp_v4([127, 0, 0, 1], port)
169}
170
171fn qos_duration_from_std(d: Duration) -> QosDuration {
175 let secs = i32::try_from(d.as_secs()).unwrap_or(i32::MAX);
176 let nanos = d.subsec_nanos();
177 let fraction = ((u64::from(nanos)) << 32) / 1_000_000_000u64;
179 QosDuration {
180 seconds: secs,
181 fraction: fraction as u32,
182 }
183}
184
185fn qos_duration_to_nanos(d: zerodds_qos::Duration) -> u64 {
188 if d.is_infinite() {
189 return 0;
190 }
191 let secs = d.seconds.max(0) as u64;
192 let frac_nanos = ((d.fraction as u64) * 1_000_000_000u64) >> 32;
194 secs.saturating_mul(1_000_000_000u64)
195 .saturating_add(frac_nanos)
196}
197
198pub const USER_PAYLOAD_ENCAP: [u8; 4] = [0x00, 0x07, 0x00, 0x00];
205
206const SMALL_FRAME_CAP: usize = 1536;
210
211fn write_user_sample_pooled(
218 writer: &mut ReliableWriter,
219 payload: &[u8],
220 now: Duration,
221) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>> {
222 let mut frame = zerodds_foundation::PoolBuffer::<SMALL_FRAME_CAP>::new();
223 frame
224 .extend_from_slice(&USER_PAYLOAD_ENCAP)
225 .map_err(|_| DdsError::WireError {
226 message: String::from("user encap framing"),
227 })?;
228 frame
229 .extend_from_slice(payload)
230 .map_err(|_| DdsError::WireError {
231 message: String::from("user payload framing"),
232 })?;
233 writer
235 .write_with_heartbeat(frame.as_slice(), now)
236 .map_err(|_| DdsError::WireError {
237 message: String::from("user writer encode"),
238 })
239}
240
241#[derive(Clone)]
244pub struct RuntimeConfig {
245 pub tick_period: Duration,
247 pub spdp_period: Duration,
249 pub spdp_multicast_group: Ipv4Addr,
251 pub multicast_interface: Ipv4Addr,
254
255 #[cfg(feature = "security")]
261 pub security: Option<std::sync::Arc<zerodds_security_runtime::SharedSecurityGate>>,
262 #[cfg(feature = "security")]
266 pub security_logger: Option<std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin>>,
267
268 #[cfg(feature = "security")]
274 pub interface_bindings: Vec<InterfaceBindingSpec>,
275
276 pub announce_secure_endpoints: bool,
284
285 pub wlp_period: Duration,
291
292 pub participant_lease_duration: Duration,
298
299 pub user_data: Vec<u8>,
304
305 pub observability: zerodds_foundation::observability::SharedSink,
311
312 pub recv_thread_priority: Option<i32>,
320
321 pub tick_thread_priority: Option<i32>,
323
324 pub recv_thread_cpus: Option<Vec<usize>>,
329
330 pub tick_thread_cpus: Option<Vec<usize>>,
332
333 pub data_representation_offer: Vec<i16>,
347
348 pub data_rep_match_mode: zerodds_rtps::publication_data::data_representation::DataRepMatchMode,
358}
359
360#[cfg(feature = "security")]
368#[derive(Clone, Debug)]
369pub struct InterfaceBindingSpec {
370 pub name: String,
373 pub bind_addr: Ipv4Addr,
375 pub bind_port: u16,
377 pub kind: NetInterface,
379 pub subnet: IpRange,
383 pub default: bool,
387}
388
389#[cfg(feature = "security")]
391struct InterfaceBinding {
392 spec: InterfaceBindingSpec,
393 socket: Arc<UdpTransport>,
394}
395
396#[cfg(feature = "security")]
405struct OutboundSocketPool {
406 bindings: Vec<InterfaceBinding>,
407 default_idx: Option<usize>,
408}
409
410#[cfg(feature = "security")]
411impl OutboundSocketPool {
412 fn bind_all(specs: &[InterfaceBindingSpec]) -> Result<Self> {
413 let mut bindings = Vec::with_capacity(specs.len());
414 for spec in specs {
415 let socket = UdpTransport::bind_v4(spec.bind_addr, spec.bind_port).map_err(|_| {
416 DdsError::TransportError {
417 label: "interface-binding bind_v4 failed",
418 }
419 })?;
420 let socket = socket
426 .with_timeout(Some(Duration::from_millis(5)))
427 .map_err(|_| DdsError::TransportError {
428 label: "interface-binding set_timeout failed",
429 })?;
430 bindings.push(InterfaceBinding {
431 spec: spec.clone(),
432 socket: Arc::new(socket),
433 });
434 }
435 let default_idx = bindings.iter().position(|b| b.spec.default);
436 Ok(Self {
437 bindings,
438 default_idx,
439 })
440 }
441
442 fn route(&self, target: &Locator) -> Option<(&Arc<UdpTransport>, NetInterface)> {
446 let ip = ipv4_from_locator(target)?;
447 let addr = core::net::IpAddr::V4(core::net::Ipv4Addr::from(ip));
448 for b in &self.bindings {
449 if b.spec.subnet.contains(&addr) {
450 return Some((&b.socket, b.spec.kind.clone()));
451 }
452 }
453 let idx = self.default_idx?;
454 let b = self.bindings.get(idx)?;
455 Some((&b.socket, b.spec.kind.clone()))
456 }
457}
458
459#[cfg(feature = "security")]
462fn ipv4_from_locator(loc: &Locator) -> Option<[u8; 4]> {
463 if loc.kind != LocatorKind::UdpV4 {
464 return None;
465 }
466 Some([
467 loc.address[12],
468 loc.address[13],
469 loc.address[14],
470 loc.address[15],
471 ])
472}
473
474impl core::fmt::Debug for RuntimeConfig {
475 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
476 let mut dbg = f.debug_struct("RuntimeConfig");
477 dbg.field("tick_period", &self.tick_period)
478 .field("spdp_period", &self.spdp_period)
479 .field("spdp_multicast_group", &self.spdp_multicast_group)
480 .field("multicast_interface", &self.multicast_interface);
481 #[cfg(feature = "security")]
482 {
483 dbg.field("security", &self.security.as_ref().map(|_| "<gate>"));
484 dbg.field(
485 "security_logger",
486 &self.security_logger.as_ref().map(|_| "<logger>"),
487 );
488 }
489 dbg.finish()
490 }
491}
492
493impl Default for RuntimeConfig {
494 fn default() -> Self {
495 Self {
496 tick_period: DEFAULT_TICK_PERIOD,
497 spdp_period: DEFAULT_SPDP_PERIOD,
498 spdp_multicast_group: Ipv4Addr::from(SPDP_DEFAULT_MULTICAST_ADDRESS),
499 multicast_interface: Ipv4Addr::UNSPECIFIED,
500 #[cfg(feature = "security")]
501 security: None,
502 #[cfg(feature = "security")]
503 security_logger: None,
504 #[cfg(feature = "security")]
505 interface_bindings: Vec::new(),
506 announce_secure_endpoints: false,
507 wlp_period: Duration::ZERO,
508 participant_lease_duration: Duration::from_secs(100),
509 user_data: Vec::new(),
510 observability: zerodds_foundation::observability::null_sink(),
511 recv_thread_priority: None,
512 tick_thread_priority: None,
513 recv_thread_cpus: None,
514 tick_thread_cpus: None,
515 data_representation_offer:
517 zerodds_rtps::publication_data::data_representation::DEFAULT_OFFER.to_vec(),
518 data_rep_match_mode:
519 zerodds_rtps::publication_data::data_representation::DataRepMatchMode::default(),
520 }
521 }
522}
523
524#[cfg(feature = "security")]
534fn secure_outbound_bytes(rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
535 match &rt.config.security {
536 Some(gate) => gate.transform_outbound(bytes).ok(),
537 None => Some(bytes.to_vec()),
538 }
539}
540
541#[cfg(not(feature = "security"))]
542fn secure_outbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
543 Some(bytes.to_vec())
544}
545
546#[cfg(feature = "security")]
558fn secure_inbound_bytes(rt: &DcpsRuntime, bytes: &[u8], iface: &NetInterface) -> Option<Vec<u8>> {
559 use zerodds_security_runtime::{InboundVerdict, LogLevel};
560 let Some(gate) = &rt.config.security else {
561 return Some(bytes.to_vec());
562 };
563 let verdict = gate.classify_inbound(bytes, iface);
564 let category = verdict.category();
565 let (level, message): (LogLevel, String) = match &verdict {
566 InboundVerdict::Accept(out) => return Some(out.clone()),
567 InboundVerdict::Malformed => (
568 LogLevel::Error,
569 alloc::format!(
570 "inbound datagram too short ({} bytes, iface={:?})",
571 bytes.len(),
572 iface
573 ),
574 ),
575 InboundVerdict::LegacyBlocked => (
576 LogLevel::Error,
577 alloc::format!(
578 "legacy plaintext peer on protected domain \
579 (iface={iface:?}, allow_unauthenticated_participants=false)"
580 ),
581 ),
582 InboundVerdict::PolicyViolation(msg) => {
583 (LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
584 }
585 InboundVerdict::CryptoError(msg) => {
586 (LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
587 }
588 };
589 if let Some(logger) = &rt.config.security_logger {
590 let mut participant = [0u8; 16];
592 if bytes.len() >= 20 {
593 participant[..12].copy_from_slice(&bytes[8..20]);
594 }
595 logger.log(level, participant, category, &message);
596 }
597 None
598}
599
600#[cfg(not(feature = "security"))]
601fn secure_inbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
602 Some(bytes.to_vec())
603}
604
605#[cfg(feature = "security")]
611const DEFAULT_INBOUND_IFACE: NetInterface = NetInterface::Wan;
612
613#[cfg(feature = "security")]
629fn secure_outbound_for_target(
630 rt: &DcpsRuntime,
631 writer_eid: EntityId,
632 bytes: &[u8],
633 target: &Locator,
634) -> Option<Vec<u8>> {
635 let Some(gate) = &rt.config.security else {
636 return Some(bytes.to_vec());
637 };
638 let resolved = rt.writer_slot(writer_eid).and_then(|arc| {
639 arc.lock().ok().and_then(|slot| {
640 let pk = slot.locator_to_peer.get(target).copied()?;
641 let lv = slot.reader_protection.get(&pk).copied()?;
642 Some((pk, lv))
643 })
644 });
645 match resolved {
646 Some((peer_key, level)) => gate.transform_outbound_for(&peer_key, bytes, level).ok(),
647 None => gate.transform_outbound(bytes).ok(),
648 }
649}
650
651#[cfg(not(feature = "security"))]
652fn secure_outbound_for_target(
653 _rt: &DcpsRuntime,
654 _writer_eid: EntityId,
655 bytes: &[u8],
656 _target: &Locator,
657) -> Option<Vec<u8>> {
658 Some(bytes.to_vec())
659}
660
661#[cfg(feature = "security")]
666fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
667 if let Some(pool) = &rt.outbound_pool {
668 if let Some((socket, _iface)) = pool.route(target) {
669 let _ = socket.send(target, bytes);
670 return;
671 }
672 }
673 let _ = rt.user_unicast.send(target, bytes);
674}
675
676#[cfg(not(feature = "security"))]
677fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
678 let _ = rt.user_unicast.send(target, bytes);
679}
680
681struct UserWriterSlot {
684 writer: ReliableWriter,
685 topic_name: String,
686 type_name: String,
687 reliable: bool,
688 durability: zerodds_qos::DurabilityKind,
689 deadline_nanos: u64,
691 last_write: Option<Duration>,
693 offered_deadline_missed_count: u64,
695 liveliness_lost_count: u64,
699 last_liveliness_assert: Option<Duration>,
701 offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus,
705 lifespan_nanos: u64,
707 sample_insert_times:
711 alloc::collections::VecDeque<(zerodds_rtps::wire_types::SequenceNumber, Duration)>,
712 liveliness_kind: zerodds_qos::LivelinessKind,
714 liveliness_lease_nanos: u64,
716 ownership: zerodds_qos::OwnershipKind,
718 partition: Vec<String>,
720 #[cfg(feature = "security")]
725 reader_protection: BTreeMap<[u8; 12], ProtectionLevel>,
726 #[cfg(feature = "security")]
731 locator_to_peer: BTreeMap<Locator, [u8; 12]>,
732 type_identifier: zerodds_types::TypeIdentifier,
735 data_rep_offer_override: Option<Vec<i16>>,
738}
739
740pub type UserSampleWithEncap = (UserSample, Option<(Arc<[u8]>, usize)>);
744
745#[derive(Debug, Clone)]
750pub enum UserSample {
751 Alive {
756 payload: Vec<u8>,
758 writer_guid: [u8; 16],
760 writer_strength: i32,
766 },
767 Lifecycle {
770 key_hash: [u8; 16],
772 kind: zerodds_rtps::history_cache::ChangeKind,
775 },
776}
777
778pub type UserReaderListener = alloc::boxed::Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
799
800struct UserReaderSlot {
801 reader: ReliableReader,
802 topic_name: String,
803 type_name: String,
804 sample_tx: mpsc::Sender<UserSample>,
805 async_waker: alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>,
809 listener: Option<alloc::sync::Arc<UserReaderListener>>,
816 durability: zerodds_qos::DurabilityKind,
817 deadline_nanos: u64,
819 last_sample_received: Option<Duration>,
821 requested_deadline_missed_count: u64,
823 requested_incompatible_qos: crate::status::RequestedIncompatibleQosStatus,
827 sample_lost_count: u64,
830 sample_rejected: crate::status::SampleRejectedStatus,
833 liveliness_lease_nanos: u64,
835 liveliness_kind: zerodds_qos::LivelinessKind,
837 liveliness_alive_count: u64,
840 liveliness_not_alive_count: u64,
842 liveliness_alive: bool,
844 ownership: zerodds_qos::OwnershipKind,
846 partition: Vec<String>,
848 writer_strengths: alloc::collections::BTreeMap<[u8; 16], i32>,
854 type_identifier: zerodds_types::TypeIdentifier,
860 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
863}
864
865#[derive(Debug, Clone)]
872pub struct UserWriterConfig {
873 pub topic_name: String,
875 pub type_name: String,
877 pub reliable: bool,
879 pub durability: zerodds_qos::DurabilityKind,
881 pub deadline: zerodds_qos::DeadlineQosPolicy,
883 pub lifespan: zerodds_qos::LifespanQosPolicy,
885 pub liveliness: zerodds_qos::LivelinessQosPolicy,
887 pub ownership: zerodds_qos::OwnershipKind,
889 pub ownership_strength: i32,
891 pub partition: Vec<String>,
893 pub user_data: Vec<u8>,
896 pub topic_data: Vec<u8>,
898 pub group_data: Vec<u8>,
900 pub type_identifier: zerodds_types::TypeIdentifier,
903
904 pub data_representation_offer: Option<Vec<i16>>,
909}
910
911#[derive(Debug, Clone)]
913pub struct UserReaderConfig {
914 pub topic_name: String,
916 pub type_name: String,
918 pub reliable: bool,
920 pub durability: zerodds_qos::DurabilityKind,
922 pub deadline: zerodds_qos::DeadlineQosPolicy,
924 pub liveliness: zerodds_qos::LivelinessQosPolicy,
926 pub ownership: zerodds_qos::OwnershipKind,
928 pub partition: Vec<String>,
930 pub user_data: Vec<u8>,
932 pub topic_data: Vec<u8>,
934 pub group_data: Vec<u8>,
936 pub type_identifier: zerodds_types::TypeIdentifier,
938 pub type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
941
942 pub data_representation_offer: Option<Vec<i16>>,
947}
948
949fn build_publication_data(
950 owner_prefix: GuidPrefix,
951 writer_eid: EntityId,
952 cfg: &UserWriterConfig,
953 runtime_offer: &[i16],
954) -> zerodds_rtps::publication_data::PublicationBuiltinTopicData {
955 use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
956 zerodds_rtps::publication_data::PublicationBuiltinTopicData {
957 key: Guid::new(owner_prefix, writer_eid),
958 participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
959 topic_name: cfg.topic_name.clone(),
960 type_name: cfg.type_name.clone(),
961 durability: cfg.durability,
962 reliability: ReliabilityQosPolicy {
963 kind: if cfg.reliable {
964 ReliabilityKind::Reliable
965 } else {
966 ReliabilityKind::BestEffort
967 },
968 max_blocking_time: QosDuration::from_millis(100_i32),
969 },
970 ownership: cfg.ownership,
971 ownership_strength: cfg.ownership_strength,
972 liveliness: cfg.liveliness,
973 deadline: cfg.deadline,
974 lifespan: cfg.lifespan,
975 partition: cfg.partition.clone(),
976 user_data: cfg.user_data.clone(),
977 topic_data: cfg.topic_data.clone(),
978 group_data: cfg.group_data.clone(),
979 type_information: None,
980 data_representation: cfg
984 .data_representation_offer
985 .clone()
986 .unwrap_or_else(|| runtime_offer.to_vec()),
987 security_info: None,
990 service_instance_name: None,
993 related_entity_guid: None,
994 topic_aliases: None,
995 type_identifier: cfg.type_identifier.clone(),
997 }
998}
999
1000fn build_subscription_data(
1001 owner_prefix: GuidPrefix,
1002 reader_eid: EntityId,
1003 cfg: &UserReaderConfig,
1004 runtime_offer: &[i16],
1005) -> zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
1006 use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
1007 zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
1008 key: Guid::new(owner_prefix, reader_eid),
1009 participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
1010 topic_name: cfg.topic_name.clone(),
1011 type_name: cfg.type_name.clone(),
1012 durability: cfg.durability,
1013 reliability: ReliabilityQosPolicy {
1014 kind: if cfg.reliable {
1015 ReliabilityKind::Reliable
1016 } else {
1017 ReliabilityKind::BestEffort
1018 },
1019 max_blocking_time: QosDuration::from_millis(100_i32),
1020 },
1021 ownership: cfg.ownership,
1022 liveliness: cfg.liveliness,
1023 deadline: cfg.deadline,
1024 partition: cfg.partition.clone(),
1025 user_data: cfg.user_data.clone(),
1026 topic_data: cfg.topic_data.clone(),
1027 group_data: cfg.group_data.clone(),
1028 type_information: None,
1029 data_representation: cfg
1032 .data_representation_offer
1033 .clone()
1034 .unwrap_or_else(|| runtime_offer.to_vec()),
1035 content_filter: None,
1036 security_info: None,
1037 service_instance_name: None,
1038 related_entity_guid: None,
1039 topic_aliases: None,
1040 type_identifier: cfg.type_identifier.clone(),
1042 }
1043}
1044
1045pub struct DcpsRuntime {
1048 pub guid_prefix: GuidPrefix,
1050 pub domain_id: i32,
1052 pub spdp_multicast_rx: Arc<UdpTransport>,
1054 pub spdp_unicast: Arc<UdpTransport>,
1056 pub user_unicast: Arc<UdpTransport>,
1059 spdp_mc_tx: Arc<UdpTransport>,
1062 spdp_beacon: Mutex<SpdpBeacon>,
1064 spdp_reader: SpdpReader,
1066 discovered: Arc<Mutex<DiscoveredParticipantsCache>>,
1068 pub sedp: Arc<Mutex<SedpStack>>,
1070 pub type_lookup_endpoints: TypeLookupEndpoints,
1072 pub type_lookup_server: Arc<Mutex<TypeLookupServer>>,
1075 pub type_lookup_client: Arc<Mutex<TypeLookupClient>>,
1078 pub security_builtin: Mutex<Option<Arc<Mutex<SecurityBuiltinStack>>>>,
1085 start_instant: Instant,
1087 user_writers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserWriterSlot>>>>>,
1089 shm_locators: Arc<RwLock<BTreeMap<EntityId, Vec<u8>>>>,
1094 user_readers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserReaderSlot>>>>>,
1096 entity_counter: AtomicU32,
1099 pub config: RuntimeConfig,
1101 #[cfg(feature = "security")]
1105 outbound_pool: Option<Arc<OutboundSocketPool>>,
1106 pub wlp: Arc<Mutex<crate::wlp::WlpEndpoint>>,
1110 builtin_sinks: Mutex<Option<crate::builtin_subscriber::BuiltinSinks>>,
1116 ignore_filter: Mutex<Option<crate::participant::IgnoreFilter>>,
1121 stop: Arc<AtomicBool>,
1123 handles: Mutex<Vec<JoinHandle<()>>>,
1127 match_event: Arc<(Mutex<()>, Condvar)>,
1134 ack_event: Arc<(Mutex<()>, Condvar)>,
1138}
1139
1140impl core::fmt::Debug for DcpsRuntime {
1141 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1142 f.debug_struct("DcpsRuntime")
1143 .field("domain_id", &self.domain_id)
1144 .field("guid_prefix", &self.guid_prefix)
1145 .field("spdp_group", &self.config.spdp_multicast_group)
1146 .finish_non_exhaustive()
1147 }
1148}
1149
1150type WriterSlotArc = Arc<Mutex<UserWriterSlot>>;
1153type ReaderSlotArc = Arc<Mutex<UserReaderSlot>>;
1154
1155impl DcpsRuntime {
1156 fn writer_slot(&self, eid: EntityId) -> Option<WriterSlotArc> {
1173 self.user_writers
1174 .read()
1175 .ok()
1176 .and_then(|w| w.get(&eid).cloned())
1177 }
1178
1179 fn reader_slot(&self, eid: EntityId) -> Option<ReaderSlotArc> {
1181 self.user_readers
1182 .read()
1183 .ok()
1184 .and_then(|r| r.get(&eid).cloned())
1185 }
1186
1187 fn writer_slots_snapshot(&self) -> Vec<(EntityId, WriterSlotArc)> {
1192 match self.user_writers.read() {
1193 Ok(w) => w.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
1194 Err(_) => Vec::new(),
1195 }
1196 }
1197
1198 fn reader_slots_snapshot(&self) -> Vec<(EntityId, ReaderSlotArc)> {
1200 match self.user_readers.read() {
1201 Ok(r) => r.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
1202 Err(_) => Vec::new(),
1203 }
1204 }
1205
1206 fn writer_eids(&self) -> Vec<EntityId> {
1209 match self.user_writers.read() {
1210 Ok(w) => w.keys().copied().collect(),
1211 Err(_) => Vec::new(),
1212 }
1213 }
1214
1215 fn reader_eids(&self) -> Vec<EntityId> {
1217 match self.user_readers.read() {
1218 Ok(r) => r.keys().copied().collect(),
1219 Err(_) => Vec::new(),
1220 }
1221 }
1222
1223 pub fn start(
1230 domain_id: i32,
1231 guid_prefix: GuidPrefix,
1232 config: RuntimeConfig,
1233 ) -> Result<Arc<Self>> {
1234 let spdp_port = u16::try_from(spdp_multicast_port(domain_id as u32)).map_err(|_| {
1237 DdsError::BadParameter {
1238 what: "domain_id too large for SPDP port mapping",
1239 }
1240 })?;
1241 let spdp_mc = UdpTransport::bind_multicast_v4(
1242 config.spdp_multicast_group,
1243 spdp_port,
1244 config.multicast_interface,
1245 )
1246 .map_err(|_| DdsError::TransportError {
1247 label: "spdp multicast bind",
1248 })?
1249 .with_timeout(Some(Duration::from_secs(1)))
1253 .map_err(|_| DdsError::TransportError {
1254 label: "spdp multicast set_timeout",
1255 })?;
1256
1257 let spdp_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
1260 .map_err(|_| DdsError::TransportError {
1261 label: "spdp unicast bind",
1262 })?
1263 .with_timeout(Some(Duration::from_secs(1)))
1264 .map_err(|_| DdsError::TransportError {
1265 label: "spdp unicast set_timeout",
1266 })?;
1267
1268 let user_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
1270 .map_err(|_| DdsError::TransportError {
1271 label: "user unicast bind",
1272 })?
1273 .with_timeout(Some(Duration::from_secs(1)))
1274 .map_err(|_| DdsError::TransportError {
1275 label: "user unicast set_timeout",
1276 })?;
1277
1278 let spdp_mc_tx = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0).map_err(|_| {
1282 DdsError::TransportError {
1283 label: "spdp mc-tx bind",
1284 }
1285 })?;
1286
1287 let stop = Arc::new(AtomicBool::new(false));
1288
1289 let user_locator = announce_locator(&user_uc, config.multicast_interface);
1297 let spdp_uc_locator = announce_locator(&spdp_uc, config.multicast_interface);
1298 let participant_data = ParticipantBuiltinTopicData {
1299 guid: Guid::new(guid_prefix, EntityId::PARTICIPANT),
1300 protocol_version: ProtocolVersion::V2_5,
1301 vendor_id: VendorId::ZERODDS,
1302 default_unicast_locator: Some(user_locator),
1303 default_multicast_locator: None,
1304 metatraffic_unicast_locator: Some(spdp_uc_locator),
1305 metatraffic_multicast_locator: Some(Locator {
1306 kind: LocatorKind::UdpV4,
1307 port: u32::from(spdp_port),
1308 address: {
1309 let mut a = [0u8; 16];
1310 a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
1311 a
1312 },
1313 }),
1314 domain_id: Some(domain_id as u32),
1315 builtin_endpoint_set: {
1330 let mut mask = endpoint_flag::ALL_STANDARD;
1331 if config.announce_secure_endpoints {
1332 mask |= endpoint_flag::ALL_SECURE;
1333 }
1334 mask
1335 },
1336 lease_duration: qos_duration_from_std(config.participant_lease_duration),
1339 user_data: config.user_data.clone(),
1342 properties: Default::default(),
1346 identity_token: None,
1350 permissions_token: None,
1351 identity_status_token: None,
1352 sig_algo_info: None,
1353 kx_algo_info: None,
1354 sym_cipher_algo_info: None,
1355 };
1356 let beacon = SpdpBeacon::new(participant_data);
1357 let sedp = SedpStack::new(guid_prefix, VendorId::ZERODDS);
1358
1359 #[cfg(feature = "security")]
1360 let outbound_pool = if config.interface_bindings.is_empty() {
1361 None
1362 } else {
1363 Some(Arc::new(OutboundSocketPool::bind_all(
1364 &config.interface_bindings,
1365 )?))
1366 };
1367
1368 let wlp_tick_period = if config.wlp_period.is_zero() {
1373 config.participant_lease_duration / 3
1374 } else {
1375 config.wlp_period
1376 };
1377 let wlp = crate::wlp::WlpEndpoint::new(guid_prefix, VendorId::ZERODDS, wlp_tick_period);
1378
1379 let rt = Arc::new(Self {
1380 guid_prefix,
1381 domain_id,
1382 spdp_multicast_rx: Arc::new(spdp_mc),
1383 spdp_unicast: Arc::new(spdp_uc),
1384 user_unicast: Arc::new(user_uc),
1385 spdp_mc_tx: Arc::new(spdp_mc_tx),
1386 spdp_beacon: Mutex::new(beacon),
1387 spdp_reader: SpdpReader::new(),
1388 discovered: Arc::new(Mutex::new(DiscoveredParticipantsCache::new())),
1389 sedp: Arc::new(Mutex::new(sedp)),
1390 type_lookup_endpoints: TypeLookupEndpoints::new(guid_prefix),
1391 type_lookup_server: Arc::new(Mutex::new(TypeLookupServer::new())),
1392 type_lookup_client: Arc::new(Mutex::new(TypeLookupClient::new())),
1393 security_builtin: Mutex::new(None),
1394 start_instant: Instant::now(),
1395 user_writers: Arc::new(RwLock::new(BTreeMap::new())),
1396 shm_locators: Arc::new(RwLock::new(BTreeMap::new())),
1397 user_readers: Arc::new(RwLock::new(BTreeMap::new())),
1398 entity_counter: AtomicU32::new(1),
1399 config,
1400 stop: stop.clone(),
1401 handles: Mutex::new(Vec::new()),
1402 match_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
1403 ack_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
1404 #[cfg(feature = "security")]
1405 outbound_pool,
1406 wlp: Arc::new(Mutex::new(wlp)),
1407 builtin_sinks: Mutex::new(None),
1408 ignore_filter: Mutex::new(None),
1409 });
1410
1411 let mut handles_init: Vec<JoinHandle<()>> = Vec::with_capacity(4);
1433
1434 let rt_recv_spdp_mc = Arc::clone(&rt);
1435 let stop_recv_spdp_mc = stop.clone();
1436 handles_init.push(
1437 thread::Builder::new()
1438 .name(String::from("zdds-recv-spdp-mc"))
1439 .spawn(move || recv_spdp_multicast_loop(rt_recv_spdp_mc, stop_recv_spdp_mc))
1440 .map_err(|_| DdsError::PreconditionNotMet {
1441 reason: "spawn zdds-recv-spdp-mc thread",
1442 })?,
1443 );
1444
1445 let rt_recv_meta = Arc::clone(&rt);
1446 let stop_recv_meta = stop.clone();
1447 handles_init.push(
1448 thread::Builder::new()
1449 .name(String::from("zdds-recv-meta"))
1450 .spawn(move || recv_metatraffic_loop(rt_recv_meta, stop_recv_meta))
1451 .map_err(|_| DdsError::PreconditionNotMet {
1452 reason: "spawn zdds-recv-meta thread",
1453 })?,
1454 );
1455
1456 let rt_recv_user = Arc::clone(&rt);
1457 let stop_recv_user = stop.clone();
1458 handles_init.push(
1459 thread::Builder::new()
1460 .name(String::from("zdds-recv-user"))
1461 .spawn(move || recv_user_data_loop(rt_recv_user, stop_recv_user))
1462 .map_err(|_| DdsError::PreconditionNotMet {
1463 reason: "spawn zdds-recv-user thread",
1464 })?,
1465 );
1466
1467 let rt_tick = Arc::clone(&rt);
1468 let stop_tick = stop;
1469 handles_init.push(
1470 thread::Builder::new()
1471 .name(String::from("zdds-tick"))
1472 .spawn(move || tick_loop(rt_tick, stop_tick))
1473 .map_err(|_| DdsError::PreconditionNotMet {
1474 reason: "spawn zdds-tick thread",
1475 })?,
1476 );
1477
1478 let mut guard = rt
1479 .handles
1480 .lock()
1481 .map_err(|_| DdsError::PreconditionNotMet {
1482 reason: "runtime handles mutex poisoned",
1483 })?;
1484 *guard = handles_init;
1485 drop(guard);
1486
1487 Ok(rt)
1488 }
1489
1490 #[must_use]
1492 pub fn user_locator(&self) -> zerodds_rtps::wire_types::Locator {
1493 self.user_unicast.local_locator()
1494 }
1495
1496 #[must_use]
1498 pub fn spdp_unicast_locator(&self) -> zerodds_rtps::wire_types::Locator {
1499 self.spdp_unicast.local_locator()
1500 }
1501
1502 #[must_use]
1507 pub fn announced_builtin_endpoint_set(&self) -> u32 {
1508 self.spdp_beacon
1509 .lock()
1510 .map(|b| b.data.builtin_endpoint_set)
1511 .unwrap_or(0)
1512 }
1513
1514 pub fn register_type_object(
1526 &self,
1527 obj: zerodds_types::type_object::TypeObject,
1528 ) -> Result<zerodds_types::EquivalenceHash> {
1529 let hash = zerodds_types::compute_hash(&obj).map_err(|_| DdsError::PreconditionNotMet {
1530 reason: "type hash computation failed",
1531 })?;
1532 let mut server =
1533 self.type_lookup_server
1534 .lock()
1535 .map_err(|_| DdsError::PreconditionNotMet {
1536 reason: "type_lookup_server mutex poisoned",
1537 })?;
1538 match obj {
1539 zerodds_types::type_object::TypeObject::Minimal(m) => {
1540 server.registry.insert_minimal(hash, m);
1541 }
1542 zerodds_types::type_object::TypeObject::Complete(c) => {
1543 server.registry.insert_complete(hash, c);
1544 }
1545 _ => {
1546 return Err(DdsError::PreconditionNotMet {
1547 reason: "unknown TypeObject variant",
1548 });
1549 }
1550 }
1551 Ok(hash)
1552 }
1553
1554 pub fn send_type_lookup_request(
1569 &self,
1570 peer: zerodds_rtps::wire_types::GuidPrefix,
1571 type_hashes: &[zerodds_types::EquivalenceHash],
1572 ) -> Result<Option<zerodds_discovery::type_lookup::RequestId>> {
1573 use alloc::sync::Arc as AllocArc;
1574 use zerodds_discovery::type_lookup::request_types_payload;
1575 use zerodds_rtps::datagram::encode_data_datagram;
1576 use zerodds_rtps::header::RtpsHeader;
1577 use zerodds_rtps::submessages::DataSubmessage;
1578 use zerodds_rtps::wire_types::{ProtocolVersion, SequenceNumber};
1579
1580 let target = {
1585 let discovered = self
1586 .discovered
1587 .lock()
1588 .map_err(|_| DdsError::PreconditionNotMet {
1589 reason: "discovered mutex poisoned",
1590 })?;
1591 let Some(dp) = discovered.get(&peer) else {
1592 return Ok(None);
1593 };
1594 dp.data
1595 .default_unicast_locator
1596 .or(dp.data.metatraffic_unicast_locator)
1597 };
1598 let Some(target) = target else {
1599 return Ok(None);
1600 };
1601
1602 let mut client =
1611 self.type_lookup_client
1612 .lock()
1613 .map_err(|_| DdsError::PreconditionNotMet {
1614 reason: "type_lookup_client mutex poisoned",
1615 })?;
1616 let type_ids: alloc::vec::Vec<zerodds_types::TypeIdentifier> = type_hashes
1617 .iter()
1618 .map(|h| zerodds_types::TypeIdentifier::EquivalenceHashMinimal(*h))
1619 .collect();
1620 let server_for_cb = Arc::clone(&self.type_lookup_server);
1621 let cb = Box::new(
1622 move |reply: zerodds_discovery::type_lookup::TypeLookupReply| {
1623 let zerodds_discovery::type_lookup::TypeLookupReply::Types(types_reply) = reply
1624 else {
1625 return;
1626 };
1627 let Ok(mut server) = server_for_cb.lock() else {
1628 return;
1629 };
1630 for t in &types_reply.types {
1631 match t {
1632 zerodds_types::type_lookup::ReplyTypeObject::Minimal(m) => {
1633 let to = zerodds_types::type_object::TypeObject::Minimal(m.clone());
1634 if let Ok(h) = zerodds_types::compute_hash(&to) {
1635 server.registry.insert_minimal(h, m.clone());
1636 }
1637 }
1638 zerodds_types::type_lookup::ReplyTypeObject::Complete(c) => {
1639 let to = zerodds_types::type_object::TypeObject::Complete(c.clone());
1640 if let Ok(h) = zerodds_types::compute_hash(&to) {
1641 server.registry.insert_complete(h, c.clone());
1642 }
1643 }
1644 }
1645 }
1646 },
1647 );
1648 let request_id = client.request_types(type_ids.clone(), cb);
1649 drop(client);
1650
1651 let body = request_types_payload(&type_ids).map_err(|_| DdsError::PreconditionNotMet {
1653 reason: "type_lookup request payload encode failed",
1654 })?;
1655 let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
1656 payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
1657 payload.extend_from_slice(&body);
1658
1659 let id_u64 = request_id.0;
1662 let sn =
1663 SequenceNumber::from_high_low((id_u64 >> 32) as i32, (id_u64 & 0xFFFF_FFFF) as u32);
1664 let header = RtpsHeader {
1665 protocol_version: ProtocolVersion::CURRENT,
1666 vendor_id: VendorId::ZERODDS,
1667 guid_prefix: self.guid_prefix,
1668 };
1669 let data = DataSubmessage {
1670 extra_flags: 0,
1671 reader_id: EntityId::TL_SVC_REQ_READER,
1672 writer_id: EntityId::TL_SVC_REQ_WRITER,
1673 writer_sn: sn,
1674 inline_qos: None,
1675 key_flag: false,
1676 non_standard_flag: false,
1677 serialized_payload: AllocArc::from(payload.into_boxed_slice()),
1678 };
1679 let datagram =
1680 encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
1681 reason: "type_lookup request datagram encode failed",
1682 })?;
1683
1684 if target.kind == LocatorKind::UdpV4 {
1685 let _ = self.user_unicast.send(&target, &datagram);
1686 }
1687 Ok(Some(request_id))
1688 }
1689
1690 pub fn enable_security_builtins(
1697 &self,
1698 vendor_id: VendorId,
1699 ) -> Arc<Mutex<SecurityBuiltinStack>> {
1700 let mut slot = match self.security_builtin.lock() {
1708 Ok(g) => g,
1709 Err(_) => {
1710 return Arc::new(Mutex::new(SecurityBuiltinStack::new(
1711 self.guid_prefix,
1712 vendor_id,
1713 )));
1714 }
1715 };
1716 if let Some(existing) = slot.as_ref() {
1717 return Arc::clone(existing);
1718 }
1719 let stack = Arc::new(Mutex::new(SecurityBuiltinStack::new(
1720 self.guid_prefix,
1721 vendor_id,
1722 )));
1723 if let Ok(cache) = self.discovered.lock() {
1726 if let Ok(mut s) = stack.lock() {
1727 for peer in cache.iter() {
1728 s.handle_remote_endpoints(peer);
1729 }
1730 }
1731 }
1732 *slot = Some(Arc::clone(&stack));
1733 stack
1734 }
1735
1736 #[must_use]
1740 pub fn security_builtin_snapshot(&self) -> Option<Arc<Mutex<SecurityBuiltinStack>>> {
1741 self.security_builtin.lock().ok()?.as_ref().map(Arc::clone)
1742 }
1743
1744 pub fn assert_liveliness(&self) {
1752 if let Ok(mut wlp) = self.wlp.lock() {
1753 wlp.assert_participant();
1754 }
1755 }
1756
1757 pub fn assert_writer_liveliness(&self, topic_token: Vec<u8>) {
1764 if let Ok(mut wlp) = self.wlp.lock() {
1765 wlp.assert_topic(topic_token);
1766 }
1767 }
1768
1769 #[must_use]
1773 pub fn peer_liveliness_last_seen(&self, prefix: &GuidPrefix) -> Option<Duration> {
1774 self.wlp
1775 .lock()
1776 .ok()
1777 .and_then(|w| w.peer_state(prefix).map(|s| s.last_seen))
1778 }
1779
1780 #[must_use]
1784 pub fn peer_capabilities(
1785 &self,
1786 prefix: &GuidPrefix,
1787 ) -> Option<zerodds_discovery::PeerCapabilities> {
1788 self.discovered
1789 .lock()
1790 .ok()
1791 .and_then(|d| d.get(prefix).map(|p| p.data.builtin_endpoint_set))
1792 .map(zerodds_discovery::PeerCapabilities::from_bits)
1793 }
1794
1795 #[must_use]
1798 pub fn discovered_participants(&self) -> Vec<DiscoveredParticipant> {
1799 self.discovered
1800 .lock()
1801 .map(|cache| cache.iter().cloned().collect())
1802 .unwrap_or_default()
1803 }
1804
1805 pub fn attach_builtin_sinks(&self, sinks: crate::builtin_subscriber::BuiltinSinks) {
1813 if let Ok(mut guard) = self.builtin_sinks.lock() {
1814 *guard = Some(sinks);
1815 }
1816 }
1817
1818 pub(crate) fn builtin_sinks_snapshot(&self) -> Option<crate::builtin_subscriber::BuiltinSinks> {
1821 self.builtin_sinks.lock().ok().and_then(|g| g.clone())
1822 }
1823
1824 pub fn attach_ignore_filter(&self, filter: crate::participant::IgnoreFilter) {
1833 if let Ok(mut guard) = self.ignore_filter.lock() {
1834 *guard = Some(filter);
1835 }
1836 }
1837
1838 pub(crate) fn ignore_filter_snapshot(&self) -> Option<crate::participant::IgnoreFilter> {
1841 self.ignore_filter.lock().ok().and_then(|g| g.clone())
1842 }
1843
1844 pub fn announce_publication(
1851 &self,
1852 data: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
1853 ) -> Result<()> {
1854 let shm = self.shm_locator(data.key.entity_id);
1859 let datagrams = {
1860 let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
1861 reason: "sedp poisoned",
1862 })?;
1863 let res = if let Some(ref bytes) = shm {
1864 sedp.announce_publication_with_shm_locator(data, bytes)
1865 } else {
1866 sedp.announce_publication(data)
1867 };
1868 res.map_err(|_| DdsError::WireError {
1869 message: alloc::string::String::from("sedp announce_publication"),
1870 })?
1871 };
1872 for dg in datagrams {
1876 if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
1877 for t in dg.targets.iter() {
1878 if t.kind == LocatorKind::UdpV4 {
1879 let _ = self.spdp_mc_tx.send(t, &secured);
1880 }
1881 }
1882 }
1883 }
1884 Ok(())
1885 }
1886
1887 pub fn announce_subscription(
1893 &self,
1894 data: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
1895 ) -> Result<()> {
1896 let datagrams = {
1897 let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
1898 reason: "sedp poisoned",
1899 })?;
1900 sedp.announce_subscription(data)
1901 .map_err(|_| DdsError::WireError {
1902 message: alloc::string::String::from("sedp announce_subscription"),
1903 })?
1904 };
1905 for dg in datagrams {
1906 if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
1907 for t in dg.targets.iter() {
1908 if t.kind == LocatorKind::UdpV4 {
1909 let _ = self.spdp_mc_tx.send(t, &secured);
1910 }
1911 }
1912 }
1913 }
1914 Ok(())
1915 }
1916
1917 pub fn register_user_writer(&self, cfg: UserWriterConfig) -> Result<EntityId> {
1927 let now = self.start_instant.elapsed();
1928 let key = self.next_entity_key();
1929 let eid = EntityId::user_writer_with_key(key);
1930 let writer = ReliableWriter::new(ReliableWriterConfig {
1931 guid: Guid::new(self.guid_prefix, eid),
1932 vendor_id: VendorId::ZERODDS,
1933 reader_proxies: Vec::new(),
1934 max_samples: 1024,
1935 history_kind: HistoryKind::KeepLast { depth: 32 },
1936 heartbeat_period: DEFAULT_HEARTBEAT_PERIOD,
1937 fragment_size: DEFAULT_FRAGMENT_SIZE,
1938 mtu: DEFAULT_MTU,
1939 });
1940 let pub_data = build_publication_data(
1941 self.guid_prefix,
1942 eid,
1943 &cfg,
1944 &self.config.data_representation_offer,
1945 );
1946 self.user_writers
1947 .write()
1948 .map_err(|_| DdsError::PreconditionNotMet {
1949 reason: "user_writers poisoned",
1950 })?
1951 .insert(
1952 eid,
1953 Arc::new(Mutex::new(UserWriterSlot {
1954 writer,
1955 topic_name: cfg.topic_name.clone(),
1956 type_name: cfg.type_name.clone(),
1957 reliable: cfg.reliable,
1958 durability: cfg.durability,
1959 deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
1960 last_write: None,
1967 offered_deadline_missed_count: 0,
1968 liveliness_lost_count: 0,
1969 last_liveliness_assert: Some(now),
1970 offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus::default(
1971 ),
1972 lifespan_nanos: qos_duration_to_nanos(cfg.lifespan.duration),
1973 sample_insert_times: alloc::collections::VecDeque::new(),
1974 liveliness_kind: cfg.liveliness.kind,
1975 liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
1976 ownership: cfg.ownership,
1977 partition: cfg.partition.clone(),
1978 #[cfg(feature = "security")]
1979 reader_protection: BTreeMap::new(),
1980 #[cfg(feature = "security")]
1981 locator_to_peer: BTreeMap::new(),
1982 type_identifier: cfg.type_identifier.clone(),
1983 data_rep_offer_override: cfg.data_representation_offer.clone(),
1984 })),
1985 );
1986 let _ = self.announce_publication(&pub_data);
1988 self.match_local_writer_against_cache(eid);
1990 self.config.observability.record(
1992 &zerodds_foundation::observability::Event::new(
1993 zerodds_foundation::observability::Level::Info,
1994 zerodds_foundation::observability::Component::Dcps,
1995 "user_writer.created",
1996 )
1997 .with_attr("topic", cfg.topic_name.as_str())
1998 .with_attr("type", cfg.type_name.as_str())
1999 .with_attr("reliable", if cfg.reliable { "true" } else { "false" }),
2000 );
2001 Ok(eid)
2002 }
2003
2004 pub fn register_user_reader(
2014 &self,
2015 cfg: UserReaderConfig,
2016 ) -> Result<(EntityId, mpsc::Receiver<UserSample>)> {
2017 let now = self.start_instant.elapsed();
2018 let key = self.next_entity_key();
2019 let eid = EntityId::user_reader_with_key(key);
2020 let reader = ReliableReader::new(ReliableReaderConfig {
2021 guid: Guid::new(self.guid_prefix, eid),
2022 vendor_id: VendorId::ZERODDS,
2023 writer_proxies: Vec::new(),
2024 max_samples_per_proxy: 256,
2025 heartbeat_response_delay:
2028 zerodds_rtps::reliable_reader::DEFAULT_HEARTBEAT_RESPONSE_DELAY,
2029 assembler_caps: AssemblerCaps::default(),
2030 });
2031 let (tx, rx) = mpsc::channel();
2032 let sub_data = build_subscription_data(
2033 self.guid_prefix,
2034 eid,
2035 &cfg,
2036 &self.config.data_representation_offer,
2037 );
2038 self.user_readers
2039 .write()
2040 .map_err(|_| DdsError::PreconditionNotMet {
2041 reason: "user_readers poisoned",
2042 })?
2043 .insert(
2044 eid,
2045 Arc::new(Mutex::new(UserReaderSlot {
2046 reader,
2047 topic_name: cfg.topic_name.clone(),
2048 type_name: cfg.type_name.clone(),
2049 sample_tx: tx,
2050 async_waker: Arc::new(std::sync::Mutex::new(None)),
2051 listener: None,
2052 durability: cfg.durability,
2053 deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
2054 last_sample_received: Some(now),
2056 requested_deadline_missed_count: 0,
2057 requested_incompatible_qos:
2058 crate::status::RequestedIncompatibleQosStatus::default(),
2059 sample_lost_count: 0,
2060 sample_rejected: crate::status::SampleRejectedStatus::default(),
2061 liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
2062 liveliness_kind: cfg.liveliness.kind,
2063 liveliness_alive_count: 0,
2064 liveliness_not_alive_count: 0,
2065 liveliness_alive: true,
2068 ownership: cfg.ownership,
2069 partition: cfg.partition.clone(),
2070 writer_strengths: alloc::collections::BTreeMap::new(),
2071 type_identifier: cfg.type_identifier.clone(),
2072 type_consistency: cfg.type_consistency,
2073 })),
2074 );
2075 let _ = self.announce_subscription(&sub_data);
2077 self.match_local_reader_against_cache(eid);
2079 self.config.observability.record(
2081 &zerodds_foundation::observability::Event::new(
2082 zerodds_foundation::observability::Level::Info,
2083 zerodds_foundation::observability::Component::Dcps,
2084 "user_reader.created",
2085 )
2086 .with_attr("topic", cfg.topic_name.as_str())
2087 .with_attr("type", cfg.type_name.as_str()),
2088 );
2089 Ok((eid, rx))
2090 }
2091
2092 fn match_local_writer_against_cache(&self, eid: EntityId) {
2096 let (topic, type_name) = {
2097 let Some(arc) = self.writer_slot(eid) else {
2098 return;
2099 };
2100 let Ok(s) = arc.lock() else {
2101 return;
2102 };
2103 (s.topic_name.clone(), s.type_name.clone())
2104 };
2105 let matches: Vec<_> = {
2106 let sedp = match self.sedp.lock() {
2107 Ok(s) => s,
2108 Err(_) => return,
2109 };
2110 sedp.cache()
2111 .match_subscriptions(&topic, &type_name)
2112 .map(|s| s.data.clone())
2113 .collect()
2114 };
2115 for sub in matches {
2116 self.wire_writer_to_remote_reader(eid, &sub);
2117 }
2118 }
2119
2120 fn match_local_reader_against_cache(&self, eid: EntityId) {
2121 let (topic, type_name) = {
2122 let Some(arc) = self.reader_slot(eid) else {
2123 return;
2124 };
2125 let Ok(s) = arc.lock() else {
2126 return;
2127 };
2128 (s.topic_name.clone(), s.type_name.clone())
2129 };
2130 let matches: Vec<_> = {
2131 let sedp = match self.sedp.lock() {
2132 Ok(s) => s,
2133 Err(_) => return,
2134 };
2135 sedp.cache()
2136 .match_publications(&topic, &type_name)
2137 .map(|p| p.data.clone())
2138 .collect()
2139 };
2140 for pubd in matches {
2141 self.wire_reader_to_remote_writer(eid, &pubd);
2142 }
2143 }
2144
2145 fn wire_writer_to_remote_reader(
2146 &self,
2147 writer_eid: EntityId,
2148 sub: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
2149 ) {
2150 let locators = remote_user_locators(sub.key.prefix, &self.discovered);
2151 if locators.is_empty() {
2152 return;
2153 }
2154 if let Some(slot_arc) = self.writer_slot(writer_eid) {
2155 if let Ok(mut slot) = slot_arc.lock() {
2156 let slot = &mut *slot;
2157 use crate::psm_constants::qos_policy_id as qid;
2166 use crate::status::bump_policy_count;
2167 let bump = |slot: &mut UserWriterSlot, pid: u32| {
2168 slot.offered_incompatible_qos.total_count =
2169 slot.offered_incompatible_qos.total_count.saturating_add(1);
2170 slot.offered_incompatible_qos.last_policy_id = pid;
2171 bump_policy_count(&mut slot.offered_incompatible_qos.policies, pid);
2172 };
2173
2174 if (slot.durability as u8) < (sub.durability as u8) {
2177 bump(slot, qid::DURABILITY);
2178 return;
2179 }
2180 if !deadline_compat(
2183 slot.deadline_nanos,
2184 qos_duration_to_nanos(sub.deadline.period),
2185 ) {
2186 bump(slot, qid::DEADLINE);
2187 return;
2188 }
2189 if (slot.liveliness_kind as u8) < (sub.liveliness.kind as u8) {
2192 bump(slot, qid::LIVELINESS);
2193 return;
2194 }
2195 if !deadline_compat(
2196 slot.liveliness_lease_nanos,
2197 qos_duration_to_nanos(sub.liveliness.lease_duration),
2198 ) {
2199 bump(slot, qid::LIVELINESS);
2200 return;
2201 }
2202 if slot.ownership != sub.ownership {
2205 bump(slot, qid::OWNERSHIP);
2206 return;
2207 }
2208 if !partitions_overlap(&slot.partition, &sub.partition) {
2211 bump(slot, qid::PARTITION);
2212 return;
2213 }
2214 if slot.type_identifier != zerodds_types::TypeIdentifier::None
2222 && sub.type_identifier != zerodds_types::TypeIdentifier::None
2223 {
2224 let registry = zerodds_types::resolve::TypeRegistry::new();
2225 let tce = zerodds_types::qos::TypeConsistencyEnforcement::default();
2226 let matcher = zerodds_types::type_matcher::TypeMatcher::new(&tce);
2227 if !matcher
2228 .match_types(&slot.type_identifier, &sub.type_identifier, ®istry)
2229 .is_match()
2230 {
2231 bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
2232 return;
2233 }
2234 }
2235
2236 let mut proxy = zerodds_rtps::reader_proxy::ReaderProxy::new(
2237 sub.key,
2238 locators.clone(),
2239 Vec::new(),
2240 slot.reliable,
2241 );
2242 {
2249 use zerodds_rtps::publication_data::data_representation as dr;
2250 let writer_offered: Vec<i16> = slot
2251 .data_rep_offer_override
2252 .clone()
2253 .unwrap_or_else(|| self.config.data_representation_offer.clone());
2254 let mode = self.config.data_rep_match_mode;
2255 if let Some(negotiated) =
2256 dr::negotiate(&writer_offered, &sub.data_representation, mode)
2257 {
2258 proxy.set_negotiated_data_representation(negotiated);
2259 } else {
2260 }
2265 }
2266 if slot.durability == zerodds_qos::DurabilityKind::Volatile {
2267 if let Some(max) = slot.writer.cache().max_sn() {
2268 proxy.skip_samples_up_to(max);
2269 }
2270 }
2271 slot.writer.add_reader_proxy(proxy);
2272 self.match_event.1.notify_all();
2274
2275 #[cfg(feature = "security")]
2280 {
2281 let peer_key = sub.key.prefix.0;
2282 let level = EndpointProtection::from_info(sub.security_info.as_ref()).level;
2283 slot.reader_protection.insert(peer_key, level);
2284 for loc in &locators {
2285 slot.locator_to_peer.insert(*loc, peer_key);
2286 }
2287 }
2288 }
2289 }
2290 self.config.observability.record(
2292 &zerodds_foundation::observability::Event::new(
2293 zerodds_foundation::observability::Level::Info,
2294 zerodds_foundation::observability::Component::Discovery,
2295 "writer.matched_remote_reader",
2296 )
2297 .with_attr("writer_eid", alloc::format!("{writer_eid:?}")),
2298 );
2299 }
2300
2301 fn wire_reader_to_remote_writer(
2302 &self,
2303 reader_eid: EntityId,
2304 pubd: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
2305 ) {
2306 let locators = remote_user_locators(pubd.key.prefix, &self.discovered);
2307 if locators.is_empty() {
2308 return;
2309 }
2310 if let Some(slot_arc) = self.reader_slot(reader_eid) {
2311 if let Ok(mut slot) = slot_arc.lock() {
2312 let slot = &mut *slot;
2313 use crate::psm_constants::qos_policy_id as qid;
2315 use crate::status::bump_policy_count;
2316 let bump = |slot: &mut UserReaderSlot, pid: u32| {
2317 slot.requested_incompatible_qos.total_count = slot
2318 .requested_incompatible_qos
2319 .total_count
2320 .saturating_add(1);
2321 slot.requested_incompatible_qos.last_policy_id = pid;
2322 bump_policy_count(&mut slot.requested_incompatible_qos.policies, pid);
2323 };
2324
2325 if (pubd.durability as u8) < (slot.durability as u8) {
2327 bump(slot, qid::DURABILITY);
2328 return;
2329 }
2330 if !deadline_compat(
2331 qos_duration_to_nanos(pubd.deadline.period),
2332 slot.deadline_nanos,
2333 ) {
2334 bump(slot, qid::DEADLINE);
2335 return;
2336 }
2337 if (pubd.liveliness.kind as u8) < (slot.liveliness_kind as u8) {
2338 bump(slot, qid::LIVELINESS);
2339 return;
2340 }
2341 if !deadline_compat(
2342 qos_duration_to_nanos(pubd.liveliness.lease_duration),
2343 slot.liveliness_lease_nanos,
2344 ) {
2345 bump(slot, qid::LIVELINESS);
2346 return;
2347 }
2348 if pubd.ownership != slot.ownership {
2349 bump(slot, qid::OWNERSHIP);
2350 return;
2351 }
2352 if !partitions_overlap(&pubd.partition, &slot.partition) {
2353 bump(slot, qid::PARTITION);
2354 return;
2355 }
2356
2357 if slot.type_identifier != zerodds_types::TypeIdentifier::None
2362 && pubd.type_identifier != zerodds_types::TypeIdentifier::None
2363 {
2364 let registry = zerodds_types::resolve::TypeRegistry::new();
2365 let matcher =
2366 zerodds_types::type_matcher::TypeMatcher::new(&slot.type_consistency);
2367 if !matcher
2368 .match_types(&pubd.type_identifier, &slot.type_identifier, ®istry)
2369 .is_match()
2370 {
2371 bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
2372 return;
2373 }
2374 }
2375
2376 slot.reader
2377 .add_writer_proxy(zerodds_rtps::writer_proxy::WriterProxy::new(
2378 pubd.key,
2379 locators,
2380 Vec::new(),
2381 true,
2382 ));
2383 self.match_event.1.notify_all();
2385
2386 slot.writer_strengths
2391 .insert(pubd.key.to_bytes(), pubd.ownership_strength);
2392 }
2393 }
2394 }
2395
2396 pub fn write_user_sample(&self, eid: EntityId, payload: Vec<u8>) -> Result<()> {
2412 let now = self.start_instant.elapsed();
2417 let total = USER_PAYLOAD_ENCAP.len() + payload.len();
2418 let out_datagrams = {
2419 let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
2420 what: "unknown writer entity id",
2421 })?;
2422 let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
2423 reason: "user_writer slot poisoned",
2424 })?;
2425 slot.last_write = Some(now);
2427 let dgs = if total <= SMALL_FRAME_CAP {
2428 write_user_sample_pooled(&mut slot.writer, &payload, now)?
2429 } else {
2430 let mut framed = Vec::with_capacity(total);
2431 framed.extend_from_slice(&USER_PAYLOAD_ENCAP);
2432 framed.extend_from_slice(&payload);
2433 slot.writer
2435 .write_with_heartbeat(&framed, now)
2436 .map_err(|_| DdsError::WireError {
2437 message: String::from("user writer encode"),
2438 })?
2439 };
2440 if slot.lifespan_nanos != 0 {
2442 if let Some(sn) = slot.writer.cache().max_sn() {
2443 slot.sample_insert_times.push_back((sn, now));
2444 }
2445 }
2446 dgs
2447 };
2448 for dg in out_datagrams {
2449 if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
2450 for t in dg.targets.iter() {
2451 if t.kind == LocatorKind::UdpV4 {
2452 let _ = self.user_unicast.send(t, &secured);
2453 }
2454 }
2455 }
2456 }
2457 #[cfg(feature = "inspect")]
2462 {
2463 self.dispatch_inspect_dcps_tap(eid, &payload);
2464 }
2465 Ok(())
2466 }
2467
2468 #[cfg(feature = "inspect")]
2473 fn dispatch_inspect_dcps_tap(&self, eid: EntityId, payload: &[u8]) {
2474 let Some(slot_arc) = self.writer_slot(eid) else {
2475 return;
2476 };
2477 let topic = match slot_arc.lock() {
2478 Ok(slot) => slot.topic_name.clone(),
2479 Err(_) => return,
2480 };
2481 let ts_ns = std::time::SystemTime::now()
2482 .duration_since(std::time::UNIX_EPOCH)
2483 .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
2484 .unwrap_or(0);
2485 let mut corr: u64 = 0;
2486 for (i, byte) in eid.entity_key.iter().enumerate() {
2487 corr |= u64::from(*byte) << (i * 8);
2488 }
2489 corr |= u64::from(eid.entity_kind as u8) << 24;
2490 let frame = zerodds_inspect_endpoint::Frame::dcps(topic, ts_ns, corr, payload.to_vec());
2491 zerodds_inspect_endpoint::tap::dispatch(&frame);
2492 }
2493
2494 pub fn write_user_lifecycle(
2503 &self,
2504 eid: EntityId,
2505 key_hash: [u8; 16],
2506 status_bits: u32,
2507 ) -> Result<()> {
2508 let out_datagrams = {
2509 let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
2510 what: "unknown writer entity id",
2511 })?;
2512 let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
2513 reason: "user_writer slot poisoned",
2514 })?;
2515 slot.writer
2516 .write_lifecycle(key_hash, status_bits)
2517 .map_err(|_| DdsError::WireError {
2518 message: String::from("user writer lifecycle encode"),
2519 })?
2520 };
2521 for dg in out_datagrams {
2522 if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
2523 for t in dg.targets.iter() {
2524 if t.kind == LocatorKind::UdpV4 {
2525 let _ = self.user_unicast.send(t, &secured);
2526 }
2527 }
2528 }
2529 }
2530 Ok(())
2531 }
2532
2533 fn next_entity_key(&self) -> [u8; 3] {
2535 let n = self.entity_counter.fetch_add(1, Ordering::Relaxed);
2536 [(n >> 16) as u8, (n >> 8) as u8, n as u8]
2537 }
2538
2539 #[must_use]
2542 pub fn discovered_publications_count(&self) -> usize {
2543 self.sedp
2544 .lock()
2545 .map(|s| s.cache().publications_len())
2546 .unwrap_or(0)
2547 }
2548
2549 #[must_use]
2551 pub fn discovered_subscriptions_count(&self) -> usize {
2552 self.sedp
2553 .lock()
2554 .map(|s| s.cache().subscriptions_len())
2555 .unwrap_or(0)
2556 }
2557
2558 #[must_use]
2561 pub fn user_writer_matched_count(&self, eid: EntityId) -> usize {
2562 self.writer_slot(eid)
2563 .and_then(|arc| arc.lock().ok().map(|s| s.writer.reader_proxy_count()))
2564 .unwrap_or(0)
2565 }
2566
2567 #[must_use]
2571 pub fn user_writer_matched_subscription_handles(
2572 &self,
2573 eid: EntityId,
2574 ) -> Vec<crate::instance_handle::InstanceHandle> {
2575 self.writer_slot(eid)
2576 .and_then(|arc| {
2577 arc.lock().ok().map(|s| {
2578 s.writer
2579 .reader_proxies()
2580 .iter()
2581 .map(|p| {
2582 crate::instance_handle::InstanceHandle::from_guid(p.remote_reader_guid)
2583 })
2584 .collect()
2585 })
2586 })
2587 .unwrap_or_default()
2588 }
2589
2590 #[must_use]
2593 pub fn user_reader_matched_publication_handles(
2594 &self,
2595 eid: EntityId,
2596 ) -> Vec<crate::instance_handle::InstanceHandle> {
2597 self.reader_slot(eid)
2598 .and_then(|arc| {
2599 arc.lock().ok().map(|s| {
2600 s.reader
2601 .writer_proxies()
2602 .iter()
2603 .map(|p| {
2604 crate::instance_handle::InstanceHandle::from_guid(
2605 p.proxy.remote_writer_guid,
2606 )
2607 })
2608 .collect()
2609 })
2610 })
2611 .unwrap_or_default()
2612 }
2613
2614 #[must_use]
2617 pub fn user_writer_offered_deadline_missed(&self, eid: EntityId) -> u64 {
2618 self.writer_slot(eid)
2619 .and_then(|arc| arc.lock().ok().map(|s| s.offered_deadline_missed_count))
2620 .unwrap_or(0)
2621 }
2622
2623 #[must_use]
2626 pub fn user_reader_requested_deadline_missed(&self, eid: EntityId) -> u64 {
2627 self.reader_slot(eid)
2628 .and_then(|arc| arc.lock().ok().map(|s| s.requested_deadline_missed_count))
2629 .unwrap_or(0)
2630 }
2631
2632 #[must_use]
2636 pub fn user_reader_liveliness_status(&self, eid: EntityId) -> (bool, u64, u64) {
2637 self.reader_slot(eid)
2638 .and_then(|arc| {
2639 arc.lock().ok().map(|s| {
2640 (
2641 s.liveliness_alive,
2642 s.liveliness_alive_count,
2643 s.liveliness_not_alive_count,
2644 )
2645 })
2646 })
2647 .unwrap_or((false, 0, 0))
2648 }
2649
2650 #[must_use]
2653 pub fn user_writer_liveliness_lost(&self, eid: EntityId) -> u64 {
2654 self.writer_slot(eid)
2655 .and_then(|arc| arc.lock().ok().map(|s| s.liveliness_lost_count))
2656 .unwrap_or(0)
2657 }
2658
2659 #[must_use]
2661 pub fn user_writer_offered_incompatible_qos(
2662 &self,
2663 eid: EntityId,
2664 ) -> crate::status::OfferedIncompatibleQosStatus {
2665 self.writer_slot(eid)
2666 .and_then(|arc| arc.lock().ok().map(|s| s.offered_incompatible_qos.clone()))
2667 .unwrap_or_default()
2668 }
2669
2670 #[must_use]
2672 pub fn user_reader_requested_incompatible_qos(
2673 &self,
2674 eid: EntityId,
2675 ) -> crate::status::RequestedIncompatibleQosStatus {
2676 self.reader_slot(eid)
2677 .and_then(|arc| {
2678 arc.lock()
2679 .ok()
2680 .map(|s| s.requested_incompatible_qos.clone())
2681 })
2682 .unwrap_or_default()
2683 }
2684
2685 #[must_use]
2687 pub fn user_reader_sample_lost(&self, eid: EntityId) -> u64 {
2688 self.reader_slot(eid)
2689 .and_then(|arc| arc.lock().ok().map(|s| s.sample_lost_count))
2690 .unwrap_or(0)
2691 }
2692
2693 #[must_use]
2695 pub fn user_reader_sample_rejected(
2696 &self,
2697 eid: EntityId,
2698 ) -> crate::status::SampleRejectedStatus {
2699 self.reader_slot(eid)
2700 .and_then(|arc| arc.lock().ok().map(|s| s.sample_rejected))
2701 .unwrap_or_default()
2702 }
2703
2704 pub fn record_sample_lost(&self, eid: EntityId, count: u32) {
2710 if count == 0 {
2711 return;
2712 }
2713 if let Some(arc) = self.reader_slot(eid) {
2714 if let Ok(mut slot) = arc.lock() {
2715 slot.sample_lost_count = slot.sample_lost_count.saturating_add(u64::from(count));
2716 }
2717 }
2718 }
2719
2720 pub fn record_sample_rejected(
2722 &self,
2723 eid: EntityId,
2724 kind: crate::status::SampleRejectedStatusKind,
2725 instance: crate::instance_handle::InstanceHandle,
2726 ) {
2727 if let Some(arc) = self.reader_slot(eid) {
2728 if let Ok(mut slot) = arc.lock() {
2729 slot.sample_rejected.total_count =
2730 slot.sample_rejected.total_count.saturating_add(1);
2731 slot.sample_rejected.last_reason = kind;
2732 slot.sample_rejected.last_instance_handle = instance;
2733 }
2734 }
2735 }
2736
2737 pub fn assert_writer_liveliness_eid(&self, eid: EntityId) {
2743 let now = self.start_instant.elapsed();
2744 if let Some(arc) = self.writer_slot(eid) {
2745 if let Ok(mut slot) = arc.lock() {
2746 slot.last_liveliness_assert = Some(now);
2747 if slot.liveliness_kind == zerodds_qos::LivelinessKind::Automatic {
2748 slot.last_write = Some(now);
2749 }
2750 }
2751 }
2752 }
2753
2754 #[must_use]
2757 pub fn user_writer_all_acknowledged(&self, eid: EntityId) -> bool {
2758 self.writer_slot(eid)
2759 .and_then(|arc| arc.lock().ok().map(|s| s.writer.all_samples_acknowledged()))
2760 .unwrap_or(true)
2761 }
2762
2763 pub fn register_user_reader_waker(&self, eid: EntityId, waker: Option<core::task::Waker>) {
2768 if let Some(arc) = self.reader_slot(eid) {
2769 if let Ok(slot) = arc.lock() {
2770 if let Ok(mut g) = slot.async_waker.lock() {
2771 *g = waker;
2772 }
2773 }
2774 }
2775 }
2776
2777 pub fn set_user_reader_listener(
2790 &self,
2791 eid: EntityId,
2792 listener: Option<UserReaderListener>,
2793 ) -> bool {
2794 let Some(arc) = self.reader_slot(eid) else {
2795 return false;
2796 };
2797 let Ok(mut slot) = arc.lock() else {
2798 return false;
2799 };
2800 slot.listener = listener.map(alloc::sync::Arc::new);
2801 true
2802 }
2803
2804 #[must_use]
2806 pub fn user_reader_matched_count(&self, eid: EntityId) -> usize {
2807 self.reader_slot(eid)
2808 .and_then(|arc| arc.lock().ok().map(|s| s.reader.writer_proxy_count()))
2809 .unwrap_or(0)
2810 }
2811
2812 #[cfg(feature = "std")]
2820 pub fn wait_match_event(&self, timeout: core::time::Duration) -> bool {
2821 let (lock, cvar) = &*self.match_event;
2822 let Ok(guard) = lock.lock() else { return false };
2823 match cvar.wait_timeout(guard, timeout) {
2824 Ok((_, t)) => !t.timed_out(),
2825 Err(_) => false,
2826 }
2827 }
2828
2829 #[cfg(feature = "std")]
2832 pub fn wait_ack_event(&self, timeout: core::time::Duration) -> bool {
2833 let (lock, cvar) = &*self.ack_event;
2834 let Ok(guard) = lock.lock() else { return false };
2835 match cvar.wait_timeout(guard, timeout) {
2836 Ok((_, t)) => !t.timed_out(),
2837 Err(_) => false,
2838 }
2839 }
2840
2841 #[cfg(feature = "std")]
2844 pub(crate) fn notify_ack_event(&self) {
2845 self.ack_event.1.notify_all();
2846 }
2847
2848 pub fn set_shm_locator(&self, eid: EntityId, bytes: Vec<u8>) {
2854 if let Ok(mut g) = self.shm_locators.write() {
2855 g.insert(eid, bytes);
2856 }
2857 }
2858
2859 #[must_use]
2863 pub fn shm_locator(&self, eid: EntityId) -> Option<Vec<u8>> {
2864 self.shm_locators.read().ok()?.get(&eid).cloned()
2865 }
2866
2867 pub fn clear_shm_locator(&self, eid: EntityId) {
2870 if let Ok(mut g) = self.shm_locators.write() {
2871 g.remove(&eid);
2872 }
2873 }
2874
2875 pub fn shutdown(&self) {
2883 self.stop.store(true, Ordering::Relaxed);
2884 if let Ok(mut guard) = self.handles.lock() {
2885 for h in guard.drain(..) {
2886 let _ = h.join();
2887 }
2888 }
2889 }
2890}
2891
2892impl Drop for DcpsRuntime {
2893 fn drop(&mut self) {
2894 self.shutdown();
2895 }
2896}
2897
2898#[allow(unused_variables)]
2930fn apply_thread_tuning(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
2931 #[cfg(target_os = "linux")]
2932 rt_pinning::apply(label, priority, cpus);
2933}
2934
2935#[cfg(target_os = "linux")]
2940#[allow(unsafe_code, clippy::print_stderr)]
2941mod rt_pinning {
2942 pub(super) fn apply(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
2943 if let Some(prio) = priority {
2944 unsafe {
2947 let param = libc::sched_param {
2948 sched_priority: prio,
2949 };
2950 let rc = libc::pthread_setschedparam(
2951 libc::pthread_self(),
2952 libc::SCHED_FIFO,
2953 &raw const param,
2954 );
2955 if rc != 0 {
2956 eprintln!(
2957 "zdds[{label}]: pthread_setschedparam SCHED_FIFO {prio} \
2958 failed (rc={rc}). Need CAP_SYS_NICE or RLIMIT_RTPRIO."
2959 );
2960 }
2961 }
2962 }
2963 if let Some(cpu_list) = cpus {
2964 unsafe {
2967 let mut set: libc::cpu_set_t = core::mem::zeroed();
2968 libc::CPU_ZERO(&mut set);
2969 for &cpu in cpu_list {
2970 if cpu < libc::CPU_SETSIZE as usize {
2971 libc::CPU_SET(cpu, &mut set);
2972 }
2973 }
2974 let rc = libc::sched_setaffinity(
2975 0,
2976 core::mem::size_of::<libc::cpu_set_t>(),
2977 &raw const set,
2978 );
2979 if rc != 0 {
2980 eprintln!("zdds[{label}]: sched_setaffinity({cpu_list:?}) failed.");
2981 }
2982 }
2983 }
2984 }
2985}
2986
2987fn recv_spdp_multicast_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
2990 apply_thread_tuning(
2991 "recv-spdp-mc",
2992 rt.config.recv_thread_priority,
2993 rt.config.recv_thread_cpus.as_deref(),
2994 );
2995 while !stop.load(Ordering::Relaxed) {
2996 let elapsed = rt.start_instant.elapsed();
2997 let sedp_now = Duration::from_secs(elapsed.as_secs())
2998 + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
2999 let Ok(dg) = rt.spdp_multicast_rx.recv() else {
3000 continue;
3001 };
3002 #[cfg(feature = "security")]
3003 let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3004 #[cfg(not(feature = "security"))]
3005 let clear = secure_inbound_bytes(&rt, &dg.data);
3006 if let Some(clear) = clear {
3007 handle_spdp_datagram(&rt, &clear);
3008 if let Ok(mut wlp) = rt.wlp.lock() {
3013 let _ = wlp.handle_datagram(&clear, sedp_now);
3014 }
3015 }
3016 }
3017}
3018
3019fn recv_metatraffic_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3022 apply_thread_tuning(
3023 "recv-meta",
3024 rt.config.recv_thread_priority,
3025 rt.config.recv_thread_cpus.as_deref(),
3026 );
3027 while !stop.load(Ordering::Relaxed) {
3028 let elapsed = rt.start_instant.elapsed();
3029 let sedp_now = Duration::from_secs(elapsed.as_secs())
3030 + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
3031 let Ok(dg) = rt.spdp_unicast.recv() else {
3032 continue;
3033 };
3034 #[cfg(feature = "security")]
3035 let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3036 #[cfg(not(feature = "security"))]
3037 let clear = secure_inbound_bytes(&rt, &dg.data);
3038 if let Some(clear) = clear {
3039 handle_spdp_datagram(&rt, &clear);
3043 let events = {
3044 if let Ok(mut sedp) = rt.sedp.lock() {
3045 sedp.handle_datagram(&clear, sedp_now).ok()
3046 } else {
3047 None
3048 }
3049 };
3050 if let Some(ev) = events {
3051 if !ev.is_empty() {
3052 run_matching_pass(&rt);
3053 push_sedp_events_to_builtin_readers(&rt, &ev);
3054 }
3055 }
3056 if let Ok(mut wlp) = rt.wlp.lock() {
3057 let _ = wlp.handle_datagram(&clear, sedp_now);
3058 }
3059 dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
3060 }
3061 }
3062}
3063
3064fn recv_user_data_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3067 apply_thread_tuning(
3068 "recv-user",
3069 rt.config.recv_thread_priority,
3070 rt.config.recv_thread_cpus.as_deref(),
3071 );
3072 while !stop.load(Ordering::Relaxed) {
3073 let elapsed = rt.start_instant.elapsed();
3074 let sedp_now = Duration::from_secs(elapsed.as_secs())
3075 + Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
3076 let Ok(dg) = rt.user_unicast.recv() else {
3077 continue;
3078 };
3079 #[cfg(feature = "security")]
3080 let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
3081 #[cfg(not(feature = "security"))]
3082 let clear = secure_inbound_bytes(&rt, &dg.data);
3083 if let Some(clear) = clear {
3084 if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
3088 handle_user_datagram(&rt, &clear, sedp_now);
3089 }
3090 }
3091 }
3092}
3093
3094fn tick_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
3098 apply_thread_tuning(
3099 "tick",
3100 rt.config.tick_thread_priority,
3101 rt.config.tick_thread_cpus.as_deref(),
3102 );
3103 let mc_target = Locator {
3105 kind: LocatorKind::UdpV4,
3106 port: u32::from(u16::try_from(spdp_multicast_port(rt.domain_id as u32)).unwrap_or(7400)),
3107 address: {
3108 let mut a = [0u8; 16];
3109 a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
3110 a
3111 },
3112 };
3113
3114 let mut next_announce = Instant::now(); while !stop.load(Ordering::Relaxed) {
3116 let elapsed_since_start = rt.start_instant.elapsed();
3119 let sedp_now = Duration::from_secs(elapsed_since_start.as_secs())
3120 + Duration::from_nanos(u64::from(elapsed_since_start.subsec_nanos()));
3121
3122 if Instant::now() >= next_announce {
3124 if let Ok(mut beacon) = rt.spdp_beacon.lock() {
3125 if let Ok(datagram) = beacon.serialize() {
3126 if let Some(secured) = secure_outbound_bytes(&rt, &datagram) {
3127 let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
3128 }
3129 }
3130 }
3131 next_announce = Instant::now() + rt.config.spdp_period;
3132 }
3133
3134 let sedp_outbound = {
3138 if let Ok(mut sedp) = rt.sedp.lock() {
3139 sedp.tick(sedp_now).unwrap_or_default()
3140 } else {
3141 Vec::new()
3142 }
3143 };
3144 for dg in sedp_outbound {
3145 send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
3146 }
3147
3148 if let Some(stack) = rt.security_builtin_snapshot() {
3152 let outbound = {
3153 if let Ok(mut s) = stack.lock() {
3154 s.poll(sedp_now).unwrap_or_default()
3155 } else {
3156 Vec::new()
3157 }
3158 };
3159 for dg in outbound {
3160 send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
3161 }
3162 }
3163
3164 let wlp_outbound = {
3174 if let Ok(mut wlp) = rt.wlp.lock() {
3175 wlp.tick(sedp_now).unwrap_or(None)
3176 } else {
3177 None
3178 }
3179 };
3180 if let Some(bytes) = wlp_outbound {
3181 if let Some(secured) = secure_outbound_bytes(&rt, &bytes) {
3182 let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
3183 }
3184 }
3185
3186 let user_writer_outbound: Vec<(EntityId, _)> = {
3196 let mut all = Vec::new();
3197 for (eid, arc) in rt.writer_slots_snapshot() {
3198 if let Ok(mut slot) = arc.lock() {
3199 if let Ok(dgs) = slot.writer.tick(sedp_now) {
3200 for dg in dgs {
3201 all.push((eid, dg));
3202 }
3203 }
3204 }
3205 }
3206 all
3207 };
3208 for (writer_eid, dg) in user_writer_outbound {
3209 for t in dg.targets.iter() {
3210 if t.kind != LocatorKind::UdpV4 {
3211 continue;
3212 }
3213 if let Some(secured) = secure_outbound_for_target(&rt, writer_eid, &dg.bytes, t) {
3214 send_on_best_interface(&rt, t, &secured);
3215 }
3216 }
3217 }
3218
3219 let user_reader_outbound: Vec<_> = {
3221 let mut all = Vec::new();
3222 for (_eid, arc) in rt.reader_slots_snapshot() {
3223 if let Ok(mut slot) = arc.lock() {
3224 if let Ok(dgs) = slot.reader.tick_outbound(sedp_now) {
3225 all.extend(dgs);
3226 }
3227 }
3228 }
3229 all
3230 };
3231 for dg in user_reader_outbound {
3232 if let Some(secured) = secure_outbound_bytes(&rt, &dg.bytes) {
3233 for t in dg.targets.iter() {
3234 if t.kind == LocatorKind::UdpV4 {
3235 let _ = rt.user_unicast.send(t, &secured);
3236 }
3237 }
3238 }
3239 }
3240
3241 #[cfg(feature = "security")]
3256 if let Some(pool) = &rt.outbound_pool {
3257 for binding in &pool.bindings {
3258 while let Ok(dg) = binding.socket.recv() {
3259 let iface = binding.spec.kind.clone();
3260 if let Some(clear) = secure_inbound_bytes(&rt, &dg.data, &iface) {
3261 handle_spdp_datagram(&rt, &clear);
3265 let events = rt
3266 .sedp
3267 .lock()
3268 .ok()
3269 .and_then(|mut s| s.handle_datagram(&clear, sedp_now).ok());
3270 if let Some(ev) = events {
3271 if !ev.is_empty() {
3272 run_matching_pass(&rt);
3273 push_sedp_events_to_builtin_readers(&rt, &ev);
3274 }
3275 }
3276 if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
3277 handle_user_datagram(&rt, &clear, sedp_now);
3278 }
3279 dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
3281 }
3282 }
3283 }
3284 }
3285
3286 check_deadlines(&rt, elapsed_since_start);
3288 expire_by_lifespan(&rt, elapsed_since_start);
3290 check_liveliness(&rt, elapsed_since_start);
3292 check_writer_liveliness(&rt, elapsed_since_start);
3294
3295 std::thread::sleep(rt.config.tick_period);
3299 }
3300}
3301
3302fn check_writer_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3316 let now_nanos = now.as_nanos() as u64;
3317 for (_eid, arc) in rt.writer_slots_snapshot() {
3318 let Ok(mut slot) = arc.lock() else { continue };
3319 if slot.liveliness_lease_nanos == 0 {
3320 continue;
3321 }
3322 let last = match slot.liveliness_kind {
3323 zerodds_qos::LivelinessKind::Automatic => slot.last_write,
3324 _ => slot.last_liveliness_assert,
3325 };
3326 let last_nanos = match last {
3327 Some(t) => t.as_nanos() as u64,
3328 None => continue,
3329 };
3330 if now_nanos.saturating_sub(last_nanos) >= slot.liveliness_lease_nanos {
3331 slot.liveliness_lost_count = slot.liveliness_lost_count.saturating_add(1);
3332 slot.last_liveliness_assert = Some(now);
3337 slot.last_write = Some(now);
3338 }
3339 }
3340}
3341
3342fn check_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3350 let now_nanos = now.as_nanos() as u64;
3351 for (_eid, arc) in rt.reader_slots_snapshot() {
3352 let Ok(mut slot) = arc.lock() else { continue };
3353 if slot.liveliness_lease_nanos == 0 {
3354 continue;
3355 }
3356 let last = match slot.last_sample_received {
3358 Some(t) => t.as_nanos() as u64,
3359 None => continue,
3360 };
3361 if now_nanos.saturating_sub(last) >= slot.liveliness_lease_nanos && slot.liveliness_alive {
3362 slot.liveliness_alive = false;
3363 slot.liveliness_not_alive_count = slot.liveliness_not_alive_count.saturating_add(1);
3364 }
3365 }
3366}
3367
3368fn expire_by_lifespan(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3378 let now_nanos = now.as_nanos() as u64;
3379 for (_eid, arc) in rt.writer_slots_snapshot() {
3380 let Ok(mut slot) = arc.lock() else { continue };
3381 if slot.lifespan_nanos == 0 {
3382 continue;
3383 }
3384 let mut highest_expired = None;
3385 while let Some(&(sn, inserted)) = slot.sample_insert_times.front() {
3386 let inserted_nanos = inserted.as_nanos() as u64;
3387 if now_nanos.saturating_sub(inserted_nanos) >= slot.lifespan_nanos {
3388 highest_expired = Some(sn);
3389 slot.sample_insert_times.pop_front();
3390 } else {
3391 break;
3392 }
3393 }
3394 if let Some(sn) = highest_expired {
3395 let _removed = slot
3396 .writer
3397 .remove_samples_up_to(zerodds_rtps::wire_types::SequenceNumber(sn.0 + 1));
3398 }
3399 }
3400}
3401
3402fn check_deadlines(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
3416 let now_nanos = now.as_nanos() as u64;
3417 for (_eid, arc) in rt.writer_slots_snapshot() {
3418 let Ok(mut slot) = arc.lock() else { continue };
3419 if slot.deadline_nanos == 0 {
3420 continue;
3421 }
3422 let Some(last) = slot.last_write.map(|d| d.as_nanos() as u64) else {
3423 continue;
3425 };
3426 if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
3427 slot.offered_deadline_missed_count =
3428 slot.offered_deadline_missed_count.saturating_add(1);
3429 slot.last_write = Some(now);
3433 }
3434 }
3435 for (_eid, arc) in rt.reader_slots_snapshot() {
3436 let Ok(mut slot) = arc.lock() else { continue };
3437 if slot.deadline_nanos == 0 {
3438 continue;
3439 }
3440 let Some(last) = slot.last_sample_received.map(|d| d.as_nanos() as u64) else {
3441 continue;
3442 };
3443 if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
3444 slot.requested_deadline_missed_count =
3445 slot.requested_deadline_missed_count.saturating_add(1);
3446 slot.last_sample_received = Some(now);
3447 }
3448 }
3449}
3450
3451fn run_matching_pass(rt: &Arc<DcpsRuntime>) {
3456 let writer_ids: Vec<EntityId> = rt.writer_eids();
3457 for eid in writer_ids {
3458 rt.match_local_writer_against_cache(eid);
3459 }
3460 let reader_ids: Vec<EntityId> = rt.reader_eids();
3461 for eid in reader_ids {
3462 rt.match_local_reader_against_cache(eid);
3463 }
3464}
3465
3466fn remote_user_locators(
3469 prefix: GuidPrefix,
3470 discovered: &Arc<Mutex<DiscoveredParticipantsCache>>,
3471) -> Vec<Locator> {
3472 match discovered.lock() {
3473 Ok(cache) => cache
3474 .get(&prefix)
3475 .and_then(|p| p.data.default_unicast_locator)
3476 .into_iter()
3477 .collect(),
3478 Err(_) => Vec::new(),
3479 }
3480}
3481
3482fn wake_async_waker(slot: &alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>) {
3494 if let Ok(mut g) = slot.lock() {
3495 if let Some(w) = g.take() {
3496 w.wake();
3497 }
3498 }
3499}
3500
3501#[cfg(feature = "inspect")]
3511fn dispatch_inspect_dcps_receive_tap(topic: &str, reader_id: EntityId, item: &UserSample) {
3512 let payload: Vec<u8> = match item {
3513 UserSample::Alive { payload, .. } => payload.clone(),
3514 UserSample::Lifecycle { key_hash, .. } => key_hash.to_vec(),
3515 };
3516 let ts_ns = std::time::SystemTime::now()
3517 .duration_since(std::time::UNIX_EPOCH)
3518 .map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
3519 .unwrap_or(0);
3520 let mut corr: u64 = 0;
3521 for (i, byte) in reader_id.entity_key.iter().enumerate() {
3522 corr |= u64::from(*byte) << (i * 8);
3523 }
3524 corr |= u64::from(reader_id.entity_kind as u8) << 24;
3525 let frame = zerodds_inspect_endpoint::Frame::dcps(topic.to_owned(), ts_ns, corr, payload);
3526 zerodds_inspect_endpoint::tap::dispatch(&frame);
3527}
3528
3529fn delivered_to_user_sample(
3530 sample: &zerodds_rtps::reliable_reader::DeliveredSample,
3531 writer_strengths: &alloc::collections::BTreeMap<[u8; 16], i32>,
3532) -> Option<UserSample> {
3533 use zerodds_rtps::history_cache::ChangeKind;
3534 match sample.kind {
3535 ChangeKind::Alive | ChangeKind::AliveFiltered => {
3536 let writer_guid = sample.writer_guid.to_bytes();
3537 let writer_strength = writer_strengths.get(&writer_guid).copied().unwrap_or(0);
3538 strip_user_encap(&sample.payload).map(|payload| UserSample::Alive {
3539 payload,
3540 writer_guid,
3541 writer_strength,
3542 })
3543 }
3544 ChangeKind::NotAliveDisposed
3545 | ChangeKind::NotAliveUnregistered
3546 | ChangeKind::NotAliveDisposedUnregistered => {
3547 let kh = sample.key_hash.unwrap_or_else(|| {
3554 let mut h = [0u8; 16];
3555 let n = sample.payload.len().min(16);
3556 h[..n].copy_from_slice(&sample.payload[..n]);
3557 h
3558 });
3559 Some(UserSample::Lifecycle {
3560 key_hash: kh,
3561 kind: sample.kind,
3562 })
3563 }
3564 }
3565}
3566
3567fn validate_user_encap_offset(payload: &[u8]) -> Option<usize> {
3573 if payload.len() < 4 {
3574 return None;
3575 }
3576 use zerodds_rtps::participant_message_data::{
3583 ENCAPSULATION_CDR_BE, ENCAPSULATION_CDR_LE, ENCAPSULATION_CDR2_BE, ENCAPSULATION_CDR2_LE,
3584 };
3585 const ENCAPSULATION_PL_CDR_BE: [u8; 2] = [0x00, 0x02];
3586 const ENCAPSULATION_PL_CDR_LE: [u8; 2] = [0x00, 0x03];
3587 let k = [payload[0], payload[1]];
3588 let known = k == ENCAPSULATION_CDR_BE
3589 || k == ENCAPSULATION_CDR_LE
3590 || k == ENCAPSULATION_PL_CDR_BE
3591 || k == ENCAPSULATION_PL_CDR_LE
3592 || k == ENCAPSULATION_CDR2_BE
3593 || k == ENCAPSULATION_CDR2_LE;
3594 if known { Some(4) } else { None }
3595}
3596
3597fn strip_user_encap(payload: &[u8]) -> Option<Vec<u8>> {
3598 validate_user_encap_offset(payload).map(|off| payload[off..].to_vec())
3599}
3600
3601fn handle_user_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
3602 let parsed = match decode_datagram(bytes) {
3603 Ok(p) => p,
3604 Err(_) => return,
3605 };
3606 for sub in parsed.submessages {
3611 match sub {
3612 ParsedSubmessage::Data(d) => {
3613 let Some(arc) = rt.reader_slot(d.reader_id) else {
3621 continue;
3622 };
3623 let mut items: Vec<UserSampleWithEncap> = Vec::new();
3628 let listener;
3629 let waker;
3630 let sender;
3631 #[cfg(feature = "inspect")]
3632 let topic_name;
3633 {
3634 let Ok(mut slot) = arc.lock() else { continue };
3635 for sample in slot.reader.handle_data(&d) {
3636 let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
3640 zerodds_rtps::history_cache::ChangeKind::Alive
3641 | zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
3642 validate_user_encap_offset(&sample.payload)
3643 .map(|off| (Arc::clone(&sample.payload), off))
3644 }
3645 _ => None,
3646 };
3647 if let Some(item) =
3648 delivered_to_user_sample(&sample, &slot.writer_strengths)
3649 {
3650 items.push((item, listener_view));
3651 }
3652 }
3653 if !items.is_empty() {
3654 slot.last_sample_received = Some(now);
3655 if !slot.liveliness_alive {
3656 slot.liveliness_alive = true;
3657 slot.liveliness_alive_count =
3658 slot.liveliness_alive_count.saturating_add(1);
3659 }
3660 }
3661 listener = slot.listener.clone();
3662 waker = Arc::clone(&slot.async_waker);
3663 sender = slot.sample_tx.clone();
3664 #[cfg(feature = "inspect")]
3665 {
3666 topic_name = slot.topic_name.clone();
3667 }
3668 }
3669 for (item, listener_view) in items {
3671 #[cfg(feature = "inspect")]
3672 dispatch_inspect_dcps_receive_tap(&topic_name, d.reader_id, &item);
3673 if let Some(ref l) = listener {
3674 if let Some((arc_payload, off)) = listener_view {
3675 l(&arc_payload[off..]);
3677 }
3678 }
3679 let _ = sender.send(item);
3680 wake_async_waker(&waker);
3681 }
3682 }
3683 ParsedSubmessage::DataFrag(df) => {
3684 let Some(arc) = rt.reader_slot(df.reader_id) else {
3686 continue;
3687 };
3688 let mut items: Vec<UserSampleWithEncap> = Vec::new();
3689 let listener;
3690 let waker;
3691 let sender;
3692 #[cfg(feature = "inspect")]
3693 let topic_name;
3694 {
3695 let Ok(mut slot) = arc.lock() else { continue };
3696 for sample in slot.reader.handle_data_frag(&df, now) {
3697 let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
3698 zerodds_rtps::history_cache::ChangeKind::Alive
3699 | zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
3700 validate_user_encap_offset(&sample.payload)
3701 .map(|off| (Arc::clone(&sample.payload), off))
3702 }
3703 _ => None,
3704 };
3705 if let Some(item) =
3706 delivered_to_user_sample(&sample, &slot.writer_strengths)
3707 {
3708 items.push((item, listener_view));
3709 }
3710 }
3711 if !items.is_empty() {
3712 slot.last_sample_received = Some(now);
3713 if !slot.liveliness_alive {
3714 slot.liveliness_alive = true;
3715 slot.liveliness_alive_count =
3716 slot.liveliness_alive_count.saturating_add(1);
3717 }
3718 }
3719 listener = slot.listener.clone();
3720 waker = Arc::clone(&slot.async_waker);
3721 sender = slot.sample_tx.clone();
3722 #[cfg(feature = "inspect")]
3723 {
3724 topic_name = slot.topic_name.clone();
3725 }
3726 }
3727 for (item, listener_view) in items {
3728 #[cfg(feature = "inspect")]
3729 dispatch_inspect_dcps_receive_tap(&topic_name, df.reader_id, &item);
3730 if let Some(ref l) = listener {
3731 if let Some((arc_payload, off)) = listener_view {
3732 l(&arc_payload[off..]);
3733 }
3734 }
3735 let _ = sender.send(item);
3736 wake_async_waker(&waker);
3737 }
3738 }
3739 ParsedSubmessage::Heartbeat(h) => {
3740 let Some(arc) = rt.reader_slot(h.reader_id) else {
3750 continue;
3751 };
3752 let mut items: Vec<UserSample> = Vec::new();
3753 let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
3754 Vec::new();
3755 let waker;
3756 let sender;
3757 {
3758 let Ok(mut slot) = arc.lock() else { continue };
3759 for sample in slot.reader.handle_heartbeat(&h, now) {
3760 if let Some(item) =
3761 delivered_to_user_sample(&sample, &slot.writer_strengths)
3762 {
3763 items.push(item);
3764 }
3765 }
3766 if !items.is_empty() {
3767 slot.last_sample_received = Some(now);
3768 if !slot.liveliness_alive {
3769 slot.liveliness_alive = true;
3770 slot.liveliness_alive_count =
3771 slot.liveliness_alive_count.saturating_add(1);
3772 }
3773 }
3774 if let Ok(dgs) = slot.reader.tick_outbound(now) {
3776 sync_outbound = dgs;
3777 }
3778 waker = Arc::clone(&slot.async_waker);
3779 sender = slot.sample_tx.clone();
3780 }
3781 for item in items {
3782 let _ = sender.send(item);
3783 wake_async_waker(&waker);
3784 }
3785 for dg in sync_outbound {
3787 if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
3788 for t in dg.targets.iter() {
3789 if t.kind == LocatorKind::UdpV4 {
3790 let _ = rt.user_unicast.send(t, &secured);
3791 }
3792 }
3793 }
3794 }
3795 }
3796 ParsedSubmessage::Gap(g) => {
3797 if let Some(arc) = rt.reader_slot(g.reader_id) {
3798 if let Ok(mut slot) = arc.lock() {
3799 for sample in slot.reader.handle_gap(&g) {
3800 if let Some(item) =
3801 delivered_to_user_sample(&sample, &slot.writer_strengths)
3802 {
3803 let _ = slot.sample_tx.send(item);
3804 wake_async_waker(&slot.async_waker);
3805 }
3806 }
3807 }
3808 }
3809 }
3810 ParsedSubmessage::AckNack(ack) => {
3811 if let Some(arc) = rt.writer_slot(ack.writer_id) {
3812 let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
3813 Vec::new();
3814 if let Ok(mut slot) = arc.lock() {
3815 let base = ack.reader_sn_state.bitmap_base;
3816 let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
3817 let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
3818 slot.writer.handle_acknack(src, base, requested);
3819 if let Ok(dgs) = slot.writer.tick(now) {
3823 sync_outbound = dgs;
3824 }
3825 }
3826 rt.notify_ack_event();
3828 for dg in sync_outbound {
3830 if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
3831 for t in dg.targets.iter() {
3832 if t.kind == LocatorKind::UdpV4 {
3833 let _ = rt.user_unicast.send(t, &secured);
3834 }
3835 }
3836 }
3837 }
3838 }
3839 }
3840 ParsedSubmessage::NackFrag(nf) => {
3841 if let Some(arc) = rt.writer_slot(nf.writer_id) {
3842 if let Ok(mut slot) = arc.lock() {
3843 let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
3844 slot.writer.handle_nackfrag(src, &nf);
3845 }
3846 }
3847 }
3848 _ => {}
3849 }
3850 }
3851}
3852
3853#[cfg(test)]
3857pub(crate) fn handle_spdp_datagram_for_test(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
3858 handle_spdp_datagram(rt, bytes);
3859}
3860
3861fn handle_spdp_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
3862 let parsed = match rt.spdp_reader.parse_datagram(bytes) {
3863 Ok(p) => p,
3864 Err(_) => return, };
3866 if parsed.sender_prefix == rt.guid_prefix {
3868 return;
3869 }
3870 let is_new = {
3871 if let Ok(mut cache) = rt.discovered.lock() {
3872 cache.insert(parsed.clone())
3873 } else {
3874 false
3875 }
3876 };
3877 if is_new {
3880 if let Ok(mut sedp) = rt.sedp.lock() {
3881 sedp.on_participant_discovered(&parsed);
3882 }
3883 if let Some(sec) = rt.security_builtin_snapshot() {
3886 if let Ok(mut s) = sec.lock() {
3887 s.handle_remote_endpoints(&parsed);
3888 }
3889 }
3890 }
3891 if let Some(sinks) = rt.builtin_sinks_snapshot() {
3896 let dcps_sample =
3897 crate::builtin_topics::ParticipantBuiltinTopicData::from_wire(&parsed.data);
3898 if let Some(filter) = rt.ignore_filter_snapshot() {
3901 let h = crate::instance_handle::InstanceHandle::from_guid(dcps_sample.key);
3902 if filter.is_participant_ignored(h) {
3903 return;
3904 }
3905 }
3906 let _ = sinks.push_participant(&dcps_sample);
3907 }
3908}
3909
3910fn push_sedp_events_to_builtin_readers(
3923 rt: &Arc<DcpsRuntime>,
3924 events: &zerodds_discovery::sedp::SedpEvents,
3925) {
3926 let Some(sinks) = rt.builtin_sinks_snapshot() else {
3927 return;
3928 };
3929 let filter = rt.ignore_filter_snapshot();
3930 for w in &events.new_publications {
3931 let pub_sample = crate::builtin_topics::PublicationBuiltinTopicData::from_wire(w);
3932 let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_publication(w);
3933 if let Some(f) = &filter {
3936 let part_h = crate::instance_handle::InstanceHandle::from_guid(w.participant_key);
3937 let pub_h = crate::instance_handle::InstanceHandle::from_guid(w.key);
3938 let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
3939 if f.is_participant_ignored(part_h) || f.is_publication_ignored(pub_h) {
3940 continue;
3941 }
3942 let _ = sinks.push_publication(&pub_sample);
3943 if !f.is_topic_ignored(topic_h) {
3944 let _ = sinks.push_topic(&topic_sample);
3945 }
3946 } else {
3947 let _ = sinks.push_publication(&pub_sample);
3948 let _ = sinks.push_topic(&topic_sample);
3949 }
3950 }
3951 for r in &events.new_subscriptions {
3952 let sub_sample = crate::builtin_topics::SubscriptionBuiltinTopicData::from_wire(r);
3953 let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_subscription(r);
3954 if let Some(f) = &filter {
3955 let part_h = crate::instance_handle::InstanceHandle::from_guid(r.participant_key);
3956 let sub_h = crate::instance_handle::InstanceHandle::from_guid(r.key);
3957 let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
3958 if f.is_participant_ignored(part_h) || f.is_subscription_ignored(sub_h) {
3959 continue;
3960 }
3961 let _ = sinks.push_subscription(&sub_sample);
3962 if !f.is_topic_ignored(topic_h) {
3963 let _ = sinks.push_topic(&topic_sample);
3964 }
3965 } else {
3966 let _ = sinks.push_subscription(&sub_sample);
3967 let _ = sinks.push_topic(&topic_sample);
3968 }
3969 }
3970}
3971
3972fn dispatch_security_builtin_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
3981 let Some(stack) = rt.security_builtin_snapshot() else {
3982 return;
3983 };
3984 let Ok(parsed) = decode_datagram(bytes) else {
3985 return;
3986 };
3987 let Ok(mut s) = stack.lock() else {
3988 return;
3989 };
3990 for sub in parsed.submessages {
3991 match sub {
3992 ParsedSubmessage::Data(d) => {
3993 if d.reader_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
3994 || d.writer_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
3995 {
3996 let _ = s.stateless_reader.handle_data(&d);
4000 } else if d.reader_id
4001 == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
4002 {
4003 let _ = s.volatile_reader.handle_data(&d);
4004 }
4005 }
4006 ParsedSubmessage::DataFrag(df) => {
4007 if df.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
4008 let _ = s.volatile_reader.handle_data_frag(&df, now);
4009 }
4010 }
4011 ParsedSubmessage::Heartbeat(h) => {
4012 let to_volatile_reader = h.reader_id
4013 == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
4014 || (h.reader_id == EntityId::UNKNOWN
4015 && h.writer_id
4016 == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER);
4017 if to_volatile_reader {
4018 s.volatile_reader.handle_heartbeat(&h, now);
4019 }
4020 }
4021 ParsedSubmessage::Gap(g) => {
4022 if g.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
4023 let _ = s.volatile_reader.handle_gap(&g);
4024 }
4025 }
4026 ParsedSubmessage::AckNack(ack) => {
4027 if ack.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
4028 let base = ack.reader_sn_state.bitmap_base;
4029 let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
4030 let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
4031 s.volatile_writer.handle_acknack(src, base, requested);
4032 }
4033 }
4034 ParsedSubmessage::NackFrag(nf) => {
4035 if nf.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
4036 let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
4037 s.volatile_writer.handle_nackfrag(src, &nf);
4038 }
4039 }
4040 _ => {}
4041 }
4042 }
4043}
4044
4045fn dispatch_type_lookup_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], source: &Locator) -> bool {
4054 use zerodds_cdr::{BufferReader, Endianness};
4055 use zerodds_types::type_lookup::{GetTypeDependenciesRequest, GetTypesReply, GetTypesRequest};
4056
4057 let Ok(parsed) = decode_datagram(bytes) else {
4058 return false;
4059 };
4060
4061 let mut accepted = false;
4062
4063 for sub in &parsed.submessages {
4064 let ParsedSubmessage::Data(d) = sub else {
4065 continue;
4066 };
4067 let payload: &[u8] = &d.serialized_payload;
4068 if payload.is_empty() {
4069 continue;
4070 }
4071 let body: &[u8] = if payload.len() >= 4 && (payload[0] == 0x00 && payload[1] == 0x01) {
4073 &payload[4..]
4074 } else {
4075 payload
4076 };
4077
4078 if d.reader_id == EntityId::TL_SVC_REQ_READER {
4080 accepted = true;
4081 let mut r = BufferReader::new(body, Endianness::Little);
4084 if let Ok(req) = GetTypesRequest::decode_from(&mut r) {
4085 let reply = match rt.type_lookup_server.lock() {
4086 Ok(g) => g.handle_get_types(&req),
4087 Err(_) => continue,
4088 };
4089 let _ = send_type_lookup_reply(
4090 rt,
4091 source,
4092 TypeLookupReplyPayload::Types(reply),
4093 d.writer_sn,
4094 );
4095 continue;
4096 }
4097 let mut r = BufferReader::new(body, Endianness::Little);
4098 if let Ok(req) = GetTypeDependenciesRequest::decode_from(&mut r) {
4099 let reply = match rt.type_lookup_server.lock() {
4100 Ok(g) => g.handle_get_type_dependencies(&req),
4101 Err(_) => continue,
4102 };
4103 let _ = send_type_lookup_reply(
4104 rt,
4105 source,
4106 TypeLookupReplyPayload::Dependencies(reply),
4107 d.writer_sn,
4108 );
4109 continue;
4110 }
4111 }
4112
4113 if d.reader_id == EntityId::TL_SVC_REPLY_READER {
4115 accepted = true;
4116 let (sn_high, sn_low) = d.writer_sn.split();
4119 let sn_u64 = ((u64::from(sn_high as u32)) << 32) | u64::from(sn_low);
4120 let request_id = zerodds_discovery::type_lookup::RequestId::from_u64(sn_u64);
4121 let mut r = BufferReader::new(body, Endianness::Little);
4122 if let Ok(reply) = GetTypesReply::decode_from(&mut r) {
4123 if let Ok(mut client) = rt.type_lookup_client.lock() {
4124 client.handle_reply(request_id, TypeLookupReply::Types(reply));
4125 }
4126 continue;
4127 }
4128 }
4129 }
4130
4131 accepted
4132}
4133
4134enum TypeLookupReplyPayload {
4136 Types(zerodds_types::type_lookup::GetTypesReply),
4137 Dependencies(zerodds_types::type_lookup::GetTypeDependenciesReply),
4138}
4139
4140fn send_type_lookup_reply(
4145 rt: &Arc<DcpsRuntime>,
4146 target: &Locator,
4147 reply: TypeLookupReplyPayload,
4148 request_sn: zerodds_rtps::wire_types::SequenceNumber,
4149) -> Result<()> {
4150 use alloc::sync::Arc as AllocArc;
4151 use zerodds_cdr::{BufferWriter, Endianness};
4152 use zerodds_rtps::datagram::encode_data_datagram;
4153 use zerodds_rtps::header::RtpsHeader;
4154 use zerodds_rtps::submessages::DataSubmessage;
4155 use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
4156
4157 let mut w = BufferWriter::new(Endianness::Little);
4159 match reply {
4160 TypeLookupReplyPayload::Types(r) => {
4161 r.encode_into(&mut w)
4162 .map_err(|_| DdsError::PreconditionNotMet {
4163 reason: "type_lookup reply encode failed",
4164 })?;
4165 }
4166 TypeLookupReplyPayload::Dependencies(r) => {
4167 r.encode_into(&mut w)
4168 .map_err(|_| DdsError::PreconditionNotMet {
4169 reason: "type_lookup deps reply encode failed",
4170 })?;
4171 }
4172 }
4173 let body = w.into_bytes();
4174 let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
4175 payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
4176 payload.extend_from_slice(&body);
4177
4178 let header = RtpsHeader {
4179 protocol_version: ProtocolVersion::CURRENT,
4180 vendor_id: VendorId::ZERODDS,
4181 guid_prefix: rt.guid_prefix,
4182 };
4183 let data = DataSubmessage {
4184 extra_flags: 0,
4185 reader_id: EntityId::TL_SVC_REPLY_READER,
4186 writer_id: EntityId::TL_SVC_REPLY_WRITER,
4187 writer_sn: request_sn,
4188 inline_qos: None,
4189 key_flag: false,
4190 non_standard_flag: false,
4191 serialized_payload: AllocArc::from(payload.into_boxed_slice()),
4192 };
4193 let datagram =
4194 encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
4195 reason: "type_lookup reply datagram encode failed",
4196 })?;
4197
4198 if target.kind == LocatorKind::UdpV4 {
4199 let _ = rt.user_unicast.send(target, &datagram);
4200 }
4201 Ok(())
4202}
4203
4204fn send_discovery_datagram(rt: &Arc<DcpsRuntime>, targets: &[Locator], bytes: &[u8]) {
4208 let Some(secured) = secure_outbound_bytes(rt, bytes) else {
4209 return;
4210 };
4211 for t in targets {
4212 if t.kind != LocatorKind::UdpV4 {
4213 continue;
4214 }
4215 let _ = rt.spdp_mc_tx.send(t, &secured);
4216 }
4217}
4218
4219#[must_use]
4222pub fn user_multicast_endpoint(domain_id: i32) -> SocketAddr {
4223 let port = 7400u16.saturating_add(250u16.saturating_mul(domain_id as u16).saturating_add(1));
4226 SocketAddr::from((Ipv4Addr::from([239, 255, 0, 1]), port))
4227}
4228
4229#[cfg(test)]
4230#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
4231mod tests {
4232 use super::*;
4233
4234 #[test]
4235 fn strip_user_encap_xcdr2_le() {
4236 let payload = [0x00, 0x07, 0x00, 0x00, 1, 2, 3];
4237 assert_eq!(strip_user_encap(&payload), Some(alloc::vec![1, 2, 3]));
4238 }
4239
4240 #[test]
4241 fn strip_user_encap_xcdr1_le() {
4242 let payload = [0x00, 0x01, 0x00, 0x00, 0xAA];
4244 assert_eq!(strip_user_encap(&payload), Some(alloc::vec![0xAA]));
4245 }
4246
4247 #[test]
4248 fn strip_user_encap_rejects_unknown_scheme() {
4249 let payload = [0xFF, 0xFF, 0x00, 0x00, 1];
4250 assert_eq!(strip_user_encap(&payload), None);
4251 }
4252
4253 #[test]
4254 fn strip_user_encap_rejects_short() {
4255 assert_eq!(strip_user_encap(&[0x00, 0x07]), None);
4256 }
4257
4258 #[test]
4259 fn user_payload_encap_is_xcdr2_le() {
4260 assert_eq!(USER_PAYLOAD_ENCAP, [0x00, 0x07, 0x00, 0x00]);
4261 }
4262
4263 #[test]
4264 fn observability_sink_records_writer_and_reader_creation() {
4265 use std::sync::Arc as StdArc;
4268 use zerodds_foundation::observability::{Component, Level, VecSink};
4269
4270 let sink = StdArc::new(VecSink::new());
4271 let cfg = RuntimeConfig {
4272 observability: sink.clone(),
4273 ..RuntimeConfig::default()
4274 };
4275 let rt =
4276 DcpsRuntime::start(7, GuidPrefix::from_bytes([0xAA; 12]), cfg).expect("start runtime");
4277 let _ = rt.register_user_writer(UserWriterConfig {
4278 topic_name: "ObsTopic".into(),
4279 type_name: "ObsType".into(),
4280 reliable: true,
4281 durability: zerodds_qos::DurabilityKind::Volatile,
4282 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4283 lifespan: zerodds_qos::LifespanQosPolicy::default(),
4284 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4285 ownership: zerodds_qos::OwnershipKind::Shared,
4286 ownership_strength: 0,
4287 partition: alloc::vec![],
4288 user_data: alloc::vec![],
4289 topic_data: alloc::vec![],
4290 group_data: alloc::vec![],
4291 type_identifier: zerodds_types::TypeIdentifier::None,
4292 data_representation_offer: None,
4293 });
4294 let _ = rt.register_user_reader(UserReaderConfig {
4295 topic_name: "ObsTopic".into(),
4296 type_name: "ObsType".into(),
4297 reliable: true,
4298 durability: zerodds_qos::DurabilityKind::Volatile,
4299 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4300 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4301 ownership: zerodds_qos::OwnershipKind::Shared,
4302 partition: alloc::vec![],
4303 user_data: alloc::vec![],
4304 topic_data: alloc::vec![],
4305 group_data: alloc::vec![],
4306 type_identifier: zerodds_types::TypeIdentifier::None,
4307 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
4308 data_representation_offer: None,
4309 });
4310 rt.shutdown();
4311
4312 let events = sink.snapshot();
4313 assert!(
4314 events.iter().any(|e| e.name == "user_writer.created"
4315 && e.component == Component::Dcps
4316 && e.level == Level::Info),
4317 "writer-event missing: got {:?}",
4318 events.iter().map(|e| e.name).collect::<Vec<_>>()
4319 );
4320 assert!(
4321 events
4322 .iter()
4323 .any(|e| e.name == "user_reader.created" && e.component == Component::Dcps),
4324 "reader-event missing"
4325 );
4326 let writer_event = events
4328 .iter()
4329 .find(|e| e.name == "user_writer.created")
4330 .expect("writer event");
4331 assert!(
4332 writer_event
4333 .attrs
4334 .iter()
4335 .any(|a| a.key == "topic" && a.value == "ObsTopic"),
4336 "topic attr missing"
4337 );
4338 }
4339
4340 #[test]
4341 fn runtime_starts_and_shuts_down_cleanly() {
4342 let rt = DcpsRuntime::start(
4343 42,
4344 GuidPrefix::from_bytes([7; 12]),
4345 RuntimeConfig::default(),
4346 )
4347 .expect("start runtime");
4348 assert_eq!(rt.domain_id, 42);
4349 rt.shutdown();
4351 rt.shutdown();
4352 }
4353
4354 #[test]
4355 fn spdp_announces_standard_bits_by_default() {
4356 let rt = DcpsRuntime::start(
4365 5,
4366 GuidPrefix::from_bytes([0xC; 12]),
4367 RuntimeConfig::default(),
4368 )
4369 .expect("start");
4370 let mask = rt.announced_builtin_endpoint_set();
4371 assert_ne!(mask & endpoint_flag::PARTICIPANT_ANNOUNCER, 0);
4373 assert_ne!(mask & endpoint_flag::PARTICIPANT_DETECTOR, 0);
4374 assert_ne!(mask & endpoint_flag::PUBLICATIONS_ANNOUNCER, 0);
4375 assert_ne!(mask & endpoint_flag::SUBSCRIPTIONS_DETECTOR, 0);
4376 assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_WRITER, 0);
4377 assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_READER, 0);
4378 assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REQUEST, 0);
4379 assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REPLY, 0);
4380 assert_eq!(mask & endpoint_flag::TOPICS_ANNOUNCER, 0);
4382 assert_eq!(mask & endpoint_flag::TOPICS_DETECTOR, 0);
4383 assert_eq!(mask & endpoint_flag::ALL_SECURE, 0);
4385 }
4386
4387 #[test]
4388 fn spdp_announces_secure_bits_when_configured() {
4389 let config = RuntimeConfig {
4392 announce_secure_endpoints: true,
4393 ..Default::default()
4394 };
4395 let rt = DcpsRuntime::start(6, GuidPrefix::from_bytes([0xD; 12]), config).expect("start");
4396 let mask = rt.announced_builtin_endpoint_set();
4397 for bit in 16u32..=27 {
4398 assert!(
4399 mask & (1u32 << bit) != 0,
4400 "Secure-Bit {bit} fehlt im SPDP-Announce"
4401 );
4402 }
4403 assert_eq!(
4405 mask & endpoint_flag::ALL_STANDARD,
4406 endpoint_flag::ALL_STANDARD
4407 );
4408 }
4409
4410 #[test]
4411 fn spdp_lease_duration_is_configurable() {
4412 let config = RuntimeConfig {
4414 participant_lease_duration: Duration::from_secs(17),
4415 ..Default::default()
4416 };
4417 let rt = DcpsRuntime::start(7, GuidPrefix::from_bytes([0xE; 12]), config).expect("start");
4418 let secs = rt
4419 .spdp_beacon
4420 .lock()
4421 .map(|b| b.data.lease_duration.seconds)
4422 .unwrap_or(0);
4423 assert_eq!(secs, 17);
4424 }
4425
4426 #[test]
4427 fn user_locator_is_udp_v4_127_0_0_x() {
4428 let rt = DcpsRuntime::start(
4429 0,
4430 GuidPrefix::from_bytes([0xA; 12]),
4431 RuntimeConfig::default(),
4432 )
4433 .expect("start");
4434 let loc = rt.user_locator();
4435 assert_eq!(loc.kind, zerodds_rtps::wire_types::LocatorKind::UdpV4);
4436 assert!(loc.port > 0);
4438 }
4439
4440 #[test]
4441 fn two_runtimes_on_same_domain_can_coexist() {
4442 let a = DcpsRuntime::start(
4444 3,
4445 GuidPrefix::from_bytes([0xA; 12]),
4446 RuntimeConfig::default(),
4447 )
4448 .expect("a");
4449 let b = DcpsRuntime::start(
4450 3,
4451 GuidPrefix::from_bytes([0xB; 12]),
4452 RuntimeConfig::default(),
4453 )
4454 .expect("b");
4455 assert_eq!(a.domain_id, b.domain_id);
4456 }
4457
4458 #[test]
4459 fn peer_capabilities_unknown_peer_returns_none() {
4460 let rt = DcpsRuntime::start(
4461 10,
4462 GuidPrefix::from_bytes([0x60; 12]),
4463 RuntimeConfig::default(),
4464 )
4465 .expect("start");
4466 let caps = rt.peer_capabilities(&GuidPrefix::from_bytes([0xEE; 12]));
4468 assert!(caps.is_none());
4469 }
4470
4471 #[test]
4472 fn assert_liveliness_enqueues_wlp_pulse_without_panic() {
4473 let rt = DcpsRuntime::start(
4476 8,
4477 GuidPrefix::from_bytes([0xF; 12]),
4478 RuntimeConfig::default(),
4479 )
4480 .expect("start");
4481 rt.assert_liveliness();
4482 rt.assert_writer_liveliness(alloc::vec![0xDE, 0xAD]);
4483 let count = rt.wlp.lock().map(|w| w.peer_count()).unwrap_or(usize::MAX);
4485 assert_eq!(count, 0, "kein Peer hat sich gemeldet → 0");
4486 }
4487
4488 #[test]
4489 fn wlp_period_default_is_lease_over_three() {
4490 let rt = DcpsRuntime::start(
4492 9,
4493 GuidPrefix::from_bytes([0x10; 12]),
4494 RuntimeConfig::default(),
4495 )
4496 .expect("start");
4497 let mut wlp = rt.wlp.lock().unwrap();
4502 wlp.assert_participant();
4503 let now0 = Duration::from_secs(0);
4504 let dg = wlp.tick(now0).unwrap();
4505 assert!(dg.is_some(), "Pulse wird sofort emittiert");
4506 }
4507
4508 #[cfg(target_os = "linux")]
4513 #[test]
4514 fn two_runtimes_exchange_wlp_heartbeat_via_multicast() {
4515 let cfg = RuntimeConfig {
4519 tick_period: Duration::from_millis(20),
4520 spdp_period: Duration::from_millis(100),
4521 wlp_period: Duration::from_millis(80),
4523 participant_lease_duration: Duration::from_millis(240),
4524 ..RuntimeConfig::default()
4525 };
4526 let _a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x40; 12]), cfg.clone()).expect("a");
4527 let _b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x41; 12]), cfg).expect("b");
4528
4529 let a_prefix = GuidPrefix::from_bytes([0x40; 12]);
4530 for _ in 0..60 {
4531 thread::sleep(Duration::from_millis(50));
4532 if _b.peer_liveliness_last_seen(&a_prefix).is_some() {
4533 return;
4534 }
4535 }
4536 panic!("B did not see A's WLP heartbeat within 3 s");
4537 }
4538
4539 #[cfg(target_os = "linux")]
4540 #[test]
4541 fn two_runtimes_assert_liveliness_reaches_peer() {
4542 let cfg = RuntimeConfig {
4547 tick_period: Duration::from_millis(20),
4548 spdp_period: Duration::from_millis(100),
4549 wlp_period: Duration::from_secs(3600),
4553 ..RuntimeConfig::default()
4554 };
4555 let a = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x50; 12]), cfg.clone()).expect("a");
4556 let b = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x51; 12]), cfg).expect("b");
4557
4558 a.assert_liveliness();
4559 let a_prefix = GuidPrefix::from_bytes([0x50; 12]);
4560 for _ in 0..60 {
4561 thread::sleep(Duration::from_millis(50));
4562 if b.peer_liveliness_last_seen(&a_prefix).is_some() {
4563 return;
4564 }
4565 }
4566 panic!("B did not see A's manual liveliness assert within 3 s");
4569 }
4570
4571 #[cfg(target_os = "linux")]
4572 #[test]
4573 fn two_runtimes_exchange_sedp_publication_announce() {
4574 use zerodds_qos::{DurabilityKind, ReliabilityKind};
4578 use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
4579
4580 let cfg = RuntimeConfig {
4581 tick_period: Duration::from_millis(20),
4582 spdp_period: Duration::from_millis(100),
4583 ..RuntimeConfig::default()
4584 };
4585 let a = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xCC; 12]), cfg.clone()).expect("a");
4588 let b = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xDD; 12]), cfg).expect("b");
4589
4590 for _ in 0..40 {
4592 thread::sleep(Duration::from_millis(50));
4593 if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
4594 break;
4595 }
4596 }
4597 assert!(
4598 !a.discovered_participants().is_empty(),
4599 "no SPDP discovery a"
4600 );
4601
4602 let pub_data = PublicationBuiltinTopicData {
4604 key: Guid::new(
4605 a.guid_prefix,
4606 EntityId::user_writer_with_key([0x01, 0x02, 0x03]),
4607 ),
4608 participant_key: Guid::new(a.guid_prefix, EntityId::PARTICIPANT),
4609 topic_name: "Chatter".into(),
4610 type_name: "zerodds::RawBytes".into(),
4611 durability: DurabilityKind::Volatile,
4612 reliability: zerodds_qos::ReliabilityQosPolicy {
4613 kind: ReliabilityKind::Reliable,
4614 max_blocking_time: QosDuration::from_millis(100_i32),
4615 },
4616 ownership: zerodds_qos::OwnershipKind::Shared,
4617 ownership_strength: 0,
4618 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4619 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4620 lifespan: zerodds_qos::LifespanQosPolicy::default(),
4621 partition: Vec::new(),
4622 user_data: Vec::new(),
4623 topic_data: Vec::new(),
4624 group_data: Vec::new(),
4625 type_information: None,
4626 data_representation: Vec::new(),
4627 security_info: None,
4628 service_instance_name: None,
4629 related_entity_guid: None,
4630 topic_aliases: None,
4631 type_identifier: zerodds_types::TypeIdentifier::None,
4632 };
4633 a.announce_publication(&pub_data).expect("announce");
4634
4635 for _ in 0..60 {
4638 thread::sleep(Duration::from_millis(50));
4639 if b.discovered_publications_count() > 0 {
4640 return;
4641 }
4642 }
4643 panic!(
4644 "B did not receive SEDP publication within 3 s (pub_count={})",
4645 b.discovered_publications_count()
4646 );
4647 }
4648
4649 #[cfg(target_os = "linux")]
4650 #[test]
4651 fn two_runtimes_e2e_user_data_match_and_transfer() {
4652 let cfg = RuntimeConfig {
4660 tick_period: Duration::from_millis(20),
4661 spdp_period: Duration::from_millis(100),
4662 ..RuntimeConfig::default()
4663 };
4664 let a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xEE; 12]), cfg.clone()).expect("a");
4665 let b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xFF; 12]), cfg).expect("b");
4666
4667 let mut spdp_ok = false;
4669 for _ in 0..60 {
4670 thread::sleep(Duration::from_millis(50));
4671 if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
4672 spdp_ok = true;
4673 break;
4674 }
4675 }
4676 assert!(spdp_ok, "SPDP mutual discovery did not complete in 3 s");
4677
4678 let wid = a
4680 .register_user_writer(UserWriterConfig {
4681 topic_name: "Chatter".into(),
4682 type_name: "zerodds::RawBytes".into(),
4683 reliable: true,
4684 durability: zerodds_qos::DurabilityKind::Volatile,
4685 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4686 lifespan: zerodds_qos::LifespanQosPolicy::default(),
4687 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4688 ownership: zerodds_qos::OwnershipKind::Shared,
4689 ownership_strength: 0,
4690 partition: Vec::new(),
4691 user_data: Vec::new(),
4692 topic_data: Vec::new(),
4693 group_data: Vec::new(),
4694 type_identifier: zerodds_types::TypeIdentifier::None,
4695 data_representation_offer: None,
4696 })
4697 .expect("wid");
4698 let (_rid, rx) = b
4699 .register_user_reader(UserReaderConfig {
4700 topic_name: "Chatter".into(),
4701 type_name: "zerodds::RawBytes".into(),
4702 reliable: true,
4703 durability: zerodds_qos::DurabilityKind::Volatile,
4704 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4705 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4706 ownership: zerodds_qos::OwnershipKind::Shared,
4707 partition: Vec::new(),
4708 user_data: Vec::new(),
4709 topic_data: Vec::new(),
4710 group_data: Vec::new(),
4711 type_identifier: zerodds_types::TypeIdentifier::None,
4712 type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
4713 data_representation_offer: None,
4714 })
4715 .expect("rid");
4716
4717 let mut attempts = 0;
4722 loop {
4723 thread::sleep(Duration::from_millis(50));
4724 let _ = a.write_user_sample(wid, alloc::vec![0xAA, 0xBB, 0xCC]);
4725 if let Ok(sample) = rx.recv_timeout(Duration::from_millis(50)) {
4726 match sample {
4727 UserSample::Alive { payload, .. } => {
4728 assert_eq!(payload, alloc::vec![0xAA, 0xBB, 0xCC]);
4729 return;
4730 }
4731 other => panic!("expected Alive sample, got {other:?}"),
4732 }
4733 }
4734 attempts += 1;
4735 if attempts > 80 {
4736 panic!("no sample delivered within 4 s");
4737 }
4738 }
4739 }
4740
4741 #[cfg(target_os = "linux")]
4742 #[test]
4743 fn two_runtimes_discover_each_other_via_spdp() {
4744 let cfg = RuntimeConfig {
4746 tick_period: Duration::from_millis(20),
4747 spdp_period: Duration::from_millis(100),
4748 ..RuntimeConfig::default()
4749 };
4750 let a = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xAA; 12]), cfg.clone()).expect("a");
4752 let b = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xBB; 12]), cfg).expect("b");
4753
4754 for _ in 0..60 {
4759 thread::sleep(Duration::from_millis(50));
4760 let a_sees_b = a
4761 .discovered_participants()
4762 .iter()
4763 .any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xBB; 12]));
4764 let b_sees_a = b
4765 .discovered_participants()
4766 .iter()
4767 .any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xAA; 12]));
4768 if a_sees_b && b_sees_a {
4769 return;
4770 }
4771 }
4772 panic!(
4773 "mutual SPDP discovery failed within 3 s (a={} b={})",
4774 a.discovered_participants().len(),
4775 b.discovered_participants().len()
4776 );
4777 }
4778
4779 #[cfg(feature = "security")]
4784 #[test]
4785 fn per_target_serializer_produces_different_wire_per_reader() {
4786 use zerodds_security_crypto::AesGcmCryptoPlugin;
4787 use zerodds_security_permissions::parse_governance_xml;
4788 use zerodds_security_runtime::{
4789 PeerCapabilities, ProtectionLevel as SecProtectionLevel, SharedSecurityGate,
4790 };
4791
4792 const GOV: &str = r#"
4796<domain_access_rules>
4797 <domain_rule>
4798 <domains><id>0</id></domains>
4799 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
4800 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
4801 </domain_rule>
4802</domain_access_rules>
4803"#;
4804 let gate = SharedSecurityGate::new(
4805 0,
4806 parse_governance_xml(GOV).unwrap(),
4807 Box::new(AesGcmCryptoPlugin::new()),
4808 );
4809
4810 let cfg = RuntimeConfig {
4811 security: Some(std::sync::Arc::new(gate)),
4812 ..RuntimeConfig::default()
4813 };
4814 let rt =
4815 DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE4; 12]), cfg).expect("start runtime");
4816
4817 let wid = rt
4818 .register_user_writer(UserWriterConfig {
4819 topic_name: "HeteroTopic".into(),
4820 type_name: "zerodds::RawBytes".into(),
4821 reliable: true,
4822 durability: zerodds_qos::DurabilityKind::Volatile,
4823 deadline: zerodds_qos::DeadlineQosPolicy::default(),
4824 lifespan: zerodds_qos::LifespanQosPolicy::default(),
4825 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
4826 ownership: zerodds_qos::OwnershipKind::Shared,
4827 ownership_strength: 0,
4828 partition: Vec::new(),
4829 user_data: Vec::new(),
4830 topic_data: Vec::new(),
4831 group_data: Vec::new(),
4832 type_identifier: zerodds_types::TypeIdentifier::None,
4833 data_representation_offer: None,
4834 })
4835 .expect("register writer");
4836
4837 let legacy_loc = Locator::udp_v4([127, 0, 0, 11], 40001);
4839 let fast_loc = Locator::udp_v4([127, 0, 0, 12], 40002);
4840 let secure_loc = Locator::udp_v4([127, 0, 0, 13], 40003);
4841 let legacy_peer: [u8; 12] = [0x11; 12];
4842 let fast_peer: [u8; 12] = [0x22; 12];
4843 let secure_peer: [u8; 12] = [0x33; 12];
4844
4845 {
4847 let arc = rt.writer_slot(wid).unwrap();
4848 let mut slot = arc.lock().unwrap();
4849 slot.reader_protection
4850 .insert(legacy_peer, SecProtectionLevel::None);
4851 slot.reader_protection
4852 .insert(fast_peer, SecProtectionLevel::Sign);
4853 slot.reader_protection
4854 .insert(secure_peer, SecProtectionLevel::Encrypt);
4855 slot.locator_to_peer.insert(legacy_loc, legacy_peer);
4856 slot.locator_to_peer.insert(fast_loc, fast_peer);
4857 slot.locator_to_peer.insert(secure_loc, secure_peer);
4858 }
4859
4860 let mut msg = Vec::new();
4862 msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
4863 msg.extend_from_slice(&[0xE4; 12]); msg.extend_from_slice(b"HELLO-HETERO");
4865
4866 let wire_legacy =
4867 secure_outbound_for_target(&rt, wid, &msg, &legacy_loc).expect("legacy path");
4868 let wire_fast = secure_outbound_for_target(&rt, wid, &msg, &fast_loc).expect("fast path");
4869 let wire_secure =
4870 secure_outbound_for_target(&rt, wid, &msg, &secure_loc).expect("secure path");
4871
4872 assert_eq!(
4874 wire_legacy, msg,
4875 "Legacy muss byte-identisch zu plaintext sein"
4876 );
4877
4878 assert_ne!(wire_fast, msg, "Fast-Reader muss geschuetzt sein");
4880 assert_ne!(wire_secure, msg, "Secure-Reader muss geschuetzt sein");
4881
4882 assert_ne!(wire_legacy, wire_fast);
4886 assert_ne!(wire_legacy, wire_secure);
4887 assert_ne!(wire_fast, wire_secure);
4888
4889 let unknown_loc = Locator::udp_v4([127, 0, 0, 99], 40099);
4892 let wire_unknown =
4893 secure_outbound_for_target(&rt, wid, &msg, &unknown_loc).expect("fallback path");
4894 assert_ne!(
4895 wire_unknown, msg,
4896 "unbekannter Target soll ueber Domain-Rule geschuetzt werden"
4897 );
4898
4899 let _unused: PeerCapabilities = PeerCapabilities::default();
4903
4904 rt.shutdown();
4905 }
4906
4907 #[cfg(feature = "security")]
4912 #[derive(Default, Clone)]
4913 struct CapturingLogger {
4914 inner: std::sync::Arc<
4915 std::sync::Mutex<Vec<(zerodds_security_runtime::LogLevel, String, String)>>,
4916 >,
4917 }
4918
4919 #[cfg(feature = "security")]
4920 impl CapturingLogger {
4921 fn events(&self) -> Vec<(zerodds_security_runtime::LogLevel, String, String)> {
4922 self.inner.lock().map(|g| g.clone()).unwrap_or_default()
4923 }
4924 }
4925
4926 #[cfg(feature = "security")]
4927 impl zerodds_security_runtime::LoggingPlugin for CapturingLogger {
4928 fn log(
4929 &self,
4930 level: zerodds_security_runtime::LogLevel,
4931 _participant: [u8; 16],
4932 category: &str,
4933 message: &str,
4934 ) {
4935 if let Ok(mut g) = self.inner.lock() {
4936 g.push((level, category.to_string(), message.to_string()));
4937 }
4938 }
4939 fn plugin_class_id(&self) -> &str {
4940 "zerodds.test.capturing_logger"
4941 }
4942 }
4943
4944 #[cfg(feature = "security")]
4945 fn build_runtime_with(
4946 gov_xml: &str,
4947 logger: std::sync::Arc<CapturingLogger>,
4948 ) -> std::sync::Arc<DcpsRuntime> {
4949 use zerodds_security_crypto::AesGcmCryptoPlugin;
4950 use zerodds_security_permissions::parse_governance_xml;
4951 use zerodds_security_runtime::{LoggingPlugin, SharedSecurityGate};
4952 let gate = SharedSecurityGate::new(
4953 0,
4954 parse_governance_xml(gov_xml).unwrap(),
4955 Box::new(AesGcmCryptoPlugin::new()),
4956 );
4957 let logger_dyn: std::sync::Arc<dyn LoggingPlugin> = logger;
4958 let cfg = RuntimeConfig {
4959 security: Some(std::sync::Arc::new(gate)),
4960 security_logger: Some(logger_dyn),
4961 ..RuntimeConfig::default()
4962 };
4963 DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE7; 12]), cfg).expect("start rt")
4964 }
4965
4966 #[cfg(feature = "security")]
4967 #[test]
4968 fn inbound_plain_on_encrypt_domain_drops_with_error_event() {
4969 const GOV_ENCRYPT: &str = r#"
4974<domain_access_rules>
4975 <domain_rule>
4976 <domains><id>0</id></domains>
4977 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
4978 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
4979 </domain_rule>
4980</domain_access_rules>
4981"#;
4982 let logger = std::sync::Arc::new(CapturingLogger::default());
4983 let rt = build_runtime_with(GOV_ENCRYPT, std::sync::Arc::clone(&logger));
4984
4985 let mut plain = Vec::new();
4987 plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
4988 plain.extend_from_slice(&[0x77; 12]); plain.extend_from_slice(b"plaintext-on-encrypted-domain");
4990
4991 let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan);
4992 assert!(out.is_none(), "tampering-Paket muss gedroppt werden");
4993
4994 let events = logger.events();
4995 assert_eq!(events.len(), 1, "genau ein Log-Event erwartet");
4996 let (level, category, _msg) = &events[0];
4997 assert_eq!(
4998 *level,
4999 zerodds_security_runtime::LogLevel::Error,
5000 "plain-on-protected-domain ohne allow_unauth = Error (LegacyBlocked)"
5001 );
5002 assert_eq!(category, "inbound.legacy_blocked");
5003 rt.shutdown();
5004 }
5005
5006 #[cfg(feature = "security")]
5007 #[test]
5008 fn inbound_legacy_peer_accepted_when_governance_allows_unauth() {
5009 const GOV: &str = r#"
5012<domain_access_rules>
5013 <domain_rule>
5014 <domains><id>0</id></domains>
5015 <allow_unauthenticated_participants>TRUE</allow_unauthenticated_participants>
5016 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5017 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5018 </domain_rule>
5019</domain_access_rules>
5020"#;
5021 let logger = std::sync::Arc::new(CapturingLogger::default());
5022 let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5023
5024 let mut plain = Vec::new();
5025 plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5026 plain.extend_from_slice(&[0x88; 12]);
5027 plain.extend_from_slice(b"legacy-but-allowed");
5028
5029 let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan)
5030 .expect("legacy-peer muss akzeptiert werden");
5031 assert_eq!(out, plain, "Output ist byte-identisch (kein crypto-unwrap)");
5032 assert!(logger.events().is_empty(), "kein Log-Event bei Accept-Pfad");
5033 rt.shutdown();
5034 }
5035
5036 #[cfg(feature = "security")]
5037 #[test]
5038 fn inbound_malformed_drops_and_logs_error() {
5039 const GOV: &str = r#"
5040<domain_access_rules>
5041 <domain_rule>
5042 <domains><id>0</id></domains>
5043 <rtps_protection_kind>NONE</rtps_protection_kind>
5044 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5045 </domain_rule>
5046</domain_access_rules>
5047"#;
5048 let logger = std::sync::Arc::new(CapturingLogger::default());
5049 let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5050
5051 let out = secure_inbound_bytes(&rt, &[1, 2, 3, 4], &NetInterface::Wan);
5052 assert!(out.is_none());
5053 let events = logger.events();
5054 assert_eq!(events.len(), 1);
5055 assert_eq!(events[0].0, zerodds_security_runtime::LogLevel::Error);
5056 assert_eq!(events[0].1, "inbound.malformed");
5057 rt.shutdown();
5058 }
5059
5060 #[cfg(feature = "security")]
5061 #[test]
5062 fn inbound_without_security_gate_bypasses_classify_and_logger() {
5063 let logger = std::sync::Arc::new(CapturingLogger::default());
5065 let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
5066 std::sync::Arc::clone(&logger) as _;
5067 let cfg = RuntimeConfig {
5068 security_logger: Some(logger_dyn),
5069 ..RuntimeConfig::default()
5070 };
5071 let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE8; 12]), cfg).unwrap();
5072 let msg = vec![0xAAu8; 40];
5073 let out = secure_inbound_bytes(&rt, &msg, &NetInterface::Wan).unwrap();
5074 assert_eq!(out, msg);
5075 assert!(
5076 logger.events().is_empty(),
5077 "Logger darf ohne Gate NICHT aufgerufen werden"
5078 );
5079 rt.shutdown();
5080 }
5081
5082 #[cfg(feature = "security")]
5087 fn lo_range(third: u8) -> zerodds_security_runtime::IpRange {
5088 zerodds_security_runtime::IpRange {
5089 base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, third)),
5090 prefix_len: 32,
5091 }
5092 }
5093
5094 #[cfg(feature = "security")]
5095 #[test]
5096 fn outbound_pool_routes_target_to_matching_binding() {
5097 let specs = vec![
5098 InterfaceBindingSpec {
5099 name: "lo-a".into(),
5100 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5101 bind_port: 0,
5102 kind: zerodds_security_runtime::NetInterface::Loopback,
5103 subnet: lo_range(11),
5104 default: false,
5105 },
5106 InterfaceBindingSpec {
5107 name: "lo-b".into(),
5108 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5109 bind_port: 0,
5110 kind: zerodds_security_runtime::NetInterface::Wan,
5111 subnet: lo_range(22),
5112 default: true,
5113 },
5114 ];
5115 let pool = OutboundSocketPool::bind_all(&specs).expect("pool");
5116
5117 let t1 = Locator::udp_v4([127, 0, 0, 11], 40000);
5119 let (sock1, iface1) = pool.route(&t1).expect("route 1");
5120 assert_eq!(iface1, zerodds_security_runtime::NetInterface::Loopback);
5121
5122 let t2 = Locator::udp_v4([127, 0, 0, 22], 40000);
5124 let (sock2, iface2) = pool.route(&t2).expect("route 2");
5125 assert_eq!(iface2, zerodds_security_runtime::NetInterface::Wan);
5126
5127 let p1 = sock1.local_locator().port;
5129 let p2 = sock2.local_locator().port;
5130 assert_ne!(p1, p2);
5131 }
5132
5133 #[cfg(feature = "security")]
5134 #[test]
5135 fn outbound_pool_falls_back_to_default_when_no_subnet_matches() {
5136 let specs = vec![
5137 InterfaceBindingSpec {
5138 name: "lo-specific".into(),
5139 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5140 bind_port: 0,
5141 kind: zerodds_security_runtime::NetInterface::Loopback,
5142 subnet: lo_range(33),
5143 default: false,
5144 },
5145 InterfaceBindingSpec {
5146 name: "wan-default".into(),
5147 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5148 bind_port: 0,
5149 kind: zerodds_security_runtime::NetInterface::Wan,
5150 subnet: zerodds_security_runtime::IpRange {
5151 base: core::net::IpAddr::V4(core::net::Ipv4Addr::UNSPECIFIED),
5152 prefix_len: 0,
5153 },
5154 default: true,
5155 },
5156 ];
5157 let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5158 let unknown = Locator::udp_v4([192, 168, 7, 7], 12345);
5159 let (_sock, iface) = pool.route(&unknown).expect("default fallback");
5160 assert_eq!(iface, zerodds_security_runtime::NetInterface::Wan);
5161 }
5162
5163 #[cfg(feature = "security")]
5164 #[test]
5165 fn outbound_pool_returns_none_when_no_match_and_no_default() {
5166 let specs = vec![InterfaceBindingSpec {
5167 name: "only-lo".into(),
5168 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5169 bind_port: 0,
5170 kind: zerodds_security_runtime::NetInterface::Loopback,
5171 subnet: lo_range(44),
5172 default: false,
5173 }];
5174 let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5175 assert!(pool.route(&Locator::udp_v4([8, 8, 8, 8], 53)).is_none());
5176 }
5177
5178 #[cfg(feature = "security")]
5179 #[test]
5180 fn outbound_pool_skips_non_v4_locators() {
5181 let specs = vec![InterfaceBindingSpec {
5182 name: "lo".into(),
5183 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5184 bind_port: 0,
5185 kind: zerodds_security_runtime::NetInterface::Loopback,
5186 subnet: lo_range(55),
5187 default: true,
5188 }];
5189 let pool = OutboundSocketPool::bind_all(&specs).unwrap();
5190 let shm = Locator {
5194 kind: zerodds_rtps::wire_types::LocatorKind::Shm,
5195 port: 0,
5196 address: [0u8; 16],
5197 };
5198 assert!(pool.route(&shm).is_none());
5199 }
5200
5201 #[cfg(feature = "security")]
5202 #[test]
5203 fn dod_plaintext_lo_vs_srtps_wan_via_sniffer() {
5204 use std::net::{SocketAddrV4, UdpSocket};
5221 use zerodds_security_crypto::AesGcmCryptoPlugin;
5222 use zerodds_security_permissions::parse_governance_xml;
5223 use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
5224
5225 const GOV: &str = r#"
5226<domain_access_rules>
5227 <domain_rule>
5228 <domains><id>0</id></domains>
5229 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5230 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5231 </domain_rule>
5232</domain_access_rules>
5233"#;
5234 let lo_sniffer =
5237 UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).expect("lo sniffer");
5238 lo_sniffer
5239 .set_read_timeout(Some(Duration::from_millis(250)))
5240 .unwrap();
5241 let wan_sniffer = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0))
5242 .expect("wan sniffer");
5243 wan_sniffer
5244 .set_read_timeout(Some(Duration::from_millis(250)))
5245 .unwrap();
5246 let lo_port = lo_sniffer.local_addr().unwrap().port();
5247 let wan_port = wan_sniffer.local_addr().unwrap().port();
5248 let lo_target = Locator::udp_v4([127, 0, 0, 1], u32::from(lo_port));
5249 let wan_target = Locator::udp_v4([127, 0, 0, 1], u32::from(wan_port));
5250
5251 let bindings = vec![InterfaceBindingSpec {
5277 name: "lo-for-legacy".into(),
5278 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5279 bind_port: 0,
5280 kind: SecIf::Loopback,
5281 subnet: zerodds_security_runtime::IpRange {
5282 base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 1)),
5283 prefix_len: 32,
5284 },
5285 default: true,
5286 }];
5287 let gate = SharedSecurityGate::new(
5288 0,
5289 parse_governance_xml(GOV).unwrap(),
5290 Box::new(AesGcmCryptoPlugin::new()),
5291 );
5292 let cfg = RuntimeConfig {
5293 security: Some(std::sync::Arc::new(gate)),
5294 interface_bindings: bindings,
5295 ..RuntimeConfig::default()
5296 };
5297 let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF0; 12]), cfg).expect("rt");
5298
5299 let wid = rt
5300 .register_user_writer(UserWriterConfig {
5301 topic_name: "HeteroRouting".into(),
5302 type_name: "zerodds::RawBytes".into(),
5303 reliable: true,
5304 durability: zerodds_qos::DurabilityKind::Volatile,
5305 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5306 lifespan: zerodds_qos::LifespanQosPolicy::default(),
5307 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
5308 ownership: zerodds_qos::OwnershipKind::Shared,
5309 ownership_strength: 0,
5310 partition: Vec::new(),
5311 user_data: Vec::new(),
5312 topic_data: Vec::new(),
5313 group_data: Vec::new(),
5314 type_identifier: zerodds_types::TypeIdentifier::None,
5315 data_representation_offer: None,
5316 })
5317 .unwrap();
5318
5319 let legacy_peer: [u8; 12] = [0x01; 12];
5322 let secure_peer: [u8; 12] = [0x02; 12];
5323 {
5324 let arc = rt.writer_slot(wid).unwrap();
5325 let mut slot = arc.lock().unwrap();
5326 slot.reader_protection
5327 .insert(legacy_peer, ProtectionLevel::None);
5328 slot.reader_protection
5329 .insert(secure_peer, ProtectionLevel::Encrypt);
5330 slot.locator_to_peer.insert(lo_target, legacy_peer);
5331 slot.locator_to_peer.insert(wan_target, secure_peer);
5332 }
5333
5334 let mut msg = Vec::new();
5336 msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5337 msg.extend_from_slice(&[0xF0; 12]);
5338 msg.extend_from_slice(b"DOD-ROUTING-PAYLOAD");
5339
5340 let plain_wire = secure_outbound_for_target(&rt, wid, &msg, &lo_target).unwrap();
5342 let secure_wire = secure_outbound_for_target(&rt, wid, &msg, &wan_target).unwrap();
5343 assert_eq!(plain_wire, msg, "lo-target: plaintext");
5344 assert_ne!(secure_wire, msg, "wan-target: SRTPS-gewrappt");
5345
5346 send_on_best_interface(&rt, &lo_target, &plain_wire);
5347 send_on_best_interface(&rt, &wan_target, &secure_wire);
5348
5349 let mut buf = [0u8; 4096];
5351 let (n1, _) = lo_sniffer.recv_from(&mut buf).expect("lo snif got");
5352 assert_eq!(
5353 &buf[..n1],
5354 &msg[..],
5355 "Loopback-Sniffer muss plaintext sehen"
5356 );
5357 let (n2, _) = wan_sniffer.recv_from(&mut buf).expect("wan snif got");
5358 assert_ne!(
5359 &buf[..n2],
5360 &msg[..],
5361 "WAN-Sniffer muss SRTPS-gewrappt sehen"
5362 );
5363 assert_eq!(
5366 buf[20], 0x33,
5367 "WAN-Output muss mit SRTPS_PREFIX-Submessage beginnen"
5368 );
5369
5370 rt.shutdown();
5371 }
5372
5373 #[cfg(feature = "security")]
5374 #[test]
5375 fn inbound_loopback_accepts_plain_on_protected_domain() {
5376 use zerodds_security_runtime::NetInterface as SecIf;
5381 const GOV: &str = r#"
5382<domain_access_rules>
5383 <domain_rule>
5384 <domains><id>0</id></domains>
5385 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5386 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5387 </domain_rule>
5388</domain_access_rules>
5389"#;
5390 let logger = std::sync::Arc::new(CapturingLogger::default());
5391 let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
5392
5393 let mut plain = Vec::new();
5394 plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5395 plain.extend_from_slice(&[0x99; 12]);
5396 plain.extend_from_slice(b"loopback-plain-is-ok");
5397
5398 let out = secure_inbound_bytes(&rt, &plain, &SecIf::Loopback)
5400 .expect("Loopback plain muss akzeptiert werden");
5401 assert_eq!(out, plain);
5402 assert!(logger.events().is_empty());
5403
5404 let out_wan = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
5406 assert!(out_wan.is_none());
5407 let evs = logger.events();
5408 assert_eq!(evs.len(), 1);
5409 assert_eq!(evs[0].0, zerodds_security_runtime::LogLevel::Error);
5410 assert!(
5411 evs[0].2.contains("iface=Wan"),
5412 "Log-Message muss iface tragen"
5413 );
5414 rt.shutdown();
5415 }
5416
5417 #[cfg(feature = "security")]
5418 #[test]
5419 fn dod_inbound_per_interface_receive_via_pool_socket() {
5420 use std::net::{SocketAddrV4, UdpSocket};
5438 use zerodds_security_crypto::AesGcmCryptoPlugin;
5439 use zerodds_security_permissions::parse_governance_xml;
5440 use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
5441
5442 const GOV: &str = r#"
5443<domain_access_rules>
5444 <domain_rule>
5445 <domains><id>0</id></domains>
5446 <rtps_protection_kind>ENCRYPT</rtps_protection_kind>
5447 <topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
5448 </domain_rule>
5449</domain_access_rules>
5450"#;
5451 let logger = std::sync::Arc::new(CapturingLogger::default());
5452 let gate = SharedSecurityGate::new(
5453 0,
5454 parse_governance_xml(GOV).unwrap(),
5455 Box::new(AesGcmCryptoPlugin::new()),
5456 );
5457 let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
5458 std::sync::Arc::clone(&logger) as _;
5459 let bindings = vec![InterfaceBindingSpec {
5460 name: "lo".into(),
5461 bind_addr: Ipv4Addr::new(127, 0, 0, 1),
5462 bind_port: 0,
5463 kind: SecIf::Loopback,
5464 subnet: zerodds_security_runtime::IpRange {
5465 base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 0)),
5466 prefix_len: 8,
5467 },
5468 default: true,
5469 }];
5470 let cfg = RuntimeConfig {
5471 security: Some(std::sync::Arc::new(gate)),
5472 security_logger: Some(logger_dyn),
5473 interface_bindings: bindings,
5474 ..RuntimeConfig::default()
5475 };
5476 let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF1; 12]), cfg).expect("rt");
5477
5478 let pool_port = rt.outbound_pool.as_ref().unwrap().bindings[0]
5480 .socket
5481 .local_locator()
5482 .port as u16;
5483 assert!(pool_port > 0);
5484
5485 let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).unwrap();
5487 let mut plain = Vec::new();
5488 plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
5489 plain.extend_from_slice(&[0xAB; 12]);
5490 plain.extend_from_slice(b"loopback-dispatch");
5491 sender
5492 .send_to(
5493 &plain,
5494 SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pool_port),
5495 )
5496 .unwrap();
5497
5498 std::thread::sleep(Duration::from_millis(300));
5501
5502 let pool_events = logger.events();
5505
5506 let _ = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
5509 let after = logger.events();
5510 assert!(
5511 after.len() > pool_events.len(),
5512 "Wan-Pfad muss ein neues Log-Event erzeugen"
5513 );
5514 let new_ev = &after[after.len() - 1];
5515 assert_eq!(new_ev.0, zerodds_security_runtime::LogLevel::Error);
5516 assert!(
5517 new_ev.2.contains("iface=Wan"),
5518 "Log-Message traegt iface-Marker: got={:?}",
5519 new_ev.2
5520 );
5521
5522 for (lvl, cat, msg) in &pool_events {
5525 assert_ne!(
5526 *lvl,
5527 zerodds_security_runtime::LogLevel::Error,
5528 "Loopback-Pfad darf kein Error-Event erzeugen: cat={cat} msg={msg}"
5529 );
5530 }
5531 rt.shutdown();
5532 }
5533
5534 #[cfg(feature = "security")]
5535 #[test]
5536 fn per_target_without_security_gate_is_passthrough() {
5537 let rt = DcpsRuntime::start(
5541 0,
5542 GuidPrefix::from_bytes([0xE5; 12]),
5543 RuntimeConfig::default(),
5544 )
5545 .expect("rt");
5546 let wid = rt
5547 .register_user_writer(UserWriterConfig {
5548 topic_name: "T".into(),
5549 type_name: "zerodds::RawBytes".into(),
5550 reliable: true,
5551 durability: zerodds_qos::DurabilityKind::Volatile,
5552 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5553 lifespan: zerodds_qos::LifespanQosPolicy::default(),
5554 liveliness: zerodds_qos::LivelinessQosPolicy::default(),
5555 ownership: zerodds_qos::OwnershipKind::Shared,
5556 ownership_strength: 0,
5557 partition: Vec::new(),
5558 user_data: Vec::new(),
5559 topic_data: Vec::new(),
5560 group_data: Vec::new(),
5561 type_identifier: zerodds_types::TypeIdentifier::None,
5562 data_representation_offer: None,
5563 })
5564 .unwrap();
5565 let tgt = Locator::udp_v4([127, 0, 0, 1], 40000);
5566 let msg = b"raw-plaintext".to_vec();
5567 let out = secure_outbound_for_target(&rt, wid, &msg, &tgt).unwrap();
5568 assert_eq!(out, msg, "ohne Gate muss passthrough sein");
5569 rt.shutdown();
5570 }
5571
5572 fn make_remote_spdp_beacon(remote_prefix: GuidPrefix) -> Vec<u8> {
5578 use zerodds_discovery::spdp::SpdpBeacon;
5579 use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
5580 use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
5581 let data = ParticipantBuiltinTopicData {
5582 guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
5583 protocol_version: ProtocolVersion::V2_5,
5584 vendor_id: VendorId::ZERODDS,
5585 default_unicast_locator: None,
5586 default_multicast_locator: None,
5587 metatraffic_unicast_locator: None,
5588 metatraffic_multicast_locator: None,
5589 domain_id: Some(0),
5590 builtin_endpoint_set: 0,
5591 lease_duration: QosDuration::from_secs(100),
5592 user_data: alloc::vec::Vec::new(),
5593 properties: Default::default(),
5594 identity_token: None,
5595 permissions_token: None,
5596 identity_status_token: None,
5597 sig_algo_info: None,
5598 kx_algo_info: None,
5599 sym_cipher_algo_info: None,
5600 };
5601 let mut beacon = SpdpBeacon::new(data);
5602 beacon.serialize().expect("serialize")
5603 }
5604
5605 #[test]
5606 fn handle_spdp_datagram_pushes_into_builtin_participant_reader() {
5607 let rt = DcpsRuntime::start(
5608 41,
5609 GuidPrefix::from_bytes([0x21; 12]),
5610 RuntimeConfig::default(),
5611 )
5612 .expect("start");
5613 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5614 rt.attach_builtin_sinks(bs.sinks());
5615
5616 let remote = GuidPrefix::from_bytes([0x99; 12]);
5617 let dg = make_remote_spdp_beacon(remote);
5618 handle_spdp_datagram(&rt, &dg);
5620
5621 let reader = bs
5622 .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5623 "DCPSParticipant",
5624 )
5625 .unwrap();
5626 let samples = reader.take().unwrap();
5627 assert_eq!(samples.len(), 1, "Genau 1 Sample fuer 1 SPDP-Beacon");
5628 assert_eq!(samples[0].key.prefix, remote);
5629 rt.shutdown();
5630 }
5631
5632 #[test]
5633 fn handle_spdp_datagram_skips_self_beacon() {
5634 let prefix = GuidPrefix::from_bytes([0x22; 12]);
5635 let rt = DcpsRuntime::start(42, prefix, RuntimeConfig::default()).expect("start");
5636 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5637 rt.attach_builtin_sinks(bs.sinks());
5638
5639 let dg = make_remote_spdp_beacon(prefix);
5642 handle_spdp_datagram(&rt, &dg);
5643
5644 let reader = bs
5645 .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5646 "DCPSParticipant",
5647 )
5648 .unwrap();
5649 let samples = reader.take().unwrap();
5650 assert!(
5651 samples.is_empty(),
5652 "Eigenes Beacon darf nicht geloggt werden"
5653 );
5654 rt.shutdown();
5655 }
5656
5657 #[test]
5658 fn sedp_event_push_populates_publication_and_topic_readers() {
5659 use crate::builtin_topics as bt;
5660 use zerodds_discovery::sedp::SedpEvents;
5661 use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5662 let rt = DcpsRuntime::start(
5663 43,
5664 GuidPrefix::from_bytes([0x23; 12]),
5665 RuntimeConfig::default(),
5666 )
5667 .expect("start");
5668 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5669 rt.attach_builtin_sinks(bs.sinks());
5670
5671 let mut events = SedpEvents::default();
5672 events.new_publications.push(
5673 zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5674 key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
5675 participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
5676 topic_name: "WireT".into(),
5677 type_name: "WireType".into(),
5678 durability: zerodds_qos::DurabilityKind::Volatile,
5679 reliability: ReliabilityQosPolicy::default(),
5680 ownership: zerodds_qos::OwnershipKind::Shared,
5681 ownership_strength: 0,
5682 liveliness: LivelinessQosPolicy::default(),
5683 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5684 lifespan: zerodds_qos::LifespanQosPolicy::default(),
5685 partition: Vec::new(),
5686 user_data: Vec::new(),
5687 topic_data: Vec::new(),
5688 group_data: Vec::new(),
5689 type_information: None,
5690 data_representation: Vec::new(),
5691 security_info: None,
5692 service_instance_name: None,
5693 related_entity_guid: None,
5694 topic_aliases: None,
5695 type_identifier: zerodds_types::TypeIdentifier::None,
5696 },
5697 );
5698
5699 push_sedp_events_to_builtin_readers(&rt, &events);
5700
5701 let pub_reader = bs
5702 .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
5703 .unwrap();
5704 let pub_samples = pub_reader.take().unwrap();
5705 assert_eq!(pub_samples.len(), 1);
5706 assert_eq!(pub_samples[0].topic_name, "WireT");
5707
5708 let topic_reader = bs
5709 .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5710 .unwrap();
5711 let topic_samples = topic_reader.take().unwrap();
5712 assert_eq!(topic_samples.len(), 1);
5713 assert_eq!(topic_samples[0].name, "WireT");
5714 rt.shutdown();
5715 }
5716
5717 #[test]
5718 fn sedp_event_push_populates_subscription_reader() {
5719 use crate::builtin_topics as bt;
5720 use zerodds_discovery::sedp::SedpEvents;
5721 use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5722 let rt = DcpsRuntime::start(
5723 44,
5724 GuidPrefix::from_bytes([0x24; 12]),
5725 RuntimeConfig::default(),
5726 )
5727 .expect("start");
5728 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5729 rt.attach_builtin_sinks(bs.sinks());
5730
5731 let mut events = SedpEvents::default();
5732 events.new_subscriptions.push(
5733 zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
5734 key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
5735 participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
5736 topic_name: "SubT".into(),
5737 type_name: "SubType".into(),
5738 durability: zerodds_qos::DurabilityKind::Volatile,
5739 reliability: ReliabilityQosPolicy::default(),
5740 ownership: zerodds_qos::OwnershipKind::Shared,
5741 liveliness: LivelinessQosPolicy::default(),
5742 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5743 partition: Vec::new(),
5744 user_data: Vec::new(),
5745 topic_data: Vec::new(),
5746 group_data: Vec::new(),
5747 type_information: None,
5748 data_representation: Vec::new(),
5749 content_filter: None,
5750 security_info: None,
5751 service_instance_name: None,
5752 related_entity_guid: None,
5753 topic_aliases: None,
5754 type_identifier: zerodds_types::TypeIdentifier::None,
5755 },
5756 );
5757
5758 push_sedp_events_to_builtin_readers(&rt, &events);
5759
5760 let sub_reader = bs
5761 .lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
5762 .unwrap();
5763 let sub_samples = sub_reader.take().unwrap();
5764 assert_eq!(sub_samples.len(), 1);
5765 assert_eq!(sub_samples[0].topic_name, "SubT");
5766
5767 let topic_reader = bs
5770 .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5771 .unwrap();
5772 let topic_samples = topic_reader.take().unwrap();
5773 assert_eq!(topic_samples.len(), 1);
5774 assert_eq!(topic_samples[0].name, "SubT");
5775 rt.shutdown();
5776 }
5777
5778 #[test]
5779 fn push_sedp_events_to_builtin_readers_is_noop_without_sinks() {
5780 use zerodds_discovery::sedp::SedpEvents;
5781 let rt = DcpsRuntime::start(
5782 45,
5783 GuidPrefix::from_bytes([0x25; 12]),
5784 RuntimeConfig::default(),
5785 )
5786 .expect("start");
5787 let events = SedpEvents::default();
5790 push_sedp_events_to_builtin_readers(&rt, &events);
5791 rt.shutdown();
5792 }
5793
5794 #[test]
5797 fn handle_spdp_datagram_drops_ignored_participant_beacon() {
5798 let rt = DcpsRuntime::start(
5801 46,
5802 GuidPrefix::from_bytes([0x26; 12]),
5803 RuntimeConfig::default(),
5804 )
5805 .expect("start");
5806 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5807 rt.attach_builtin_sinks(bs.sinks());
5808 let filter = crate::participant::IgnoreFilter::default();
5809 rt.attach_ignore_filter(filter.clone());
5810
5811 let remote = GuidPrefix::from_bytes([0xAA; 12]);
5812 let key = Guid::new(remote, EntityId::PARTICIPANT);
5816 let h = crate::instance_handle::InstanceHandle::from_guid(key);
5817 if let Ok(mut s) = filter.inner.participants.lock() {
5818 s.insert(h);
5819 }
5820 let dg = make_remote_spdp_beacon(remote);
5821 handle_spdp_datagram(&rt, &dg);
5822
5823 let reader = bs
5824 .lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
5825 "DCPSParticipant",
5826 )
5827 .unwrap();
5828 assert!(
5829 reader.take().unwrap().is_empty(),
5830 "ignorierter Participant darf nicht in DCPSParticipant landen"
5831 );
5832 rt.shutdown();
5833 }
5834
5835 #[test]
5836 fn sedp_event_push_filters_ignored_publication() {
5837 use crate::builtin_topics as bt;
5838 use zerodds_discovery::sedp::SedpEvents;
5839 use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5840 let rt = DcpsRuntime::start(
5841 47,
5842 GuidPrefix::from_bytes([0x27; 12]),
5843 RuntimeConfig::default(),
5844 )
5845 .expect("start");
5846 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5847 rt.attach_builtin_sinks(bs.sinks());
5848 let filter = crate::participant::IgnoreFilter::default();
5849 rt.attach_ignore_filter(filter.clone());
5850
5851 let pub_key = Guid::new(GuidPrefix::from_bytes([0x33; 12]), EntityId::PARTICIPANT);
5852 let h_pub = crate::instance_handle::InstanceHandle::from_guid(pub_key);
5853 if let Ok(mut s) = filter.inner.publications.lock() {
5854 s.insert(h_pub);
5855 }
5856
5857 let mut events = SedpEvents::default();
5858 events.new_publications.push(
5859 zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5860 key: pub_key,
5861 participant_key: Guid::new(
5862 GuidPrefix::from_bytes([0x33; 12]),
5863 EntityId::PARTICIPANT,
5864 ),
5865 topic_name: "Filtered".into(),
5866 type_name: "T".into(),
5867 durability: zerodds_qos::DurabilityKind::Volatile,
5868 reliability: ReliabilityQosPolicy::default(),
5869 ownership: zerodds_qos::OwnershipKind::Shared,
5870 ownership_strength: 0,
5871 liveliness: LivelinessQosPolicy::default(),
5872 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5873 lifespan: zerodds_qos::LifespanQosPolicy::default(),
5874 partition: Vec::new(),
5875 user_data: Vec::new(),
5876 topic_data: Vec::new(),
5877 group_data: Vec::new(),
5878 type_information: None,
5879 data_representation: Vec::new(),
5880 security_info: None,
5881 service_instance_name: None,
5882 related_entity_guid: None,
5883 topic_aliases: None,
5884 type_identifier: zerodds_types::TypeIdentifier::None,
5885 },
5886 );
5887
5888 push_sedp_events_to_builtin_readers(&rt, &events);
5889
5890 let pub_reader = bs
5891 .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
5892 .unwrap();
5893 assert!(
5894 pub_reader.take().unwrap().is_empty(),
5895 "ignorierte Publication darf nicht in DCPSPublication landen"
5896 );
5897 let topic_reader = bs
5901 .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
5902 .unwrap();
5903 assert!(topic_reader.take().unwrap().is_empty());
5904 rt.shutdown();
5905 }
5906
5907 #[test]
5908 fn sedp_event_push_filters_ignored_subscription() {
5909 use crate::builtin_topics as bt;
5910 use zerodds_discovery::sedp::SedpEvents;
5911 use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5912 let rt = DcpsRuntime::start(
5913 48,
5914 GuidPrefix::from_bytes([0x28; 12]),
5915 RuntimeConfig::default(),
5916 )
5917 .expect("start");
5918 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5919 rt.attach_builtin_sinks(bs.sinks());
5920 let filter = crate::participant::IgnoreFilter::default();
5921 rt.attach_ignore_filter(filter.clone());
5922
5923 let sub_key = Guid::new(GuidPrefix::from_bytes([0x44; 12]), EntityId::PARTICIPANT);
5924 let h_sub = crate::instance_handle::InstanceHandle::from_guid(sub_key);
5925 if let Ok(mut s) = filter.inner.subscriptions.lock() {
5926 s.insert(h_sub);
5927 }
5928
5929 let mut events = SedpEvents::default();
5930 events.new_subscriptions.push(
5931 zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
5932 key: sub_key,
5933 participant_key: Guid::new(
5934 GuidPrefix::from_bytes([0x44; 12]),
5935 EntityId::PARTICIPANT,
5936 ),
5937 topic_name: "FilteredSub".into(),
5938 type_name: "T".into(),
5939 durability: zerodds_qos::DurabilityKind::Volatile,
5940 reliability: ReliabilityQosPolicy::default(),
5941 ownership: zerodds_qos::OwnershipKind::Shared,
5942 liveliness: LivelinessQosPolicy::default(),
5943 deadline: zerodds_qos::DeadlineQosPolicy::default(),
5944 partition: Vec::new(),
5945 user_data: Vec::new(),
5946 topic_data: Vec::new(),
5947 group_data: Vec::new(),
5948 type_information: None,
5949 data_representation: Vec::new(),
5950 content_filter: None,
5951 security_info: None,
5952 service_instance_name: None,
5953 related_entity_guid: None,
5954 topic_aliases: None,
5955 type_identifier: zerodds_types::TypeIdentifier::None,
5956 },
5957 );
5958
5959 push_sedp_events_to_builtin_readers(&rt, &events);
5960
5961 let sub_reader = bs
5962 .lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
5963 .unwrap();
5964 assert!(sub_reader.take().unwrap().is_empty());
5965 rt.shutdown();
5966 }
5967
5968 #[test]
5969 fn sedp_event_push_filters_ignored_topic_only() {
5970 use crate::builtin_topics as bt;
5974 use zerodds_discovery::sedp::SedpEvents;
5975 use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
5976 let rt = DcpsRuntime::start(
5977 49,
5978 GuidPrefix::from_bytes([0x29; 12]),
5979 RuntimeConfig::default(),
5980 )
5981 .expect("start");
5982 let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
5983 rt.attach_builtin_sinks(bs.sinks());
5984 let filter = crate::participant::IgnoreFilter::default();
5985 rt.attach_ignore_filter(filter.clone());
5986
5987 let topic_key =
5988 crate::builtin_topics::TopicBuiltinTopicData::synthesize_key("OnlyTopic", "T");
5989 let h_topic = crate::instance_handle::InstanceHandle::from_guid(topic_key);
5990 if let Ok(mut s) = filter.inner.topics.lock() {
5991 s.insert(h_topic);
5992 }
5993
5994 let mut events = SedpEvents::default();
5995 events.new_publications.push(
5996 zerodds_rtps::publication_data::PublicationBuiltinTopicData {
5997 key: Guid::new(GuidPrefix::from_bytes([0x55; 12]), EntityId::PARTICIPANT),
5998 participant_key: Guid::new(
5999 GuidPrefix::from_bytes([0x55; 12]),
6000 EntityId::PARTICIPANT,
6001 ),
6002 topic_name: "OnlyTopic".into(),
6003 type_name: "T".into(),
6004 durability: zerodds_qos::DurabilityKind::Volatile,
6005 reliability: ReliabilityQosPolicy::default(),
6006 ownership: zerodds_qos::OwnershipKind::Shared,
6007 ownership_strength: 0,
6008 liveliness: LivelinessQosPolicy::default(),
6009 deadline: zerodds_qos::DeadlineQosPolicy::default(),
6010 lifespan: zerodds_qos::LifespanQosPolicy::default(),
6011 partition: Vec::new(),
6012 user_data: Vec::new(),
6013 topic_data: Vec::new(),
6014 group_data: Vec::new(),
6015 type_information: None,
6016 data_representation: Vec::new(),
6017 security_info: None,
6018 service_instance_name: None,
6019 related_entity_guid: None,
6020 topic_aliases: None,
6021 type_identifier: zerodds_types::TypeIdentifier::None,
6022 },
6023 );
6024
6025 push_sedp_events_to_builtin_readers(&rt, &events);
6026
6027 let pub_reader = bs
6028 .lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
6029 .unwrap();
6030 assert_eq!(pub_reader.take().unwrap().len(), 1);
6031 let topic_reader = bs
6032 .lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
6033 .unwrap();
6034 assert!(
6035 topic_reader.take().unwrap().is_empty(),
6036 "ignoriertes Topic darf das synth. DCPSTopic-Sample blockieren"
6037 );
6038 rt.shutdown();
6039 }
6040
6041 fn make_remote_spdp_beacon_with_flags(remote_prefix: GuidPrefix, endpoint_set: u32) -> Vec<u8> {
6047 use zerodds_discovery::spdp::SpdpBeacon;
6048 use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
6049 use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
6050 let data = ParticipantBuiltinTopicData {
6051 guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
6052 protocol_version: ProtocolVersion::V2_5,
6053 vendor_id: VendorId::ZERODDS,
6054 default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7500)),
6055 default_multicast_locator: None,
6056 metatraffic_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7501)),
6057 metatraffic_multicast_locator: None,
6058 domain_id: Some(0),
6059 builtin_endpoint_set: endpoint_set,
6060 lease_duration: QosDuration::from_secs(100),
6061 user_data: alloc::vec::Vec::new(),
6062 properties: Default::default(),
6063 identity_token: None,
6064 permissions_token: None,
6065 identity_status_token: None,
6066 sig_algo_info: None,
6067 kx_algo_info: None,
6068 sym_cipher_algo_info: None,
6069 };
6070 let mut beacon = SpdpBeacon::new(data);
6071 beacon.serialize().expect("serialize")
6072 }
6073
6074 #[test]
6082 fn c34c_security_builtin_wiring_end_to_end() {
6083 use zerodds_discovery::security::SecurityBuiltinStack;
6084 use zerodds_security::generic_message::{
6085 MessageIdentity, ParticipantGenericMessage, class_id,
6086 };
6087 use zerodds_security::token::DataHolder;
6088
6089 let local_prefix = GuidPrefix::from_bytes([0x75; 12]);
6090 let rt = DcpsRuntime::start(75, local_prefix, RuntimeConfig::default()).expect("start");
6091
6092 assert!(rt.security_builtin_snapshot().is_none());
6094
6095 let h1 = rt.enable_security_builtins(VendorId::ZERODDS);
6097 let h2 = rt.enable_security_builtins(VendorId::ZERODDS);
6098 assert!(Arc::ptr_eq(&h1, &h2));
6099 assert!(rt.security_builtin_snapshot().is_some());
6100
6101 let remote_a = GuidPrefix::from_bytes([0x99; 12]);
6104 let flags_all = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
6105 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
6106 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
6107 | endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
6108 handle_spdp_datagram(
6109 &rt,
6110 &make_remote_spdp_beacon_with_flags(remote_a, flags_all),
6111 );
6112 {
6113 let s = h1.lock().unwrap();
6114 assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
6115 assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
6116 assert_eq!(s.volatile_writer.reader_proxy_count(), 1);
6117 assert_eq!(s.volatile_reader.writer_proxy_count(), 1);
6118 }
6119
6120 let remote_b = GuidPrefix::from_bytes([0x88; 12]);
6122 handle_spdp_datagram(
6123 &rt,
6124 &make_remote_spdp_beacon_with_flags(remote_b, endpoint_flag::ALL_STANDARD),
6125 );
6126 {
6127 let s = h1.lock().unwrap();
6128 assert_eq!(
6129 s.stateless_writer.reader_proxy_count(),
6130 1,
6131 "Peer ohne Security-Bits darf bestehende Proxies nicht beruehren"
6132 );
6133 }
6134
6135 let mut remote_stack = SecurityBuiltinStack::new(remote_a, VendorId::ZERODDS);
6139 let local_peer = make_remote_spdp_beacon_with_flags(local_prefix, flags_all);
6140 let parsed_local = zerodds_discovery::spdp::SpdpReader::new()
6141 .parse_datagram(&local_peer)
6142 .unwrap();
6143 remote_stack.handle_remote_endpoints(&parsed_local);
6144 let msg = ParticipantGenericMessage {
6145 message_identity: MessageIdentity {
6146 source_guid: [0xCD; 16],
6147 sequence_number: 1,
6148 },
6149 related_message_identity: MessageIdentity::default(),
6150 destination_participant_key: [0xEF; 16],
6151 destination_endpoint_key: [0; 16],
6152 source_endpoint_key: [0xFE; 16],
6153 message_class_id: class_id::AUTH_REQUEST.into(),
6154 message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
6155 };
6156 let dgs = remote_stack.stateless_writer.write(&msg).unwrap();
6157 assert_eq!(dgs.len(), 1);
6158 dispatch_security_builtin_datagram(&rt, &dgs[0].bytes, Duration::from_secs(1));
6159
6160 dispatch_security_builtin_datagram(&rt, &[0u8; 32], Duration::from_secs(1));
6162
6163 rt.shutdown();
6164 }
6165
6166 #[test]
6167 fn c34c_enable_security_builtins_replays_known_peers() {
6168 let rt = DcpsRuntime::start(
6173 76,
6174 GuidPrefix::from_bytes([0x76; 12]),
6175 RuntimeConfig::default(),
6176 )
6177 .expect("start");
6178
6179 dispatch_security_builtin_datagram(&rt, &[0u8; 16], Duration::from_secs(1));
6181
6182 let remote = GuidPrefix::from_bytes([0x77; 12]);
6183 let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
6184 | endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
6185 let dg = make_remote_spdp_beacon_with_flags(remote, flags);
6186 handle_spdp_datagram(&rt, &dg);
6187
6188 let stack = rt.enable_security_builtins(VendorId::ZERODDS);
6189 {
6190 let s = stack.lock().unwrap();
6191 assert_eq!(
6192 s.stateless_writer.reader_proxy_count(),
6193 1,
6194 "spaete Plugin-Activation muss bekannte Peers nachholen"
6195 );
6196 }
6197
6198 rt.shutdown();
6199 }
6200}