extern crate alloc;
use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::time::Duration;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::mpsc;
use std::sync::{Condvar, Mutex, RwLock};
use std::thread::{self, JoinHandle};
use std::time::Instant;
use zerodds_discovery::security::SecurityBuiltinStack;
use zerodds_discovery::sedp::SedpStack;
use zerodds_discovery::spdp::{
DiscoveredParticipant, DiscoveredParticipantsCache, SpdpBeacon, SpdpReader,
};
use zerodds_discovery::type_lookup::{
TypeLookupClient, TypeLookupEndpoints, TypeLookupReply, TypeLookupServer,
};
use zerodds_qos::Duration as QosDuration;
use zerodds_rtps::EntityId;
use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
use zerodds_rtps::fragment_assembler::AssemblerCaps;
use zerodds_rtps::history_cache::HistoryKind;
use zerodds_rtps::message_builder::DEFAULT_MTU;
use zerodds_rtps::participant_data::{ParticipantBuiltinTopicData, endpoint_flag};
use zerodds_rtps::reliable_reader::{ReliableReader, ReliableReaderConfig};
use zerodds_rtps::reliable_writer::{
DEFAULT_FRAGMENT_SIZE, DEFAULT_HEARTBEAT_PERIOD, ReliableWriter, ReliableWriterConfig,
};
use zerodds_rtps::wire_types::{
Guid, GuidPrefix, Locator, LocatorKind, ProtocolVersion, SPDP_DEFAULT_MULTICAST_ADDRESS,
VendorId, spdp_multicast_port,
};
use zerodds_transport::Transport;
use zerodds_transport_udp::UdpTransport;
#[cfg(feature = "security")]
use zerodds_security_runtime::{EndpointProtection, IpRange, NetInterface, ProtectionLevel};
use crate::error::{DdsError, Result};
pub const DEFAULT_TICK_PERIOD: Duration = Duration::from_millis(5);
pub const DEFAULT_SPDP_PERIOD: Duration = Duration::from_secs(5);
fn deadline_compat(offered_nanos: u64, requested_nanos: u64) -> bool {
if offered_nanos == 0 || requested_nanos == 0 {
return true;
}
offered_nanos <= requested_nanos
}
fn partitions_overlap(offered: &[String], requested: &[String]) -> bool {
if offered.is_empty() && requested.is_empty() {
return true;
}
let off_default = offered.is_empty();
let req_default = requested.is_empty();
if off_default && requested.iter().any(|s| s.is_empty()) {
return true;
}
if req_default && offered.iter().any(|s| s.is_empty()) {
return true;
}
offered.iter().any(|o| requested.iter().any(|r| r == o))
}
#[cfg(feature = "std")]
fn announce_locator(uc: &UdpTransport, hint: Ipv4Addr) -> Locator {
let raw = uc.local_locator();
let port = raw.port;
let ip = Ipv4Addr::new(
raw.address[12],
raw.address[13],
raw.address[14],
raw.address[15],
);
if !ip.is_unspecified() {
return raw;
}
if let Ok(probe) =
std::net::UdpSocket::bind(std::net::SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0))
{
if probe
.connect(std::net::SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 1), 7))
.is_ok()
{
if let Ok(std::net::SocketAddr::V4(local)) = probe.local_addr() {
let resolved = local.ip();
if !resolved.is_unspecified() {
return Locator::udp_v4(resolved.octets(), port);
}
}
}
}
if !hint.is_unspecified() {
return Locator::udp_v4(hint.octets(), port);
}
Locator::udp_v4([127, 0, 0, 1], port)
}
fn qos_duration_from_std(d: Duration) -> QosDuration {
let secs = i32::try_from(d.as_secs()).unwrap_or(i32::MAX);
let nanos = d.subsec_nanos();
let fraction = ((u64::from(nanos)) << 32) / 1_000_000_000u64;
QosDuration {
seconds: secs,
fraction: fraction as u32,
}
}
fn qos_duration_to_nanos(d: zerodds_qos::Duration) -> u64 {
if d.is_infinite() {
return 0;
}
let secs = d.seconds.max(0) as u64;
let frac_nanos = ((d.fraction as u64) * 1_000_000_000u64) >> 32;
secs.saturating_mul(1_000_000_000u64)
.saturating_add(frac_nanos)
}
pub const USER_PAYLOAD_ENCAP: [u8; 4] = [0x00, 0x07, 0x00, 0x00];
const SMALL_FRAME_CAP: usize = 1536;
fn write_user_sample_pooled(
writer: &mut ReliableWriter,
payload: &[u8],
now: Duration,
) -> Result<Vec<zerodds_rtps::message_builder::OutboundDatagram>> {
let mut frame = zerodds_foundation::PoolBuffer::<SMALL_FRAME_CAP>::new();
frame
.extend_from_slice(&USER_PAYLOAD_ENCAP)
.map_err(|_| DdsError::WireError {
message: String::from("user encap framing"),
})?;
frame
.extend_from_slice(payload)
.map_err(|_| DdsError::WireError {
message: String::from("user payload framing"),
})?;
writer
.write_with_heartbeat(frame.as_slice(), now)
.map_err(|_| DdsError::WireError {
message: String::from("user writer encode"),
})
}
#[derive(Clone)]
pub struct RuntimeConfig {
pub tick_period: Duration,
pub spdp_period: Duration,
pub spdp_multicast_group: Ipv4Addr,
pub multicast_interface: Ipv4Addr,
#[cfg(feature = "security")]
pub security: Option<std::sync::Arc<zerodds_security_runtime::SharedSecurityGate>>,
#[cfg(feature = "security")]
pub security_logger: Option<std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin>>,
#[cfg(feature = "security")]
pub interface_bindings: Vec<InterfaceBindingSpec>,
pub announce_secure_endpoints: bool,
pub wlp_period: Duration,
pub participant_lease_duration: Duration,
pub user_data: Vec<u8>,
pub observability: zerodds_foundation::observability::SharedSink,
pub recv_thread_priority: Option<i32>,
pub tick_thread_priority: Option<i32>,
pub recv_thread_cpus: Option<Vec<usize>>,
pub tick_thread_cpus: Option<Vec<usize>>,
pub data_representation_offer: Vec<i16>,
pub data_rep_match_mode: zerodds_rtps::publication_data::data_representation::DataRepMatchMode,
}
#[cfg(feature = "security")]
#[derive(Clone, Debug)]
pub struct InterfaceBindingSpec {
pub name: String,
pub bind_addr: Ipv4Addr,
pub bind_port: u16,
pub kind: NetInterface,
pub subnet: IpRange,
pub default: bool,
}
#[cfg(feature = "security")]
struct InterfaceBinding {
spec: InterfaceBindingSpec,
socket: Arc<UdpTransport>,
}
#[cfg(feature = "security")]
struct OutboundSocketPool {
bindings: Vec<InterfaceBinding>,
default_idx: Option<usize>,
}
#[cfg(feature = "security")]
impl OutboundSocketPool {
fn bind_all(specs: &[InterfaceBindingSpec]) -> Result<Self> {
let mut bindings = Vec::with_capacity(specs.len());
for spec in specs {
let socket = UdpTransport::bind_v4(spec.bind_addr, spec.bind_port).map_err(|_| {
DdsError::TransportError {
label: "interface-binding bind_v4 failed",
}
})?;
let socket = socket
.with_timeout(Some(Duration::from_millis(5)))
.map_err(|_| DdsError::TransportError {
label: "interface-binding set_timeout failed",
})?;
bindings.push(InterfaceBinding {
spec: spec.clone(),
socket: Arc::new(socket),
});
}
let default_idx = bindings.iter().position(|b| b.spec.default);
Ok(Self {
bindings,
default_idx,
})
}
fn route(&self, target: &Locator) -> Option<(&Arc<UdpTransport>, NetInterface)> {
let ip = ipv4_from_locator(target)?;
let addr = core::net::IpAddr::V4(core::net::Ipv4Addr::from(ip));
for b in &self.bindings {
if b.spec.subnet.contains(&addr) {
return Some((&b.socket, b.spec.kind.clone()));
}
}
let idx = self.default_idx?;
let b = self.bindings.get(idx)?;
Some((&b.socket, b.spec.kind.clone()))
}
}
#[cfg(feature = "security")]
fn ipv4_from_locator(loc: &Locator) -> Option<[u8; 4]> {
if loc.kind != LocatorKind::UdpV4 {
return None;
}
Some([
loc.address[12],
loc.address[13],
loc.address[14],
loc.address[15],
])
}
impl core::fmt::Debug for RuntimeConfig {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut dbg = f.debug_struct("RuntimeConfig");
dbg.field("tick_period", &self.tick_period)
.field("spdp_period", &self.spdp_period)
.field("spdp_multicast_group", &self.spdp_multicast_group)
.field("multicast_interface", &self.multicast_interface);
#[cfg(feature = "security")]
{
dbg.field("security", &self.security.as_ref().map(|_| "<gate>"));
dbg.field(
"security_logger",
&self.security_logger.as_ref().map(|_| "<logger>"),
);
}
dbg.finish()
}
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
tick_period: DEFAULT_TICK_PERIOD,
spdp_period: DEFAULT_SPDP_PERIOD,
spdp_multicast_group: Ipv4Addr::from(SPDP_DEFAULT_MULTICAST_ADDRESS),
multicast_interface: Ipv4Addr::UNSPECIFIED,
#[cfg(feature = "security")]
security: None,
#[cfg(feature = "security")]
security_logger: None,
#[cfg(feature = "security")]
interface_bindings: Vec::new(),
announce_secure_endpoints: false,
wlp_period: Duration::ZERO,
participant_lease_duration: Duration::from_secs(100),
user_data: Vec::new(),
observability: zerodds_foundation::observability::null_sink(),
recv_thread_priority: None,
tick_thread_priority: None,
recv_thread_cpus: None,
tick_thread_cpus: None,
data_representation_offer:
zerodds_rtps::publication_data::data_representation::DEFAULT_OFFER.to_vec(),
data_rep_match_mode:
zerodds_rtps::publication_data::data_representation::DataRepMatchMode::default(),
}
}
}
#[cfg(feature = "security")]
fn secure_outbound_bytes(rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
match &rt.config.security {
Some(gate) => gate.transform_outbound(bytes).ok(),
None => Some(bytes.to_vec()),
}
}
#[cfg(not(feature = "security"))]
fn secure_outbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
Some(bytes.to_vec())
}
#[cfg(feature = "security")]
fn secure_inbound_bytes(rt: &DcpsRuntime, bytes: &[u8], iface: &NetInterface) -> Option<Vec<u8>> {
use zerodds_security_runtime::{InboundVerdict, LogLevel};
let Some(gate) = &rt.config.security else {
return Some(bytes.to_vec());
};
let verdict = gate.classify_inbound(bytes, iface);
let category = verdict.category();
let (level, message): (LogLevel, String) = match &verdict {
InboundVerdict::Accept(out) => return Some(out.clone()),
InboundVerdict::Malformed => (
LogLevel::Error,
alloc::format!(
"inbound datagram too short ({} bytes, iface={:?})",
bytes.len(),
iface
),
),
InboundVerdict::LegacyBlocked => (
LogLevel::Error,
alloc::format!(
"legacy plaintext peer on protected domain \
(iface={iface:?}, allow_unauthenticated_participants=false)"
),
),
InboundVerdict::PolicyViolation(msg) => {
(LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
}
InboundVerdict::CryptoError(msg) => {
(LogLevel::Warning, alloc::format!("{msg} [iface={iface:?}]"))
}
};
if let Some(logger) = &rt.config.security_logger {
let mut participant = [0u8; 16];
if bytes.len() >= 20 {
participant[..12].copy_from_slice(&bytes[8..20]);
}
logger.log(level, participant, category, &message);
}
None
}
#[cfg(not(feature = "security"))]
fn secure_inbound_bytes(_rt: &DcpsRuntime, bytes: &[u8]) -> Option<Vec<u8>> {
Some(bytes.to_vec())
}
#[cfg(feature = "security")]
const DEFAULT_INBOUND_IFACE: NetInterface = NetInterface::Wan;
#[cfg(feature = "security")]
fn secure_outbound_for_target(
rt: &DcpsRuntime,
writer_eid: EntityId,
bytes: &[u8],
target: &Locator,
) -> Option<Vec<u8>> {
let Some(gate) = &rt.config.security else {
return Some(bytes.to_vec());
};
let resolved = rt.writer_slot(writer_eid).and_then(|arc| {
arc.lock().ok().and_then(|slot| {
let pk = slot.locator_to_peer.get(target).copied()?;
let lv = slot.reader_protection.get(&pk).copied()?;
Some((pk, lv))
})
});
match resolved {
Some((peer_key, level)) => gate.transform_outbound_for(&peer_key, bytes, level).ok(),
None => gate.transform_outbound(bytes).ok(),
}
}
#[cfg(not(feature = "security"))]
fn secure_outbound_for_target(
_rt: &DcpsRuntime,
_writer_eid: EntityId,
bytes: &[u8],
_target: &Locator,
) -> Option<Vec<u8>> {
Some(bytes.to_vec())
}
#[cfg(feature = "security")]
fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
if let Some(pool) = &rt.outbound_pool {
if let Some((socket, _iface)) = pool.route(target) {
let _ = socket.send(target, bytes);
return;
}
}
let _ = rt.user_unicast.send(target, bytes);
}
#[cfg(not(feature = "security"))]
fn send_on_best_interface(rt: &DcpsRuntime, target: &Locator, bytes: &[u8]) {
let _ = rt.user_unicast.send(target, bytes);
}
struct UserWriterSlot {
writer: ReliableWriter,
topic_name: String,
type_name: String,
reliable: bool,
durability: zerodds_qos::DurabilityKind,
deadline_nanos: u64,
last_write: Option<Duration>,
offered_deadline_missed_count: u64,
liveliness_lost_count: u64,
last_liveliness_assert: Option<Duration>,
offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus,
lifespan_nanos: u64,
sample_insert_times:
alloc::collections::VecDeque<(zerodds_rtps::wire_types::SequenceNumber, Duration)>,
liveliness_kind: zerodds_qos::LivelinessKind,
liveliness_lease_nanos: u64,
ownership: zerodds_qos::OwnershipKind,
partition: Vec<String>,
#[cfg(feature = "security")]
reader_protection: BTreeMap<[u8; 12], ProtectionLevel>,
#[cfg(feature = "security")]
locator_to_peer: BTreeMap<Locator, [u8; 12]>,
type_identifier: zerodds_types::TypeIdentifier,
data_rep_offer_override: Option<Vec<i16>>,
}
pub type UserSampleWithEncap = (UserSample, Option<(Arc<[u8]>, usize)>);
#[derive(Debug, Clone)]
pub enum UserSample {
Alive {
payload: Vec<u8>,
writer_guid: [u8; 16],
writer_strength: i32,
},
Lifecycle {
key_hash: [u8; 16],
kind: zerodds_rtps::history_cache::ChangeKind,
},
}
pub type UserReaderListener = alloc::boxed::Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
struct UserReaderSlot {
reader: ReliableReader,
topic_name: String,
type_name: String,
sample_tx: mpsc::Sender<UserSample>,
async_waker: alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>,
listener: Option<alloc::sync::Arc<UserReaderListener>>,
durability: zerodds_qos::DurabilityKind,
deadline_nanos: u64,
last_sample_received: Option<Duration>,
requested_deadline_missed_count: u64,
requested_incompatible_qos: crate::status::RequestedIncompatibleQosStatus,
sample_lost_count: u64,
sample_rejected: crate::status::SampleRejectedStatus,
liveliness_lease_nanos: u64,
liveliness_kind: zerodds_qos::LivelinessKind,
liveliness_alive_count: u64,
liveliness_not_alive_count: u64,
liveliness_alive: bool,
ownership: zerodds_qos::OwnershipKind,
partition: Vec<String>,
writer_strengths: alloc::collections::BTreeMap<[u8; 16], i32>,
type_identifier: zerodds_types::TypeIdentifier,
type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
}
#[derive(Debug, Clone)]
pub struct UserWriterConfig {
pub topic_name: String,
pub type_name: String,
pub reliable: bool,
pub durability: zerodds_qos::DurabilityKind,
pub deadline: zerodds_qos::DeadlineQosPolicy,
pub lifespan: zerodds_qos::LifespanQosPolicy,
pub liveliness: zerodds_qos::LivelinessQosPolicy,
pub ownership: zerodds_qos::OwnershipKind,
pub ownership_strength: i32,
pub partition: Vec<String>,
pub user_data: Vec<u8>,
pub topic_data: Vec<u8>,
pub group_data: Vec<u8>,
pub type_identifier: zerodds_types::TypeIdentifier,
pub data_representation_offer: Option<Vec<i16>>,
}
#[derive(Debug, Clone)]
pub struct UserReaderConfig {
pub topic_name: String,
pub type_name: String,
pub reliable: bool,
pub durability: zerodds_qos::DurabilityKind,
pub deadline: zerodds_qos::DeadlineQosPolicy,
pub liveliness: zerodds_qos::LivelinessQosPolicy,
pub ownership: zerodds_qos::OwnershipKind,
pub partition: Vec<String>,
pub user_data: Vec<u8>,
pub topic_data: Vec<u8>,
pub group_data: Vec<u8>,
pub type_identifier: zerodds_types::TypeIdentifier,
pub type_consistency: zerodds_types::qos::TypeConsistencyEnforcement,
pub data_representation_offer: Option<Vec<i16>>,
}
fn build_publication_data(
owner_prefix: GuidPrefix,
writer_eid: EntityId,
cfg: &UserWriterConfig,
runtime_offer: &[i16],
) -> zerodds_rtps::publication_data::PublicationBuiltinTopicData {
use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
zerodds_rtps::publication_data::PublicationBuiltinTopicData {
key: Guid::new(owner_prefix, writer_eid),
participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
topic_name: cfg.topic_name.clone(),
type_name: cfg.type_name.clone(),
durability: cfg.durability,
reliability: ReliabilityQosPolicy {
kind: if cfg.reliable {
ReliabilityKind::Reliable
} else {
ReliabilityKind::BestEffort
},
max_blocking_time: QosDuration::from_millis(100_i32),
},
ownership: cfg.ownership,
ownership_strength: cfg.ownership_strength,
liveliness: cfg.liveliness,
deadline: cfg.deadline,
lifespan: cfg.lifespan,
partition: cfg.partition.clone(),
user_data: cfg.user_data.clone(),
topic_data: cfg.topic_data.clone(),
group_data: cfg.group_data.clone(),
type_information: None,
data_representation: cfg
.data_representation_offer
.clone()
.unwrap_or_else(|| runtime_offer.to_vec()),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: cfg.type_identifier.clone(),
}
}
fn build_subscription_data(
owner_prefix: GuidPrefix,
reader_eid: EntityId,
cfg: &UserReaderConfig,
runtime_offer: &[i16],
) -> zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
use zerodds_qos::{ReliabilityKind, ReliabilityQosPolicy};
zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
key: Guid::new(owner_prefix, reader_eid),
participant_key: Guid::new(owner_prefix, EntityId::PARTICIPANT),
topic_name: cfg.topic_name.clone(),
type_name: cfg.type_name.clone(),
durability: cfg.durability,
reliability: ReliabilityQosPolicy {
kind: if cfg.reliable {
ReliabilityKind::Reliable
} else {
ReliabilityKind::BestEffort
},
max_blocking_time: QosDuration::from_millis(100_i32),
},
ownership: cfg.ownership,
liveliness: cfg.liveliness,
deadline: cfg.deadline,
partition: cfg.partition.clone(),
user_data: cfg.user_data.clone(),
topic_data: cfg.topic_data.clone(),
group_data: cfg.group_data.clone(),
type_information: None,
data_representation: cfg
.data_representation_offer
.clone()
.unwrap_or_else(|| runtime_offer.to_vec()),
content_filter: None,
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: cfg.type_identifier.clone(),
}
}
pub struct DcpsRuntime {
pub guid_prefix: GuidPrefix,
pub domain_id: i32,
pub spdp_multicast_rx: Arc<UdpTransport>,
pub spdp_unicast: Arc<UdpTransport>,
pub user_unicast: Arc<UdpTransport>,
spdp_mc_tx: Arc<UdpTransport>,
spdp_beacon: Mutex<SpdpBeacon>,
spdp_reader: SpdpReader,
discovered: Arc<Mutex<DiscoveredParticipantsCache>>,
pub sedp: Arc<Mutex<SedpStack>>,
pub type_lookup_endpoints: TypeLookupEndpoints,
pub type_lookup_server: Arc<Mutex<TypeLookupServer>>,
pub type_lookup_client: Arc<Mutex<TypeLookupClient>>,
pub security_builtin: Mutex<Option<Arc<Mutex<SecurityBuiltinStack>>>>,
start_instant: Instant,
user_writers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserWriterSlot>>>>>,
shm_locators: Arc<RwLock<BTreeMap<EntityId, Vec<u8>>>>,
user_readers: Arc<RwLock<BTreeMap<EntityId, Arc<Mutex<UserReaderSlot>>>>>,
entity_counter: AtomicU32,
pub config: RuntimeConfig,
#[cfg(feature = "security")]
outbound_pool: Option<Arc<OutboundSocketPool>>,
pub wlp: Arc<Mutex<crate::wlp::WlpEndpoint>>,
builtin_sinks: Mutex<Option<crate::builtin_subscriber::BuiltinSinks>>,
ignore_filter: Mutex<Option<crate::participant::IgnoreFilter>>,
stop: Arc<AtomicBool>,
handles: Mutex<Vec<JoinHandle<()>>>,
match_event: Arc<(Mutex<()>, Condvar)>,
ack_event: Arc<(Mutex<()>, Condvar)>,
}
impl core::fmt::Debug for DcpsRuntime {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("DcpsRuntime")
.field("domain_id", &self.domain_id)
.field("guid_prefix", &self.guid_prefix)
.field("spdp_group", &self.config.spdp_multicast_group)
.finish_non_exhaustive()
}
}
type WriterSlotArc = Arc<Mutex<UserWriterSlot>>;
type ReaderSlotArc = Arc<Mutex<UserReaderSlot>>;
impl DcpsRuntime {
fn writer_slot(&self, eid: EntityId) -> Option<WriterSlotArc> {
self.user_writers
.read()
.ok()
.and_then(|w| w.get(&eid).cloned())
}
fn reader_slot(&self, eid: EntityId) -> Option<ReaderSlotArc> {
self.user_readers
.read()
.ok()
.and_then(|r| r.get(&eid).cloned())
}
fn writer_slots_snapshot(&self) -> Vec<(EntityId, WriterSlotArc)> {
match self.user_writers.read() {
Ok(w) => w.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
Err(_) => Vec::new(),
}
}
fn reader_slots_snapshot(&self) -> Vec<(EntityId, ReaderSlotArc)> {
match self.user_readers.read() {
Ok(r) => r.iter().map(|(k, v)| (*k, Arc::clone(v))).collect(),
Err(_) => Vec::new(),
}
}
fn writer_eids(&self) -> Vec<EntityId> {
match self.user_writers.read() {
Ok(w) => w.keys().copied().collect(),
Err(_) => Vec::new(),
}
}
fn reader_eids(&self) -> Vec<EntityId> {
match self.user_readers.read() {
Ok(r) => r.keys().copied().collect(),
Err(_) => Vec::new(),
}
}
pub fn start(
domain_id: i32,
guid_prefix: GuidPrefix,
config: RuntimeConfig,
) -> Result<Arc<Self>> {
let spdp_port = u16::try_from(spdp_multicast_port(domain_id as u32)).map_err(|_| {
DdsError::BadParameter {
what: "domain_id too large for SPDP port mapping",
}
})?;
let spdp_mc = UdpTransport::bind_multicast_v4(
config.spdp_multicast_group,
spdp_port,
config.multicast_interface,
)
.map_err(|_| DdsError::TransportError {
label: "spdp multicast bind",
})?
.with_timeout(Some(Duration::from_secs(1)))
.map_err(|_| DdsError::TransportError {
label: "spdp multicast set_timeout",
})?;
let spdp_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
.map_err(|_| DdsError::TransportError {
label: "spdp unicast bind",
})?
.with_timeout(Some(Duration::from_secs(1)))
.map_err(|_| DdsError::TransportError {
label: "spdp unicast set_timeout",
})?;
let user_uc = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0)
.map_err(|_| DdsError::TransportError {
label: "user unicast bind",
})?
.with_timeout(Some(Duration::from_secs(1)))
.map_err(|_| DdsError::TransportError {
label: "user unicast set_timeout",
})?;
let spdp_mc_tx = UdpTransport::bind_v4(Ipv4Addr::UNSPECIFIED, 0).map_err(|_| {
DdsError::TransportError {
label: "spdp mc-tx bind",
}
})?;
let stop = Arc::new(AtomicBool::new(false));
let user_locator = announce_locator(&user_uc, config.multicast_interface);
let spdp_uc_locator = announce_locator(&spdp_uc, config.multicast_interface);
let participant_data = ParticipantBuiltinTopicData {
guid: Guid::new(guid_prefix, EntityId::PARTICIPANT),
protocol_version: ProtocolVersion::V2_5,
vendor_id: VendorId::ZERODDS,
default_unicast_locator: Some(user_locator),
default_multicast_locator: None,
metatraffic_unicast_locator: Some(spdp_uc_locator),
metatraffic_multicast_locator: Some(Locator {
kind: LocatorKind::UdpV4,
port: u32::from(spdp_port),
address: {
let mut a = [0u8; 16];
a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
a
},
}),
domain_id: Some(domain_id as u32),
builtin_endpoint_set: {
let mut mask = endpoint_flag::ALL_STANDARD;
if config.announce_secure_endpoints {
mask |= endpoint_flag::ALL_SECURE;
}
mask
},
lease_duration: qos_duration_from_std(config.participant_lease_duration),
user_data: config.user_data.clone(),
properties: Default::default(),
identity_token: None,
permissions_token: None,
identity_status_token: None,
sig_algo_info: None,
kx_algo_info: None,
sym_cipher_algo_info: None,
};
let beacon = SpdpBeacon::new(participant_data);
let sedp = SedpStack::new(guid_prefix, VendorId::ZERODDS);
#[cfg(feature = "security")]
let outbound_pool = if config.interface_bindings.is_empty() {
None
} else {
Some(Arc::new(OutboundSocketPool::bind_all(
&config.interface_bindings,
)?))
};
let wlp_tick_period = if config.wlp_period.is_zero() {
config.participant_lease_duration / 3
} else {
config.wlp_period
};
let wlp = crate::wlp::WlpEndpoint::new(guid_prefix, VendorId::ZERODDS, wlp_tick_period);
let rt = Arc::new(Self {
guid_prefix,
domain_id,
spdp_multicast_rx: Arc::new(spdp_mc),
spdp_unicast: Arc::new(spdp_uc),
user_unicast: Arc::new(user_uc),
spdp_mc_tx: Arc::new(spdp_mc_tx),
spdp_beacon: Mutex::new(beacon),
spdp_reader: SpdpReader::new(),
discovered: Arc::new(Mutex::new(DiscoveredParticipantsCache::new())),
sedp: Arc::new(Mutex::new(sedp)),
type_lookup_endpoints: TypeLookupEndpoints::new(guid_prefix),
type_lookup_server: Arc::new(Mutex::new(TypeLookupServer::new())),
type_lookup_client: Arc::new(Mutex::new(TypeLookupClient::new())),
security_builtin: Mutex::new(None),
start_instant: Instant::now(),
user_writers: Arc::new(RwLock::new(BTreeMap::new())),
shm_locators: Arc::new(RwLock::new(BTreeMap::new())),
user_readers: Arc::new(RwLock::new(BTreeMap::new())),
entity_counter: AtomicU32::new(1),
config,
stop: stop.clone(),
handles: Mutex::new(Vec::new()),
match_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
ack_event: Arc::new((Mutex::new(()), std::sync::Condvar::new())),
#[cfg(feature = "security")]
outbound_pool,
wlp: Arc::new(Mutex::new(wlp)),
builtin_sinks: Mutex::new(None),
ignore_filter: Mutex::new(None),
});
let mut handles_init: Vec<JoinHandle<()>> = Vec::with_capacity(4);
let rt_recv_spdp_mc = Arc::clone(&rt);
let stop_recv_spdp_mc = stop.clone();
handles_init.push(
thread::Builder::new()
.name(String::from("zdds-recv-spdp-mc"))
.spawn(move || recv_spdp_multicast_loop(rt_recv_spdp_mc, stop_recv_spdp_mc))
.map_err(|_| DdsError::PreconditionNotMet {
reason: "spawn zdds-recv-spdp-mc thread",
})?,
);
let rt_recv_meta = Arc::clone(&rt);
let stop_recv_meta = stop.clone();
handles_init.push(
thread::Builder::new()
.name(String::from("zdds-recv-meta"))
.spawn(move || recv_metatraffic_loop(rt_recv_meta, stop_recv_meta))
.map_err(|_| DdsError::PreconditionNotMet {
reason: "spawn zdds-recv-meta thread",
})?,
);
let rt_recv_user = Arc::clone(&rt);
let stop_recv_user = stop.clone();
handles_init.push(
thread::Builder::new()
.name(String::from("zdds-recv-user"))
.spawn(move || recv_user_data_loop(rt_recv_user, stop_recv_user))
.map_err(|_| DdsError::PreconditionNotMet {
reason: "spawn zdds-recv-user thread",
})?,
);
let rt_tick = Arc::clone(&rt);
let stop_tick = stop;
handles_init.push(
thread::Builder::new()
.name(String::from("zdds-tick"))
.spawn(move || tick_loop(rt_tick, stop_tick))
.map_err(|_| DdsError::PreconditionNotMet {
reason: "spawn zdds-tick thread",
})?,
);
let mut guard = rt
.handles
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "runtime handles mutex poisoned",
})?;
*guard = handles_init;
drop(guard);
Ok(rt)
}
#[must_use]
pub fn user_locator(&self) -> zerodds_rtps::wire_types::Locator {
self.user_unicast.local_locator()
}
#[must_use]
pub fn spdp_unicast_locator(&self) -> zerodds_rtps::wire_types::Locator {
self.spdp_unicast.local_locator()
}
#[must_use]
pub fn announced_builtin_endpoint_set(&self) -> u32 {
self.spdp_beacon
.lock()
.map(|b| b.data.builtin_endpoint_set)
.unwrap_or(0)
}
pub fn register_type_object(
&self,
obj: zerodds_types::type_object::TypeObject,
) -> Result<zerodds_types::EquivalenceHash> {
let hash = zerodds_types::compute_hash(&obj).map_err(|_| DdsError::PreconditionNotMet {
reason: "type hash computation failed",
})?;
let mut server =
self.type_lookup_server
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup_server mutex poisoned",
})?;
match obj {
zerodds_types::type_object::TypeObject::Minimal(m) => {
server.registry.insert_minimal(hash, m);
}
zerodds_types::type_object::TypeObject::Complete(c) => {
server.registry.insert_complete(hash, c);
}
_ => {
return Err(DdsError::PreconditionNotMet {
reason: "unknown TypeObject variant",
});
}
}
Ok(hash)
}
pub fn send_type_lookup_request(
&self,
peer: zerodds_rtps::wire_types::GuidPrefix,
type_hashes: &[zerodds_types::EquivalenceHash],
) -> Result<Option<zerodds_discovery::type_lookup::RequestId>> {
use alloc::sync::Arc as AllocArc;
use zerodds_discovery::type_lookup::request_types_payload;
use zerodds_rtps::datagram::encode_data_datagram;
use zerodds_rtps::header::RtpsHeader;
use zerodds_rtps::submessages::DataSubmessage;
use zerodds_rtps::wire_types::{ProtocolVersion, SequenceNumber};
let target = {
let discovered = self
.discovered
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "discovered mutex poisoned",
})?;
let Some(dp) = discovered.get(&peer) else {
return Ok(None);
};
dp.data
.default_unicast_locator
.or(dp.data.metatraffic_unicast_locator)
};
let Some(target) = target else {
return Ok(None);
};
let mut client =
self.type_lookup_client
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup_client mutex poisoned",
})?;
let type_ids: alloc::vec::Vec<zerodds_types::TypeIdentifier> = type_hashes
.iter()
.map(|h| zerodds_types::TypeIdentifier::EquivalenceHashMinimal(*h))
.collect();
let server_for_cb = Arc::clone(&self.type_lookup_server);
let cb = Box::new(
move |reply: zerodds_discovery::type_lookup::TypeLookupReply| {
let zerodds_discovery::type_lookup::TypeLookupReply::Types(types_reply) = reply
else {
return;
};
let Ok(mut server) = server_for_cb.lock() else {
return;
};
for t in &types_reply.types {
match t {
zerodds_types::type_lookup::ReplyTypeObject::Minimal(m) => {
let to = zerodds_types::type_object::TypeObject::Minimal(m.clone());
if let Ok(h) = zerodds_types::compute_hash(&to) {
server.registry.insert_minimal(h, m.clone());
}
}
zerodds_types::type_lookup::ReplyTypeObject::Complete(c) => {
let to = zerodds_types::type_object::TypeObject::Complete(c.clone());
if let Ok(h) = zerodds_types::compute_hash(&to) {
server.registry.insert_complete(h, c.clone());
}
}
}
}
},
);
let request_id = client.request_types(type_ids.clone(), cb);
drop(client);
let body = request_types_payload(&type_ids).map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup request payload encode failed",
})?;
let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
payload.extend_from_slice(&body);
let id_u64 = request_id.0;
let sn =
SequenceNumber::from_high_low((id_u64 >> 32) as i32, (id_u64 & 0xFFFF_FFFF) as u32);
let header = RtpsHeader {
protocol_version: ProtocolVersion::CURRENT,
vendor_id: VendorId::ZERODDS,
guid_prefix: self.guid_prefix,
};
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::TL_SVC_REQ_READER,
writer_id: EntityId::TL_SVC_REQ_WRITER,
writer_sn: sn,
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: AllocArc::from(payload.into_boxed_slice()),
};
let datagram =
encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup request datagram encode failed",
})?;
if target.kind == LocatorKind::UdpV4 {
let _ = self.user_unicast.send(&target, &datagram);
}
Ok(Some(request_id))
}
pub fn enable_security_builtins(
&self,
vendor_id: VendorId,
) -> Arc<Mutex<SecurityBuiltinStack>> {
let mut slot = match self.security_builtin.lock() {
Ok(g) => g,
Err(_) => {
return Arc::new(Mutex::new(SecurityBuiltinStack::new(
self.guid_prefix,
vendor_id,
)));
}
};
if let Some(existing) = slot.as_ref() {
return Arc::clone(existing);
}
let stack = Arc::new(Mutex::new(SecurityBuiltinStack::new(
self.guid_prefix,
vendor_id,
)));
if let Ok(cache) = self.discovered.lock() {
if let Ok(mut s) = stack.lock() {
for peer in cache.iter() {
s.handle_remote_endpoints(peer);
}
}
}
*slot = Some(Arc::clone(&stack));
stack
}
#[must_use]
pub fn security_builtin_snapshot(&self) -> Option<Arc<Mutex<SecurityBuiltinStack>>> {
self.security_builtin.lock().ok()?.as_ref().map(Arc::clone)
}
pub fn assert_liveliness(&self) {
if let Ok(mut wlp) = self.wlp.lock() {
wlp.assert_participant();
}
}
pub fn assert_writer_liveliness(&self, topic_token: Vec<u8>) {
if let Ok(mut wlp) = self.wlp.lock() {
wlp.assert_topic(topic_token);
}
}
#[must_use]
pub fn peer_liveliness_last_seen(&self, prefix: &GuidPrefix) -> Option<Duration> {
self.wlp
.lock()
.ok()
.and_then(|w| w.peer_state(prefix).map(|s| s.last_seen))
}
#[must_use]
pub fn peer_capabilities(
&self,
prefix: &GuidPrefix,
) -> Option<zerodds_discovery::PeerCapabilities> {
self.discovered
.lock()
.ok()
.and_then(|d| d.get(prefix).map(|p| p.data.builtin_endpoint_set))
.map(zerodds_discovery::PeerCapabilities::from_bits)
}
#[must_use]
pub fn discovered_participants(&self) -> Vec<DiscoveredParticipant> {
self.discovered
.lock()
.map(|cache| cache.iter().cloned().collect())
.unwrap_or_default()
}
pub fn attach_builtin_sinks(&self, sinks: crate::builtin_subscriber::BuiltinSinks) {
if let Ok(mut guard) = self.builtin_sinks.lock() {
*guard = Some(sinks);
}
}
pub(crate) fn builtin_sinks_snapshot(&self) -> Option<crate::builtin_subscriber::BuiltinSinks> {
self.builtin_sinks.lock().ok().and_then(|g| g.clone())
}
pub fn attach_ignore_filter(&self, filter: crate::participant::IgnoreFilter) {
if let Ok(mut guard) = self.ignore_filter.lock() {
*guard = Some(filter);
}
}
pub(crate) fn ignore_filter_snapshot(&self) -> Option<crate::participant::IgnoreFilter> {
self.ignore_filter.lock().ok().and_then(|g| g.clone())
}
pub fn announce_publication(
&self,
data: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
) -> Result<()> {
let shm = self.shm_locator(data.key.entity_id);
let datagrams = {
let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
reason: "sedp poisoned",
})?;
let res = if let Some(ref bytes) = shm {
sedp.announce_publication_with_shm_locator(data, bytes)
} else {
sedp.announce_publication(data)
};
res.map_err(|_| DdsError::WireError {
message: alloc::string::String::from("sedp announce_publication"),
})?
};
for dg in datagrams {
if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = self.spdp_mc_tx.send(t, &secured);
}
}
}
}
Ok(())
}
pub fn announce_subscription(
&self,
data: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
) -> Result<()> {
let datagrams = {
let mut sedp = self.sedp.lock().map_err(|_| DdsError::PreconditionNotMet {
reason: "sedp poisoned",
})?;
sedp.announce_subscription(data)
.map_err(|_| DdsError::WireError {
message: alloc::string::String::from("sedp announce_subscription"),
})?
};
for dg in datagrams {
if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = self.spdp_mc_tx.send(t, &secured);
}
}
}
}
Ok(())
}
pub fn register_user_writer(&self, cfg: UserWriterConfig) -> Result<EntityId> {
let now = self.start_instant.elapsed();
let key = self.next_entity_key();
let eid = EntityId::user_writer_with_key(key);
let writer = ReliableWriter::new(ReliableWriterConfig {
guid: Guid::new(self.guid_prefix, eid),
vendor_id: VendorId::ZERODDS,
reader_proxies: Vec::new(),
max_samples: 1024,
history_kind: HistoryKind::KeepLast { depth: 32 },
heartbeat_period: DEFAULT_HEARTBEAT_PERIOD,
fragment_size: DEFAULT_FRAGMENT_SIZE,
mtu: DEFAULT_MTU,
});
let pub_data = build_publication_data(
self.guid_prefix,
eid,
&cfg,
&self.config.data_representation_offer,
);
self.user_writers
.write()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "user_writers poisoned",
})?
.insert(
eid,
Arc::new(Mutex::new(UserWriterSlot {
writer,
topic_name: cfg.topic_name.clone(),
type_name: cfg.type_name.clone(),
reliable: cfg.reliable,
durability: cfg.durability,
deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
last_write: None,
offered_deadline_missed_count: 0,
liveliness_lost_count: 0,
last_liveliness_assert: Some(now),
offered_incompatible_qos: crate::status::OfferedIncompatibleQosStatus::default(
),
lifespan_nanos: qos_duration_to_nanos(cfg.lifespan.duration),
sample_insert_times: alloc::collections::VecDeque::new(),
liveliness_kind: cfg.liveliness.kind,
liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
ownership: cfg.ownership,
partition: cfg.partition.clone(),
#[cfg(feature = "security")]
reader_protection: BTreeMap::new(),
#[cfg(feature = "security")]
locator_to_peer: BTreeMap::new(),
type_identifier: cfg.type_identifier.clone(),
data_rep_offer_override: cfg.data_representation_offer.clone(),
})),
);
let _ = self.announce_publication(&pub_data);
self.match_local_writer_against_cache(eid);
self.config.observability.record(
&zerodds_foundation::observability::Event::new(
zerodds_foundation::observability::Level::Info,
zerodds_foundation::observability::Component::Dcps,
"user_writer.created",
)
.with_attr("topic", cfg.topic_name.as_str())
.with_attr("type", cfg.type_name.as_str())
.with_attr("reliable", if cfg.reliable { "true" } else { "false" }),
);
Ok(eid)
}
pub fn register_user_reader(
&self,
cfg: UserReaderConfig,
) -> Result<(EntityId, mpsc::Receiver<UserSample>)> {
let now = self.start_instant.elapsed();
let key = self.next_entity_key();
let eid = EntityId::user_reader_with_key(key);
let reader = ReliableReader::new(ReliableReaderConfig {
guid: Guid::new(self.guid_prefix, eid),
vendor_id: VendorId::ZERODDS,
writer_proxies: Vec::new(),
max_samples_per_proxy: 256,
heartbeat_response_delay:
zerodds_rtps::reliable_reader::DEFAULT_HEARTBEAT_RESPONSE_DELAY,
assembler_caps: AssemblerCaps::default(),
});
let (tx, rx) = mpsc::channel();
let sub_data = build_subscription_data(
self.guid_prefix,
eid,
&cfg,
&self.config.data_representation_offer,
);
self.user_readers
.write()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "user_readers poisoned",
})?
.insert(
eid,
Arc::new(Mutex::new(UserReaderSlot {
reader,
topic_name: cfg.topic_name.clone(),
type_name: cfg.type_name.clone(),
sample_tx: tx,
async_waker: Arc::new(std::sync::Mutex::new(None)),
listener: None,
durability: cfg.durability,
deadline_nanos: qos_duration_to_nanos(cfg.deadline.period),
last_sample_received: Some(now),
requested_deadline_missed_count: 0,
requested_incompatible_qos:
crate::status::RequestedIncompatibleQosStatus::default(),
sample_lost_count: 0,
sample_rejected: crate::status::SampleRejectedStatus::default(),
liveliness_lease_nanos: qos_duration_to_nanos(cfg.liveliness.lease_duration),
liveliness_kind: cfg.liveliness.kind,
liveliness_alive_count: 0,
liveliness_not_alive_count: 0,
liveliness_alive: true,
ownership: cfg.ownership,
partition: cfg.partition.clone(),
writer_strengths: alloc::collections::BTreeMap::new(),
type_identifier: cfg.type_identifier.clone(),
type_consistency: cfg.type_consistency,
})),
);
let _ = self.announce_subscription(&sub_data);
self.match_local_reader_against_cache(eid);
self.config.observability.record(
&zerodds_foundation::observability::Event::new(
zerodds_foundation::observability::Level::Info,
zerodds_foundation::observability::Component::Dcps,
"user_reader.created",
)
.with_attr("topic", cfg.topic_name.as_str())
.with_attr("type", cfg.type_name.as_str()),
);
Ok((eid, rx))
}
fn match_local_writer_against_cache(&self, eid: EntityId) {
let (topic, type_name) = {
let Some(arc) = self.writer_slot(eid) else {
return;
};
let Ok(s) = arc.lock() else {
return;
};
(s.topic_name.clone(), s.type_name.clone())
};
let matches: Vec<_> = {
let sedp = match self.sedp.lock() {
Ok(s) => s,
Err(_) => return,
};
sedp.cache()
.match_subscriptions(&topic, &type_name)
.map(|s| s.data.clone())
.collect()
};
for sub in matches {
self.wire_writer_to_remote_reader(eid, &sub);
}
}
fn match_local_reader_against_cache(&self, eid: EntityId) {
let (topic, type_name) = {
let Some(arc) = self.reader_slot(eid) else {
return;
};
let Ok(s) = arc.lock() else {
return;
};
(s.topic_name.clone(), s.type_name.clone())
};
let matches: Vec<_> = {
let sedp = match self.sedp.lock() {
Ok(s) => s,
Err(_) => return,
};
sedp.cache()
.match_publications(&topic, &type_name)
.map(|p| p.data.clone())
.collect()
};
for pubd in matches {
self.wire_reader_to_remote_writer(eid, &pubd);
}
}
fn wire_writer_to_remote_reader(
&self,
writer_eid: EntityId,
sub: &zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData,
) {
let locators = remote_user_locators(sub.key.prefix, &self.discovered);
if locators.is_empty() {
return;
}
if let Some(slot_arc) = self.writer_slot(writer_eid) {
if let Ok(mut slot) = slot_arc.lock() {
let slot = &mut *slot;
use crate::psm_constants::qos_policy_id as qid;
use crate::status::bump_policy_count;
let bump = |slot: &mut UserWriterSlot, pid: u32| {
slot.offered_incompatible_qos.total_count =
slot.offered_incompatible_qos.total_count.saturating_add(1);
slot.offered_incompatible_qos.last_policy_id = pid;
bump_policy_count(&mut slot.offered_incompatible_qos.policies, pid);
};
if (slot.durability as u8) < (sub.durability as u8) {
bump(slot, qid::DURABILITY);
return;
}
if !deadline_compat(
slot.deadline_nanos,
qos_duration_to_nanos(sub.deadline.period),
) {
bump(slot, qid::DEADLINE);
return;
}
if (slot.liveliness_kind as u8) < (sub.liveliness.kind as u8) {
bump(slot, qid::LIVELINESS);
return;
}
if !deadline_compat(
slot.liveliness_lease_nanos,
qos_duration_to_nanos(sub.liveliness.lease_duration),
) {
bump(slot, qid::LIVELINESS);
return;
}
if slot.ownership != sub.ownership {
bump(slot, qid::OWNERSHIP);
return;
}
if !partitions_overlap(&slot.partition, &sub.partition) {
bump(slot, qid::PARTITION);
return;
}
if slot.type_identifier != zerodds_types::TypeIdentifier::None
&& sub.type_identifier != zerodds_types::TypeIdentifier::None
{
let registry = zerodds_types::resolve::TypeRegistry::new();
let tce = zerodds_types::qos::TypeConsistencyEnforcement::default();
let matcher = zerodds_types::type_matcher::TypeMatcher::new(&tce);
if !matcher
.match_types(&slot.type_identifier, &sub.type_identifier, ®istry)
.is_match()
{
bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
return;
}
}
let mut proxy = zerodds_rtps::reader_proxy::ReaderProxy::new(
sub.key,
locators.clone(),
Vec::new(),
slot.reliable,
);
{
use zerodds_rtps::publication_data::data_representation as dr;
let writer_offered: Vec<i16> = slot
.data_rep_offer_override
.clone()
.unwrap_or_else(|| self.config.data_representation_offer.clone());
let mode = self.config.data_rep_match_mode;
if let Some(negotiated) =
dr::negotiate(&writer_offered, &sub.data_representation, mode)
{
proxy.set_negotiated_data_representation(negotiated);
} else {
}
}
if slot.durability == zerodds_qos::DurabilityKind::Volatile {
if let Some(max) = slot.writer.cache().max_sn() {
proxy.skip_samples_up_to(max);
}
}
slot.writer.add_reader_proxy(proxy);
self.match_event.1.notify_all();
#[cfg(feature = "security")]
{
let peer_key = sub.key.prefix.0;
let level = EndpointProtection::from_info(sub.security_info.as_ref()).level;
slot.reader_protection.insert(peer_key, level);
for loc in &locators {
slot.locator_to_peer.insert(*loc, peer_key);
}
}
}
}
self.config.observability.record(
&zerodds_foundation::observability::Event::new(
zerodds_foundation::observability::Level::Info,
zerodds_foundation::observability::Component::Discovery,
"writer.matched_remote_reader",
)
.with_attr("writer_eid", alloc::format!("{writer_eid:?}")),
);
}
fn wire_reader_to_remote_writer(
&self,
reader_eid: EntityId,
pubd: &zerodds_rtps::publication_data::PublicationBuiltinTopicData,
) {
let locators = remote_user_locators(pubd.key.prefix, &self.discovered);
if locators.is_empty() {
return;
}
if let Some(slot_arc) = self.reader_slot(reader_eid) {
if let Ok(mut slot) = slot_arc.lock() {
let slot = &mut *slot;
use crate::psm_constants::qos_policy_id as qid;
use crate::status::bump_policy_count;
let bump = |slot: &mut UserReaderSlot, pid: u32| {
slot.requested_incompatible_qos.total_count = slot
.requested_incompatible_qos
.total_count
.saturating_add(1);
slot.requested_incompatible_qos.last_policy_id = pid;
bump_policy_count(&mut slot.requested_incompatible_qos.policies, pid);
};
if (pubd.durability as u8) < (slot.durability as u8) {
bump(slot, qid::DURABILITY);
return;
}
if !deadline_compat(
qos_duration_to_nanos(pubd.deadline.period),
slot.deadline_nanos,
) {
bump(slot, qid::DEADLINE);
return;
}
if (pubd.liveliness.kind as u8) < (slot.liveliness_kind as u8) {
bump(slot, qid::LIVELINESS);
return;
}
if !deadline_compat(
qos_duration_to_nanos(pubd.liveliness.lease_duration),
slot.liveliness_lease_nanos,
) {
bump(slot, qid::LIVELINESS);
return;
}
if pubd.ownership != slot.ownership {
bump(slot, qid::OWNERSHIP);
return;
}
if !partitions_overlap(&pubd.partition, &slot.partition) {
bump(slot, qid::PARTITION);
return;
}
if slot.type_identifier != zerodds_types::TypeIdentifier::None
&& pubd.type_identifier != zerodds_types::TypeIdentifier::None
{
let registry = zerodds_types::resolve::TypeRegistry::new();
let matcher =
zerodds_types::type_matcher::TypeMatcher::new(&slot.type_consistency);
if !matcher
.match_types(&pubd.type_identifier, &slot.type_identifier, ®istry)
.is_match()
{
bump(slot, qid::TYPE_CONSISTENCY_ENFORCEMENT);
return;
}
}
slot.reader
.add_writer_proxy(zerodds_rtps::writer_proxy::WriterProxy::new(
pubd.key,
locators,
Vec::new(),
true,
));
self.match_event.1.notify_all();
slot.writer_strengths
.insert(pubd.key.to_bytes(), pubd.ownership_strength);
}
}
}
pub fn write_user_sample(&self, eid: EntityId, payload: Vec<u8>) -> Result<()> {
let now = self.start_instant.elapsed();
let total = USER_PAYLOAD_ENCAP.len() + payload.len();
let out_datagrams = {
let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
what: "unknown writer entity id",
})?;
let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
reason: "user_writer slot poisoned",
})?;
slot.last_write = Some(now);
let dgs = if total <= SMALL_FRAME_CAP {
write_user_sample_pooled(&mut slot.writer, &payload, now)?
} else {
let mut framed = Vec::with_capacity(total);
framed.extend_from_slice(&USER_PAYLOAD_ENCAP);
framed.extend_from_slice(&payload);
slot.writer
.write_with_heartbeat(&framed, now)
.map_err(|_| DdsError::WireError {
message: String::from("user writer encode"),
})?
};
if slot.lifespan_nanos != 0 {
if let Some(sn) = slot.writer.cache().max_sn() {
slot.sample_insert_times.push_back((sn, now));
}
}
dgs
};
for dg in out_datagrams {
if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = self.user_unicast.send(t, &secured);
}
}
}
}
#[cfg(feature = "inspect")]
{
self.dispatch_inspect_dcps_tap(eid, &payload);
}
Ok(())
}
#[cfg(feature = "inspect")]
fn dispatch_inspect_dcps_tap(&self, eid: EntityId, payload: &[u8]) {
let Some(slot_arc) = self.writer_slot(eid) else {
return;
};
let topic = match slot_arc.lock() {
Ok(slot) => slot.topic_name.clone(),
Err(_) => return,
};
let ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0);
let mut corr: u64 = 0;
for (i, byte) in eid.entity_key.iter().enumerate() {
corr |= u64::from(*byte) << (i * 8);
}
corr |= u64::from(eid.entity_kind as u8) << 24;
let frame = zerodds_inspect_endpoint::Frame::dcps(topic, ts_ns, corr, payload.to_vec());
zerodds_inspect_endpoint::tap::dispatch(&frame);
}
pub fn write_user_lifecycle(
&self,
eid: EntityId,
key_hash: [u8; 16],
status_bits: u32,
) -> Result<()> {
let out_datagrams = {
let slot_arc = self.writer_slot(eid).ok_or(DdsError::BadParameter {
what: "unknown writer entity id",
})?;
let mut slot = slot_arc.lock().map_err(|_| DdsError::PreconditionNotMet {
reason: "user_writer slot poisoned",
})?;
slot.writer
.write_lifecycle(key_hash, status_bits)
.map_err(|_| DdsError::WireError {
message: String::from("user writer lifecycle encode"),
})?
};
for dg in out_datagrams {
if let Some(secured) = secure_outbound_bytes(self, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = self.user_unicast.send(t, &secured);
}
}
}
}
Ok(())
}
fn next_entity_key(&self) -> [u8; 3] {
let n = self.entity_counter.fetch_add(1, Ordering::Relaxed);
[(n >> 16) as u8, (n >> 8) as u8, n as u8]
}
#[must_use]
pub fn discovered_publications_count(&self) -> usize {
self.sedp
.lock()
.map(|s| s.cache().publications_len())
.unwrap_or(0)
}
#[must_use]
pub fn discovered_subscriptions_count(&self) -> usize {
self.sedp
.lock()
.map(|s| s.cache().subscriptions_len())
.unwrap_or(0)
}
#[must_use]
pub fn user_writer_matched_count(&self, eid: EntityId) -> usize {
self.writer_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.writer.reader_proxy_count()))
.unwrap_or(0)
}
#[must_use]
pub fn user_writer_matched_subscription_handles(
&self,
eid: EntityId,
) -> Vec<crate::instance_handle::InstanceHandle> {
self.writer_slot(eid)
.and_then(|arc| {
arc.lock().ok().map(|s| {
s.writer
.reader_proxies()
.iter()
.map(|p| {
crate::instance_handle::InstanceHandle::from_guid(p.remote_reader_guid)
})
.collect()
})
})
.unwrap_or_default()
}
#[must_use]
pub fn user_reader_matched_publication_handles(
&self,
eid: EntityId,
) -> Vec<crate::instance_handle::InstanceHandle> {
self.reader_slot(eid)
.and_then(|arc| {
arc.lock().ok().map(|s| {
s.reader
.writer_proxies()
.iter()
.map(|p| {
crate::instance_handle::InstanceHandle::from_guid(
p.proxy.remote_writer_guid,
)
})
.collect()
})
})
.unwrap_or_default()
}
#[must_use]
pub fn user_writer_offered_deadline_missed(&self, eid: EntityId) -> u64 {
self.writer_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.offered_deadline_missed_count))
.unwrap_or(0)
}
#[must_use]
pub fn user_reader_requested_deadline_missed(&self, eid: EntityId) -> u64 {
self.reader_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.requested_deadline_missed_count))
.unwrap_or(0)
}
#[must_use]
pub fn user_reader_liveliness_status(&self, eid: EntityId) -> (bool, u64, u64) {
self.reader_slot(eid)
.and_then(|arc| {
arc.lock().ok().map(|s| {
(
s.liveliness_alive,
s.liveliness_alive_count,
s.liveliness_not_alive_count,
)
})
})
.unwrap_or((false, 0, 0))
}
#[must_use]
pub fn user_writer_liveliness_lost(&self, eid: EntityId) -> u64 {
self.writer_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.liveliness_lost_count))
.unwrap_or(0)
}
#[must_use]
pub fn user_writer_offered_incompatible_qos(
&self,
eid: EntityId,
) -> crate::status::OfferedIncompatibleQosStatus {
self.writer_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.offered_incompatible_qos.clone()))
.unwrap_or_default()
}
#[must_use]
pub fn user_reader_requested_incompatible_qos(
&self,
eid: EntityId,
) -> crate::status::RequestedIncompatibleQosStatus {
self.reader_slot(eid)
.and_then(|arc| {
arc.lock()
.ok()
.map(|s| s.requested_incompatible_qos.clone())
})
.unwrap_or_default()
}
#[must_use]
pub fn user_reader_sample_lost(&self, eid: EntityId) -> u64 {
self.reader_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.sample_lost_count))
.unwrap_or(0)
}
#[must_use]
pub fn user_reader_sample_rejected(
&self,
eid: EntityId,
) -> crate::status::SampleRejectedStatus {
self.reader_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.sample_rejected))
.unwrap_or_default()
}
pub fn record_sample_lost(&self, eid: EntityId, count: u32) {
if count == 0 {
return;
}
if let Some(arc) = self.reader_slot(eid) {
if let Ok(mut slot) = arc.lock() {
slot.sample_lost_count = slot.sample_lost_count.saturating_add(u64::from(count));
}
}
}
pub fn record_sample_rejected(
&self,
eid: EntityId,
kind: crate::status::SampleRejectedStatusKind,
instance: crate::instance_handle::InstanceHandle,
) {
if let Some(arc) = self.reader_slot(eid) {
if let Ok(mut slot) = arc.lock() {
slot.sample_rejected.total_count =
slot.sample_rejected.total_count.saturating_add(1);
slot.sample_rejected.last_reason = kind;
slot.sample_rejected.last_instance_handle = instance;
}
}
}
pub fn assert_writer_liveliness_eid(&self, eid: EntityId) {
let now = self.start_instant.elapsed();
if let Some(arc) = self.writer_slot(eid) {
if let Ok(mut slot) = arc.lock() {
slot.last_liveliness_assert = Some(now);
if slot.liveliness_kind == zerodds_qos::LivelinessKind::Automatic {
slot.last_write = Some(now);
}
}
}
}
#[must_use]
pub fn user_writer_all_acknowledged(&self, eid: EntityId) -> bool {
self.writer_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.writer.all_samples_acknowledged()))
.unwrap_or(true)
}
pub fn register_user_reader_waker(&self, eid: EntityId, waker: Option<core::task::Waker>) {
if let Some(arc) = self.reader_slot(eid) {
if let Ok(slot) = arc.lock() {
if let Ok(mut g) = slot.async_waker.lock() {
*g = waker;
}
}
}
}
pub fn set_user_reader_listener(
&self,
eid: EntityId,
listener: Option<UserReaderListener>,
) -> bool {
let Some(arc) = self.reader_slot(eid) else {
return false;
};
let Ok(mut slot) = arc.lock() else {
return false;
};
slot.listener = listener.map(alloc::sync::Arc::new);
true
}
#[must_use]
pub fn user_reader_matched_count(&self, eid: EntityId) -> usize {
self.reader_slot(eid)
.and_then(|arc| arc.lock().ok().map(|s| s.reader.writer_proxy_count()))
.unwrap_or(0)
}
#[cfg(feature = "std")]
pub fn wait_match_event(&self, timeout: core::time::Duration) -> bool {
let (lock, cvar) = &*self.match_event;
let Ok(guard) = lock.lock() else { return false };
match cvar.wait_timeout(guard, timeout) {
Ok((_, t)) => !t.timed_out(),
Err(_) => false,
}
}
#[cfg(feature = "std")]
pub fn wait_ack_event(&self, timeout: core::time::Duration) -> bool {
let (lock, cvar) = &*self.ack_event;
let Ok(guard) = lock.lock() else { return false };
match cvar.wait_timeout(guard, timeout) {
Ok((_, t)) => !t.timed_out(),
Err(_) => false,
}
}
#[cfg(feature = "std")]
pub(crate) fn notify_ack_event(&self) {
self.ack_event.1.notify_all();
}
pub fn set_shm_locator(&self, eid: EntityId, bytes: Vec<u8>) {
if let Ok(mut g) = self.shm_locators.write() {
g.insert(eid, bytes);
}
}
#[must_use]
pub fn shm_locator(&self, eid: EntityId) -> Option<Vec<u8>> {
self.shm_locators.read().ok()?.get(&eid).cloned()
}
pub fn clear_shm_locator(&self, eid: EntityId) {
if let Ok(mut g) = self.shm_locators.write() {
g.remove(&eid);
}
}
pub fn shutdown(&self) {
self.stop.store(true, Ordering::Relaxed);
if let Ok(mut guard) = self.handles.lock() {
for h in guard.drain(..) {
let _ = h.join();
}
}
}
}
impl Drop for DcpsRuntime {
fn drop(&mut self) {
self.shutdown();
}
}
#[allow(unused_variables)]
fn apply_thread_tuning(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
#[cfg(target_os = "linux")]
rt_pinning::apply(label, priority, cpus);
}
#[cfg(target_os = "linux")]
#[allow(unsafe_code, clippy::print_stderr)]
mod rt_pinning {
pub(super) fn apply(label: &str, priority: Option<i32>, cpus: Option<&[usize]>) {
if let Some(prio) = priority {
unsafe {
let param = libc::sched_param {
sched_priority: prio,
};
let rc = libc::pthread_setschedparam(
libc::pthread_self(),
libc::SCHED_FIFO,
&raw const param,
);
if rc != 0 {
eprintln!(
"zdds[{label}]: pthread_setschedparam SCHED_FIFO {prio} \
failed (rc={rc}). Need CAP_SYS_NICE or RLIMIT_RTPRIO."
);
}
}
}
if let Some(cpu_list) = cpus {
unsafe {
let mut set: libc::cpu_set_t = core::mem::zeroed();
libc::CPU_ZERO(&mut set);
for &cpu in cpu_list {
if cpu < libc::CPU_SETSIZE as usize {
libc::CPU_SET(cpu, &mut set);
}
}
let rc = libc::sched_setaffinity(
0,
core::mem::size_of::<libc::cpu_set_t>(),
&raw const set,
);
if rc != 0 {
eprintln!("zdds[{label}]: sched_setaffinity({cpu_list:?}) failed.");
}
}
}
}
}
fn recv_spdp_multicast_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
apply_thread_tuning(
"recv-spdp-mc",
rt.config.recv_thread_priority,
rt.config.recv_thread_cpus.as_deref(),
);
while !stop.load(Ordering::Relaxed) {
let elapsed = rt.start_instant.elapsed();
let sedp_now = Duration::from_secs(elapsed.as_secs())
+ Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
let Ok(dg) = rt.spdp_multicast_rx.recv() else {
continue;
};
#[cfg(feature = "security")]
let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
#[cfg(not(feature = "security"))]
let clear = secure_inbound_bytes(&rt, &dg.data);
if let Some(clear) = clear {
handle_spdp_datagram(&rt, &clear);
if let Ok(mut wlp) = rt.wlp.lock() {
let _ = wlp.handle_datagram(&clear, sedp_now);
}
}
}
}
fn recv_metatraffic_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
apply_thread_tuning(
"recv-meta",
rt.config.recv_thread_priority,
rt.config.recv_thread_cpus.as_deref(),
);
while !stop.load(Ordering::Relaxed) {
let elapsed = rt.start_instant.elapsed();
let sedp_now = Duration::from_secs(elapsed.as_secs())
+ Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
let Ok(dg) = rt.spdp_unicast.recv() else {
continue;
};
#[cfg(feature = "security")]
let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
#[cfg(not(feature = "security"))]
let clear = secure_inbound_bytes(&rt, &dg.data);
if let Some(clear) = clear {
handle_spdp_datagram(&rt, &clear);
let events = {
if let Ok(mut sedp) = rt.sedp.lock() {
sedp.handle_datagram(&clear, sedp_now).ok()
} else {
None
}
};
if let Some(ev) = events {
if !ev.is_empty() {
run_matching_pass(&rt);
push_sedp_events_to_builtin_readers(&rt, &ev);
}
}
if let Ok(mut wlp) = rt.wlp.lock() {
let _ = wlp.handle_datagram(&clear, sedp_now);
}
dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
}
}
}
fn recv_user_data_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
apply_thread_tuning(
"recv-user",
rt.config.recv_thread_priority,
rt.config.recv_thread_cpus.as_deref(),
);
while !stop.load(Ordering::Relaxed) {
let elapsed = rt.start_instant.elapsed();
let sedp_now = Duration::from_secs(elapsed.as_secs())
+ Duration::from_nanos(u64::from(elapsed.subsec_nanos()));
let Ok(dg) = rt.user_unicast.recv() else {
continue;
};
#[cfg(feature = "security")]
let clear = secure_inbound_bytes(&rt, &dg.data, &DEFAULT_INBOUND_IFACE);
#[cfg(not(feature = "security"))]
let clear = secure_inbound_bytes(&rt, &dg.data);
if let Some(clear) = clear {
if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
handle_user_datagram(&rt, &clear, sedp_now);
}
}
}
}
fn tick_loop(rt: Arc<DcpsRuntime>, stop: Arc<AtomicBool>) {
apply_thread_tuning(
"tick",
rt.config.tick_thread_priority,
rt.config.tick_thread_cpus.as_deref(),
);
let mc_target = Locator {
kind: LocatorKind::UdpV4,
port: u32::from(u16::try_from(spdp_multicast_port(rt.domain_id as u32)).unwrap_or(7400)),
address: {
let mut a = [0u8; 16];
a[12..].copy_from_slice(&SPDP_DEFAULT_MULTICAST_ADDRESS);
a
},
};
let mut next_announce = Instant::now(); while !stop.load(Ordering::Relaxed) {
let elapsed_since_start = rt.start_instant.elapsed();
let sedp_now = Duration::from_secs(elapsed_since_start.as_secs())
+ Duration::from_nanos(u64::from(elapsed_since_start.subsec_nanos()));
if Instant::now() >= next_announce {
if let Ok(mut beacon) = rt.spdp_beacon.lock() {
if let Ok(datagram) = beacon.serialize() {
if let Some(secured) = secure_outbound_bytes(&rt, &datagram) {
let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
}
}
}
next_announce = Instant::now() + rt.config.spdp_period;
}
let sedp_outbound = {
if let Ok(mut sedp) = rt.sedp.lock() {
sedp.tick(sedp_now).unwrap_or_default()
} else {
Vec::new()
}
};
for dg in sedp_outbound {
send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
}
if let Some(stack) = rt.security_builtin_snapshot() {
let outbound = {
if let Ok(mut s) = stack.lock() {
s.poll(sedp_now).unwrap_or_default()
} else {
Vec::new()
}
};
for dg in outbound {
send_discovery_datagram(&rt, &dg.targets, &dg.bytes);
}
}
let wlp_outbound = {
if let Ok(mut wlp) = rt.wlp.lock() {
wlp.tick(sedp_now).unwrap_or(None)
} else {
None
}
};
if let Some(bytes) = wlp_outbound {
if let Some(secured) = secure_outbound_bytes(&rt, &bytes) {
let _ = rt.spdp_mc_tx.send(&mc_target, &secured);
}
}
let user_writer_outbound: Vec<(EntityId, _)> = {
let mut all = Vec::new();
for (eid, arc) in rt.writer_slots_snapshot() {
if let Ok(mut slot) = arc.lock() {
if let Ok(dgs) = slot.writer.tick(sedp_now) {
for dg in dgs {
all.push((eid, dg));
}
}
}
}
all
};
for (writer_eid, dg) in user_writer_outbound {
for t in dg.targets.iter() {
if t.kind != LocatorKind::UdpV4 {
continue;
}
if let Some(secured) = secure_outbound_for_target(&rt, writer_eid, &dg.bytes, t) {
send_on_best_interface(&rt, t, &secured);
}
}
}
let user_reader_outbound: Vec<_> = {
let mut all = Vec::new();
for (_eid, arc) in rt.reader_slots_snapshot() {
if let Ok(mut slot) = arc.lock() {
if let Ok(dgs) = slot.reader.tick_outbound(sedp_now) {
all.extend(dgs);
}
}
}
all
};
for dg in user_reader_outbound {
if let Some(secured) = secure_outbound_bytes(&rt, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = rt.user_unicast.send(t, &secured);
}
}
}
}
#[cfg(feature = "security")]
if let Some(pool) = &rt.outbound_pool {
for binding in &pool.bindings {
while let Ok(dg) = binding.socket.recv() {
let iface = binding.spec.kind.clone();
if let Some(clear) = secure_inbound_bytes(&rt, &dg.data, &iface) {
handle_spdp_datagram(&rt, &clear);
let events = rt
.sedp
.lock()
.ok()
.and_then(|mut s| s.handle_datagram(&clear, sedp_now).ok());
if let Some(ev) = events {
if !ev.is_empty() {
run_matching_pass(&rt);
push_sedp_events_to_builtin_readers(&rt, &ev);
}
}
if !dispatch_type_lookup_datagram(&rt, &clear, &dg.source) {
handle_user_datagram(&rt, &clear, sedp_now);
}
dispatch_security_builtin_datagram(&rt, &clear, sedp_now);
}
}
}
}
check_deadlines(&rt, elapsed_since_start);
expire_by_lifespan(&rt, elapsed_since_start);
check_liveliness(&rt, elapsed_since_start);
check_writer_liveliness(&rt, elapsed_since_start);
std::thread::sleep(rt.config.tick_period);
}
}
fn check_writer_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
let now_nanos = now.as_nanos() as u64;
for (_eid, arc) in rt.writer_slots_snapshot() {
let Ok(mut slot) = arc.lock() else { continue };
if slot.liveliness_lease_nanos == 0 {
continue;
}
let last = match slot.liveliness_kind {
zerodds_qos::LivelinessKind::Automatic => slot.last_write,
_ => slot.last_liveliness_assert,
};
let last_nanos = match last {
Some(t) => t.as_nanos() as u64,
None => continue,
};
if now_nanos.saturating_sub(last_nanos) >= slot.liveliness_lease_nanos {
slot.liveliness_lost_count = slot.liveliness_lost_count.saturating_add(1);
slot.last_liveliness_assert = Some(now);
slot.last_write = Some(now);
}
}
}
fn check_liveliness(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
let now_nanos = now.as_nanos() as u64;
for (_eid, arc) in rt.reader_slots_snapshot() {
let Ok(mut slot) = arc.lock() else { continue };
if slot.liveliness_lease_nanos == 0 {
continue;
}
let last = match slot.last_sample_received {
Some(t) => t.as_nanos() as u64,
None => continue,
};
if now_nanos.saturating_sub(last) >= slot.liveliness_lease_nanos && slot.liveliness_alive {
slot.liveliness_alive = false;
slot.liveliness_not_alive_count = slot.liveliness_not_alive_count.saturating_add(1);
}
}
}
fn expire_by_lifespan(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
let now_nanos = now.as_nanos() as u64;
for (_eid, arc) in rt.writer_slots_snapshot() {
let Ok(mut slot) = arc.lock() else { continue };
if slot.lifespan_nanos == 0 {
continue;
}
let mut highest_expired = None;
while let Some(&(sn, inserted)) = slot.sample_insert_times.front() {
let inserted_nanos = inserted.as_nanos() as u64;
if now_nanos.saturating_sub(inserted_nanos) >= slot.lifespan_nanos {
highest_expired = Some(sn);
slot.sample_insert_times.pop_front();
} else {
break;
}
}
if let Some(sn) = highest_expired {
let _removed = slot
.writer
.remove_samples_up_to(zerodds_rtps::wire_types::SequenceNumber(sn.0 + 1));
}
}
}
fn check_deadlines(rt: &Arc<DcpsRuntime>, now: std::time::Duration) {
let now_nanos = now.as_nanos() as u64;
for (_eid, arc) in rt.writer_slots_snapshot() {
let Ok(mut slot) = arc.lock() else { continue };
if slot.deadline_nanos == 0 {
continue;
}
let Some(last) = slot.last_write.map(|d| d.as_nanos() as u64) else {
continue;
};
if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
slot.offered_deadline_missed_count =
slot.offered_deadline_missed_count.saturating_add(1);
slot.last_write = Some(now);
}
}
for (_eid, arc) in rt.reader_slots_snapshot() {
let Ok(mut slot) = arc.lock() else { continue };
if slot.deadline_nanos == 0 {
continue;
}
let Some(last) = slot.last_sample_received.map(|d| d.as_nanos() as u64) else {
continue;
};
if now_nanos.saturating_sub(last) >= slot.deadline_nanos {
slot.requested_deadline_missed_count =
slot.requested_deadline_missed_count.saturating_add(1);
slot.last_sample_received = Some(now);
}
}
}
fn run_matching_pass(rt: &Arc<DcpsRuntime>) {
let writer_ids: Vec<EntityId> = rt.writer_eids();
for eid in writer_ids {
rt.match_local_writer_against_cache(eid);
}
let reader_ids: Vec<EntityId> = rt.reader_eids();
for eid in reader_ids {
rt.match_local_reader_against_cache(eid);
}
}
fn remote_user_locators(
prefix: GuidPrefix,
discovered: &Arc<Mutex<DiscoveredParticipantsCache>>,
) -> Vec<Locator> {
match discovered.lock() {
Ok(cache) => cache
.get(&prefix)
.and_then(|p| p.data.default_unicast_locator)
.into_iter()
.collect(),
Err(_) => Vec::new(),
}
}
fn wake_async_waker(slot: &alloc::sync::Arc<std::sync::Mutex<Option<core::task::Waker>>>) {
if let Ok(mut g) = slot.lock() {
if let Some(w) = g.take() {
w.wake();
}
}
}
#[cfg(feature = "inspect")]
fn dispatch_inspect_dcps_receive_tap(topic: &str, reader_id: EntityId, item: &UserSample) {
let payload: Vec<u8> = match item {
UserSample::Alive { payload, .. } => payload.clone(),
UserSample::Lifecycle { key_hash, .. } => key_hash.to_vec(),
};
let ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| u64::try_from(d.as_nanos()).unwrap_or(u64::MAX))
.unwrap_or(0);
let mut corr: u64 = 0;
for (i, byte) in reader_id.entity_key.iter().enumerate() {
corr |= u64::from(*byte) << (i * 8);
}
corr |= u64::from(reader_id.entity_kind as u8) << 24;
let frame = zerodds_inspect_endpoint::Frame::dcps(topic.to_owned(), ts_ns, corr, payload);
zerodds_inspect_endpoint::tap::dispatch(&frame);
}
fn delivered_to_user_sample(
sample: &zerodds_rtps::reliable_reader::DeliveredSample,
writer_strengths: &alloc::collections::BTreeMap<[u8; 16], i32>,
) -> Option<UserSample> {
use zerodds_rtps::history_cache::ChangeKind;
match sample.kind {
ChangeKind::Alive | ChangeKind::AliveFiltered => {
let writer_guid = sample.writer_guid.to_bytes();
let writer_strength = writer_strengths.get(&writer_guid).copied().unwrap_or(0);
strip_user_encap(&sample.payload).map(|payload| UserSample::Alive {
payload,
writer_guid,
writer_strength,
})
}
ChangeKind::NotAliveDisposed
| ChangeKind::NotAliveUnregistered
| ChangeKind::NotAliveDisposedUnregistered => {
let kh = sample.key_hash.unwrap_or_else(|| {
let mut h = [0u8; 16];
let n = sample.payload.len().min(16);
h[..n].copy_from_slice(&sample.payload[..n]);
h
});
Some(UserSample::Lifecycle {
key_hash: kh,
kind: sample.kind,
})
}
}
}
fn validate_user_encap_offset(payload: &[u8]) -> Option<usize> {
if payload.len() < 4 {
return None;
}
use zerodds_rtps::participant_message_data::{
ENCAPSULATION_CDR_BE, ENCAPSULATION_CDR_LE, ENCAPSULATION_CDR2_BE, ENCAPSULATION_CDR2_LE,
};
const ENCAPSULATION_PL_CDR_BE: [u8; 2] = [0x00, 0x02];
const ENCAPSULATION_PL_CDR_LE: [u8; 2] = [0x00, 0x03];
let k = [payload[0], payload[1]];
let known = k == ENCAPSULATION_CDR_BE
|| k == ENCAPSULATION_CDR_LE
|| k == ENCAPSULATION_PL_CDR_BE
|| k == ENCAPSULATION_PL_CDR_LE
|| k == ENCAPSULATION_CDR2_BE
|| k == ENCAPSULATION_CDR2_LE;
if known { Some(4) } else { None }
}
fn strip_user_encap(payload: &[u8]) -> Option<Vec<u8>> {
validate_user_encap_offset(payload).map(|off| payload[off..].to_vec())
}
fn handle_user_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
let parsed = match decode_datagram(bytes) {
Ok(p) => p,
Err(_) => return,
};
for sub in parsed.submessages {
match sub {
ParsedSubmessage::Data(d) => {
let Some(arc) = rt.reader_slot(d.reader_id) else {
continue;
};
let mut items: Vec<UserSampleWithEncap> = Vec::new();
let listener;
let waker;
let sender;
#[cfg(feature = "inspect")]
let topic_name;
{
let Ok(mut slot) = arc.lock() else { continue };
for sample in slot.reader.handle_data(&d) {
let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
zerodds_rtps::history_cache::ChangeKind::Alive
| zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
validate_user_encap_offset(&sample.payload)
.map(|off| (Arc::clone(&sample.payload), off))
}
_ => None,
};
if let Some(item) =
delivered_to_user_sample(&sample, &slot.writer_strengths)
{
items.push((item, listener_view));
}
}
if !items.is_empty() {
slot.last_sample_received = Some(now);
if !slot.liveliness_alive {
slot.liveliness_alive = true;
slot.liveliness_alive_count =
slot.liveliness_alive_count.saturating_add(1);
}
}
listener = slot.listener.clone();
waker = Arc::clone(&slot.async_waker);
sender = slot.sample_tx.clone();
#[cfg(feature = "inspect")]
{
topic_name = slot.topic_name.clone();
}
}
for (item, listener_view) in items {
#[cfg(feature = "inspect")]
dispatch_inspect_dcps_receive_tap(&topic_name, d.reader_id, &item);
if let Some(ref l) = listener {
if let Some((arc_payload, off)) = listener_view {
l(&arc_payload[off..]);
}
}
let _ = sender.send(item);
wake_async_waker(&waker);
}
}
ParsedSubmessage::DataFrag(df) => {
let Some(arc) = rt.reader_slot(df.reader_id) else {
continue;
};
let mut items: Vec<UserSampleWithEncap> = Vec::new();
let listener;
let waker;
let sender;
#[cfg(feature = "inspect")]
let topic_name;
{
let Ok(mut slot) = arc.lock() else { continue };
for sample in slot.reader.handle_data_frag(&df, now) {
let listener_view: Option<(Arc<[u8]>, usize)> = match sample.kind {
zerodds_rtps::history_cache::ChangeKind::Alive
| zerodds_rtps::history_cache::ChangeKind::AliveFiltered => {
validate_user_encap_offset(&sample.payload)
.map(|off| (Arc::clone(&sample.payload), off))
}
_ => None,
};
if let Some(item) =
delivered_to_user_sample(&sample, &slot.writer_strengths)
{
items.push((item, listener_view));
}
}
if !items.is_empty() {
slot.last_sample_received = Some(now);
if !slot.liveliness_alive {
slot.liveliness_alive = true;
slot.liveliness_alive_count =
slot.liveliness_alive_count.saturating_add(1);
}
}
listener = slot.listener.clone();
waker = Arc::clone(&slot.async_waker);
sender = slot.sample_tx.clone();
#[cfg(feature = "inspect")]
{
topic_name = slot.topic_name.clone();
}
}
for (item, listener_view) in items {
#[cfg(feature = "inspect")]
dispatch_inspect_dcps_receive_tap(&topic_name, df.reader_id, &item);
if let Some(ref l) = listener {
if let Some((arc_payload, off)) = listener_view {
l(&arc_payload[off..]);
}
}
let _ = sender.send(item);
wake_async_waker(&waker);
}
}
ParsedSubmessage::Heartbeat(h) => {
let Some(arc) = rt.reader_slot(h.reader_id) else {
continue;
};
let mut items: Vec<UserSample> = Vec::new();
let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
Vec::new();
let waker;
let sender;
{
let Ok(mut slot) = arc.lock() else { continue };
for sample in slot.reader.handle_heartbeat(&h, now) {
if let Some(item) =
delivered_to_user_sample(&sample, &slot.writer_strengths)
{
items.push(item);
}
}
if !items.is_empty() {
slot.last_sample_received = Some(now);
if !slot.liveliness_alive {
slot.liveliness_alive = true;
slot.liveliness_alive_count =
slot.liveliness_alive_count.saturating_add(1);
}
}
if let Ok(dgs) = slot.reader.tick_outbound(now) {
sync_outbound = dgs;
}
waker = Arc::clone(&slot.async_waker);
sender = slot.sample_tx.clone();
}
for item in items {
let _ = sender.send(item);
wake_async_waker(&waker);
}
for dg in sync_outbound {
if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = rt.user_unicast.send(t, &secured);
}
}
}
}
}
ParsedSubmessage::Gap(g) => {
if let Some(arc) = rt.reader_slot(g.reader_id) {
if let Ok(mut slot) = arc.lock() {
for sample in slot.reader.handle_gap(&g) {
if let Some(item) =
delivered_to_user_sample(&sample, &slot.writer_strengths)
{
let _ = slot.sample_tx.send(item);
wake_async_waker(&slot.async_waker);
}
}
}
}
}
ParsedSubmessage::AckNack(ack) => {
if let Some(arc) = rt.writer_slot(ack.writer_id) {
let mut sync_outbound: Vec<zerodds_rtps::message_builder::OutboundDatagram> =
Vec::new();
if let Ok(mut slot) = arc.lock() {
let base = ack.reader_sn_state.bitmap_base;
let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
slot.writer.handle_acknack(src, base, requested);
if let Ok(dgs) = slot.writer.tick(now) {
sync_outbound = dgs;
}
}
rt.notify_ack_event();
for dg in sync_outbound {
if let Some(secured) = secure_outbound_bytes(rt, &dg.bytes) {
for t in dg.targets.iter() {
if t.kind == LocatorKind::UdpV4 {
let _ = rt.user_unicast.send(t, &secured);
}
}
}
}
}
}
ParsedSubmessage::NackFrag(nf) => {
if let Some(arc) = rt.writer_slot(nf.writer_id) {
if let Ok(mut slot) = arc.lock() {
let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
slot.writer.handle_nackfrag(src, &nf);
}
}
}
_ => {}
}
}
}
#[cfg(test)]
pub(crate) fn handle_spdp_datagram_for_test(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
handle_spdp_datagram(rt, bytes);
}
fn handle_spdp_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8]) {
let parsed = match rt.spdp_reader.parse_datagram(bytes) {
Ok(p) => p,
Err(_) => return, };
if parsed.sender_prefix == rt.guid_prefix {
return;
}
let is_new = {
if let Ok(mut cache) = rt.discovered.lock() {
cache.insert(parsed.clone())
} else {
false
}
};
if is_new {
if let Ok(mut sedp) = rt.sedp.lock() {
sedp.on_participant_discovered(&parsed);
}
if let Some(sec) = rt.security_builtin_snapshot() {
if let Ok(mut s) = sec.lock() {
s.handle_remote_endpoints(&parsed);
}
}
}
if let Some(sinks) = rt.builtin_sinks_snapshot() {
let dcps_sample =
crate::builtin_topics::ParticipantBuiltinTopicData::from_wire(&parsed.data);
if let Some(filter) = rt.ignore_filter_snapshot() {
let h = crate::instance_handle::InstanceHandle::from_guid(dcps_sample.key);
if filter.is_participant_ignored(h) {
return;
}
}
let _ = sinks.push_participant(&dcps_sample);
}
}
fn push_sedp_events_to_builtin_readers(
rt: &Arc<DcpsRuntime>,
events: &zerodds_discovery::sedp::SedpEvents,
) {
let Some(sinks) = rt.builtin_sinks_snapshot() else {
return;
};
let filter = rt.ignore_filter_snapshot();
for w in &events.new_publications {
let pub_sample = crate::builtin_topics::PublicationBuiltinTopicData::from_wire(w);
let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_publication(w);
if let Some(f) = &filter {
let part_h = crate::instance_handle::InstanceHandle::from_guid(w.participant_key);
let pub_h = crate::instance_handle::InstanceHandle::from_guid(w.key);
let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
if f.is_participant_ignored(part_h) || f.is_publication_ignored(pub_h) {
continue;
}
let _ = sinks.push_publication(&pub_sample);
if !f.is_topic_ignored(topic_h) {
let _ = sinks.push_topic(&topic_sample);
}
} else {
let _ = sinks.push_publication(&pub_sample);
let _ = sinks.push_topic(&topic_sample);
}
}
for r in &events.new_subscriptions {
let sub_sample = crate::builtin_topics::SubscriptionBuiltinTopicData::from_wire(r);
let topic_sample = crate::builtin_topics::TopicBuiltinTopicData::from_subscription(r);
if let Some(f) = &filter {
let part_h = crate::instance_handle::InstanceHandle::from_guid(r.participant_key);
let sub_h = crate::instance_handle::InstanceHandle::from_guid(r.key);
let topic_h = crate::instance_handle::InstanceHandle::from_guid(topic_sample.key);
if f.is_participant_ignored(part_h) || f.is_subscription_ignored(sub_h) {
continue;
}
let _ = sinks.push_subscription(&sub_sample);
if !f.is_topic_ignored(topic_h) {
let _ = sinks.push_topic(&topic_sample);
}
} else {
let _ = sinks.push_subscription(&sub_sample);
let _ = sinks.push_topic(&topic_sample);
}
}
}
fn dispatch_security_builtin_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], now: Duration) {
let Some(stack) = rt.security_builtin_snapshot() else {
return;
};
let Ok(parsed) = decode_datagram(bytes) else {
return;
};
let Ok(mut s) = stack.lock() else {
return;
};
for sub in parsed.submessages {
match sub {
ParsedSubmessage::Data(d) => {
if d.reader_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
|| d.writer_id == EntityId::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
{
let _ = s.stateless_reader.handle_data(&d);
} else if d.reader_id
== EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
{
let _ = s.volatile_reader.handle_data(&d);
}
}
ParsedSubmessage::DataFrag(df) => {
if df.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
let _ = s.volatile_reader.handle_data_frag(&df, now);
}
}
ParsedSubmessage::Heartbeat(h) => {
let to_volatile_reader = h.reader_id
== EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
|| (h.reader_id == EntityId::UNKNOWN
&& h.writer_id
== EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER);
if to_volatile_reader {
s.volatile_reader.handle_heartbeat(&h, now);
}
}
ParsedSubmessage::Gap(g) => {
if g.reader_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER {
let _ = s.volatile_reader.handle_gap(&g);
}
}
ParsedSubmessage::AckNack(ack) => {
if ack.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
let base = ack.reader_sn_state.bitmap_base;
let requested: Vec<_> = ack.reader_sn_state.iter_set().collect();
let src = Guid::new(parsed.header.guid_prefix, ack.reader_id);
s.volatile_writer.handle_acknack(src, base, requested);
}
}
ParsedSubmessage::NackFrag(nf) => {
if nf.writer_id == EntityId::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER {
let src = Guid::new(parsed.header.guid_prefix, nf.reader_id);
s.volatile_writer.handle_nackfrag(src, &nf);
}
}
_ => {}
}
}
}
fn dispatch_type_lookup_datagram(rt: &Arc<DcpsRuntime>, bytes: &[u8], source: &Locator) -> bool {
use zerodds_cdr::{BufferReader, Endianness};
use zerodds_types::type_lookup::{GetTypeDependenciesRequest, GetTypesReply, GetTypesRequest};
let Ok(parsed) = decode_datagram(bytes) else {
return false;
};
let mut accepted = false;
for sub in &parsed.submessages {
let ParsedSubmessage::Data(d) = sub else {
continue;
};
let payload: &[u8] = &d.serialized_payload;
if payload.is_empty() {
continue;
}
let body: &[u8] = if payload.len() >= 4 && (payload[0] == 0x00 && payload[1] == 0x01) {
&payload[4..]
} else {
payload
};
if d.reader_id == EntityId::TL_SVC_REQ_READER {
accepted = true;
let mut r = BufferReader::new(body, Endianness::Little);
if let Ok(req) = GetTypesRequest::decode_from(&mut r) {
let reply = match rt.type_lookup_server.lock() {
Ok(g) => g.handle_get_types(&req),
Err(_) => continue,
};
let _ = send_type_lookup_reply(
rt,
source,
TypeLookupReplyPayload::Types(reply),
d.writer_sn,
);
continue;
}
let mut r = BufferReader::new(body, Endianness::Little);
if let Ok(req) = GetTypeDependenciesRequest::decode_from(&mut r) {
let reply = match rt.type_lookup_server.lock() {
Ok(g) => g.handle_get_type_dependencies(&req),
Err(_) => continue,
};
let _ = send_type_lookup_reply(
rt,
source,
TypeLookupReplyPayload::Dependencies(reply),
d.writer_sn,
);
continue;
}
}
if d.reader_id == EntityId::TL_SVC_REPLY_READER {
accepted = true;
let (sn_high, sn_low) = d.writer_sn.split();
let sn_u64 = ((u64::from(sn_high as u32)) << 32) | u64::from(sn_low);
let request_id = zerodds_discovery::type_lookup::RequestId::from_u64(sn_u64);
let mut r = BufferReader::new(body, Endianness::Little);
if let Ok(reply) = GetTypesReply::decode_from(&mut r) {
if let Ok(mut client) = rt.type_lookup_client.lock() {
client.handle_reply(request_id, TypeLookupReply::Types(reply));
}
continue;
}
}
}
accepted
}
enum TypeLookupReplyPayload {
Types(zerodds_types::type_lookup::GetTypesReply),
Dependencies(zerodds_types::type_lookup::GetTypeDependenciesReply),
}
fn send_type_lookup_reply(
rt: &Arc<DcpsRuntime>,
target: &Locator,
reply: TypeLookupReplyPayload,
request_sn: zerodds_rtps::wire_types::SequenceNumber,
) -> Result<()> {
use alloc::sync::Arc as AllocArc;
use zerodds_cdr::{BufferWriter, Endianness};
use zerodds_rtps::datagram::encode_data_datagram;
use zerodds_rtps::header::RtpsHeader;
use zerodds_rtps::submessages::DataSubmessage;
use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
let mut w = BufferWriter::new(Endianness::Little);
match reply {
TypeLookupReplyPayload::Types(r) => {
r.encode_into(&mut w)
.map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup reply encode failed",
})?;
}
TypeLookupReplyPayload::Dependencies(r) => {
r.encode_into(&mut w)
.map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup deps reply encode failed",
})?;
}
}
let body = w.into_bytes();
let mut payload: alloc::vec::Vec<u8> = alloc::vec::Vec::with_capacity(4 + body.len());
payload.extend_from_slice(&[0x00, 0x01, 0x00, 0x00]);
payload.extend_from_slice(&body);
let header = RtpsHeader {
protocol_version: ProtocolVersion::CURRENT,
vendor_id: VendorId::ZERODDS,
guid_prefix: rt.guid_prefix,
};
let data = DataSubmessage {
extra_flags: 0,
reader_id: EntityId::TL_SVC_REPLY_READER,
writer_id: EntityId::TL_SVC_REPLY_WRITER,
writer_sn: request_sn,
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: AllocArc::from(payload.into_boxed_slice()),
};
let datagram =
encode_data_datagram(header, &[data]).map_err(|_| DdsError::PreconditionNotMet {
reason: "type_lookup reply datagram encode failed",
})?;
if target.kind == LocatorKind::UdpV4 {
let _ = rt.user_unicast.send(target, &datagram);
}
Ok(())
}
fn send_discovery_datagram(rt: &Arc<DcpsRuntime>, targets: &[Locator], bytes: &[u8]) {
let Some(secured) = secure_outbound_bytes(rt, bytes) else {
return;
};
for t in targets {
if t.kind != LocatorKind::UdpV4 {
continue;
}
let _ = rt.spdp_mc_tx.send(t, &secured);
}
}
#[must_use]
pub fn user_multicast_endpoint(domain_id: i32) -> SocketAddr {
let port = 7400u16.saturating_add(250u16.saturating_mul(domain_id as u16).saturating_add(1));
SocketAddr::from((Ipv4Addr::from([239, 255, 0, 1]), port))
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn strip_user_encap_xcdr2_le() {
let payload = [0x00, 0x07, 0x00, 0x00, 1, 2, 3];
assert_eq!(strip_user_encap(&payload), Some(alloc::vec![1, 2, 3]));
}
#[test]
fn strip_user_encap_xcdr1_le() {
let payload = [0x00, 0x01, 0x00, 0x00, 0xAA];
assert_eq!(strip_user_encap(&payload), Some(alloc::vec![0xAA]));
}
#[test]
fn strip_user_encap_rejects_unknown_scheme() {
let payload = [0xFF, 0xFF, 0x00, 0x00, 1];
assert_eq!(strip_user_encap(&payload), None);
}
#[test]
fn strip_user_encap_rejects_short() {
assert_eq!(strip_user_encap(&[0x00, 0x07]), None);
}
#[test]
fn user_payload_encap_is_xcdr2_le() {
assert_eq!(USER_PAYLOAD_ENCAP, [0x00, 0x07, 0x00, 0x00]);
}
#[test]
fn observability_sink_records_writer_and_reader_creation() {
use std::sync::Arc as StdArc;
use zerodds_foundation::observability::{Component, Level, VecSink};
let sink = StdArc::new(VecSink::new());
let cfg = RuntimeConfig {
observability: sink.clone(),
..RuntimeConfig::default()
};
let rt =
DcpsRuntime::start(7, GuidPrefix::from_bytes([0xAA; 12]), cfg).expect("start runtime");
let _ = rt.register_user_writer(UserWriterConfig {
topic_name: "ObsTopic".into(),
type_name: "ObsType".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
partition: alloc::vec![],
user_data: alloc::vec![],
topic_data: alloc::vec![],
group_data: alloc::vec![],
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: None,
});
let _ = rt.register_user_reader(UserReaderConfig {
topic_name: "ObsTopic".into(),
type_name: "ObsType".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
partition: alloc::vec![],
user_data: alloc::vec![],
topic_data: alloc::vec![],
group_data: alloc::vec![],
type_identifier: zerodds_types::TypeIdentifier::None,
type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
data_representation_offer: None,
});
rt.shutdown();
let events = sink.snapshot();
assert!(
events.iter().any(|e| e.name == "user_writer.created"
&& e.component == Component::Dcps
&& e.level == Level::Info),
"writer-event missing: got {:?}",
events.iter().map(|e| e.name).collect::<Vec<_>>()
);
assert!(
events
.iter()
.any(|e| e.name == "user_reader.created" && e.component == Component::Dcps),
"reader-event missing"
);
let writer_event = events
.iter()
.find(|e| e.name == "user_writer.created")
.expect("writer event");
assert!(
writer_event
.attrs
.iter()
.any(|a| a.key == "topic" && a.value == "ObsTopic"),
"topic attr missing"
);
}
#[test]
fn runtime_starts_and_shuts_down_cleanly() {
let rt = DcpsRuntime::start(
42,
GuidPrefix::from_bytes([7; 12]),
RuntimeConfig::default(),
)
.expect("start runtime");
assert_eq!(rt.domain_id, 42);
rt.shutdown();
rt.shutdown();
}
#[test]
fn spdp_announces_standard_bits_by_default() {
let rt = DcpsRuntime::start(
5,
GuidPrefix::from_bytes([0xC; 12]),
RuntimeConfig::default(),
)
.expect("start");
let mask = rt.announced_builtin_endpoint_set();
assert_ne!(mask & endpoint_flag::PARTICIPANT_ANNOUNCER, 0);
assert_ne!(mask & endpoint_flag::PARTICIPANT_DETECTOR, 0);
assert_ne!(mask & endpoint_flag::PUBLICATIONS_ANNOUNCER, 0);
assert_ne!(mask & endpoint_flag::SUBSCRIPTIONS_DETECTOR, 0);
assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_WRITER, 0);
assert_ne!(mask & endpoint_flag::PARTICIPANT_MESSAGE_DATA_READER, 0);
assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REQUEST, 0);
assert_ne!(mask & endpoint_flag::TYPE_LOOKUP_REPLY, 0);
assert_eq!(mask & endpoint_flag::TOPICS_ANNOUNCER, 0);
assert_eq!(mask & endpoint_flag::TOPICS_DETECTOR, 0);
assert_eq!(mask & endpoint_flag::ALL_SECURE, 0);
}
#[test]
fn spdp_announces_secure_bits_when_configured() {
let config = RuntimeConfig {
announce_secure_endpoints: true,
..Default::default()
};
let rt = DcpsRuntime::start(6, GuidPrefix::from_bytes([0xD; 12]), config).expect("start");
let mask = rt.announced_builtin_endpoint_set();
for bit in 16u32..=27 {
assert!(
mask & (1u32 << bit) != 0,
"Secure-Bit {bit} fehlt im SPDP-Announce"
);
}
assert_eq!(
mask & endpoint_flag::ALL_STANDARD,
endpoint_flag::ALL_STANDARD
);
}
#[test]
fn spdp_lease_duration_is_configurable() {
let config = RuntimeConfig {
participant_lease_duration: Duration::from_secs(17),
..Default::default()
};
let rt = DcpsRuntime::start(7, GuidPrefix::from_bytes([0xE; 12]), config).expect("start");
let secs = rt
.spdp_beacon
.lock()
.map(|b| b.data.lease_duration.seconds)
.unwrap_or(0);
assert_eq!(secs, 17);
}
#[test]
fn user_locator_is_udp_v4_127_0_0_x() {
let rt = DcpsRuntime::start(
0,
GuidPrefix::from_bytes([0xA; 12]),
RuntimeConfig::default(),
)
.expect("start");
let loc = rt.user_locator();
assert_eq!(loc.kind, zerodds_rtps::wire_types::LocatorKind::UdpV4);
assert!(loc.port > 0);
}
#[test]
fn two_runtimes_on_same_domain_can_coexist() {
let a = DcpsRuntime::start(
3,
GuidPrefix::from_bytes([0xA; 12]),
RuntimeConfig::default(),
)
.expect("a");
let b = DcpsRuntime::start(
3,
GuidPrefix::from_bytes([0xB; 12]),
RuntimeConfig::default(),
)
.expect("b");
assert_eq!(a.domain_id, b.domain_id);
}
#[test]
fn peer_capabilities_unknown_peer_returns_none() {
let rt = DcpsRuntime::start(
10,
GuidPrefix::from_bytes([0x60; 12]),
RuntimeConfig::default(),
)
.expect("start");
let caps = rt.peer_capabilities(&GuidPrefix::from_bytes([0xEE; 12]));
assert!(caps.is_none());
}
#[test]
fn assert_liveliness_enqueues_wlp_pulse_without_panic() {
let rt = DcpsRuntime::start(
8,
GuidPrefix::from_bytes([0xF; 12]),
RuntimeConfig::default(),
)
.expect("start");
rt.assert_liveliness();
rt.assert_writer_liveliness(alloc::vec![0xDE, 0xAD]);
let count = rt.wlp.lock().map(|w| w.peer_count()).unwrap_or(usize::MAX);
assert_eq!(count, 0, "kein Peer hat sich gemeldet → 0");
}
#[test]
fn wlp_period_default_is_lease_over_three() {
let rt = DcpsRuntime::start(
9,
GuidPrefix::from_bytes([0x10; 12]),
RuntimeConfig::default(),
)
.expect("start");
let mut wlp = rt.wlp.lock().unwrap();
wlp.assert_participant();
let now0 = Duration::from_secs(0);
let dg = wlp.tick(now0).unwrap();
assert!(dg.is_some(), "Pulse wird sofort emittiert");
}
#[cfg(target_os = "linux")]
#[test]
fn two_runtimes_exchange_wlp_heartbeat_via_multicast() {
let cfg = RuntimeConfig {
tick_period: Duration::from_millis(20),
spdp_period: Duration::from_millis(100),
wlp_period: Duration::from_millis(80),
participant_lease_duration: Duration::from_millis(240),
..RuntimeConfig::default()
};
let _a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x40; 12]), cfg.clone()).expect("a");
let _b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0x41; 12]), cfg).expect("b");
let a_prefix = GuidPrefix::from_bytes([0x40; 12]);
for _ in 0..60 {
thread::sleep(Duration::from_millis(50));
if _b.peer_liveliness_last_seen(&a_prefix).is_some() {
return;
}
}
panic!("B did not see A's WLP heartbeat within 3 s");
}
#[cfg(target_os = "linux")]
#[test]
fn two_runtimes_assert_liveliness_reaches_peer() {
let cfg = RuntimeConfig {
tick_period: Duration::from_millis(20),
spdp_period: Duration::from_millis(100),
wlp_period: Duration::from_secs(3600),
..RuntimeConfig::default()
};
let a = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x50; 12]), cfg.clone()).expect("a");
let b = DcpsRuntime::start(4, GuidPrefix::from_bytes([0x51; 12]), cfg).expect("b");
a.assert_liveliness();
let a_prefix = GuidPrefix::from_bytes([0x50; 12]);
for _ in 0..60 {
thread::sleep(Duration::from_millis(50));
if b.peer_liveliness_last_seen(&a_prefix).is_some() {
return;
}
}
panic!("B did not see A's manual liveliness assert within 3 s");
}
#[cfg(target_os = "linux")]
#[test]
fn two_runtimes_exchange_sedp_publication_announce() {
use zerodds_qos::{DurabilityKind, ReliabilityKind};
use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
let cfg = RuntimeConfig {
tick_period: Duration::from_millis(20),
spdp_period: Duration::from_millis(100),
..RuntimeConfig::default()
};
let a = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xCC; 12]), cfg.clone()).expect("a");
let b = DcpsRuntime::start(1, GuidPrefix::from_bytes([0xDD; 12]), cfg).expect("b");
for _ in 0..40 {
thread::sleep(Duration::from_millis(50));
if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
break;
}
}
assert!(
!a.discovered_participants().is_empty(),
"no SPDP discovery a"
);
let pub_data = PublicationBuiltinTopicData {
key: Guid::new(
a.guid_prefix,
EntityId::user_writer_with_key([0x01, 0x02, 0x03]),
),
participant_key: Guid::new(a.guid_prefix, EntityId::PARTICIPANT),
topic_name: "Chatter".into(),
type_name: "zerodds::RawBytes".into(),
durability: DurabilityKind::Volatile,
reliability: zerodds_qos::ReliabilityQosPolicy {
kind: ReliabilityKind::Reliable,
max_blocking_time: QosDuration::from_millis(100_i32),
},
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
};
a.announce_publication(&pub_data).expect("announce");
for _ in 0..60 {
thread::sleep(Duration::from_millis(50));
if b.discovered_publications_count() > 0 {
return;
}
}
panic!(
"B did not receive SEDP publication within 3 s (pub_count={})",
b.discovered_publications_count()
);
}
#[cfg(target_os = "linux")]
#[test]
fn two_runtimes_e2e_user_data_match_and_transfer() {
let cfg = RuntimeConfig {
tick_period: Duration::from_millis(20),
spdp_period: Duration::from_millis(100),
..RuntimeConfig::default()
};
let a = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xEE; 12]), cfg.clone()).expect("a");
let b = DcpsRuntime::start(2, GuidPrefix::from_bytes([0xFF; 12]), cfg).expect("b");
let mut spdp_ok = false;
for _ in 0..60 {
thread::sleep(Duration::from_millis(50));
if !a.discovered_participants().is_empty() && !b.discovered_participants().is_empty() {
spdp_ok = true;
break;
}
}
assert!(spdp_ok, "SPDP mutual discovery did not complete in 3 s");
let wid = a
.register_user_writer(UserWriterConfig {
topic_name: "Chatter".into(),
type_name: "zerodds::RawBytes".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: None,
})
.expect("wid");
let (_rid, rx) = b
.register_user_reader(UserReaderConfig {
topic_name: "Chatter".into(),
type_name: "zerodds::RawBytes".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
type_consistency: zerodds_types::qos::TypeConsistencyEnforcement::default(),
data_representation_offer: None,
})
.expect("rid");
let mut attempts = 0;
loop {
thread::sleep(Duration::from_millis(50));
let _ = a.write_user_sample(wid, alloc::vec![0xAA, 0xBB, 0xCC]);
if let Ok(sample) = rx.recv_timeout(Duration::from_millis(50)) {
match sample {
UserSample::Alive { payload, .. } => {
assert_eq!(payload, alloc::vec![0xAA, 0xBB, 0xCC]);
return;
}
other => panic!("expected Alive sample, got {other:?}"),
}
}
attempts += 1;
if attempts > 80 {
panic!("no sample delivered within 4 s");
}
}
}
#[cfg(target_os = "linux")]
#[test]
fn two_runtimes_discover_each_other_via_spdp() {
let cfg = RuntimeConfig {
tick_period: Duration::from_millis(20),
spdp_period: Duration::from_millis(100),
..RuntimeConfig::default()
};
let a = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xAA; 12]), cfg.clone()).expect("a");
let b = DcpsRuntime::start(3, GuidPrefix::from_bytes([0xBB; 12]), cfg).expect("b");
for _ in 0..60 {
thread::sleep(Duration::from_millis(50));
let a_sees_b = a
.discovered_participants()
.iter()
.any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xBB; 12]));
let b_sees_a = b
.discovered_participants()
.iter()
.any(|p| p.sender_prefix == GuidPrefix::from_bytes([0xAA; 12]));
if a_sees_b && b_sees_a {
return;
}
}
panic!(
"mutual SPDP discovery failed within 3 s (a={} b={})",
a.discovered_participants().len(),
b.discovered_participants().len()
);
}
#[cfg(feature = "security")]
#[test]
fn per_target_serializer_produces_different_wire_per_reader() {
use zerodds_security_crypto::AesGcmCryptoPlugin;
use zerodds_security_permissions::parse_governance_xml;
use zerodds_security_runtime::{
PeerCapabilities, ProtectionLevel as SecProtectionLevel, SharedSecurityGate,
};
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let gate = SharedSecurityGate::new(
0,
parse_governance_xml(GOV).unwrap(),
Box::new(AesGcmCryptoPlugin::new()),
);
let cfg = RuntimeConfig {
security: Some(std::sync::Arc::new(gate)),
..RuntimeConfig::default()
};
let rt =
DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE4; 12]), cfg).expect("start runtime");
let wid = rt
.register_user_writer(UserWriterConfig {
topic_name: "HeteroTopic".into(),
type_name: "zerodds::RawBytes".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: None,
})
.expect("register writer");
let legacy_loc = Locator::udp_v4([127, 0, 0, 11], 40001);
let fast_loc = Locator::udp_v4([127, 0, 0, 12], 40002);
let secure_loc = Locator::udp_v4([127, 0, 0, 13], 40003);
let legacy_peer: [u8; 12] = [0x11; 12];
let fast_peer: [u8; 12] = [0x22; 12];
let secure_peer: [u8; 12] = [0x33; 12];
{
let arc = rt.writer_slot(wid).unwrap();
let mut slot = arc.lock().unwrap();
slot.reader_protection
.insert(legacy_peer, SecProtectionLevel::None);
slot.reader_protection
.insert(fast_peer, SecProtectionLevel::Sign);
slot.reader_protection
.insert(secure_peer, SecProtectionLevel::Encrypt);
slot.locator_to_peer.insert(legacy_loc, legacy_peer);
slot.locator_to_peer.insert(fast_loc, fast_peer);
slot.locator_to_peer.insert(secure_loc, secure_peer);
}
let mut msg = Vec::new();
msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
msg.extend_from_slice(&[0xE4; 12]); msg.extend_from_slice(b"HELLO-HETERO");
let wire_legacy =
secure_outbound_for_target(&rt, wid, &msg, &legacy_loc).expect("legacy path");
let wire_fast = secure_outbound_for_target(&rt, wid, &msg, &fast_loc).expect("fast path");
let wire_secure =
secure_outbound_for_target(&rt, wid, &msg, &secure_loc).expect("secure path");
assert_eq!(
wire_legacy, msg,
"Legacy muss byte-identisch zu plaintext sein"
);
assert_ne!(wire_fast, msg, "Fast-Reader muss geschuetzt sein");
assert_ne!(wire_secure, msg, "Secure-Reader muss geschuetzt sein");
assert_ne!(wire_legacy, wire_fast);
assert_ne!(wire_legacy, wire_secure);
assert_ne!(wire_fast, wire_secure);
let unknown_loc = Locator::udp_v4([127, 0, 0, 99], 40099);
let wire_unknown =
secure_outbound_for_target(&rt, wid, &msg, &unknown_loc).expect("fallback path");
assert_ne!(
wire_unknown, msg,
"unbekannter Target soll ueber Domain-Rule geschuetzt werden"
);
let _unused: PeerCapabilities = PeerCapabilities::default();
rt.shutdown();
}
#[cfg(feature = "security")]
#[derive(Default, Clone)]
struct CapturingLogger {
inner: std::sync::Arc<
std::sync::Mutex<Vec<(zerodds_security_runtime::LogLevel, String, String)>>,
>,
}
#[cfg(feature = "security")]
impl CapturingLogger {
fn events(&self) -> Vec<(zerodds_security_runtime::LogLevel, String, String)> {
self.inner.lock().map(|g| g.clone()).unwrap_or_default()
}
}
#[cfg(feature = "security")]
impl zerodds_security_runtime::LoggingPlugin for CapturingLogger {
fn log(
&self,
level: zerodds_security_runtime::LogLevel,
_participant: [u8; 16],
category: &str,
message: &str,
) {
if let Ok(mut g) = self.inner.lock() {
g.push((level, category.to_string(), message.to_string()));
}
}
fn plugin_class_id(&self) -> &str {
"zerodds.test.capturing_logger"
}
}
#[cfg(feature = "security")]
fn build_runtime_with(
gov_xml: &str,
logger: std::sync::Arc<CapturingLogger>,
) -> std::sync::Arc<DcpsRuntime> {
use zerodds_security_crypto::AesGcmCryptoPlugin;
use zerodds_security_permissions::parse_governance_xml;
use zerodds_security_runtime::{LoggingPlugin, SharedSecurityGate};
let gate = SharedSecurityGate::new(
0,
parse_governance_xml(gov_xml).unwrap(),
Box::new(AesGcmCryptoPlugin::new()),
);
let logger_dyn: std::sync::Arc<dyn LoggingPlugin> = logger;
let cfg = RuntimeConfig {
security: Some(std::sync::Arc::new(gate)),
security_logger: Some(logger_dyn),
..RuntimeConfig::default()
};
DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE7; 12]), cfg).expect("start rt")
}
#[cfg(feature = "security")]
#[test]
fn inbound_plain_on_encrypt_domain_drops_with_error_event() {
const GOV_ENCRYPT: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let logger = std::sync::Arc::new(CapturingLogger::default());
let rt = build_runtime_with(GOV_ENCRYPT, std::sync::Arc::clone(&logger));
let mut plain = Vec::new();
plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
plain.extend_from_slice(&[0x77; 12]); plain.extend_from_slice(b"plaintext-on-encrypted-domain");
let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan);
assert!(out.is_none(), "tampering-Paket muss gedroppt werden");
let events = logger.events();
assert_eq!(events.len(), 1, "genau ein Log-Event erwartet");
let (level, category, _msg) = &events[0];
assert_eq!(
*level,
zerodds_security_runtime::LogLevel::Error,
"plain-on-protected-domain ohne allow_unauth = Error (LegacyBlocked)"
);
assert_eq!(category, "inbound.legacy_blocked");
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn inbound_legacy_peer_accepted_when_governance_allows_unauth() {
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<allow_unauthenticated_participants>TRUE</allow_unauthenticated_participants>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let logger = std::sync::Arc::new(CapturingLogger::default());
let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
let mut plain = Vec::new();
plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
plain.extend_from_slice(&[0x88; 12]);
plain.extend_from_slice(b"legacy-but-allowed");
let out = secure_inbound_bytes(&rt, &plain, &NetInterface::Wan)
.expect("legacy-peer muss akzeptiert werden");
assert_eq!(out, plain, "Output ist byte-identisch (kein crypto-unwrap)");
assert!(logger.events().is_empty(), "kein Log-Event bei Accept-Pfad");
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn inbound_malformed_drops_and_logs_error() {
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>NONE</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let logger = std::sync::Arc::new(CapturingLogger::default());
let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
let out = secure_inbound_bytes(&rt, &[1, 2, 3, 4], &NetInterface::Wan);
assert!(out.is_none());
let events = logger.events();
assert_eq!(events.len(), 1);
assert_eq!(events[0].0, zerodds_security_runtime::LogLevel::Error);
assert_eq!(events[0].1, "inbound.malformed");
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn inbound_without_security_gate_bypasses_classify_and_logger() {
let logger = std::sync::Arc::new(CapturingLogger::default());
let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
std::sync::Arc::clone(&logger) as _;
let cfg = RuntimeConfig {
security_logger: Some(logger_dyn),
..RuntimeConfig::default()
};
let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xE8; 12]), cfg).unwrap();
let msg = vec![0xAAu8; 40];
let out = secure_inbound_bytes(&rt, &msg, &NetInterface::Wan).unwrap();
assert_eq!(out, msg);
assert!(
logger.events().is_empty(),
"Logger darf ohne Gate NICHT aufgerufen werden"
);
rt.shutdown();
}
#[cfg(feature = "security")]
fn lo_range(third: u8) -> zerodds_security_runtime::IpRange {
zerodds_security_runtime::IpRange {
base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, third)),
prefix_len: 32,
}
}
#[cfg(feature = "security")]
#[test]
fn outbound_pool_routes_target_to_matching_binding() {
let specs = vec![
InterfaceBindingSpec {
name: "lo-a".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Loopback,
subnet: lo_range(11),
default: false,
},
InterfaceBindingSpec {
name: "lo-b".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Wan,
subnet: lo_range(22),
default: true,
},
];
let pool = OutboundSocketPool::bind_all(&specs).expect("pool");
let t1 = Locator::udp_v4([127, 0, 0, 11], 40000);
let (sock1, iface1) = pool.route(&t1).expect("route 1");
assert_eq!(iface1, zerodds_security_runtime::NetInterface::Loopback);
let t2 = Locator::udp_v4([127, 0, 0, 22], 40000);
let (sock2, iface2) = pool.route(&t2).expect("route 2");
assert_eq!(iface2, zerodds_security_runtime::NetInterface::Wan);
let p1 = sock1.local_locator().port;
let p2 = sock2.local_locator().port;
assert_ne!(p1, p2);
}
#[cfg(feature = "security")]
#[test]
fn outbound_pool_falls_back_to_default_when_no_subnet_matches() {
let specs = vec![
InterfaceBindingSpec {
name: "lo-specific".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Loopback,
subnet: lo_range(33),
default: false,
},
InterfaceBindingSpec {
name: "wan-default".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Wan,
subnet: zerodds_security_runtime::IpRange {
base: core::net::IpAddr::V4(core::net::Ipv4Addr::UNSPECIFIED),
prefix_len: 0,
},
default: true,
},
];
let pool = OutboundSocketPool::bind_all(&specs).unwrap();
let unknown = Locator::udp_v4([192, 168, 7, 7], 12345);
let (_sock, iface) = pool.route(&unknown).expect("default fallback");
assert_eq!(iface, zerodds_security_runtime::NetInterface::Wan);
}
#[cfg(feature = "security")]
#[test]
fn outbound_pool_returns_none_when_no_match_and_no_default() {
let specs = vec![InterfaceBindingSpec {
name: "only-lo".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Loopback,
subnet: lo_range(44),
default: false,
}];
let pool = OutboundSocketPool::bind_all(&specs).unwrap();
assert!(pool.route(&Locator::udp_v4([8, 8, 8, 8], 53)).is_none());
}
#[cfg(feature = "security")]
#[test]
fn outbound_pool_skips_non_v4_locators() {
let specs = vec![InterfaceBindingSpec {
name: "lo".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: zerodds_security_runtime::NetInterface::Loopback,
subnet: lo_range(55),
default: true,
}];
let pool = OutboundSocketPool::bind_all(&specs).unwrap();
let shm = Locator {
kind: zerodds_rtps::wire_types::LocatorKind::Shm,
port: 0,
address: [0u8; 16],
};
assert!(pool.route(&shm).is_none());
}
#[cfg(feature = "security")]
#[test]
fn dod_plaintext_lo_vs_srtps_wan_via_sniffer() {
use std::net::{SocketAddrV4, UdpSocket};
use zerodds_security_crypto::AesGcmCryptoPlugin;
use zerodds_security_permissions::parse_governance_xml;
use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let lo_sniffer =
UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).expect("lo sniffer");
lo_sniffer
.set_read_timeout(Some(Duration::from_millis(250)))
.unwrap();
let wan_sniffer = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0))
.expect("wan sniffer");
wan_sniffer
.set_read_timeout(Some(Duration::from_millis(250)))
.unwrap();
let lo_port = lo_sniffer.local_addr().unwrap().port();
let wan_port = wan_sniffer.local_addr().unwrap().port();
let lo_target = Locator::udp_v4([127, 0, 0, 1], u32::from(lo_port));
let wan_target = Locator::udp_v4([127, 0, 0, 1], u32::from(wan_port));
let bindings = vec![InterfaceBindingSpec {
name: "lo-for-legacy".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: SecIf::Loopback,
subnet: zerodds_security_runtime::IpRange {
base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 1)),
prefix_len: 32,
},
default: true,
}];
let gate = SharedSecurityGate::new(
0,
parse_governance_xml(GOV).unwrap(),
Box::new(AesGcmCryptoPlugin::new()),
);
let cfg = RuntimeConfig {
security: Some(std::sync::Arc::new(gate)),
interface_bindings: bindings,
..RuntimeConfig::default()
};
let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF0; 12]), cfg).expect("rt");
let wid = rt
.register_user_writer(UserWriterConfig {
topic_name: "HeteroRouting".into(),
type_name: "zerodds::RawBytes".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: None,
})
.unwrap();
let legacy_peer: [u8; 12] = [0x01; 12];
let secure_peer: [u8; 12] = [0x02; 12];
{
let arc = rt.writer_slot(wid).unwrap();
let mut slot = arc.lock().unwrap();
slot.reader_protection
.insert(legacy_peer, ProtectionLevel::None);
slot.reader_protection
.insert(secure_peer, ProtectionLevel::Encrypt);
slot.locator_to_peer.insert(lo_target, legacy_peer);
slot.locator_to_peer.insert(wan_target, secure_peer);
}
let mut msg = Vec::new();
msg.extend_from_slice(b"RTPS\x02\x05\x01\x02");
msg.extend_from_slice(&[0xF0; 12]);
msg.extend_from_slice(b"DOD-ROUTING-PAYLOAD");
let plain_wire = secure_outbound_for_target(&rt, wid, &msg, &lo_target).unwrap();
let secure_wire = secure_outbound_for_target(&rt, wid, &msg, &wan_target).unwrap();
assert_eq!(plain_wire, msg, "lo-target: plaintext");
assert_ne!(secure_wire, msg, "wan-target: SRTPS-gewrappt");
send_on_best_interface(&rt, &lo_target, &plain_wire);
send_on_best_interface(&rt, &wan_target, &secure_wire);
let mut buf = [0u8; 4096];
let (n1, _) = lo_sniffer.recv_from(&mut buf).expect("lo snif got");
assert_eq!(
&buf[..n1],
&msg[..],
"Loopback-Sniffer muss plaintext sehen"
);
let (n2, _) = wan_sniffer.recv_from(&mut buf).expect("wan snif got");
assert_ne!(
&buf[..n2],
&msg[..],
"WAN-Sniffer muss SRTPS-gewrappt sehen"
);
assert_eq!(
buf[20], 0x33,
"WAN-Output muss mit SRTPS_PREFIX-Submessage beginnen"
);
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn inbound_loopback_accepts_plain_on_protected_domain() {
use zerodds_security_runtime::NetInterface as SecIf;
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let logger = std::sync::Arc::new(CapturingLogger::default());
let rt = build_runtime_with(GOV, std::sync::Arc::clone(&logger));
let mut plain = Vec::new();
plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
plain.extend_from_slice(&[0x99; 12]);
plain.extend_from_slice(b"loopback-plain-is-ok");
let out = secure_inbound_bytes(&rt, &plain, &SecIf::Loopback)
.expect("Loopback plain muss akzeptiert werden");
assert_eq!(out, plain);
assert!(logger.events().is_empty());
let out_wan = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
assert!(out_wan.is_none());
let evs = logger.events();
assert_eq!(evs.len(), 1);
assert_eq!(evs[0].0, zerodds_security_runtime::LogLevel::Error);
assert!(
evs[0].2.contains("iface=Wan"),
"Log-Message muss iface tragen"
);
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn dod_inbound_per_interface_receive_via_pool_socket() {
use std::net::{SocketAddrV4, UdpSocket};
use zerodds_security_crypto::AesGcmCryptoPlugin;
use zerodds_security_permissions::parse_governance_xml;
use zerodds_security_runtime::{NetInterface as SecIf, SharedSecurityGate};
const GOV: &str = r#"
<domain_access_rules>
<domain_rule>
<domains><id>0</id></domains>
<rtps_protection_kind>ENCRYPT</rtps_protection_kind>
<topic_access_rules><topic_rule><topic_expression>*</topic_expression></topic_rule></topic_access_rules>
</domain_rule>
</domain_access_rules>
"#;
let logger = std::sync::Arc::new(CapturingLogger::default());
let gate = SharedSecurityGate::new(
0,
parse_governance_xml(GOV).unwrap(),
Box::new(AesGcmCryptoPlugin::new()),
);
let logger_dyn: std::sync::Arc<dyn zerodds_security_runtime::LoggingPlugin> =
std::sync::Arc::clone(&logger) as _;
let bindings = vec![InterfaceBindingSpec {
name: "lo".into(),
bind_addr: Ipv4Addr::new(127, 0, 0, 1),
bind_port: 0,
kind: SecIf::Loopback,
subnet: zerodds_security_runtime::IpRange {
base: core::net::IpAddr::V4(core::net::Ipv4Addr::new(127, 0, 0, 0)),
prefix_len: 8,
},
default: true,
}];
let cfg = RuntimeConfig {
security: Some(std::sync::Arc::new(gate)),
security_logger: Some(logger_dyn),
interface_bindings: bindings,
..RuntimeConfig::default()
};
let rt = DcpsRuntime::start(0, GuidPrefix::from_bytes([0xF1; 12]), cfg).expect("rt");
let pool_port = rt.outbound_pool.as_ref().unwrap().bindings[0]
.socket
.local_locator()
.port as u16;
assert!(pool_port > 0);
let sender = UdpSocket::bind(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0)).unwrap();
let mut plain = Vec::new();
plain.extend_from_slice(b"RTPS\x02\x05\x01\x02");
plain.extend_from_slice(&[0xAB; 12]);
plain.extend_from_slice(b"loopback-dispatch");
sender
.send_to(
&plain,
SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pool_port),
)
.unwrap();
std::thread::sleep(Duration::from_millis(300));
let pool_events = logger.events();
let _ = secure_inbound_bytes(&rt, &plain, &SecIf::Wan);
let after = logger.events();
assert!(
after.len() > pool_events.len(),
"Wan-Pfad muss ein neues Log-Event erzeugen"
);
let new_ev = &after[after.len() - 1];
assert_eq!(new_ev.0, zerodds_security_runtime::LogLevel::Error);
assert!(
new_ev.2.contains("iface=Wan"),
"Log-Message traegt iface-Marker: got={:?}",
new_ev.2
);
for (lvl, cat, msg) in &pool_events {
assert_ne!(
*lvl,
zerodds_security_runtime::LogLevel::Error,
"Loopback-Pfad darf kein Error-Event erzeugen: cat={cat} msg={msg}"
);
}
rt.shutdown();
}
#[cfg(feature = "security")]
#[test]
fn per_target_without_security_gate_is_passthrough() {
let rt = DcpsRuntime::start(
0,
GuidPrefix::from_bytes([0xE5; 12]),
RuntimeConfig::default(),
)
.expect("rt");
let wid = rt
.register_user_writer(UserWriterConfig {
topic_name: "T".into(),
type_name: "zerodds::RawBytes".into(),
reliable: true,
durability: zerodds_qos::DurabilityKind::Volatile,
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_identifier: zerodds_types::TypeIdentifier::None,
data_representation_offer: None,
})
.unwrap();
let tgt = Locator::udp_v4([127, 0, 0, 1], 40000);
let msg = b"raw-plaintext".to_vec();
let out = secure_outbound_for_target(&rt, wid, &msg, &tgt).unwrap();
assert_eq!(out, msg, "ohne Gate muss passthrough sein");
rt.shutdown();
}
fn make_remote_spdp_beacon(remote_prefix: GuidPrefix) -> Vec<u8> {
use zerodds_discovery::spdp::SpdpBeacon;
use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
let data = ParticipantBuiltinTopicData {
guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
protocol_version: ProtocolVersion::V2_5,
vendor_id: VendorId::ZERODDS,
default_unicast_locator: None,
default_multicast_locator: None,
metatraffic_unicast_locator: None,
metatraffic_multicast_locator: None,
domain_id: Some(0),
builtin_endpoint_set: 0,
lease_duration: QosDuration::from_secs(100),
user_data: alloc::vec::Vec::new(),
properties: Default::default(),
identity_token: None,
permissions_token: None,
identity_status_token: None,
sig_algo_info: None,
kx_algo_info: None,
sym_cipher_algo_info: None,
};
let mut beacon = SpdpBeacon::new(data);
beacon.serialize().expect("serialize")
}
#[test]
fn handle_spdp_datagram_pushes_into_builtin_participant_reader() {
let rt = DcpsRuntime::start(
41,
GuidPrefix::from_bytes([0x21; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let remote = GuidPrefix::from_bytes([0x99; 12]);
let dg = make_remote_spdp_beacon(remote);
handle_spdp_datagram(&rt, &dg);
let reader = bs
.lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
"DCPSParticipant",
)
.unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1, "Genau 1 Sample fuer 1 SPDP-Beacon");
assert_eq!(samples[0].key.prefix, remote);
rt.shutdown();
}
#[test]
fn handle_spdp_datagram_skips_self_beacon() {
let prefix = GuidPrefix::from_bytes([0x22; 12]);
let rt = DcpsRuntime::start(42, prefix, RuntimeConfig::default()).expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let dg = make_remote_spdp_beacon(prefix);
handle_spdp_datagram(&rt, &dg);
let reader = bs
.lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
"DCPSParticipant",
)
.unwrap();
let samples = reader.take().unwrap();
assert!(
samples.is_empty(),
"Eigenes Beacon darf nicht geloggt werden"
);
rt.shutdown();
}
#[test]
fn sedp_event_push_populates_publication_and_topic_readers() {
use crate::builtin_topics as bt;
use zerodds_discovery::sedp::SedpEvents;
use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
let rt = DcpsRuntime::start(
43,
GuidPrefix::from_bytes([0x23; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let mut events = SedpEvents::default();
events.new_publications.push(
zerodds_rtps::publication_data::PublicationBuiltinTopicData {
key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
topic_name: "WireT".into(),
type_name: "WireType".into(),
durability: zerodds_qos::DurabilityKind::Volatile,
reliability: ReliabilityQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
},
);
push_sedp_events_to_builtin_readers(&rt, &events);
let pub_reader = bs
.lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
.unwrap();
let pub_samples = pub_reader.take().unwrap();
assert_eq!(pub_samples.len(), 1);
assert_eq!(pub_samples[0].topic_name, "WireT");
let topic_reader = bs
.lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
.unwrap();
let topic_samples = topic_reader.take().unwrap();
assert_eq!(topic_samples.len(), 1);
assert_eq!(topic_samples[0].name, "WireT");
rt.shutdown();
}
#[test]
fn sedp_event_push_populates_subscription_reader() {
use crate::builtin_topics as bt;
use zerodds_discovery::sedp::SedpEvents;
use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
let rt = DcpsRuntime::start(
44,
GuidPrefix::from_bytes([0x24; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let mut events = SedpEvents::default();
events.new_subscriptions.push(
zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
participant_key: Guid::new(GuidPrefix::from_bytes([2; 12]), EntityId::PARTICIPANT),
topic_name: "SubT".into(),
type_name: "SubType".into(),
durability: zerodds_qos::DurabilityKind::Volatile,
reliability: ReliabilityQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
liveliness: LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
content_filter: None,
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
},
);
push_sedp_events_to_builtin_readers(&rt, &events);
let sub_reader = bs
.lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
.unwrap();
let sub_samples = sub_reader.take().unwrap();
assert_eq!(sub_samples.len(), 1);
assert_eq!(sub_samples[0].topic_name, "SubT");
let topic_reader = bs
.lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
.unwrap();
let topic_samples = topic_reader.take().unwrap();
assert_eq!(topic_samples.len(), 1);
assert_eq!(topic_samples[0].name, "SubT");
rt.shutdown();
}
#[test]
fn push_sedp_events_to_builtin_readers_is_noop_without_sinks() {
use zerodds_discovery::sedp::SedpEvents;
let rt = DcpsRuntime::start(
45,
GuidPrefix::from_bytes([0x25; 12]),
RuntimeConfig::default(),
)
.expect("start");
let events = SedpEvents::default();
push_sedp_events_to_builtin_readers(&rt, &events);
rt.shutdown();
}
#[test]
fn handle_spdp_datagram_drops_ignored_participant_beacon() {
let rt = DcpsRuntime::start(
46,
GuidPrefix::from_bytes([0x26; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let filter = crate::participant::IgnoreFilter::default();
rt.attach_ignore_filter(filter.clone());
let remote = GuidPrefix::from_bytes([0xAA; 12]);
let key = Guid::new(remote, EntityId::PARTICIPANT);
let h = crate::instance_handle::InstanceHandle::from_guid(key);
if let Ok(mut s) = filter.inner.participants.lock() {
s.insert(h);
}
let dg = make_remote_spdp_beacon(remote);
handle_spdp_datagram(&rt, &dg);
let reader = bs
.lookup_datareader::<crate::builtin_topics::ParticipantBuiltinTopicData>(
"DCPSParticipant",
)
.unwrap();
assert!(
reader.take().unwrap().is_empty(),
"ignorierter Participant darf nicht in DCPSParticipant landen"
);
rt.shutdown();
}
#[test]
fn sedp_event_push_filters_ignored_publication() {
use crate::builtin_topics as bt;
use zerodds_discovery::sedp::SedpEvents;
use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
let rt = DcpsRuntime::start(
47,
GuidPrefix::from_bytes([0x27; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let filter = crate::participant::IgnoreFilter::default();
rt.attach_ignore_filter(filter.clone());
let pub_key = Guid::new(GuidPrefix::from_bytes([0x33; 12]), EntityId::PARTICIPANT);
let h_pub = crate::instance_handle::InstanceHandle::from_guid(pub_key);
if let Ok(mut s) = filter.inner.publications.lock() {
s.insert(h_pub);
}
let mut events = SedpEvents::default();
events.new_publications.push(
zerodds_rtps::publication_data::PublicationBuiltinTopicData {
key: pub_key,
participant_key: Guid::new(
GuidPrefix::from_bytes([0x33; 12]),
EntityId::PARTICIPANT,
),
topic_name: "Filtered".into(),
type_name: "T".into(),
durability: zerodds_qos::DurabilityKind::Volatile,
reliability: ReliabilityQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
},
);
push_sedp_events_to_builtin_readers(&rt, &events);
let pub_reader = bs
.lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
.unwrap();
assert!(
pub_reader.take().unwrap().is_empty(),
"ignorierte Publication darf nicht in DCPSPublication landen"
);
let topic_reader = bs
.lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
.unwrap();
assert!(topic_reader.take().unwrap().is_empty());
rt.shutdown();
}
#[test]
fn sedp_event_push_filters_ignored_subscription() {
use crate::builtin_topics as bt;
use zerodds_discovery::sedp::SedpEvents;
use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
let rt = DcpsRuntime::start(
48,
GuidPrefix::from_bytes([0x28; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let filter = crate::participant::IgnoreFilter::default();
rt.attach_ignore_filter(filter.clone());
let sub_key = Guid::new(GuidPrefix::from_bytes([0x44; 12]), EntityId::PARTICIPANT);
let h_sub = crate::instance_handle::InstanceHandle::from_guid(sub_key);
if let Ok(mut s) = filter.inner.subscriptions.lock() {
s.insert(h_sub);
}
let mut events = SedpEvents::default();
events.new_subscriptions.push(
zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData {
key: sub_key,
participant_key: Guid::new(
GuidPrefix::from_bytes([0x44; 12]),
EntityId::PARTICIPANT,
),
topic_name: "FilteredSub".into(),
type_name: "T".into(),
durability: zerodds_qos::DurabilityKind::Volatile,
reliability: ReliabilityQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
liveliness: LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
content_filter: None,
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
},
);
push_sedp_events_to_builtin_readers(&rt, &events);
let sub_reader = bs
.lookup_datareader::<bt::SubscriptionBuiltinTopicData>("DCPSSubscription")
.unwrap();
assert!(sub_reader.take().unwrap().is_empty());
rt.shutdown();
}
#[test]
fn sedp_event_push_filters_ignored_topic_only() {
use crate::builtin_topics as bt;
use zerodds_discovery::sedp::SedpEvents;
use zerodds_qos::{LivelinessQosPolicy, ReliabilityQosPolicy};
let rt = DcpsRuntime::start(
49,
GuidPrefix::from_bytes([0x29; 12]),
RuntimeConfig::default(),
)
.expect("start");
let bs = crate::builtin_subscriber::BuiltinSubscriber::new();
rt.attach_builtin_sinks(bs.sinks());
let filter = crate::participant::IgnoreFilter::default();
rt.attach_ignore_filter(filter.clone());
let topic_key =
crate::builtin_topics::TopicBuiltinTopicData::synthesize_key("OnlyTopic", "T");
let h_topic = crate::instance_handle::InstanceHandle::from_guid(topic_key);
if let Ok(mut s) = filter.inner.topics.lock() {
s.insert(h_topic);
}
let mut events = SedpEvents::default();
events.new_publications.push(
zerodds_rtps::publication_data::PublicationBuiltinTopicData {
key: Guid::new(GuidPrefix::from_bytes([0x55; 12]), EntityId::PARTICIPANT),
participant_key: Guid::new(
GuidPrefix::from_bytes([0x55; 12]),
EntityId::PARTICIPANT,
),
topic_name: "OnlyTopic".into(),
type_name: "T".into(),
durability: zerodds_qos::DurabilityKind::Volatile,
reliability: ReliabilityQosPolicy::default(),
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: Vec::new(),
user_data: Vec::new(),
topic_data: Vec::new(),
group_data: Vec::new(),
type_information: None,
data_representation: Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
},
);
push_sedp_events_to_builtin_readers(&rt, &events);
let pub_reader = bs
.lookup_datareader::<bt::PublicationBuiltinTopicData>("DCPSPublication")
.unwrap();
assert_eq!(pub_reader.take().unwrap().len(), 1);
let topic_reader = bs
.lookup_datareader::<bt::TopicBuiltinTopicData>("DCPSTopic")
.unwrap();
assert!(
topic_reader.take().unwrap().is_empty(),
"ignoriertes Topic darf das synth. DCPSTopic-Sample blockieren"
);
rt.shutdown();
}
fn make_remote_spdp_beacon_with_flags(remote_prefix: GuidPrefix, endpoint_set: u32) -> Vec<u8> {
use zerodds_discovery::spdp::SpdpBeacon;
use zerodds_rtps::participant_data::ParticipantBuiltinTopicData;
use zerodds_rtps::wire_types::{ProtocolVersion, VendorId};
let data = ParticipantBuiltinTopicData {
guid: Guid::new(remote_prefix, EntityId::PARTICIPANT),
protocol_version: ProtocolVersion::V2_5,
vendor_id: VendorId::ZERODDS,
default_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7500)),
default_multicast_locator: None,
metatraffic_unicast_locator: Some(Locator::udp_v4([127, 0, 0, 99], 7501)),
metatraffic_multicast_locator: None,
domain_id: Some(0),
builtin_endpoint_set: endpoint_set,
lease_duration: QosDuration::from_secs(100),
user_data: alloc::vec::Vec::new(),
properties: Default::default(),
identity_token: None,
permissions_token: None,
identity_status_token: None,
sig_algo_info: None,
kx_algo_info: None,
sym_cipher_algo_info: None,
};
let mut beacon = SpdpBeacon::new(data);
beacon.serialize().expect("serialize")
}
#[test]
fn c34c_security_builtin_wiring_end_to_end() {
use zerodds_discovery::security::SecurityBuiltinStack;
use zerodds_security::generic_message::{
MessageIdentity, ParticipantGenericMessage, class_id,
};
use zerodds_security::token::DataHolder;
let local_prefix = GuidPrefix::from_bytes([0x75; 12]);
let rt = DcpsRuntime::start(75, local_prefix, RuntimeConfig::default()).expect("start");
assert!(rt.security_builtin_snapshot().is_none());
let h1 = rt.enable_security_builtins(VendorId::ZERODDS);
let h2 = rt.enable_security_builtins(VendorId::ZERODDS);
assert!(Arc::ptr_eq(&h1, &h2));
assert!(rt.security_builtin_snapshot().is_some());
let remote_a = GuidPrefix::from_bytes([0x99; 12]);
let flags_all = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
| endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER
| endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
| endpoint_flag::PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER;
handle_spdp_datagram(
&rt,
&make_remote_spdp_beacon_with_flags(remote_a, flags_all),
);
{
let s = h1.lock().unwrap();
assert_eq!(s.stateless_writer.reader_proxy_count(), 1);
assert_eq!(s.stateless_reader.writer_proxy_count(), 1);
assert_eq!(s.volatile_writer.reader_proxy_count(), 1);
assert_eq!(s.volatile_reader.writer_proxy_count(), 1);
}
let remote_b = GuidPrefix::from_bytes([0x88; 12]);
handle_spdp_datagram(
&rt,
&make_remote_spdp_beacon_with_flags(remote_b, endpoint_flag::ALL_STANDARD),
);
{
let s = h1.lock().unwrap();
assert_eq!(
s.stateless_writer.reader_proxy_count(),
1,
"Peer ohne Security-Bits darf bestehende Proxies nicht beruehren"
);
}
let mut remote_stack = SecurityBuiltinStack::new(remote_a, VendorId::ZERODDS);
let local_peer = make_remote_spdp_beacon_with_flags(local_prefix, flags_all);
let parsed_local = zerodds_discovery::spdp::SpdpReader::new()
.parse_datagram(&local_peer)
.unwrap();
remote_stack.handle_remote_endpoints(&parsed_local);
let msg = ParticipantGenericMessage {
message_identity: MessageIdentity {
source_guid: [0xCD; 16],
sequence_number: 1,
},
related_message_identity: MessageIdentity::default(),
destination_participant_key: [0xEF; 16],
destination_endpoint_key: [0; 16],
source_endpoint_key: [0xFE; 16],
message_class_id: class_id::AUTH_REQUEST.into(),
message_data: alloc::vec![DataHolder::new("DDS:Auth:PKI-DH:1.2+AuthReq")],
};
let dgs = remote_stack.stateless_writer.write(&msg).unwrap();
assert_eq!(dgs.len(), 1);
dispatch_security_builtin_datagram(&rt, &dgs[0].bytes, Duration::from_secs(1));
dispatch_security_builtin_datagram(&rt, &[0u8; 32], Duration::from_secs(1));
rt.shutdown();
}
#[test]
fn c34c_enable_security_builtins_replays_known_peers() {
let rt = DcpsRuntime::start(
76,
GuidPrefix::from_bytes([0x76; 12]),
RuntimeConfig::default(),
)
.expect("start");
dispatch_security_builtin_datagram(&rt, &[0u8; 16], Duration::from_secs(1));
let remote = GuidPrefix::from_bytes([0x77; 12]);
let flags = endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_WRITER
| endpoint_flag::PARTICIPANT_STATELESS_MESSAGE_READER;
let dg = make_remote_spdp_beacon_with_flags(remote, flags);
handle_spdp_datagram(&rt, &dg);
let stack = rt.enable_security_builtins(VendorId::ZERODDS);
{
let s = stack.lock().unwrap();
assert_eq!(
s.stateless_writer.reader_proxy_count(),
1,
"spaete Plugin-Activation muss bekannte Peers nachholen"
);
}
rt.shutdown();
}
}