extern crate alloc;
use alloc::vec::Vec;
use core::time::Duration;
use zerodds_rtps::error::WireError;
use zerodds_rtps::history_cache::HistoryKind;
use zerodds_rtps::message_builder::{DEFAULT_MTU, OutboundDatagram};
use zerodds_rtps::publication_data::PublicationBuiltinTopicData;
use zerodds_rtps::reader_proxy::ReaderProxy;
use zerodds_rtps::reliable_writer::{DEFAULT_FRAGMENT_SIZE, ReliableWriter, ReliableWriterConfig};
use zerodds_rtps::submessages::NackFragSubmessage;
use zerodds_rtps::subscription_data::SubscriptionBuiltinTopicData;
use zerodds_rtps::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
pub const SEDP_DEFAULT_DEPTH: usize = 256;
pub const SEDP_HEARTBEAT_PERIOD: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub struct SedpPublicationsWriter {
inner: ReliableWriter,
}
#[derive(Debug)]
pub struct SedpSubscriptionsWriter {
inner: ReliableWriter,
}
impl SedpPublicationsWriter {
#[must_use]
pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
Self {
inner: make_sedp_writer(
Guid::new(
participant_prefix,
EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER,
),
vendor_id,
),
}
}
#[must_use]
pub fn guid(&self) -> Guid {
self.inner.guid()
}
pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
self.inner.add_reader_proxy(proxy);
}
pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
self.inner.remove_reader_proxy(guid)
}
pub fn announce(
&mut self,
p: &PublicationBuiltinTopicData,
) -> Result<Vec<OutboundDatagram>, WireError> {
let payload = p.to_pl_cdr_le()?;
self.inner.write(&payload)
}
pub fn announce_with_shm_locator(
&mut self,
p: &PublicationBuiltinTopicData,
locator_bytes: &[u8],
) -> Result<Vec<OutboundDatagram>, WireError> {
let mut payload = p.to_pl_cdr_le()?;
zerodds_rtps::publication_data::inject_pid_shm_locator(&mut payload, locator_bytes)?;
self.inner.write(&payload)
}
pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
self.inner.tick(now)
}
pub fn handle_acknack(
&mut self,
src_guid: Guid,
base: SequenceNumber,
requested: impl IntoIterator<Item = SequenceNumber>,
) {
self.inner.handle_acknack(src_guid, base, requested);
}
pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
self.inner.handle_nackfrag(src_guid, nf);
}
#[must_use]
pub fn inner(&self) -> &ReliableWriter {
&self.inner
}
}
impl SedpSubscriptionsWriter {
#[must_use]
pub fn new(participant_prefix: GuidPrefix, vendor_id: VendorId) -> Self {
Self {
inner: make_sedp_writer(
Guid::new(
participant_prefix,
EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER,
),
vendor_id,
),
}
}
#[must_use]
pub fn guid(&self) -> Guid {
self.inner.guid()
}
pub fn add_reader_proxy(&mut self, proxy: ReaderProxy) {
self.inner.add_reader_proxy(proxy);
}
pub fn remove_reader_proxy(&mut self, guid: Guid) -> Option<ReaderProxy> {
self.inner.remove_reader_proxy(guid)
}
pub fn announce(
&mut self,
s: &SubscriptionBuiltinTopicData,
) -> Result<Vec<OutboundDatagram>, WireError> {
let payload = s.to_pl_cdr_le()?;
self.inner.write(&payload)
}
pub fn tick(&mut self, now: Duration) -> Result<Vec<OutboundDatagram>, WireError> {
self.inner.tick(now)
}
pub fn handle_acknack(
&mut self,
src_guid: Guid,
base: SequenceNumber,
requested: impl IntoIterator<Item = SequenceNumber>,
) {
self.inner.handle_acknack(src_guid, base, requested);
}
pub fn handle_nackfrag(&mut self, src_guid: Guid, nf: &NackFragSubmessage) {
self.inner.handle_nackfrag(src_guid, nf);
}
#[must_use]
pub fn inner(&self) -> &ReliableWriter {
&self.inner
}
}
fn make_sedp_writer(guid: Guid, vendor_id: VendorId) -> ReliableWriter {
ReliableWriter::new(ReliableWriterConfig {
guid,
vendor_id,
reader_proxies: Vec::new(),
max_samples: SEDP_DEFAULT_DEPTH,
history_kind: HistoryKind::KeepLast {
depth: SEDP_DEFAULT_DEPTH,
},
heartbeat_period: SEDP_HEARTBEAT_PERIOD,
fragment_size: DEFAULT_FRAGMENT_SIZE,
mtu: DEFAULT_MTU,
})
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use zerodds_rtps::datagram::{ParsedSubmessage, decode_datagram};
use zerodds_rtps::participant_data::Duration as DdsDuration;
use zerodds_rtps::publication_data::{DurabilityKind, ReliabilityKind, ReliabilityQos};
use zerodds_rtps::wire_types::Locator;
fn sample_pub() -> PublicationBuiltinTopicData {
PublicationBuiltinTopicData {
key: Guid::new(
GuidPrefix::from_bytes([1; 12]),
EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
),
participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
topic_name: "ChatterTopic".into(),
type_name: "std_msgs::String".into(),
durability: DurabilityKind::Volatile,
reliability: ReliabilityQos {
kind: ReliabilityKind::Reliable,
max_blocking_time: DdsDuration::from_secs(10),
},
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: alloc::vec::Vec::new(),
user_data: alloc::vec::Vec::new(),
topic_data: alloc::vec::Vec::new(),
group_data: alloc::vec::Vec::new(),
type_information: None,
data_representation: alloc::vec::Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
}
}
#[test]
fn writer_has_expected_guid() {
let w = SedpPublicationsWriter::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
assert_eq!(
w.guid().entity_id,
EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER
);
}
#[test]
fn announce_without_proxies_returns_no_datagrams() {
let mut w = SedpPublicationsWriter::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
let dgs = w.announce(&sample_pub()).unwrap();
assert!(dgs.is_empty(), "no proxies → no fan-out");
}
#[test]
fn announce_with_one_proxy_produces_one_datagram_with_cdr_body() {
let mut w = SedpPublicationsWriter::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
let remote = Guid::new(
GuidPrefix::from_bytes([2; 12]),
EntityId::SEDP_BUILTIN_PUBLICATIONS_READER,
);
w.add_reader_proxy(ReaderProxy::new(
remote,
alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7411)],
alloc::vec![],
true,
));
let dgs = w.announce(&sample_pub()).unwrap();
assert_eq!(dgs.len(), 1);
let parsed = decode_datagram(&dgs[0].bytes).unwrap();
let data = parsed
.submessages
.iter()
.find_map(|s| {
if let ParsedSubmessage::Data(d) = s {
Some(d)
} else {
None
}
})
.expect("DATA submessage");
let decoded =
PublicationBuiltinTopicData::from_pl_cdr_le(&data.serialized_payload).unwrap();
assert_eq!(decoded.topic_name, "ChatterTopic");
assert_eq!(decoded.type_name, "std_msgs::String");
assert_eq!(data.writer_id, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER);
}
#[test]
fn subscriptions_writer_has_expected_guid() {
let w = SedpSubscriptionsWriter::new(GuidPrefix::from_bytes([1; 12]), VendorId::ZERODDS);
assert_eq!(
w.guid().entity_id,
EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER
);
}
}