extern crate alloc;
use alloc::string::String;
use alloc::vec::Vec;
use crate::endpoint_security_info::EndpointSecurityInfo;
use crate::error::WireError;
use crate::parameter_list::{Parameter, ParameterList, pid};
use crate::participant_data::{Duration, ENCAPSULATION_PL_CDR_LE};
use crate::wire_types::Guid;
pub use zerodds_qos::DurabilityKind;
pub use zerodds_qos::ReliabilityKind;
pub use zerodds_qos::ReliabilityQosPolicy as ReliabilityQos;
pub mod data_representation {
pub const XCDR: i16 = 0;
pub const XML: i16 = 1;
pub const XCDR2: i16 = 2;
pub const DEFAULT_OFFER: [i16; 2] = [XCDR, XCDR2];
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DataRepMatchMode {
Strict,
#[default]
Tolerant,
}
#[must_use]
pub fn negotiate(
writer_offered: &[i16],
reader_accepted: &[i16],
mode: DataRepMatchMode,
) -> Option<i16> {
let w_default = [XCDR];
let r_default = [XCDR];
let w: &[i16] = if writer_offered.is_empty() {
&w_default
} else {
writer_offered
};
let r: &[i16] = if reader_accepted.is_empty() {
&r_default
} else {
reader_accepted
};
match mode {
DataRepMatchMode::Strict => {
let first = w.first().copied()?;
if r.contains(&first) {
Some(first)
} else {
None
}
}
DataRepMatchMode::Tolerant => {
w.iter().copied().find(|id| r.contains(id))
}
}
}
#[must_use]
pub fn encap_for_final_le(id: i16) -> [u8; 4] {
match id {
XCDR2 => [0x00, 0x07, 0x00, 0x00], _ => [0x00, 0x01, 0x00, 0x00], }
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublicationBuiltinTopicData {
pub key: Guid,
pub participant_key: Guid,
pub topic_name: String,
pub type_name: String,
pub durability: DurabilityKind,
pub reliability: ReliabilityQos,
pub ownership: zerodds_qos::OwnershipKind,
pub ownership_strength: i32,
pub liveliness: zerodds_qos::LivelinessQosPolicy,
pub deadline: zerodds_qos::DeadlineQosPolicy,
pub lifespan: zerodds_qos::LifespanQosPolicy,
pub partition: Vec<String>,
pub user_data: Vec<u8>,
pub topic_data: Vec<u8>,
pub group_data: Vec<u8>,
pub type_information: Option<Vec<u8>>,
pub data_representation: Vec<i16>,
pub security_info: Option<EndpointSecurityInfo>,
pub service_instance_name: Option<String>,
pub related_entity_guid: Option<Guid>,
pub topic_aliases: Option<Vec<String>>,
pub type_identifier: zerodds_types::TypeIdentifier,
}
impl PublicationBuiltinTopicData {
pub fn to_pl_cdr_le(&self) -> Result<Vec<u8>, WireError> {
let mut params = ParameterList::new();
params.push(Parameter::new(
pid::PARTICIPANT_GUID,
self.participant_key.to_bytes().to_vec(),
));
params.push(Parameter::new(
pid::ENDPOINT_GUID,
self.key.to_bytes().to_vec(),
));
params.push(Parameter::new(
pid::TOPIC_NAME,
encode_cdr_string_le(&self.topic_name)?,
));
params.push(Parameter::new(
pid::TYPE_NAME,
encode_cdr_string_le(&self.type_name)?,
));
params.push(Parameter::new(
pid::DURABILITY,
(self.durability as u32).to_le_bytes().to_vec(),
));
let mut rel = Vec::with_capacity(12);
rel.extend_from_slice(&(self.reliability.kind as u32).to_le_bytes());
rel.extend_from_slice(&self.reliability.max_blocking_time.to_bytes_le());
params.push(Parameter::new(pid::RELIABILITY, rel));
params.push(Parameter::new(
pid::OWNERSHIP,
encode_u32_le(self.ownership as u32).to_vec(),
));
params.push(Parameter::new(
pid::OWNERSHIP_STRENGTH,
encode_u32_le(self.ownership_strength as u32).to_vec(),
));
params.push(Parameter::new(
pid::LIVELINESS,
encode_liveliness_le(self.liveliness),
));
params.push(Parameter::new(
pid::DEADLINE,
encode_duration_le(self.deadline.period).to_vec(),
));
params.push(Parameter::new(
pid::LIFESPAN,
encode_duration_le(self.lifespan.duration).to_vec(),
));
if !self.partition.is_empty() {
params.push(Parameter::new(
pid::PARTITION,
encode_partition_le(&self.partition)?,
));
}
if !self.user_data.is_empty() {
params.push(Parameter::new(
pid::USER_DATA,
encode_octet_seq_le(&self.user_data)?,
));
}
if !self.topic_data.is_empty() {
params.push(Parameter::new(
pid::TOPIC_DATA,
encode_octet_seq_le(&self.topic_data)?,
));
}
if !self.group_data.is_empty() {
params.push(Parameter::new(
pid::GROUP_DATA,
encode_octet_seq_le(&self.group_data)?,
));
}
if let Some(ti) = &self.type_information {
params.push(Parameter::new(pid::TYPE_INFORMATION, ti.clone()));
}
if let Some(info) = self.security_info {
params.push(Parameter::new(
pid::ENDPOINT_SECURITY_INFO,
info.to_bytes(true).to_vec(),
));
}
if let Some(name) = &self.service_instance_name {
params.push(Parameter::new(
pid::SERVICE_INSTANCE_NAME,
encode_cdr_string_le(name)?,
));
}
if let Some(guid) = self.related_entity_guid {
params.push(Parameter::new(
pid::RELATED_ENTITY_GUID,
guid.to_bytes().to_vec(),
));
}
if let Some(aliases) = &self.topic_aliases {
params.push(Parameter::new(
pid::TOPIC_ALIASES,
encode_partition_le(aliases)?,
));
}
if self.type_identifier != zerodds_types::TypeIdentifier::None {
let mut w = zerodds_cdr::BufferWriter::new(zerodds_cdr::Endianness::Little);
self.type_identifier
.encode_into(&mut w)
.map_err(|_| WireError::ValueOutOfRange {
message: "type_identifier encoding failed",
})?;
params.push(Parameter::new(pid::ZERODDS_TYPE_ID, w.into_bytes()));
}
if !self.data_representation.is_empty() {
let mut dr = Vec::with_capacity(4 + 2 * self.data_representation.len());
let len = u32::try_from(self.data_representation.len()).map_err(|_| {
WireError::ValueOutOfRange {
message: "data_representation length exceeds u32::MAX",
}
})?;
dr.extend_from_slice(&len.to_le_bytes());
for rep in &self.data_representation {
dr.extend_from_slice(&rep.to_le_bytes());
}
params.push(Parameter::new(pid::DATA_REPRESENTATION, dr));
}
let mut out = Vec::with_capacity(params.parameters.len() * 24 + 16);
out.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
out.extend_from_slice(&[0, 0]); out.extend_from_slice(¶ms.to_bytes(true));
Ok(out)
}
pub fn from_pl_cdr_le(bytes: &[u8]) -> Result<Self, WireError> {
if bytes.len() < 4 {
return Err(WireError::UnexpectedEof {
needed: 4,
offset: 0,
});
}
let little_endian = match &bytes[..2] {
b if b == ENCAPSULATION_PL_CDR_LE => true,
[0x00, 0x02] => false,
other => {
return Err(WireError::UnsupportedEncapsulation {
kind: [other[0], other[1]],
});
}
};
let pl = ParameterList::from_bytes(&bytes[4..], little_endian)?;
let key = pl
.find(pid::ENDPOINT_GUID)
.and_then(guid_from_param)
.ok_or(WireError::ValueOutOfRange {
message: "ENDPOINT_GUID missing or wrong length",
})?;
let participant_key = pl
.find(pid::PARTICIPANT_GUID)
.and_then(guid_from_param)
.unwrap_or_else(|| {
Guid::new(key.prefix, crate::wire_types::EntityId::PARTICIPANT)
});
let topic_name = pl
.find(pid::TOPIC_NAME)
.map(|p| decode_cdr_string(&p.value, little_endian))
.transpose()?
.ok_or(WireError::ValueOutOfRange {
message: "TOPIC_NAME missing",
})?;
let type_name = pl
.find(pid::TYPE_NAME)
.map(|p| decode_cdr_string(&p.value, little_endian))
.transpose()?
.ok_or(WireError::ValueOutOfRange {
message: "TYPE_NAME missing",
})?;
let durability = pl
.find(pid::DURABILITY)
.and_then(|p| {
if p.value.len() >= 4 {
let mut b = [0u8; 4];
b.copy_from_slice(&p.value[..4]);
Some(DurabilityKind::from_u32(if little_endian {
u32::from_le_bytes(b)
} else {
u32::from_be_bytes(b)
}))
} else {
None
}
})
.unwrap_or_default();
let reliability = pl
.find(pid::RELIABILITY)
.and_then(|p| {
if p.value.len() >= 12 {
let mut k = [0u8; 4];
k.copy_from_slice(&p.value[..4]);
let kind = ReliabilityKind::from_u32(if little_endian {
u32::from_le_bytes(k)
} else {
u32::from_be_bytes(k)
});
let mut d = [0u8; 8];
d.copy_from_slice(&p.value[4..12]);
let max_blocking_time = if little_endian {
Duration::from_bytes_le(d)
} else {
let mut s = [0u8; 4];
s.copy_from_slice(&d[..4]);
let mut f = [0u8; 4];
f.copy_from_slice(&d[4..]);
Duration {
seconds: i32::from_be_bytes(s),
fraction: u32::from_be_bytes(f),
}
};
Some(ReliabilityQos {
kind,
max_blocking_time,
})
} else {
None
}
})
.unwrap_or_default();
let ownership = pl
.find(pid::OWNERSHIP)
.and_then(|p| decode_u32(&p.value, little_endian))
.map(zerodds_qos::OwnershipKind::from_u32)
.unwrap_or_default();
let ownership_strength = pl
.find(pid::OWNERSHIP_STRENGTH)
.and_then(|p| decode_i32(&p.value, little_endian))
.unwrap_or(0);
let liveliness = pl
.find(pid::LIVELINESS)
.and_then(|p| decode_liveliness(&p.value, little_endian))
.unwrap_or_default();
let deadline = pl
.find(pid::DEADLINE)
.and_then(|p| decode_duration(&p.value, little_endian))
.map(|period| zerodds_qos::DeadlineQosPolicy { period })
.unwrap_or_default();
let lifespan = pl
.find(pid::LIFESPAN)
.and_then(|p| decode_duration(&p.value, little_endian))
.map(|duration| zerodds_qos::LifespanQosPolicy { duration })
.unwrap_or_default();
let partition = pl
.find(pid::PARTITION)
.and_then(|p| decode_partition(&p.value, little_endian))
.unwrap_or_default();
let user_data = pl
.find(pid::USER_DATA)
.and_then(|p| decode_octet_seq(&p.value, little_endian))
.unwrap_or_default();
let topic_data = pl
.find(pid::TOPIC_DATA)
.and_then(|p| decode_octet_seq(&p.value, little_endian))
.unwrap_or_default();
let group_data = pl
.find(pid::GROUP_DATA)
.and_then(|p| decode_octet_seq(&p.value, little_endian))
.unwrap_or_default();
let type_information = pl.find(pid::TYPE_INFORMATION).map(|p| p.value.clone());
let security_info = pl
.find(pid::ENDPOINT_SECURITY_INFO)
.and_then(|p| EndpointSecurityInfo::from_bytes(&p.value, little_endian).ok());
let service_instance_name = pl
.find(pid::SERVICE_INSTANCE_NAME)
.map(|p| decode_cdr_string(&p.value, little_endian))
.transpose()
.ok()
.flatten();
let related_entity_guid = pl.find(pid::RELATED_ENTITY_GUID).and_then(guid_from_param);
let topic_aliases = pl
.find(pid::TOPIC_ALIASES)
.and_then(|p| decode_partition(&p.value, little_endian));
let type_identifier = pl
.find(pid::ZERODDS_TYPE_ID)
.and_then(|p| {
let mut r =
zerodds_cdr::BufferReader::new(&p.value, zerodds_cdr::Endianness::Little);
zerodds_types::TypeIdentifier::decode_from(&mut r).ok()
})
.unwrap_or_default();
let data_representation = pl
.find(pid::DATA_REPRESENTATION)
.map(|p| {
let v = &p.value;
if v.len() < 4 {
return Vec::new();
}
let mut n_bytes = [0u8; 4];
n_bytes.copy_from_slice(&v[..4]);
let n = if little_endian {
u32::from_le_bytes(n_bytes)
} else {
u32::from_be_bytes(n_bytes)
} as usize;
let cap = n.min(v.len().saturating_sub(4) / 2);
let mut reps = Vec::with_capacity(cap);
for i in 0..n {
let off = 4 + i * 2;
if off + 2 > v.len() {
break;
}
let mut b = [0u8; 2];
b.copy_from_slice(&v[off..off + 2]);
reps.push(if little_endian {
i16::from_le_bytes(b)
} else {
i16::from_be_bytes(b)
});
}
reps
})
.unwrap_or_default();
Ok(Self {
key,
participant_key,
topic_name,
type_name,
durability,
reliability,
ownership,
ownership_strength,
liveliness,
deadline,
lifespan,
partition,
user_data,
topic_data,
group_data,
type_information,
data_representation,
security_info,
service_instance_name,
related_entity_guid,
topic_aliases,
type_identifier,
})
}
}
pub fn inject_pid_shm_locator(bytes: &mut Vec<u8>, locator_bytes: &[u8]) -> Result<(), WireError> {
use crate::parameter_list::pid;
if bytes.len() < 4 {
return Err(WireError::ValueOutOfRange {
message: "inject_pid_shm_locator: bytes too short",
});
}
let sentinel_pos = bytes.len() - 4;
if bytes[sentinel_pos..] != [0x01, 0x00, 0x00, 0x00] {
return Err(WireError::ValueOutOfRange {
message: "inject_pid_shm_locator: missing PID_SENTINEL trailer",
});
}
let padded_len = (locator_bytes.len() + 3) & !3;
if padded_len > u16::MAX as usize {
return Err(WireError::ValueOutOfRange {
message: "inject_pid_shm_locator: locator > u16::MAX",
});
}
let mut inject = Vec::with_capacity(4 + padded_len + 4);
inject.extend_from_slice(&pid::SHM_LOCATOR.to_le_bytes());
inject.extend_from_slice(&(padded_len as u16).to_le_bytes());
inject.extend_from_slice(locator_bytes);
inject.resize(inject.len() + (padded_len - locator_bytes.len()), 0);
inject.extend_from_slice(&bytes[sentinel_pos..]);
bytes.truncate(sentinel_pos);
bytes.extend_from_slice(&inject);
Ok(())
}
pub(crate) fn guid_from_param(p: &Parameter) -> Option<Guid> {
if p.value.len() == 16 {
let mut g = [0u8; 16];
g.copy_from_slice(&p.value);
Some(Guid::from_bytes(g))
} else {
None
}
}
pub(crate) fn encode_duration_le(d: Duration) -> [u8; 8] {
let mut out = [0u8; 8];
out[..4].copy_from_slice(&d.seconds.to_le_bytes());
out[4..].copy_from_slice(&d.fraction.to_le_bytes());
out
}
pub(crate) fn decode_duration(value: &[u8], little_endian: bool) -> Option<Duration> {
if value.len() < 8 {
return None;
}
let mut s = [0u8; 4];
s.copy_from_slice(&value[..4]);
let mut f = [0u8; 4];
f.copy_from_slice(&value[4..8]);
if little_endian {
Some(Duration {
seconds: i32::from_le_bytes(s),
fraction: u32::from_le_bytes(f),
})
} else {
Some(Duration {
seconds: i32::from_be_bytes(s),
fraction: u32::from_be_bytes(f),
})
}
}
pub(crate) fn encode_u32_le(v: u32) -> [u8; 4] {
v.to_le_bytes()
}
pub(crate) fn decode_u32(value: &[u8], little_endian: bool) -> Option<u32> {
if value.len() < 4 {
return None;
}
let mut b = [0u8; 4];
b.copy_from_slice(&value[..4]);
if little_endian {
Some(u32::from_le_bytes(b))
} else {
Some(u32::from_be_bytes(b))
}
}
pub(crate) fn decode_i32(value: &[u8], little_endian: bool) -> Option<i32> {
decode_u32(value, little_endian).map(|u| u as i32)
}
pub(crate) fn encode_liveliness_le(l: zerodds_qos::LivelinessQosPolicy) -> Vec<u8> {
let mut out = Vec::with_capacity(12);
out.extend_from_slice(&(l.kind as u32).to_le_bytes());
out.extend_from_slice(&encode_duration_le(l.lease_duration));
out
}
pub(crate) fn decode_liveliness(
value: &[u8],
little_endian: bool,
) -> Option<zerodds_qos::LivelinessQosPolicy> {
if value.len() < 12 {
return None;
}
let kind_u = decode_u32(&value[..4], little_endian)?;
let lease = decode_duration(&value[4..12], little_endian)?;
Some(zerodds_qos::LivelinessQosPolicy {
kind: zerodds_qos::LivelinessKind::from_u32(kind_u),
lease_duration: lease,
})
}
pub fn encode_octet_seq_le(data: &[u8]) -> Result<Vec<u8>, WireError> {
let len = u32::try_from(data.len()).map_err(|_| WireError::ValueOutOfRange {
message: "octet sequence length exceeds u32::MAX",
})?;
let mut out = Vec::with_capacity(4 + data.len() + 3);
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(data);
while out.len() % 4 != 0 {
out.push(0);
}
Ok(out)
}
pub fn decode_octet_seq(value: &[u8], little_endian: bool) -> Option<Vec<u8>> {
let n = decode_u32(value, little_endian)? as usize;
if 4 + n > value.len() {
return None;
}
Some(value[4..4 + n].to_vec())
}
pub(crate) fn encode_partition_le(partitions: &[String]) -> Result<Vec<u8>, WireError> {
let mut out = Vec::new();
let len = u32::try_from(partitions.len()).map_err(|_| WireError::ValueOutOfRange {
message: "partition count exceeds u32::MAX",
})?;
out.extend_from_slice(&len.to_le_bytes());
for p in partitions {
out.extend_from_slice(&encode_cdr_string_le(p)?);
}
Ok(out)
}
pub(crate) fn decode_partition(value: &[u8], little_endian: bool) -> Option<Vec<String>> {
let n = decode_u32(value, little_endian)? as usize;
let cap = n.min(value.len().saturating_sub(4) / 5);
let mut out = Vec::with_capacity(cap);
let mut pos = 4;
for _ in 0..n {
if pos + 4 > value.len() {
return None;
}
let mut lb = [0u8; 4];
lb.copy_from_slice(&value[pos..pos + 4]);
let slen = if little_endian {
u32::from_le_bytes(lb)
} else {
u32::from_be_bytes(lb)
} as usize;
let next_raw_end = pos + 4 + slen;
if next_raw_end > value.len() {
return None;
}
let s =
decode_cdr_string(&value[pos..next_raw_end.min(value.len())], little_endian).ok()?;
out.push(s);
let padded_end = (next_raw_end + 3) & !3;
pos = padded_end;
}
Some(out)
}
pub(crate) fn encode_cdr_string_le(s: &str) -> Result<Vec<u8>, WireError> {
let bytes = s.as_bytes();
let len =
u32::try_from(bytes.len().saturating_add(1)).map_err(|_| WireError::ValueOutOfRange {
message: "CDR string length exceeds u32::MAX",
})?;
let mut out = Vec::with_capacity(4 + bytes.len() + 4);
out.extend_from_slice(&len.to_le_bytes());
out.extend_from_slice(bytes);
out.push(0); while out.len() % 4 != 0 {
out.push(0);
}
Ok(out)
}
pub(crate) fn decode_cdr_string(value: &[u8], little_endian: bool) -> Result<String, WireError> {
if value.len() < 4 {
return Err(WireError::UnexpectedEof {
needed: 4,
offset: 0,
});
}
let mut lb = [0u8; 4];
lb.copy_from_slice(&value[..4]);
let len = if little_endian {
u32::from_le_bytes(lb)
} else {
u32::from_be_bytes(lb)
} as usize;
if len == 0 {
return Err(WireError::ValueOutOfRange {
message: "CDR string length 0 (missing null terminator)",
});
}
if value.len() < 4 + len {
return Err(WireError::UnexpectedEof {
needed: 4 + len,
offset: 0,
});
}
let raw = &value[4..4 + len];
if raw[len - 1] != 0 {
return Err(WireError::ValueOutOfRange {
message: "CDR string missing null terminator",
});
}
String::from_utf8(raw[..len - 1].to_vec()).map_err(|_| WireError::ValueOutOfRange {
message: "CDR string is not valid UTF-8",
})
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn durability_try_from_u32_rejects_unknown() {
assert_eq!(
DurabilityKind::try_from_u32(0),
Some(DurabilityKind::Volatile)
);
assert_eq!(
DurabilityKind::try_from_u32(1),
Some(DurabilityKind::TransientLocal)
);
assert_eq!(
DurabilityKind::try_from_u32(3),
Some(DurabilityKind::Persistent)
);
assert_eq!(DurabilityKind::try_from_u32(99), None);
}
#[test]
fn reliability_try_from_u32_rejects_unknown() {
assert_eq!(
ReliabilityKind::try_from_u32(1),
Some(ReliabilityKind::BestEffort)
);
assert_eq!(
ReliabilityKind::try_from_u32(2),
Some(ReliabilityKind::Reliable)
);
assert_eq!(ReliabilityKind::try_from_u32(0), None);
assert_eq!(ReliabilityKind::try_from_u32(42), None);
}
#[test]
fn legacy_from_u32_still_defaults_for_sedp_forward_compat() {
assert_eq!(DurabilityKind::from_u32(99), DurabilityKind::Volatile);
assert_eq!(ReliabilityKind::from_u32(99), ReliabilityKind::BestEffort);
}
use crate::wire_types::{EntityId, GuidPrefix};
use alloc::vec;
fn sample_data() -> PublicationBuiltinTopicData {
PublicationBuiltinTopicData {
key: Guid::new(
GuidPrefix::from_bytes([1; 12]),
EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
),
participant_key: Guid::new(GuidPrefix::from_bytes([1; 12]), EntityId::PARTICIPANT),
topic_name: "ChatterTopic".into(),
type_name: "std_msgs::String".into(),
durability: DurabilityKind::Volatile,
reliability: ReliabilityQos {
kind: ReliabilityKind::Reliable,
max_blocking_time: Duration::from_secs(10),
},
ownership: zerodds_qos::OwnershipKind::Shared,
ownership_strength: 0,
liveliness: zerodds_qos::LivelinessQosPolicy::default(),
deadline: zerodds_qos::DeadlineQosPolicy::default(),
lifespan: zerodds_qos::LifespanQosPolicy::default(),
partition: alloc::vec::Vec::new(),
user_data: alloc::vec::Vec::new(),
topic_data: alloc::vec::Vec::new(),
group_data: alloc::vec::Vec::new(),
type_information: None,
data_representation: alloc::vec::Vec::new(),
security_info: None,
service_instance_name: None,
related_entity_guid: None,
topic_aliases: None,
type_identifier: zerodds_types::TypeIdentifier::None,
}
}
#[test]
fn roundtrip_le() {
let d = sample_data();
let bytes = d.to_pl_cdr_le().unwrap();
assert_eq!(&bytes[..2], &[0x00, 0x03]); let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded, d);
}
#[test]
fn security_info_roundtrip() {
use crate::endpoint_security_info::{EndpointSecurityInfo, attrs, plugin_attrs};
let mut d = sample_data();
d.security_info = Some(EndpointSecurityInfo {
endpoint_security_attributes: attrs::IS_VALID | attrs::IS_SUBMESSAGE_PROTECTED,
plugin_endpoint_security_attributes: plugin_attrs::IS_VALID
| plugin_attrs::IS_SUBMESSAGE_ENCRYPTED,
});
let bytes = d.to_pl_cdr_le().unwrap();
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded.security_info, d.security_info);
}
#[test]
fn legacy_peer_without_security_info_parses_ok() {
let d = sample_data();
assert!(d.security_info.is_none());
let bytes = d.to_pl_cdr_le().unwrap();
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert!(decoded.security_info.is_none());
}
#[test]
fn roundtrip_utf8_topic_name() {
let mut d = sample_data();
d.topic_name = "Zählung".into();
let bytes = d.to_pl_cdr_le().unwrap();
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded.topic_name, "Zählung");
}
#[test]
fn decode_rejects_unknown_encapsulation() {
let mut bytes = vec![0xFF, 0xFF, 0x00, 0x00];
bytes.extend_from_slice(&[0u8; 16]);
let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
assert!(matches!(
res,
Err(WireError::UnsupportedEncapsulation { .. })
));
}
#[test]
fn decode_rejects_missing_topic_name() {
let mut pl = ParameterList::new();
pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
pl.push(Parameter::new(
pid::TYPE_NAME,
encode_cdr_string_le("T").unwrap(),
));
let mut bytes = Vec::new();
bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
bytes.extend_from_slice(&[0, 0]);
bytes.extend_from_slice(&pl.to_bytes(true));
let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
assert!(
matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TOPIC_NAME"))
);
}
#[test]
fn decode_rejects_missing_type_name() {
let mut pl = ParameterList::new();
pl.push(Parameter::new(pid::ENDPOINT_GUID, vec![0u8; 16]));
pl.push(Parameter::new(
pid::TOPIC_NAME,
encode_cdr_string_le("T").unwrap(),
));
let mut bytes = Vec::new();
bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
bytes.extend_from_slice(&[0, 0]);
bytes.extend_from_slice(&pl.to_bytes(true));
let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
assert!(
matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("TYPE_NAME"))
);
}
#[test]
fn decode_rejects_missing_endpoint_guid() {
let mut pl = ParameterList::new();
pl.push(Parameter::new(
pid::TOPIC_NAME,
encode_cdr_string_le("T").unwrap(),
));
pl.push(Parameter::new(
pid::TYPE_NAME,
encode_cdr_string_le("U").unwrap(),
));
let mut bytes = Vec::new();
bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
bytes.extend_from_slice(&[0, 0]);
bytes.extend_from_slice(&pl.to_bytes(true));
let res = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes);
assert!(
matches!(res, Err(WireError::ValueOutOfRange { message }) if message.contains("ENDPOINT_GUID"))
);
}
#[test]
fn unknown_pids_are_skipped() {
let mut bytes = sample_data().to_pl_cdr_le().unwrap();
let sentinel_pos = bytes.len() - 4;
let mut inject = vec![0xFFu8, 0x7F, 4, 0, 0xDE, 0xAD, 0xBE, 0xEF];
inject.extend_from_slice(&bytes[sentinel_pos..]);
bytes.truncate(sentinel_pos);
bytes.extend_from_slice(&inject);
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded, sample_data());
}
#[test]
fn inject_pid_shm_locator_appends_before_sentinel() {
let mut locator = Vec::new();
locator.extend_from_slice(&0xDEAD_BEEFu32.to_le_bytes()); locator.extend_from_slice(&1000u32.to_le_bytes()); locator.extend_from_slice(&64u32.to_le_bytes()); locator.extend_from_slice(&4096u32.to_le_bytes()); let path = "/dev/shm/zd-1";
locator.extend_from_slice(&((path.len() as u32) + 1).to_le_bytes());
locator.extend_from_slice(path.as_bytes());
locator.push(0);
let pad = (4 - locator.len() % 4) % 4;
locator.resize(locator.len() + pad, 0);
let mut bytes = sample_data().to_pl_cdr_le().unwrap();
let len_before = bytes.len();
super::inject_pid_shm_locator(&mut bytes, &locator).unwrap();
assert!(bytes.len() > len_before);
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded, sample_data());
let pid_found = bytes.windows(2).any(|w| w == 0x8001u16.to_le_bytes());
assert!(pid_found, "PID_SHM_LOCATOR should appear in bytes");
}
#[test]
fn inject_pid_shm_locator_rejects_missing_sentinel() {
let mut bytes = vec![0u8; 8];
let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
assert!(res.is_err());
}
#[test]
fn inject_pid_shm_locator_rejects_too_short() {
let mut bytes = vec![0u8, 1u8];
let res = super::inject_pid_shm_locator(&mut bytes, &[0u8; 16]);
assert!(res.is_err());
}
#[test]
fn participant_key_fallback_when_pid_missing() {
let d = sample_data();
let mut pl = ParameterList::new();
pl.push(Parameter::new(
pid::ENDPOINT_GUID,
d.key.to_bytes().to_vec(),
));
pl.push(Parameter::new(
pid::TOPIC_NAME,
encode_cdr_string_le(&d.topic_name).unwrap(),
));
pl.push(Parameter::new(
pid::TYPE_NAME,
encode_cdr_string_le(&d.type_name).unwrap(),
));
let mut bytes = Vec::new();
bytes.extend_from_slice(&ENCAPSULATION_PL_CDR_LE);
bytes.extend_from_slice(&[0, 0]);
bytes.extend_from_slice(&pl.to_bytes(true));
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded.participant_key.prefix, d.key.prefix);
assert_eq!(decoded.participant_key.entity_id, EntityId::PARTICIPANT);
}
#[test]
fn durability_kind_from_u32_unknown_defaults_volatile() {
assert_eq!(DurabilityKind::from_u32(0), DurabilityKind::Volatile);
assert_eq!(DurabilityKind::from_u32(1), DurabilityKind::TransientLocal);
assert_eq!(DurabilityKind::from_u32(999), DurabilityKind::Volatile);
}
#[test]
fn rpc_discovery_pids_roundtrip() {
let mut d = sample_data();
d.service_instance_name = Some("CalcInstance-1".into());
d.related_entity_guid = Some(Guid::new(
crate::wire_types::GuidPrefix::from_bytes([7; 12]),
crate::wire_types::EntityId::user_reader_with_key([0xAA, 0xBB, 0xCC]),
));
d.topic_aliases = Some(alloc::vec!["LegacyCalc_Request".into(), "v2_Req".into()]);
let bytes = d.to_pl_cdr_le().unwrap();
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert_eq!(decoded.service_instance_name, d.service_instance_name);
assert_eq!(decoded.related_entity_guid, d.related_entity_guid);
assert_eq!(decoded.topic_aliases, d.topic_aliases);
}
#[test]
fn rpc_pids_optional_legacy_peer_parses_ok() {
let d = sample_data();
assert!(d.service_instance_name.is_none());
assert!(d.related_entity_guid.is_none());
assert!(d.topic_aliases.is_none());
let bytes = d.to_pl_cdr_le().unwrap();
let decoded = PublicationBuiltinTopicData::from_pl_cdr_le(&bytes).unwrap();
assert!(decoded.service_instance_name.is_none());
assert!(decoded.related_entity_guid.is_none());
assert!(decoded.topic_aliases.is_none());
}
#[test]
fn rpc_pid_constants_in_emitted_bytes() {
let mut d = sample_data();
d.service_instance_name = Some("X".into());
d.related_entity_guid = Some(Guid::new(
crate::wire_types::GuidPrefix::from_bytes([1; 12]),
crate::wire_types::EntityId::PARTICIPANT,
));
d.topic_aliases = Some(alloc::vec!["A".into()]);
let bytes = d.to_pl_cdr_le().unwrap();
let mut found_080 = false;
let mut found_081 = false;
let mut found_082 = false;
for w in bytes.windows(2) {
if w == [0x80, 0x00] {
found_080 = true;
}
if w == [0x81, 0x00] {
found_081 = true;
}
if w == [0x82, 0x00] {
found_082 = true;
}
}
assert!(found_080 && found_081 && found_082);
}
#[test]
fn reliability_kind_from_u32_unknown_defaults_best_effort() {
assert_eq!(ReliabilityKind::from_u32(1), ReliabilityKind::BestEffort);
assert_eq!(ReliabilityKind::from_u32(2), ReliabilityKind::Reliable);
assert_eq!(ReliabilityKind::from_u32(999), ReliabilityKind::BestEffort);
}
}