use std::sync::Arc;
use dashmap::DashMap;
use xxhash_rust::xxh3::xxh3_64;
pub const MAX_NAME_LEN: usize = 255;
pub type ChannelHash = u64;
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ChannelName(Arc<str>);
impl ChannelName {
pub fn new(name: &str) -> Result<Self, ChannelError> {
Self::validate(name)?;
Ok(Self(Arc::from(name)))
}
#[inline]
pub fn as_str(&self) -> &str {
&self.0
}
#[inline]
pub fn hash(&self) -> ChannelHash {
channel_hash(&self.0)
}
#[inline]
pub fn wire_hash(&self) -> u16 {
wire_channel_hash(&self.0)
}
pub fn depth(&self) -> usize {
self.0.split('/').count()
}
pub fn is_prefix_of(&self, other: &ChannelName) -> bool {
if self.0.len() >= other.0.len() {
return self.0 == other.0;
}
other.0.starts_with(&*self.0) && other.0.as_bytes()[self.0.len()] == b'/'
}
fn validate(name: &str) -> Result<(), ChannelError> {
if name.is_empty() {
return Err(ChannelError::Empty);
}
if name.len() > MAX_NAME_LEN {
return Err(ChannelError::TooLong(name.len()));
}
if name.starts_with('/') || name.ends_with('/') {
return Err(ChannelError::InvalidFormat(
"must not start or end with '/'".into(),
));
}
if name.contains("//") {
return Err(ChannelError::InvalidFormat("must not contain '//'".into()));
}
for ch in name.chars() {
if ch.is_ascii_uppercase() {
return Err(ChannelError::InvalidFormat(format!(
"uppercase character {:?} not allowed — channel names are lowercase only",
ch
)));
}
if !matches!(ch, 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '/') {
return Err(ChannelError::InvalidChar(ch));
}
}
for seg in name.split('/') {
if seg == "." || seg == ".." {
return Err(ChannelError::InvalidFormat(format!(
"segment {:?} is reserved",
seg
)));
}
}
Ok(())
}
}
impl std::fmt::Debug for ChannelName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ChannelName({:?})", self.0)
}
}
impl std::fmt::Display for ChannelName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[inline]
pub fn channel_hash(name: &str) -> ChannelHash {
xxh3_64(name.as_bytes())
}
#[inline]
pub fn wire_channel_hash(name: &str) -> u16 {
xxh3_64(name.as_bytes()) as u16
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ChannelId {
name: ChannelName,
hash: ChannelHash,
}
impl ChannelId {
pub fn new(name: ChannelName) -> Self {
let hash = name.hash();
Self { name, hash }
}
pub fn parse(name: &str) -> Result<Self, ChannelError> {
Ok(Self::new(ChannelName::new(name)?))
}
#[inline]
pub fn name(&self) -> &ChannelName {
&self.name
}
#[inline]
pub fn hash(&self) -> ChannelHash {
self.hash
}
#[inline]
pub fn wire_hash(&self) -> u16 {
self.hash as u16
}
}
impl std::fmt::Debug for ChannelId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ChannelId({}, {:08x})", self.name, self.hash)
}
}
impl std::fmt::Display for ChannelId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
pub struct ChannelRegistry {
by_hash: DashMap<ChannelHash, Vec<ChannelId>>,
by_wire_hash: DashMap<u16, Vec<ChannelId>>,
by_name: DashMap<String, ChannelId>,
}
impl ChannelRegistry {
pub fn new() -> Self {
Self {
by_hash: DashMap::new(),
by_wire_hash: DashMap::new(),
by_name: DashMap::new(),
}
}
pub fn register(&self, name: &str) -> Result<(ChannelId, bool), ChannelError> {
let id = ChannelId::parse(name)?;
let name_key = name.to_string();
let mut hash_entry = self.by_hash.entry(id.hash()).or_default();
let collision = !hash_entry.is_empty();
match self.by_name.entry(name_key) {
dashmap::mapref::entry::Entry::Occupied(_) => {
return Err(ChannelError::AlreadyExists(name.to_string()));
}
dashmap::mapref::entry::Entry::Vacant(vacant) => {
hash_entry.push(id.clone());
self.by_wire_hash
.entry(id.wire_hash())
.or_default()
.push(id.clone());
vacant.insert(id.clone());
}
}
Ok((id, collision))
}
pub fn get(&self, name: &str) -> Option<ChannelId> {
self.by_name.get(name).map(|r| r.clone())
}
pub fn get_by_hash(&self, hash: ChannelHash) -> Vec<ChannelId> {
self.by_hash
.get(&hash)
.map(|r| r.clone())
.unwrap_or_default()
}
pub fn get_all_by_wire_hash(&self, wire_hash: u16) -> Vec<ChannelId> {
self.by_wire_hash
.get(&wire_hash)
.map(|r| r.clone())
.unwrap_or_default()
}
pub fn remove(&self, name: &str) -> Option<ChannelId> {
let id = self.by_name.get(name)?.clone();
if let Some(mut hash_entry) = self.by_hash.get_mut(&id.hash()) {
hash_entry.retain(|c| c.name().as_str() != name);
}
if let Some(mut wire_entry) = self.by_wire_hash.get_mut(&id.wire_hash()) {
wire_entry.retain(|c| c.name().as_str() != name);
}
if self.by_name.remove(name).is_some() {
Some(id)
} else {
None
}
}
pub fn len(&self) -> usize {
self.by_name.len()
}
pub fn is_empty(&self) -> bool {
self.by_name.is_empty()
}
}
impl Default for ChannelRegistry {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ChannelRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChannelRegistry")
.field("channels", &self.by_name.len())
.field("hash_buckets", &self.by_hash.len())
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelError {
Empty,
TooLong(usize),
InvalidChar(char),
InvalidFormat(String),
AlreadyExists(String),
}
impl std::fmt::Display for ChannelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "channel name is empty"),
Self::TooLong(len) => write!(f, "channel name too long ({} > {})", len, MAX_NAME_LEN),
Self::InvalidChar(ch) => write!(f, "invalid character '{}' in channel name", ch),
Self::InvalidFormat(msg) => write!(f, "invalid channel name format: {}", msg),
Self::AlreadyExists(name) => write!(f, "channel '{}' already exists", name),
}
}
}
impl std::error::Error for ChannelError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_names() {
assert!(ChannelName::new("sensors").is_ok());
assert!(ChannelName::new("sensors/lidar").is_ok());
assert!(ChannelName::new("sensors/lidar/front").is_ok());
assert!(ChannelName::new("control.v2").is_ok());
assert!(ChannelName::new("my-channel_1").is_ok());
}
#[test]
fn test_invalid_names() {
assert_eq!(ChannelName::new(""), Err(ChannelError::Empty));
assert!(matches!(
ChannelName::new("/leading"),
Err(ChannelError::InvalidFormat(_))
));
assert!(matches!(
ChannelName::new("trailing/"),
Err(ChannelError::InvalidFormat(_))
));
assert!(matches!(
ChannelName::new("double//slash"),
Err(ChannelError::InvalidFormat(_))
));
assert_eq!(
ChannelName::new("has space"),
Err(ChannelError::InvalidChar(' '))
);
assert_eq!(
ChannelName::new("has@symbol"),
Err(ChannelError::InvalidChar('@'))
);
}
#[test]
fn rejects_ascii_uppercase() {
for n in [
"Foo",
"foo/Bar",
"FOO",
"prod.Deploy",
"Prod.deploy",
"a/B/c",
] {
assert!(
matches!(ChannelName::new(n), Err(ChannelError::InvalidFormat(_))),
"uppercase variant {n:?} must be rejected",
);
}
for n in ["foo", "foo/bar", "prod.deploy", "a-b_c.d", "v2/0"] {
assert!(
ChannelName::new(n).is_ok(),
"lowercase {n:?} must be accepted"
);
}
}
#[test]
fn test_regression_rejects_path_traversal_segments() {
assert!(matches!(
ChannelName::new("a/../etc"),
Err(ChannelError::InvalidFormat(_))
));
assert!(matches!(
ChannelName::new(".."),
Err(ChannelError::InvalidFormat(_))
));
assert!(matches!(
ChannelName::new("."),
Err(ChannelError::InvalidFormat(_))
));
assert!(matches!(
ChannelName::new("sensors/./front"),
Err(ChannelError::InvalidFormat(_))
));
assert!(ChannelName::new("control.v2").is_ok());
assert!(ChannelName::new("a.b/c.d").is_ok());
}
#[test]
fn test_name_too_long() {
let long_name = "a".repeat(256);
assert!(matches!(
ChannelName::new(&long_name),
Err(ChannelError::TooLong(256))
));
let max_name = "a".repeat(255);
assert!(ChannelName::new(&max_name).is_ok());
}
#[test]
fn test_hash_deterministic() {
let h1 = channel_hash("sensors/lidar");
let h2 = channel_hash("sensors/lidar");
assert_eq!(h1, h2);
}
#[test]
fn test_hash_differs() {
let h1 = channel_hash("sensors/lidar");
let h2 = channel_hash("control/estop");
assert_ne!(h1, h2);
}
#[test]
fn test_channel_id() {
let id = ChannelId::parse("sensors/lidar").unwrap();
assert_eq!(id.name().as_str(), "sensors/lidar");
assert_eq!(id.hash(), channel_hash("sensors/lidar"));
}
#[test]
fn test_depth() {
assert_eq!(ChannelName::new("a").unwrap().depth(), 1);
assert_eq!(ChannelName::new("a/b").unwrap().depth(), 2);
assert_eq!(ChannelName::new("a/b/c/d").unwrap().depth(), 4);
}
#[test]
fn test_is_prefix_of() {
let parent = ChannelName::new("sensors").unwrap();
let child = ChannelName::new("sensors/lidar").unwrap();
let grandchild = ChannelName::new("sensors/lidar/front").unwrap();
let unrelated = ChannelName::new("control/estop").unwrap();
assert!(parent.is_prefix_of(&child));
assert!(parent.is_prefix_of(&grandchild));
assert!(child.is_prefix_of(&grandchild));
assert!(!child.is_prefix_of(&parent));
assert!(!parent.is_prefix_of(&unrelated));
assert!(parent.is_prefix_of(&parent));
}
#[test]
fn test_registry_basic() {
let reg = ChannelRegistry::new();
let (id, collision) = reg.register("sensors/lidar").unwrap();
assert!(!collision);
assert_eq!(reg.len(), 1);
let found = reg.get("sensors/lidar").unwrap();
assert_eq!(found.hash(), id.hash());
}
#[test]
fn test_registry_duplicate() {
let reg = ChannelRegistry::new();
reg.register("sensors/lidar").unwrap();
assert_eq!(
reg.register("sensors/lidar").unwrap_err(),
ChannelError::AlreadyExists("sensors/lidar".to_string())
);
}
#[test]
fn test_registry_remove() {
let reg = ChannelRegistry::new();
reg.register("sensors/lidar").unwrap();
assert_eq!(reg.len(), 1);
let removed = reg.remove("sensors/lidar");
assert!(removed.is_some());
assert_eq!(reg.len(), 0);
assert!(reg.get("sensors/lidar").is_none());
}
#[test]
fn test_registry_get_by_hash() {
let reg = ChannelRegistry::new();
let (id, _) = reg.register("sensors/lidar").unwrap();
let results = reg.get_by_hash(id.hash());
assert_eq!(results.len(), 1);
assert_eq!(results[0].name().as_str(), "sensors/lidar");
}
#[test]
fn test_canonical_hash_is_u64_and_wire_is_u16() {
let name = "sensors/lidar";
let canonical: ChannelHash = channel_hash(name);
let wire: u16 = wire_channel_hash(name);
assert_eq!(canonical as u16, wire);
assert_eq!(std::mem::size_of::<ChannelHash>(), 8);
assert_eq!(std::mem::size_of_val(&wire), 2);
}
#[test]
fn test_registry_disambiguates_wire_hash() {
let reg = ChannelRegistry::new();
let (id_a, _) = reg.register("sensors/lidar").unwrap();
let (id_b, _) = reg.register("control/estop").unwrap();
let by_wire_a = reg.get_all_by_wire_hash(id_a.wire_hash());
assert!(by_wire_a
.iter()
.any(|c| c.name().as_str() == "sensors/lidar"));
let by_wire_b = reg.get_all_by_wire_hash(id_b.wire_hash());
assert!(by_wire_b
.iter()
.any(|c| c.name().as_str() == "control/estop"));
assert_eq!(reg.get_by_hash(id_a.hash()).len(), 1);
assert_eq!(reg.get_by_hash(id_b.hash()).len(), 1);
}
#[test]
fn test_get_all_by_wire_hash_returns_full_collision_set() {
let reg = ChannelRegistry::new();
let mut seen = std::collections::HashMap::<u16, String>::new();
let (name_a, name_b) = (|| -> Option<(String, String)> {
for i in 0..200_000u64 {
let name = format!("reg/wcoll/{}", i);
let wire = wire_channel_hash(&name);
if let Some(prev) = seen.get(&wire) {
return Some((prev.clone(), name));
}
seen.insert(wire, name);
}
None
})()
.expect("no wire collision in 200K candidates");
let (id_a, _) = reg.register(&name_a).unwrap();
let (id_b, _) = reg.register(&name_b).unwrap();
assert_eq!(id_a.wire_hash(), id_b.wire_hash());
assert_ne!(id_a.hash(), id_b.hash());
let bucket = reg.get_all_by_wire_hash(id_a.wire_hash());
assert_eq!(bucket.len(), 2);
assert!(bucket.iter().any(|c| c.name().as_str() == name_a));
assert!(bucket.iter().any(|c| c.name().as_str() == name_b));
let empty = reg.get_all_by_wire_hash(id_a.wire_hash().wrapping_add(1));
let _ = empty;
}
}