net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
//! Named typed channels for Net.
//!
//! Channels are hierarchical named endpoints (e.g. `"sensors/lidar/front"`).
//! Two hashes are derived from the name via xxh3:
//!
//! - **Canonical [`ChannelHash`] (`u64`)** — used as the substrate-wide key
//!   for auth (`AuthGuard`, `PermissionToken`), config (`ChannelConfigMap`),
//!   storage (`RedexFile`), and metrics. Full xxh3_64 keyspace; targeted
//!   second-preimage attacks need ~2^64 work even with a non-cryptographic
//!   hash. Pre-fix the canonical key was a `u32` truncation of xxh3_64;
//!   an attacker willing to do ~2^32 work could grind a channel-name
//!   collision and use a token issued for the grinded name to bypass the
//!   `TokenCache::check` fast path for an unrelated victim channel that
//!   happened to hash to the same `u32` bucket.
//! - **Wire `u16`** — the fast-path hint stamped on every outgoing packet
//!   header for wire-speed filtering by forwarding nodes. 65 K buckets;
//!   routine collisions at scale. Mirrors the
//!   `origin_hash: u64 canonical → u32 wire` precedent in the protocol
//!   layer: per-packet width is fixed by the 64-byte header budget, and
//!   wire-side collisions are benign (only affect filter precision, not
//!   ACL or storage decisions, since those key on the canonical hash).

use std::sync::Arc;

use dashmap::DashMap;
use xxhash_rust::xxh3::xxh3_64;

/// Maximum channel name length in bytes.
pub const MAX_NAME_LEN: usize = 255;

/// Substrate-wide canonical hash for a [`ChannelName`].
///
/// 64-bit. Used as the canonical key for ACL, storage, and config decisions.
/// Distinct from the wire `u16` hash on `NetHeader::channel_hash`, which is
/// a per-packet fast-path filter hint and may collide; the canonical
/// `ChannelHash` is what auth and storage decisions must key on.
pub type ChannelHash = u64;

/// A validated channel name.
///
/// Names are hierarchical with `/` separators. Valid characters are
/// alphanumeric, `-`, `_`, `.`, and `/`. Names must not be empty,
/// start or end with `/`, or contain `//`.
///
/// Backed by `Arc<str>` so `Clone` is a refcount bump rather than a
/// heap allocation. `ChannelName` is cloned on every nRPC call (the
/// per-service route cache hands the cached name to the caller
/// guard), on every publish (the publisher's `ChannelId` is cloned
/// into per-peer dispatch records), and on registry lookups; making
/// `Clone` allocation-free removes a class of per-hot-call allocs
/// without changing any caller code. The validated invariant is
/// preserved by `::new`: there is no public field, no `From<String>`
/// or `From<&str>` impl, and no mutator — once constructed, the
/// `Arc<str>` is immutable and aliased copies are safe to share.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ChannelName(Arc<str>);

impl ChannelName {
    /// Create a new channel name, validating the format.
    pub fn new(name: &str) -> Result<Self, ChannelError> {
        Self::validate(name)?;
        Ok(Self(Arc::from(name)))
    }

    /// Get the name as a string slice.
    #[inline]
    pub fn as_str(&self) -> &str {
        &self.0
    }

    /// Compute the canonical [`ChannelHash`] (64-bit) for the name.
    ///
    /// This is the substrate-wide key used for ACL, storage, and config
    /// decisions. Targeted second-preimage attacks require ~2^64 work
    /// even though xxh3 is non-cryptographic.
    #[inline]
    pub fn hash(&self) -> ChannelHash {
        channel_hash(&self.0)
    }

    /// Compute the wire `u16` channel hash for the Net header fast-path.
    ///
    /// The hint stamped on every outgoing packet — fast to compare but only
    /// 65,536 buckets, so it has routine collisions at mesh scale.
    /// Control-plane and storage authorization key on
    /// [`ChannelName::hash`] (canonical `u32`), not on this wire hint.
    #[inline]
    pub fn wire_hash(&self) -> u16 {
        wire_channel_hash(&self.0)
    }

