use alloc::string::{String, ToString};
use alloc::vec::Vec;
use zerodds_amqp_bridge::extended_types::AmqpExtValue;
use crate::keyhash;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum DdsOperation {
#[default]
Write,
Register,
Unregister,
Dispose,
}
impl DdsOperation {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Write => "write",
Self::Register => "register",
Self::Unregister => "unregister",
Self::Dispose => "dispose",
}
}
pub fn parse(s: &str) -> Result<Self, &str> {
match s {
"write" => Ok(Self::Write),
"register" => Ok(Self::Register),
"unregister" => Ok(Self::Unregister),
"dispose" => Ok(Self::Dispose),
other => Err(other),
}
}
}
#[derive(Debug, Clone)]
pub struct SampleHeader {
pub writer_guid: [u8; 16],
pub seqnum: u64,
pub source_timestamp_ms: i64,
pub source_nsec_remainder: u32,
pub keyhash: Option<Vec<u8>>,
pub instance_handle: [u8; 16],
pub lifespan_remaining_ms: Option<i64>,
pub operation: DdsOperation,
pub type_id_hex: Option<String>,
pub domain_id: u32,
pub partitions: Vec<String>,
}
#[must_use]
pub fn message_id(writer_guid: [u8; 16], seqnum: u64) -> Vec<u8> {
let mut out = Vec::with_capacity(24);
out.extend_from_slice(&writer_guid);
out.extend_from_slice(&seqnum.to_be_bytes());
out
}
#[must_use]
pub fn hex_lower(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = core::fmt::Write::write_fmt(&mut out, core::format_args!("{b:02x}"));
}
out
}
#[derive(Debug, Clone)]
pub struct ProducedProperties {
pub message_id: Vec<u8>,
pub creation_time_ms: i64,
pub absolute_expiry_time_ms: Option<i64>,
pub group_id: Option<String>,
}
#[must_use]
pub fn produce_properties(hdr: &SampleHeader) -> ProducedProperties {
ProducedProperties {
message_id: message_id(hdr.writer_guid, hdr.seqnum),
creation_time_ms: hdr.source_timestamp_ms,
absolute_expiry_time_ms: hdr
.lifespan_remaining_ms
.map(|rem| hdr.source_timestamp_ms.saturating_add(rem)),
group_id: hdr.keyhash.as_deref().map(keyhash::group_id),
}
}
pub mod app_keys {
pub const NSEC: &str = "dds:nsec";
pub const PARTITION: &str = "dds:partition";
pub const DOMAIN_ID: &str = "dds:domain-id";
pub const TYPE_ID: &str = "dds:type-id";
pub const SOURCE_GUID: &str = "dds:source-guid";
pub const LIFESPAN_MS: &str = "dds:lifespan-ms";
pub const SAMPLE_STATE: &str = "dds:sample-state";
pub const VIEW_STATE: &str = "dds:view-state";
pub const INSTANCE_STATE: &str = "dds:instance-state";
pub const OPERATION: &str = "dds:operation";
pub const BRIDGE_ID: &str = "dds:bridge-id";
pub const BRIDGE_HOP: &str = "dds:bridge-hop";
pub const INSTANCE_HANDLE: &str = "dds:instance-handle";
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TypeIdCheck {
Match,
Absent,
Mismatch {
received: String,
expected: String,
},
}
#[must_use]
pub fn inspect_dds_type_id(app_props: &AmqpExtValue, expected_hex: &str) -> TypeIdCheck {
let entries = match app_props {
AmqpExtValue::Map(v) => v,
_ => return TypeIdCheck::Absent,
};
let want = AmqpExtValue::Str(app_keys::TYPE_ID.to_string());
for (k, v) in entries {
if *k == want {
let received = match v {
AmqpExtValue::Str(s) => s.clone(),
AmqpExtValue::Symbol(s) => s.clone(),
AmqpExtValue::Binary(b) => hex_lower(b),
_ => return TypeIdCheck::Absent,
};
return if received.eq_ignore_ascii_case(expected_hex) {
TypeIdCheck::Match
} else {
TypeIdCheck::Mismatch {
received,
expected: expected_hex.to_string(),
}
};
}
}
TypeIdCheck::Absent
}
#[must_use]
pub fn produce_application_properties(hdr: &SampleHeader) -> AmqpExtValue {
let mut map: Vec<(AmqpExtValue, AmqpExtValue)> = Vec::new();
if hdr.source_nsec_remainder != 0 {
map.push((
AmqpExtValue::Str(app_keys::NSEC.to_string()),
AmqpExtValue::Uint(hdr.source_nsec_remainder),
));
}
match hdr.partitions.len() {
0 => {} 1 => {
map.push((
AmqpExtValue::Str(app_keys::PARTITION.to_string()),
AmqpExtValue::Str(hdr.partitions[0].clone()),
));
}
_ => {
let list: Vec<AmqpExtValue> = hdr
.partitions
.iter()
.map(|p| AmqpExtValue::Str(p.clone()))
.collect();
map.push((
AmqpExtValue::Str(app_keys::PARTITION.to_string()),
AmqpExtValue::List(list),
));
}
}
map.push((
AmqpExtValue::Str(app_keys::DOMAIN_ID.to_string()),
AmqpExtValue::Uint(hdr.domain_id),
));
if let Some(hex) = &hdr.type_id_hex {
map.push((
AmqpExtValue::Str(app_keys::TYPE_ID.to_string()),
AmqpExtValue::Str(hex.clone()),
));
}
if let Some(rem) = hdr.lifespan_remaining_ms {
map.push((
AmqpExtValue::Str(app_keys::LIFESPAN_MS.to_string()),
AmqpExtValue::Long(rem),
));
}
if hdr.operation != DdsOperation::Write {
map.push((
AmqpExtValue::Str(app_keys::OPERATION.to_string()),
AmqpExtValue::Str(hdr.operation.as_str().to_string()),
));
}
map.push((
AmqpExtValue::Str(app_keys::INSTANCE_HANDLE.to_string()),
AmqpExtValue::Binary(hdr.instance_handle.to_vec()),
));
AmqpExtValue::Map(map)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn header() -> SampleHeader {
SampleHeader {
writer_guid: [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e,
0x0f, 0x10,
],
seqnum: 0x0000_0000_0000_002a, source_timestamp_ms: 1_700_000_000_000,
source_nsec_remainder: 0,
keyhash: None,
instance_handle: [0u8; 16],
lifespan_remaining_ms: None,
operation: DdsOperation::Write,
type_id_hex: None,
domain_id: 0,
partitions: Vec::new(),
}
}
#[test]
fn message_id_is_24_bytes_guid_then_seqnum() {
let mid = message_id([0u8; 16], 1);
assert_eq!(mid.len(), 24);
assert_eq!(&mid[16..24], &[0u8, 0, 0, 0, 0, 0, 0, 1]);
}
#[test]
fn message_id_distinguishes_consecutive_samples_same_instance() {
let g = [0xAAu8; 16];
let m1 = message_id(g, 1);
let m2 = message_id(g, 2);
assert_ne!(m1, m2);
}
#[test]
fn dds_operation_round_trips_via_str() {
for op in [
DdsOperation::Write,
DdsOperation::Register,
DdsOperation::Unregister,
DdsOperation::Dispose,
] {
let s = op.as_str();
let back = DdsOperation::parse(s).unwrap();
assert_eq!(op, back);
}
}
#[test]
fn dds_operation_unknown_yields_err() {
assert!(DdsOperation::parse("bogus").is_err());
}
#[test]
fn produce_properties_keyhash_yields_group_id() {
let mut hdr = header();
hdr.keyhash = Some(b"\x00\x00\x00\x07".to_vec());
let p = produce_properties(&hdr);
assert_eq!(p.message_id.len(), 24);
assert!(p.group_id.is_some());
let g = p.group_id.unwrap();
assert_eq!(g.len(), 64);
}
#[test]
fn produce_properties_unkeyed_omits_group_id() {
let p = produce_properties(&header());
assert!(p.group_id.is_none());
}
#[test]
fn produce_properties_lifespan_sets_expiry() {
let mut hdr = header();
hdr.source_timestamp_ms = 1000;
hdr.lifespan_remaining_ms = Some(5000);
let p = produce_properties(&hdr);
assert_eq!(p.absolute_expiry_time_ms, Some(6000));
}
#[test]
fn application_properties_default_minimal_set() {
let m = produce_application_properties(&header());
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!("expected Map"),
};
let keys: Vec<&str> = entries
.iter()
.map(|(k, _)| match k {
AmqpExtValue::Str(s) => s.as_str(),
_ => "",
})
.collect();
assert!(keys.contains(&"dds:domain-id"));
assert!(keys.contains(&"dds:instance-handle"));
assert!(!keys.contains(&"dds:operation"));
assert!(!keys.contains(&"dds:nsec"));
}
#[test]
fn application_properties_register_sets_operation() {
let mut hdr = header();
hdr.operation = DdsOperation::Register;
let m = produce_application_properties(&hdr);
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!(),
};
let op = entries
.iter()
.find(|(k, _)| matches!(k, AmqpExtValue::Str(s) if s == "dds:operation"))
.map(|(_, v)| v.clone())
.unwrap();
assert_eq!(op, AmqpExtValue::Str("register".to_string()));
}
#[test]
fn application_properties_truncated_type_id_present() {
let mut hdr = header();
hdr.type_id_hex = Some("deadbeefcafebabe1234567890ab".to_string());
let m = produce_application_properties(&hdr);
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!(),
};
let tid = entries
.iter()
.any(|(k, _)| matches!(k, AmqpExtValue::Str(s) if s == "dds:type-id"));
assert!(tid);
}
#[test]
fn application_properties_multi_partition_uses_list() {
let mut hdr = header();
hdr.partitions = alloc::vec!["alpha".into(), "beta".into()];
let m = produce_application_properties(&hdr);
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!(),
};
let part = entries
.iter()
.find(|(k, _)| matches!(k, AmqpExtValue::Str(s) if s == "dds:partition"))
.map(|(_, v)| v.clone())
.unwrap();
match part {
AmqpExtValue::List(items) => assert_eq!(items.len(), 2),
other => panic!("expected list, got {other:?}"),
}
}
#[test]
fn application_properties_single_partition_uses_string() {
let mut hdr = header();
hdr.partitions = alloc::vec!["solo".into()];
let m = produce_application_properties(&hdr);
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!(),
};
let part = entries
.iter()
.find(|(k, _)| matches!(k, AmqpExtValue::Str(s) if s == "dds:partition"))
.map(|(_, v)| v.clone())
.unwrap();
assert_eq!(part, AmqpExtValue::Str("solo".to_string()));
}
#[test]
fn application_properties_nsec_set_when_nonzero() {
let mut hdr = header();
hdr.source_nsec_remainder = 123_456;
let m = produce_application_properties(&hdr);
let entries = match m {
AmqpExtValue::Map(v) => v,
_ => panic!(),
};
let nsec = entries
.iter()
.find(|(k, _)| matches!(k, AmqpExtValue::Str(s) if s == "dds:nsec"))
.map(|(_, v)| v.clone())
.unwrap();
assert_eq!(nsec, AmqpExtValue::Uint(123_456));
}
#[test]
fn hex_lower_24_byte() {
let bytes = [0xAB, 0xCD];
assert_eq!(hex_lower(&bytes), "abcd");
}
fn props_with(entries: Vec<(&str, AmqpExtValue)>) -> AmqpExtValue {
AmqpExtValue::Map(
entries
.into_iter()
.map(|(k, v)| (AmqpExtValue::Str(k.to_string()), v))
.collect(),
)
}
#[test]
fn type_id_inspector_match_returns_match() {
let p = props_with(alloc::vec![(
app_keys::TYPE_ID,
AmqpExtValue::Str("deadbeefcafebabe1234567890ab".to_string()),
)]);
let r = inspect_dds_type_id(&p, "deadbeefcafebabe1234567890ab");
assert_eq!(r, TypeIdCheck::Match);
}
#[test]
fn type_id_inspector_case_insensitive_match() {
let p = props_with(alloc::vec![(
app_keys::TYPE_ID,
AmqpExtValue::Str("DEADBEEFCAFEBABE".to_string()),
)]);
let r = inspect_dds_type_id(&p, "deadbeefcafebabe");
assert_eq!(r, TypeIdCheck::Match);
}
#[test]
fn type_id_inspector_absent_returns_absent() {
let p = props_with(alloc::vec![(app_keys::DOMAIN_ID, AmqpExtValue::Uint(42),)]);
let r = inspect_dds_type_id(&p, "deadbeefcafebabe");
assert_eq!(r, TypeIdCheck::Absent);
}
#[test]
fn type_id_inspector_mismatch_detects_collision() {
let p = props_with(alloc::vec![(
app_keys::TYPE_ID,
AmqpExtValue::Str("deadbeefcafebabe1111111111ff".to_string()),
)]);
let r = inspect_dds_type_id(&p, "deadbeefcafebabe2222222222ee");
match r {
TypeIdCheck::Mismatch { received, expected } => {
assert!(received.contains("1111"));
assert!(expected.contains("2222"));
}
other => panic!("expected mismatch, got {other:?}"),
}
}
#[test]
fn type_id_inspector_accepts_symbol_form() {
let p = props_with(alloc::vec![(
app_keys::TYPE_ID,
AmqpExtValue::Symbol("dds:type:abc".to_string()),
)]);
let r = inspect_dds_type_id(&p, "dds:type:abc");
assert_eq!(r, TypeIdCheck::Match);
}
#[test]
fn type_id_inspector_non_map_yields_absent() {
let r = inspect_dds_type_id(&AmqpExtValue::Null, "x");
assert_eq!(r, TypeIdCheck::Absent);
}
}