use bytes::{Buf, BufMut};
use crate::adapter::net::channel::ChannelName;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscriptionBinding {
pub publisher: u64,
pub channel: ChannelName,
pub token_bytes: Option<Vec<u8>>,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DaemonBindings {
pub subscriptions: Vec<SubscriptionBinding>,
}
impl DaemonBindings {
pub fn is_empty(&self) -> bool {
self.subscriptions.is_empty()
}
#[expect(
clippy::expect_used,
reason = "subscription count is bounded well below u32::MAX in practice; channel names are validated to MAX_NAME_LEN = 255 < u16::MAX; token byte lengths are caller-controlled but the caller-bug branch is documented"
)]
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::new();
let count = u32::try_from(self.subscriptions.len())
.expect("subscription ledger exceeds 4 billion entries");
buf.put_u32_le(count);
for sub in &self.subscriptions {
buf.put_u64_le(sub.publisher);
let name = sub.channel.as_str().as_bytes();
let name_len =
u16::try_from(name.len()).expect("channel name validation already caps at u16");
buf.put_u16_le(name_len);
buf.extend_from_slice(name);
match &sub.token_bytes {
None => buf.put_u8(0),
Some(tok) => {
buf.put_u8(1);
let tok_len =
u16::try_from(tok.len()).expect("token bytes exceed u16::MAX — caller bug");
buf.put_u16_le(tok_len);
buf.extend_from_slice(tok);
}
}
}
buf
}
pub fn from_bytes(data: &[u8]) -> Option<Self> {
if data.is_empty() {
return Some(Self::default());
}
let mut cursor = data;
if cursor.remaining() < 4 {
return None;
}
let count = cursor.get_u32_le() as usize;
const MIN_BINDING_SIZE: usize = 11;
if count > cursor.remaining() / MIN_BINDING_SIZE {
return None;
}
let mut subscriptions = Vec::with_capacity(count);
for _ in 0..count {
if cursor.remaining() < 8 + 2 {
return None;
}
let publisher = cursor.get_u64_le();
let name_len = cursor.get_u16_le() as usize;
if cursor.remaining() < name_len + 1 {
return None;
}
let name_bytes = &cursor[..name_len];
let name_str = std::str::from_utf8(name_bytes).ok()?;
let channel = ChannelName::new(name_str).ok()?;
cursor = &cursor[name_len..];
let token_bytes = match cursor.get_u8() {
0 => None,
1 => {
if cursor.remaining() < 2 {
return None;
}
let tok_len = cursor.get_u16_le() as usize;
if cursor.remaining() < tok_len {
return None;
}
let bytes = cursor[..tok_len].to_vec();
cursor = &cursor[tok_len..];
Some(bytes)
}
_ => return None,
};
subscriptions.push(SubscriptionBinding {
publisher,
channel,
token_bytes,
});
}
if !cursor.is_empty() {
return None;
}
Some(Self { subscriptions })
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ch(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
#[test]
fn default_is_empty() {
let b = DaemonBindings::default();
assert!(b.is_empty());
assert_eq!(b.to_bytes(), vec![0, 0, 0, 0]);
}
#[test]
fn roundtrip_multi_binding() {
let b = DaemonBindings {
subscriptions: vec![
SubscriptionBinding {
publisher: 0xAAAA_BBBB_CCCC_DDDD,
channel: ch("sensors/lidar"),
token_bytes: None,
},
SubscriptionBinding {
publisher: 0x1111_2222_3333_4444,
channel: ch("control/estop"),
token_bytes: Some(vec![0xDE, 0xAD, 0xBE, 0xEF]),
},
],
};
let bytes = b.to_bytes();
let decoded = DaemonBindings::from_bytes(&bytes).expect("roundtrip");
assert_eq!(decoded, b);
}
#[test]
fn empty_bytes_decode_as_empty_ledger() {
let b = DaemonBindings::from_bytes(&[]).expect("empty = no-op decode");
assert!(b.is_empty());
}
#[test]
fn rejects_trailing_garbage() {
let b = DaemonBindings {
subscriptions: vec![SubscriptionBinding {
publisher: 1,
channel: ch("t"),
token_bytes: None,
}],
};
let mut bytes = b.to_bytes();
bytes.push(0xFF);
assert!(DaemonBindings::from_bytes(&bytes).is_none());
}
#[test]
fn rejects_invalid_channel_name() {
let mut buf = Vec::new();
buf.put_u32_le(1);
buf.put_u64_le(0);
let name = b"../etc";
buf.put_u16_le(name.len() as u16);
buf.extend_from_slice(name);
buf.put_u8(0);
assert!(DaemonBindings::from_bytes(&buf).is_none());
}
#[test]
fn rejects_truncated_header() {
assert!(DaemonBindings::from_bytes(&[0, 0]).is_none());
}
#[test]
fn rejects_unknown_token_flag() {
let mut buf = Vec::new();
buf.put_u32_le(1);
buf.put_u64_le(0);
let name = b"x";
buf.put_u16_le(name.len() as u16);
buf.extend_from_slice(name);
buf.put_u8(0x7F); assert!(DaemonBindings::from_bytes(&buf).is_none());
}
#[test]
fn rejects_count_exceeding_remaining_bytes() {
let mut buf = Vec::new();
buf.put_u32_le(u32::MAX);
assert!(DaemonBindings::from_bytes(&buf).is_none());
let mut buf = Vec::new();
buf.put_u32_le(1_000_000);
buf.extend_from_slice(&[0u8; 256]);
assert!(DaemonBindings::from_bytes(&buf).is_none());
let mut buf = Vec::new();
buf.put_u32_le(1);
buf.put_u64_le(0);
buf.put_u16_le(0); buf.put_u8(0);
let _ = DaemonBindings::from_bytes(&buf);
}
}