    /// Get the number of path segments.
    pub fn depth(&self) -> usize {
        self.0.split('/').count()
    }

    /// Check if this name is a prefix of another (for wildcard subscriptions).
    pub fn is_prefix_of(&self, other: &ChannelName) -> bool {
        if self.0.len() >= other.0.len() {
            return self.0 == other.0;
        }
        other.0.starts_with(&*self.0) && other.0.as_bytes()[self.0.len()] == b'/'
    }

    fn validate(name: &str) -> Result<(), ChannelError> {
        if name.is_empty() {
            return Err(ChannelError::Empty);
        }
        if name.len() > MAX_NAME_LEN {
            return Err(ChannelError::TooLong(name.len()));
        }
        if name.starts_with('/') || name.ends_with('/') {
            return Err(ChannelError::InvalidFormat(
                "must not start or end with '/'".into(),
            ));
        }
        if name.contains("//") {
            return Err(ChannelError::InvalidFormat("must not contain '//'".into()));
        }
        for ch in name.chars() {
            // ASCII uppercase rejected to eliminate the split-
            // namespace footgun: `foo.bar` and `FOO.BAR` would
            // otherwise be distinct channels (different xxh3
            // hashes, different registry entries, different ACL
            // entries) and an operator who registered `prod.deploy`
            // with strict caps would silently leave `Prod.deploy`
            // unprotected. Mirror DNS / typical message-bus naming
            // conventions: lowercase-only.
            if ch.is_ascii_uppercase() {
                return Err(ChannelError::InvalidFormat(format!(
                    "uppercase character {:?} not allowed — channel names are lowercase only",
                    ch
                )));
            }
            if !matches!(ch, 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '/') {
                return Err(ChannelError::InvalidChar(ch));
            }
        }
        // Reject segments that are path-traversal tokens. Channel
        // names are also used as on-disk directory path segments in
        // the `redex-disk` feature; `..` would escape the persistent
        // base directory, `.` would alias the current directory and
        // shadow siblings. Rejecting these at name-construction time
        // keeps every downstream path-use safe by construction.
        for seg in name.split('/') {
            if seg == "." || seg == ".." {
                return Err(ChannelError::InvalidFormat(format!(
                    "segment {:?} is reserved",
                    seg
                )));
            }
        }
        Ok(())
    }
}

impl std::fmt::Debug for ChannelName {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "ChannelName({:?})", self.0)
    }
}

impl std::fmt::Display for ChannelName {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

/// Compute the canonical [`ChannelHash`] (32-bit) from a name string.
///
/// Full xxh3_64. This is the substrate-wide canonical key for ACL,
/// storage, and config. Targeted second-preimage attacks need ~2^64
/// work even though xxh3 is non-cryptographic.
#[inline]
pub fn channel_hash(name: &str) -> ChannelHash {
    xxh3_64(name.as_bytes())
}

/// Compute the wire `u16` channel hash from a name string.
///
/// Uses xxh3_64 truncated to 16 bits, consistent with the existing
/// `stream_id_from_key` pattern in the routing module. Used only for the
/// `NetHeader::channel_hash` fast-path hint; ACL/storage decisions must
/// use [`channel_hash`] (canonical `u32`).
#[inline]
pub fn wire_channel_hash(name: &str) -> u16 {
    xxh3_64(name.as_bytes()) as u16
}

/// A channel identifier: name + cached canonical hash.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct ChannelId {
    name: ChannelName,
    hash: ChannelHash,
}

impl ChannelId {
    /// Create a new channel ID.
    pub fn new(name: ChannelName) -> Self {
        let hash = name.hash();
        Self { name, hash }
    }

    /// Create from a raw name string.
    pub fn parse(name: &str) -> Result<Self, ChannelError> {
        Ok(Self::new(ChannelName::new(name)?))
    }

    /// Get the channel name.
    #[inline]
    pub fn name(&self) -> &ChannelName {
        &self.name
    }

    /// Get the cached canonical [`ChannelHash`] (32-bit).
    ///
    /// Used as the substrate-wide key for auth, storage, and config.
    #[inline]
    pub fn hash(&self) -> ChannelHash {
        self.hash
    }

    /// Get the wire `u16` hash for stamping the `NetHeader` fast-path.
    ///
    /// Derived from the canonical hash by truncation; the wire `u16` is a
    /// fast-path filter hint that may collide, while [`ChannelId::hash`]
    /// (canonical `u32`) is the ACL/storage key.
    #[inline]
    pub fn wire_hash(&self) -> u16 {
        self.hash as u16
    }
}

impl std::fmt::Debug for ChannelId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "ChannelId({}, {:08x})", self.name, self.hash)
    }
}

impl std::fmt::Display for ChannelId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.name)
    }
}

/// Registry of channels, tracking name-to-hash mappings.
///
/// Detects canonical-hash collisions at creation time. Forwarding nodes only
/// see the wire `u16` hash; the registry resolves wire-side ambiguity via
/// [`Self::get_all_by_wire_hash`].
pub struct ChannelRegistry {
    /// Canonical hash -> list of channels with that hash (rare at u32).
    by_hash: DashMap<ChannelHash, Vec<ChannelId>>,
    /// Wire `u16` hash -> list of channels with that wire bucket
    /// (routine collisions at scale; used for receive-side disambig).
    by_wire_hash: DashMap<u16, Vec<ChannelId>>,
    /// Name -> channel ID (for fast lookup by name)
    by_name: DashMap<String, ChannelId>,
}

impl ChannelRegistry {
    /// Create an empty registry.
    pub fn new() -> Self {
        Self {
            by_hash: DashMap::new(),
            by_wire_hash: DashMap::new(),
            by_name: DashMap::new(),
        }
    }

    /// Register a channel. Returns the ChannelId and whether a canonical-hash
    /// collision was detected with an existing channel.
    pub fn register(&self, name: &str) -> Result<(ChannelId, bool), ChannelError> {
        let id = ChannelId::parse(name)?;
        let name_key = name.to_string();

        // Hold the by_hash entry guard while inserting into by_name.
        // This ensures both maps are updated atomically from the perspective
        // of concurrent register/remove calls.
        let mut hash_entry = self.by_hash.entry(id.hash()).or_default();
        let collision = !hash_entry.is_empty();

        match self.by_name.entry(name_key) {
            dashmap::mapref::entry::Entry::Occupied(_) => {
                // Drop hash_entry guard before returning — don't leave
                // a dangling entry if the name already existed.
                return Err(ChannelError::AlreadyExists(name.to_string()));
            }
            dashmap::mapref::entry::Entry::Vacant(vacant) => {
                hash_entry.push(id.clone());
                self.by_wire_hash
                    .entry(id.wire_hash())
                    .or_default()
                    .push(id.clone());
                vacant.insert(id.clone());
            }
        }

        Ok((id, collision))
    }

    /// Look up a channel by name.
    pub fn get(&self, name: &str) -> Option<ChannelId> {
        self.by_name.get(name).map(|r| r.clone())
    }

    /// Look up all channels with a given canonical hash (may be multiple if
    /// the rare u32 collision occurs).
    pub fn get_by_hash(&self, hash: ChannelHash) -> Vec<ChannelId> {
        self.by_hash
            .get(&hash)
            .map(|r| r.clone())
            .unwrap_or_default()
    }

    /// Look up all channels with a given wire `u16` hash (routinely multiple
    /// due to wire-bucket collisions at scale). Used by receive-side dispatch
    /// to disambiguate the wire fast-path hint into canonical channels.
    ///
    /// Returns the full collision set rather than collapsing to `None` on
    /// collision — the receive-side caller wants to enumerate every
    /// canonical that could have stamped this wire hash. This is the
    /// opposite of [`super::ChannelConfigRegistry::get_by_wire_hash`],
    /// which returns `None` on collision because the config caller wants
    /// a single safe policy decision.
    pub fn get_all_by_wire_hash(&self, wire_hash: u16) -> Vec<ChannelId> {
        self.by_wire_hash
            .get(&wire_hash)
            .map(|r| r.clone())
            .unwrap_or_default()
    }

    /// Remove a channel by name.
    ///
    /// Holds the `by_hash` entry guard while removing from `by_name` to
    /// prevent interleaved register/remove from leaving stale entries.
    pub fn remove(&self, name: &str) -> Option<ChannelId> {
        // Look up the id first to get the hash for locking
        let id = self.by_name.get(name)?.clone();

        // Hold by_hash guard while removing from both maps
        if let Some(mut hash_entry) = self.by_hash.get_mut(&id.hash()) {
            hash_entry.retain(|c| c.name().as_str() != name);
        }
        if let Some(mut wire_entry) = self.by_wire_hash.get_mut(&id.wire_hash()) {
            wire_entry.retain(|c| c.name().as_str() != name);
        }

        if self.by_name.remove(name).is_some() {
            Some(id)
        } else {
            None
        }
    }

    /// Number of registered channels.
    pub fn len(&self) -> usize {
        self.by_name.len()
    }

    /// Check if registry is empty.
    pub fn is_empty(&self) -> bool {
        self.by_name.is_empty()
    }
}

impl Default for ChannelRegistry {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Debug for ChannelRegistry {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ChannelRegistry")
            .field("channels", &self.by_name.len())
            .field("hash_buckets", &self.by_hash.len())
            .finish()
    }
}

/// Errors from channel operations.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelError {
    /// Channel name is empty.
    Empty,
    /// Channel name exceeds maximum length.
    TooLong(usize),
    /// Invalid character in channel name.
    InvalidChar(char),
    /// Invalid name format.
    InvalidFormat(String),
    /// Channel already exists.
    AlreadyExists(String),
}

impl std::fmt::Display for ChannelError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Empty => write!(f, "channel name is empty"),
            Self::TooLong(len) => write!(f, "channel name too long ({} > {})", len, MAX_NAME_LEN),
            Self::InvalidChar(ch) => write!(f, "invalid character '{}' in channel name", ch),
            Self::InvalidFormat(msg) => write!(f, "invalid channel name format: {}", msg),
            Self::AlreadyExists(name) => write!(f, "channel '{}' already exists", name),
        }
    }
}

impl std::error::Error for ChannelError {}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_valid_names() {
        assert!(ChannelName::new("sensors").is_ok());
        assert!(ChannelName::new("sensors/lidar").is_ok());
        assert!(ChannelName::new("sensors/lidar/front").is_ok());
        assert!(ChannelName::new("control.v2").is_ok());
        assert!(ChannelName::new("my-channel_1").is_ok());
    }

    #[test]
    fn test_invalid_names() {
        assert_eq!(ChannelName::new(""), Err(ChannelError::Empty));
        assert!(matches!(
            ChannelName::new("/leading"),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert!(matches!(
            ChannelName::new("trailing/"),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert!(matches!(
            ChannelName::new("double//slash"),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert_eq!(
            ChannelName::new("has space"),
            Err(ChannelError::InvalidChar(' '))
        );
        assert_eq!(
            ChannelName::new("has@symbol"),
            Err(ChannelError::InvalidChar('@'))
        );
    }

    /// Uppercase ASCII rejected. Pre-fix `foo.bar` and `FOO.BAR`
    /// were distinct channels with distinct hashes; an operator
    /// who registered `prod.deploy` with strict caps silently left
    /// `Prod.deploy` unprotected. Mirror DNS / typical message-bus
    /// naming: lowercase-only.
    #[test]
    fn rejects_ascii_uppercase() {
        for n in [
            "Foo",
            "foo/Bar",
            "FOO",
            "prod.Deploy",
            "Prod.deploy",
            "a/B/c",
        ] {
            assert!(
                matches!(ChannelName::new(n), Err(ChannelError::InvalidFormat(_))),
                "uppercase variant {n:?} must be rejected",
            );
        }
        // Lowercase + digits + permitted punctuation still accepted.
        for n in ["foo", "foo/bar", "prod.deploy", "a-b_c.d", "v2/0"] {
            assert!(
                ChannelName::new(n).is_ok(),
                "lowercase {n:?} must be accepted"
            );
        }
    }

    #[test]
    fn test_regression_rejects_path_traversal_segments() {
        // Regression: channel names are used as on-disk directory
        // segments in the redex-disk feature. Names like
        // `a/../../etc/target` previously passed validation (only
        // `.` and `/` chars) and would escape the base dir. Reject
        // any `..` or `.` segment at name-construction time.
        assert!(matches!(
            ChannelName::new("a/../etc"),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert!(matches!(
            ChannelName::new(".."),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert!(matches!(
            ChannelName::new("."),
            Err(ChannelError::InvalidFormat(_))
        ));
        assert!(matches!(
            ChannelName::new("sensors/./front"),
            Err(ChannelError::InvalidFormat(_))
        ));
        // Names with `.` inside a segment (not as a whole segment)
        // are still valid — e.g. "control.v2".
        assert!(ChannelName::new("control.v2").is_ok());
        assert!(ChannelName::new("a.b/c.d").is_ok());
    }

    #[test]
    fn test_name_too_long() {
        let long_name = "a".repeat(256);
        assert!(matches!(
            ChannelName::new(&long_name),
            Err(ChannelError::TooLong(256))
        ));

        let max_name = "a".repeat(255);
        assert!(ChannelName::new(&max_name).is_ok());
    }

    #[test]
    fn test_hash_deterministic() {
        let h1 = channel_hash("sensors/lidar");
        let h2 = channel_hash("sensors/lidar");
        assert_eq!(h1, h2);
    }

    #[test]
    fn test_hash_differs() {
        let h1 = channel_hash("sensors/lidar");
        let h2 = channel_hash("control/estop");
        // Not guaranteed to differ for all inputs, but should for these
        assert_ne!(h1, h2);
    }

    #[test]
    fn test_channel_id() {
        let id = ChannelId::parse("sensors/lidar").unwrap();
        assert_eq!(id.name().as_str(), "sensors/lidar");
        assert_eq!(id.hash(), channel_hash("sensors/lidar"));
    }

    #[test]
    fn test_depth() {
        assert_eq!(ChannelName::new("a").unwrap().depth(), 1);
        assert_eq!(ChannelName::new("a/b").unwrap().depth(), 2);
        assert_eq!(ChannelName::new("a/b/c/d").unwrap().depth(), 4);
    }

    #[test]
    fn test_is_prefix_of() {
        let parent = ChannelName::new("sensors").unwrap();
        let child = ChannelName::new("sensors/lidar").unwrap();
        let grandchild = ChannelName::new("sensors/lidar/front").unwrap();
        let unrelated = ChannelName::new("control/estop").unwrap();

        assert!(parent.is_prefix_of(&child));
        assert!(parent.is_prefix_of(&grandchild));
        assert!(child.is_prefix_of(&grandchild));
        assert!(!child.is_prefix_of(&parent));
        assert!(!parent.is_prefix_of(&unrelated));

        // Self is prefix of self
        assert!(parent.is_prefix_of(&parent));
    }

    #[test]
    fn test_registry_basic() {
        let reg = ChannelRegistry::new();

        let (id, collision) = reg.register("sensors/lidar").unwrap();
        assert!(!collision);
        assert_eq!(reg.len(), 1);

        let found = reg.get("sensors/lidar").unwrap();
        assert_eq!(found.hash(), id.hash());
    }

    #[test]
    fn test_registry_duplicate() {
        let reg = ChannelRegistry::new();
        reg.register("sensors/lidar").unwrap();

        assert_eq!(
            reg.register("sensors/lidar").unwrap_err(),
            ChannelError::AlreadyExists("sensors/lidar".to_string())
        );
    }

    #[test]
    fn test_registry_remove() {
        let reg = ChannelRegistry::new();
        reg.register("sensors/lidar").unwrap();
        assert_eq!(reg.len(), 1);

        let removed = reg.remove("sensors/lidar");
        assert!(removed.is_some());
        assert_eq!(reg.len(), 0);
        assert!(reg.get("sensors/lidar").is_none());
    }

    #[test]
    fn test_registry_get_by_hash() {
        let reg = ChannelRegistry::new();
        let (id, _) = reg.register("sensors/lidar").unwrap();

        let results = reg.get_by_hash(id.hash());
        assert_eq!(results.len(), 1);
        assert_eq!(results[0].name().as_str(), "sensors/lidar");
    }

    #[test]
    fn test_canonical_hash_is_u64_and_wire_is_u16() {
        // The canonical hash is u64 (8 bytes); the wire hash is u16
        // (2 bytes) and equals the low 16 bits of the canonical hash.
        let name = "sensors/lidar";
        let canonical: ChannelHash = channel_hash(name);
        let wire: u16 = wire_channel_hash(name);
        assert_eq!(canonical as u16, wire);
        // Width assertions.
        assert_eq!(std::mem::size_of::<ChannelHash>(), 8);
        assert_eq!(std::mem::size_of_val(&wire), 2);
    }

    #[test]
    fn test_registry_disambiguates_wire_hash() {
        // Two channels that may share a u16 wire bucket (high probability
        // with crafted input) must be uniquely separable by the canonical
        // u64 hash. With random inputs we can't reliably force a u16
        // collision, so this test exercises the wire-hash lookup API
        // for the non-colliding case and asserts both lookup paths agree.
        let reg = ChannelRegistry::new();
        let (id_a, _) = reg.register("sensors/lidar").unwrap();
        let (id_b, _) = reg.register("control/estop").unwrap();

        // get_all_by_wire_hash returns the right channel for each wire bucket.
        let by_wire_a = reg.get_all_by_wire_hash(id_a.wire_hash());
        assert!(by_wire_a
            .iter()
            .any(|c| c.name().as_str() == "sensors/lidar"));
        let by_wire_b = reg.get_all_by_wire_hash(id_b.wire_hash());
        assert!(by_wire_b
            .iter()
            .any(|c| c.name().as_str() == "control/estop"));

        // by_hash (canonical) returns exactly one channel per registered
        // hash — collisions at u32 require ~65 K channels to become
        // probable, so two registered channels are practically guaranteed
        // distinct.
        assert_eq!(reg.get_by_hash(id_a.hash()).len(), 1);
        assert_eq!(reg.get_by_hash(id_b.hash()).len(), 1);
    }

    #[test]
    fn test_get_all_by_wire_hash_returns_full_collision_set() {
        // `get_all_by_wire_hash` deliberately returns the full set of
        // canonicals sharing a wire bucket rather than collapsing to
        // `None` on collision — that's the contract that lets
        // receive-side dispatch enumerate every canonical the inbound
        // packet's wire hash could have come from. Construct a
        // collision by brute force.
        let reg = ChannelRegistry::new();
        let mut seen = std::collections::HashMap::<u16, String>::new();
        let (name_a, name_b) = (|| -> Option<(String, String)> {
            for i in 0..200_000u64 {
                let name = format!("reg/wcoll/{}", i);
                let wire = wire_channel_hash(&name);
                if let Some(prev) = seen.get(&wire) {
                    return Some((prev.clone(), name));
                }
                seen.insert(wire, name);
            }
            None
        })()
        .expect("no wire collision in 200K candidates");

        let (id_a, _) = reg.register(&name_a).unwrap();
        let (id_b, _) = reg.register(&name_b).unwrap();
        assert_eq!(id_a.wire_hash(), id_b.wire_hash());
        assert_ne!(id_a.hash(), id_b.hash());

        // Both canonicals are enumerable through the shared bucket.
        let bucket = reg.get_all_by_wire_hash(id_a.wire_hash());
        assert_eq!(bucket.len(), 2);
        assert!(bucket.iter().any(|c| c.name().as_str() == name_a));
        assert!(bucket.iter().any(|c| c.name().as_str() == name_b));

        // Unknown bucket returns an empty Vec, not `None`.
        let empty = reg.get_all_by_wire_hash(id_a.wire_hash().wrapping_add(1));
        // Either truly empty, or this neighbour happens to also be a
        // populated bucket — assert the API shape, not the contents.
        let _ = empty;
    }
}