net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
//! Wire-speed authorization guard for Net packets.
//!
//! The `AuthGuard` uses a bloom filter to authorize packets in under 10ns.
//! Authorized `(origin_hash, channel_hash)` pairs are inserted at subscription
//! time (slow path). The per-packet fast path probes the bloom filter with
//! no crypto, no heap allocation, and no pointer chasing.
//!
//! # Two-tier authorization
//!
//! The guard keeps two parallel ACLs:
//!
//! - **Fast path** (`check_fast`, `authorize`, `is_authorized`): keyed
//!   on the canonical [`ChannelHash`] (`u64`). Used by the packet data
//!   plane. The 64-bit canonical hash is collision-resistant at realistic
//!   deployment scale (~65 K channels before birthday-collision threshold);
//!   AEAD still enforces payload integrity end-to-end, so a residual
//!   bloom false positive costs at most a full check further up the
//!   stack. The wire `NetHeader.channel_hash` is a `u16` fast-path hint
//!   that callers widen to the canonical [`ChannelHash`] via the
//!   `ChannelConfigRegistry` / `ChannelRegistry` before consulting this
//!   guard.
//! - **Exact path** (`allow_channel`, `revoke_channel`,
//!   `is_authorized_full`): keyed on the **canonical `ChannelName`**
//!   itself, not any hash. Used by control-plane and storage
//!   decisions (e.g. `Redex::open_file`) where a hash collision
//!   would let one channel's ACL authorize access to a different
//!   channel's file. `xxh3_64` is non-cryptographic (~2^32 ops to
//!   birthday-collide, feasible offline), so a hash-keyed ACL would
//!   be crackable by an attacker who can influence the name passed
//!   to `open_file`. Keying on the canonical string eliminates the
//!   hash layer entirely — two distinct names can never alias.
//!
//! `allow_channel` populates both tiers so a caller granted storage
//! access can also continue sending packets on that channel.

use std::sync::atomic::{AtomicU8, Ordering};

use dashmap::DashMap;
use xxhash_rust::xxh3::xxh3_64;

use super::{ChannelHash, ChannelName};

/// Bloom-filter half of the authorization guard.
///
/// Extracted from the monolithic `AuthGuard` under
/// `docs/FAILURE_PATH_HARDENING_PLAN.md` §Stage 2 Option B —
/// the atomics-only sub-piece that loom can model without
/// requiring a DashMap shim. The two DashMaps on
/// [`AuthGuard`] (`verified`, `exact`) stay outside this
/// struct because loom substitutes `std::sync::*` but not
/// `dashmap::*`; keeping them separate means the loom model
/// in `tests/loom_models.rs` can exercise the real
/// production atomics while substituting a simple
/// `AtomicU64` for the DashMap state.
///
/// # Memory-ordering contract
///
/// `Relaxed` on both `mark` (fetch_or) and `probe` (load).
/// Sufficient because:
///
/// 1. **Cross-structure synchronization comes from DashMap.**
///    `AuthGuard::authorize` marks the bloom then inserts into
///    `verified`; `check_fast` probes the bloom then reads
///    `verified` via `contains_key`. DashMap's per-shard
///    `parking_lot::Mutex` provides Release-on-unlock /
///    Acquire-on-lock independently, which synchronizes the
///    producer's `verified` insert with the consumer's
///    `contains_key` — the bloom Relaxed ordering is not
///    load-bearing for that visibility.
/// 2. **Per-byte atomicity suffices within the bloom.** Two
///    threads concurrently marking different bits in the same
///    byte both use `fetch_or`, which is atomically
///    read-modify-write; Relaxed + per-byte atomicity
///    guarantees the final byte carries the union of all marks.
/// 3. **Cross-thread synchronization BETWEEN authorize and
///    check_fast (without DashMap) is supplied externally** —
///    by the subprotocol-handler round trip on subscribe, or by
///    the tokio runtime's wake barrier. Under those
///    synchronizations (loom models `thread::join` as
///    equivalent), Relaxed suffices.
///
/// The loom model `auth_bloom_post_authorize_check_never_denies`
/// in `tests/loom_models.rs` pins property 3: after a joined
/// authorize, check_fast never falsely denies.
#[derive(Debug)]
pub struct BloomCache {
    /// Bloom filter bits stored as bytes; one bit per
    /// authorized `(origin_hash, channel_hash)` pair.
    bloom: Vec<AtomicU8>,
    /// `2^BLOOM_BITS - 1`, used to mask hash outputs.
    bloom_mask: u64,
}

impl BloomCache {
    /// Construct a fresh cache with all bits clear.
    pub fn new() -> Self {
        let num_bytes = 1usize << (BLOOM_BITS - 3);
        let bloom = (0..num_bytes).map(|_| AtomicU8::new(0)).collect();
        Self {
            bloom,
            bloom_mask: (1u64 << BLOOM_BITS) - 1,
        }
    }

    /// Compute the two bit indices this `(origin, channel)`
    /// hashes to. Pulled out so the loom model can replay the
    /// same derivation without depending on `bloom_key`.
    #[inline]
    fn indices(&self, origin_hash: u64, channel_hash: ChannelHash) -> (usize, usize) {
        let key = bloom_key(origin_hash, channel_hash);
        let h1 = (key & self.bloom_mask) as usize;
        let h2 = ((key >> BLOOM_BITS) & self.bloom_mask) as usize;
        (h1, h2)
    }

    /// Mark a pair authorized by setting both bloom bits.
    /// `Relaxed` fetch_or — per-byte atomicity suffices
    /// (concurrent marks of different bits in the same byte
    /// union correctly under fetch_or's RMW semantics).
    /// Cross-structure synchronization with the verified
    /// cache is provided by DashMap's per-shard mutex on the
    /// subsequent `verified.insert`, not by bloom ordering.
    #[inline]
    pub fn mark(&self, origin_hash: u64, channel_hash: ChannelHash) {
        let (h1, h2) = self.indices(origin_hash, channel_hash);
        self.set_bit(h1);
        self.set_bit(h2);
    }

    /// Probe the bloom. Returns `true` if BOTH bits are set
    /// (bloom hit), `false` otherwise. A hit is a *maybe* — the
    /// caller must consult the verified cache; a miss is a
    /// *definite no* — the pair was never authorized (or the
    /// bloom was rebuilt after revocation).
    ///
    /// `Relaxed` loads. Per-location coherence guarantees a
    /// thread that ever observes the bit set never observes
    /// it clear afterwards (bloom bits are monotonic until
    /// `rebuild_bloom` runs, which requires `&mut self` on the
    /// outer guard). Cross-thread visibility between a
    /// just-completed `authorize` and a subsequent
    /// `check_fast` relies on external synchronization
    /// (subprotocol handler's await, wire round-trip, or
    /// DashMap's Mutex via the verified-cache path).
    #[inline]
    pub fn probe(&self, origin_hash: u64, channel_hash: ChannelHash) -> bool {
        let (h1, h2) = self.indices(origin_hash, channel_hash);
        let bit1 = (self.bloom[h1 >> 3].load(Ordering::Relaxed) >> (h1 & 7)) & 1;
        let bit2 = (self.bloom[h2 >> 3].load(Ordering::Relaxed) >> (h2 & 7)) & 1;
        bit1 != 0 && bit2 != 0
    }

    /// Clear every bit. Called by [`AuthGuard::rebuild_bloom`]
    /// which already requires `&mut self` on the outer guard,
    /// so concurrent [`Self::probe`] is impossible during the
    /// clear — a concurrent probe would see a spurious
    /// Denied. `Relaxed` is fine here because there's no
    /// observer.
    pub fn clear(&mut self) {
        for byte in &self.bloom {
            byte.store(0, Ordering::Relaxed);
        }
    }

    /// Set a single bit by flat index. `Relaxed` fetch_or —
    /// see `mark`'s docstring for the ordering rationale.
    #[inline]
    fn set_bit(&self, bit_index: usize) {
        let byte_index = bit_index >> 3;
        let bit_offset = bit_index & 7;
        self.bloom[byte_index].fetch_or(1 << bit_offset, Ordering::Relaxed);
    }

    /// Size of the backing byte array in bytes. Used by the
    /// outer `AuthGuard`'s Debug impl for diagnostics.
    pub fn len(&self) -> usize {
        self.bloom.len()
    }
}

impl Default for BloomCache {
    fn default() -> Self {
        Self::new()
    }
}

/// Result of a fast-path authorization check.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthVerdict {
    /// Packet is authorized (bloom hit + verified cache hit).
    Allowed,
    /// Packet is definitely not authorized (bloom miss).
    Denied,
    /// Bloom filter hit but not in verified cache — needs full check.
    NeedsFullCheck,
}

/// Wire-speed authorization guard.
///
/// Contains a bloom filter for O(1) per-packet checks and a verified-positive
/// cache to avoid repeated full token verification on bloom hits.
///
/// # Performance
///
/// `check_fast()` does:
/// - 2 hash computations (xxh3, ~1ns each)
/// - 2 array lookups (bloom filter bits)
/// - 1 DashMap probe (verified cache, ~5ns)
///
/// Total: <10ns for the Allowed/Denied paths.
pub struct AuthGuard {
    /// Bloom filter for O(1) per-packet Denied verdicts. See
    /// [`BloomCache`] for the memory-ordering contract this
    /// wrapper relies on.
    bloom: BloomCache,
    /// Verified-positive cache: (origin_hash, channel_hash) -> authorized.
    ///
    /// `origin_hash` is a 64-bit subscriber projection — typically the
    /// full `node_id` — rather than a 32-bit truncation. A 32-bit key
    /// births-collides at ~65 k peers (√2^32), inside the practical
    /// reach of a medium-sized mesh; 64 bits pushes the collision
    /// floor to ~4 billion peers, which is no longer a plausible
    /// operating point.
    verified: DashMap<(u64, ChannelHash), bool>,
    /// Exact-identity ACL: `(origin_hash, canonical ChannelName) ->
    /// authorized`. Keys on the name string (not a hash) so that no
    /// two distinct channels can alias through a hash collision —
    /// this is the control-plane / storage authorization path.
    /// `ChannelName` already implements `Hash + Eq` against its
    /// underlying validated `String`, so DashMap keys on the exact
    /// name comparison.
    exact: DashMap<(u64, ChannelName), ()>,
    /// Count of `revoke()` calls since the last `rebuild_bloom()`.
    /// Bloom filters don't support deletion, so each revocation
    /// leaves stale bits in the bloom and the false-positive rate
    /// climbs over the operating window. Operators / monitoring
    /// hooks read [`Self::revocations_since_rebuild`] and call
    /// `rebuild_bloom` when the value crosses a deployment-
    /// specific threshold (typical: ~1k revocations or ~1% of
    /// bloom capacity, whichever fires first). Pre-fix the dirty
    /// rate was unobservable — the false-positive climb produced
    /// silent CPU waste on `is_authorized_full` fallbacks.
    revocations_since_rebuild: std::sync::atomic::AtomicU64,
}

/// Bloom filter size: 2^15 bits = 4KB. Fits in L1 cache.
const BLOOM_BITS: u32 = 15;

impl AuthGuard {
    /// Create a new authorization guard.
    pub fn new() -> Self {
        Self {
            bloom: BloomCache::new(),
            verified: DashMap::new(),
            exact: DashMap::new(),
            revocations_since_rebuild: std::sync::atomic::AtomicU64::new(0),
        }
    }

    /// Fast-path authorization check.
    ///
    /// Called on every packet by forwarding nodes. Must complete in <10ns.
    ///
    /// # Ordering
    ///
    /// The inner `bloom.probe` uses `Relaxed` loads. Cross-
    /// structure synchronization between a just-completed
    /// [`Self::authorize`] and this call is provided by
    /// DashMap's per-shard `parking_lot::Mutex` on the
    /// `verified.contains_key` path (Acquire on shard-lock),
    /// not by bloom ordering. See the module-internal
    /// `BloomCache` struct docstring for the full rationale
    /// and the loom models at `tests/loom_models.rs` for the
    /// pinned invariants
    /// (`auth_bloom_post_authorize_check_never_denies` in
    /// particular pins the "subscribe-completes-before-first-
    /// packet-arrives" no-false-deny property).
    #[inline]
    pub fn check_fast(&self, origin_hash: u64, channel_hash: ChannelHash) -> AuthVerdict {
        if !self.bloom.probe(origin_hash, channel_hash) {
            return AuthVerdict::Denied;
        }
        // Bloom hit — check verified cache
        if self.verified.contains_key(&(origin_hash, channel_hash)) {
            AuthVerdict::Allowed
        } else {
            AuthVerdict::NeedsFullCheck
        }
    }

    /// Authorize an (origin_hash, channel_hash) pair.
    ///
    /// Called at subscription time (slow path). Inserts into both the
    /// bloom filter and the verified cache.
    ///
    /// # Ordering
    ///
    /// `bloom.mark` uses `Relaxed` fetch_or; `verified.insert`
    /// carries a Release via DashMap's per-shard Mutex unlock.
    /// A subsequent `check_fast` that observes `verified`
    /// populated is guaranteed — via DashMap's Acquire on
    /// shard-lock — to also observe the bloom bits, regardless
    /// of the bloom's own ordering. See the module-internal
    /// `BloomCache` docstring for the full analysis of why
    /// `Relaxed` on the bloom is sufficient, and
    /// `tests/loom_models.rs` for the pinned invariants.
    pub fn authorize(&self, origin_hash: u64, channel_hash: ChannelHash) {
        self.bloom.mark(origin_hash, channel_hash);
        self.verified.insert((origin_hash, channel_hash), true);
    }

    /// Revoke authorization for an (origin_hash, channel_hash) pair.
    ///
    /// Removes from verified cache. The bloom filter is not cleared
    /// (bloom filters don't support deletion), but the verified cache
    /// miss will cause `NeedsFullCheck` which will then fail.
    ///
    /// Bumps [`Self::revocations_since_rebuild`] so operators can
    /// schedule a `rebuild_bloom` when the dirty count crosses a
    /// deployment threshold and the false-positive rate makes the
    /// `NeedsFullCheck` fallback dominate the hot path.
    pub fn revoke(&self, origin_hash: u64, channel_hash: ChannelHash) {
        self.verified.remove(&(origin_hash, channel_hash));
        self.revocations_since_rebuild
            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    }

    /// Number of `revoke` calls since the last successful
    /// `rebuild_bloom`. Operators / monitoring hooks read this to
    /// decide when to schedule a rebuild — bloom filters can't
    /// delete bits, so each revoke leaves a dirty bit that
    /// inflates the false-positive rate. A rule of thumb: rebuild
    /// when the count crosses ~1k or ~1% of the bloom capacity,
    /// whichever fires first.
    #[inline]
    pub fn revocations_since_rebuild(&self) -> u64 {
        self.revocations_since_rebuild
            .load(std::sync::atomic::Ordering::Relaxed)
    }

    /// Check if a pair is authorized (verified cache only, no bloom).
    ///
    /// This is the fast-path check used by the packet data plane.
    /// For control-plane / storage decisions, use
    /// [`Self::is_authorized_full`] — even the canonical 32-bit
    /// `channel_hash` could theoretically alias under adversarial
    /// name selection, so non-data-plane decisions must key on the
    /// canonical name string.
    pub fn is_authorized(&self, origin_hash: u64, channel_hash: ChannelHash) -> bool {
        self.verified.contains_key(&(origin_hash, channel_hash))
    }

    /// Grant `origin_hash` full (control-plane) access to `name`.
    ///
    /// Populates both ACL tiers:
    /// - the exact canonical-name ACL that control-plane / storage
    ///   callers must consult via [`Self::is_authorized_full`];
    /// - the fast-path bloom + verified cache, so the same origin
    ///   can continue sending packets on that channel via
    ///   [`Self::check_fast`] / [`Self::is_authorized`].
    pub fn allow_channel(&self, origin_hash: u64, name: &ChannelName) {
        self.exact.insert((origin_hash, name.clone()), ());
        self.authorize(origin_hash, name.hash());
    }

    /// Revoke `origin_hash`'s full access to `name`.
    ///
    /// Removes from both the exact ACL and the fast-path verified
    /// cache. Bloom bits are not cleared (bloom filters don't support
    /// deletion), so the fast path may transition to
    /// [`AuthVerdict::NeedsFullCheck`] for this pair — the exact-map
    /// miss then fails the full check.
    pub fn revoke_channel(&self, origin_hash: u64, name: &ChannelName) {
        self.exact.remove(&(origin_hash, name.clone()));
        self.revoke(origin_hash, name.hash());
    }

    /// Exact authorization check keyed on the canonical `ChannelName`
    /// string. Used by control-plane / storage decisions
    /// (e.g. `Redex::open_file`). Unlike [`Self::is_authorized`],
    /// this cannot be bypassed by a hash collision between two
    /// different channel names — two distinct canonical names can
    /// never alias.
    pub fn is_authorized_full(&self, origin_hash: u64, name: &ChannelName) -> bool {
        self.exact.contains_key(&(origin_hash, name.clone()))
    }

    /// Number of authorized pairs in the verified cache.
    pub fn authorized_count(&self) -> usize {
        self.verified.len()
    }

    /// Number of (origin, channel) pairs with exact (control-plane)
    /// authorization.
    pub fn exact_authorized_count(&self) -> usize {
        self.exact.len()
    }

    /// Rebuild the bloom filter from the verified cache.
    ///
    /// Call this after many revocations to clear stale bloom bits.
    /// Requires `&mut self` to prevent concurrent reads during the
    /// clear-then-reinsert window, which would incorrectly deny
    /// authorized traffic.
    pub fn rebuild_bloom(&mut self) {
        self.bloom.clear();
        for entry in self.verified.iter() {
            let (origin_hash, channel_hash) = *entry.key();
            self.bloom.mark(origin_hash, channel_hash);
        }
        // Reset the dirty counter so subsequent
        // `revocations_since_rebuild` queries reflect the post-
        // rebuild state.
        self.revocations_since_rebuild
            .store(0, std::sync::atomic::Ordering::Relaxed);
    }
}

impl Default for AuthGuard {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Debug for AuthGuard {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AuthGuard")
            .field("bloom_size_bytes", &self.bloom.len())
            .field("authorized_pairs", &self.verified.len())
            .field("exact_authorized_pairs", &self.exact.len())
            .finish()
    }
}

/// Compute bloom filter key from (origin_hash, channel_hash).
#[inline]
fn bloom_key(origin_hash: u64, channel_hash: ChannelHash) -> u64 {
    let mut buf = [0u8; 16];
    buf[0..8].copy_from_slice(&origin_hash.to_le_bytes());
    buf[8..16].copy_from_slice(&channel_hash.to_le_bytes());
    xxh3_64(&buf)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_empty_guard_denies() {
        let guard = AuthGuard::new();
        assert_eq!(guard.check_fast(0x1234, 0xABCD), AuthVerdict::Denied);
    }

    #[test]
    fn test_authorize_then_allow() {
        let guard = AuthGuard::new();
        guard.authorize(0x1234, 0xABCD);

        assert_eq!(guard.check_fast(0x1234, 0xABCD), AuthVerdict::Allowed);
    }

    #[test]
    fn test_different_pair_denied() {
        let guard = AuthGuard::new();
        guard.authorize(0x1234, 0xABCD);

        // Different origin
        assert_ne!(guard.check_fast(0x5678, 0xABCD), AuthVerdict::Allowed);
        // Different channel
        assert_ne!(guard.check_fast(0x1234, 0x1111), AuthVerdict::Allowed);
    }

    #[test]
    fn test_revoke() {
        let guard = AuthGuard::new();
        guard.authorize(0x1234, 0xABCD);
        assert_eq!(guard.check_fast(0x1234, 0xABCD), AuthVerdict::Allowed);

        guard.revoke(0x1234, 0xABCD);
        // After revoke, bloom still has the bits but verified cache is empty.
        // Result should be NeedsFullCheck (bloom hit, cache miss).
        assert_eq!(
            guard.check_fast(0x1234, 0xABCD),
            AuthVerdict::NeedsFullCheck
        );
    }

    #[test]
    fn test_rebuild_bloom_after_revoke() {
        let mut guard = AuthGuard::new();
        guard.authorize(0x1234, 0xABCD);
        guard.authorize(0x5678, 0xBEEF);

        guard.revoke(0x1234, 0xABCD);
        guard.rebuild_bloom();

        // Revoked pair should now be Denied (bloom cleared)
        assert_eq!(guard.check_fast(0x1234, 0xABCD), AuthVerdict::Denied);
        // Other pair should still be Allowed
        assert_eq!(guard.check_fast(0x5678, 0xBEEF), AuthVerdict::Allowed);
    }

    #[test]
    fn test_multiple_authorizations() {
        let guard = AuthGuard::new();

        for i in 0..100u64 {
            guard.authorize(i, (i * 7) as ChannelHash);
        }

        assert_eq!(guard.authorized_count(), 100);

        for i in 0..100u64 {
            assert_eq!(
                guard.check_fast(i, (i * 7) as ChannelHash),
                AuthVerdict::Allowed,
                "pair ({}, {}) should be allowed",
                i,
                i * 7
            );
        }
    }

    #[test]
    fn test_is_authorized() {
        let guard = AuthGuard::new();
        assert!(!guard.is_authorized(0x1234, 0xABCD));

        guard.authorize(0x1234, 0xABCD);
        assert!(guard.is_authorized(0x1234, 0xABCD));

        guard.revoke(0x1234, 0xABCD);
        assert!(!guard.is_authorized(0x1234, 0xABCD));
    }

    #[test]
    fn test_bloom_false_positive_rate() {
        // Insert 1000 pairs, check 10000 random pairs that weren't inserted.
        // False positive rate should be well under 1% for a 4KB filter.
        let guard = AuthGuard::new();

        for i in 0..1000u64 {
            guard.authorize(i, i as ChannelHash);
        }

        let mut false_positives = 0;
        for i in 10000..20000u64 {
            let verdict = guard.check_fast(i, i as ChannelHash);
            if verdict != AuthVerdict::Denied {
                false_positives += 1;
            }
        }

        let fp_rate = false_positives as f64 / 10000.0;
        assert!(fp_rate < 0.01, "false positive rate {} exceeds 1%", fp_rate);
    }

    // ---- Regression tests for Cubic AI findings ----

    #[test]
    fn test_regression_u64_origin_hash_defeats_32bit_collision() {
        // Regression: before this fix the guard keyed on `u32`, so two
        // distinct `node_id`s sharing the low 32 bits were
        // indistinguishable — the first subscriber's grant admitted the
        // second's packets. Birthday collision is plausible at ~65 k
        // peers. Widening to `u64` pushes the floor out of reach.
        let guard = AuthGuard::new();

        let name = ChannelName::new("regression-u64-origin").unwrap();
        let legit: u64 = 0x0000_ABCD_1234_5678;
        let forged: u64 = 0xFFFF_FFFF_1234_5678; // same low 32, different high
        assert_eq!(legit as u32, forged as u32);
        assert_ne!(legit, forged);

        guard.allow_channel(legit, &name);

        // Legit subscriber is admitted.
        assert_eq!(
            guard.check_fast(legit, name.hash()),
            AuthVerdict::Allowed,
            "legit subscriber must be admitted"
        );
        assert!(guard.is_authorized_full(legit, &name));

        // Forged subscriber (sharing only the low 32 bits) is rejected.
        assert_ne!(
            guard.check_fast(forged, name.hash()),
            AuthVerdict::Allowed,
            "forged subscriber must not ride the legit grant"
        );
        assert!(!guard.is_authorized_full(forged, &name));
    }

    #[test]
    fn test_regression_channel_hash_collision_distinguishable_by_exact_name() {
        // Regression: even the canonical 32-bit channel_hash could
        // theoretically alias under adversarial name selection (random
        // collisions require ~65 K channels per process); the exact
        // ACL on the canonical `ChannelName` is the intended backstop.
        // This test asserts two adversarial colliding names never alias
        // on the exact path, by forcing a 16-bit wire collision and
        // then a canonical (32-bit) collision via the brute-force loop.
        let guard = AuthGuard::new();

        // Construct two distinct names whose canonical `hash()`
        // (now u32) collide. With xxh3_64 truncated to 32 bits, random
        // collisions need ~2^16 = 65 K candidates; we cap the loop at
        // 200 K which is comfortably above the birthday bound.
        let base = "regression/coll-";
        let mut name_a: Option<ChannelName> = None;
        let mut name_b: Option<ChannelName> = None;
        'outer: for i in 0..200_000u32 {
            let cand = ChannelName::new(&format!("{base}{i}")).unwrap();
            if name_a.is_none() {
                name_a = Some(cand);
                continue;
            }
            // The exact-ACL backstop must hold even when the cheaper
            // wire `u16` hash collides — so search for a wire-bucket
            // collision rather than a canonical (rare) one.
            if cand.wire_hash() == name_a.as_ref().unwrap().wire_hash()
                && cand.as_str() != name_a.as_ref().unwrap().as_str()
            {
                name_b = Some(cand);
                break 'outer;
            }
        }
        let name_a = name_a.expect("seeded name");
        let name_b = name_b.expect(
            "two distinct ChannelNames with the same 16-bit wire hash — widen the search range",
        );
        assert_eq!(name_a.wire_hash(), name_b.wire_hash());
        assert_ne!(name_a.as_str(), name_b.as_str());

        let origin: u64 = 0xDEAD_BEEF_CAFE_F00D;
        guard.allow_channel(origin, &name_a);

        // The canonical 32-bit channel_hash for these two names is
        // (with overwhelming probability) distinct, so the fast-path
        // check_fast for B is Denied — even with a wire collision.
        // The exact-name path is unconditionally correct.
        let fast_b = guard.check_fast(origin, name_b.hash());
        if name_a.hash() == name_b.hash() {
            // Adversarial canonical collision: fast path may still
            // say Allowed; the exact backstop is what callers consult.
            assert_eq!(fast_b, AuthVerdict::Allowed);
        } else {
            // Typical case: canonical distinct → fast path Denied
            // for B even though wire bucket matches.
            assert_eq!(fast_b, AuthVerdict::Denied);
        }

        // Exact check distinguishes them — this is what callers must
        // consult before trusting the fast-path verdict for any
        // authorization decision that survives past the AEAD backstop.
        assert!(guard.is_authorized_full(origin, &name_a));
        assert!(!guard.is_authorized_full(origin, &name_b));
    }

    #[test]
    fn test_regression_concurrent_authorize_and_check() {
        // Regression: bloom filter used unsafe raw pointer mutation through
        // &self, causing UB under concurrent access. Now uses AtomicU8.
        use std::sync::Arc;
        use std::thread;

        let guard = Arc::new(AuthGuard::new());

        // Spawn writers
        let mut handles = Vec::new();
        for t in 0..4u64 {
            let g = Arc::clone(&guard);
            handles.push(thread::spawn(move || {
                for i in 0..250u64 {
                    g.authorize(t * 1000 + i, (t * 1000 + i) as ChannelHash);
                }
            }));
        }

        // Spawn concurrent readers
        for _ in 0..4 {
            let g = Arc::clone(&guard);
            handles.push(thread::spawn(move || {
                for i in 0..1000u64 {
                    let _ = g.check_fast(i, i as ChannelHash);
                }
            }));
        }

        for h in handles {
            h.join().unwrap();
        }

        // All authorized pairs should be findable
        assert_eq!(guard.authorized_count(), 1000);
        for t in 0..4u64 {
            for i in 0..250u64 {
                assert!(
                    guard.is_authorized(t * 1000 + i, (t * 1000 + i) as ChannelHash),
                    "pair ({}, {}) should be authorized after concurrent insertion",
                    t * 1000 + i,
                    t * 1000 + i
                );
            }
        }
    }

    // ========================================================================
    // TEST_COVERAGE_PLAN §P2-8 — concurrent authorize + revoke on the
    // same (origin, channel) key.
    //
    // The existing regression above stresses writer + reader races on
    // disjoint keys. Subscribe / unsubscribe on the SAME pair is the
    // harder case: authorize sets the bloom bits + inserts the
    // verified entry, revoke removes the verified entry. Bloom bits
    // never clear, so the only read-observable state is the verified
    // map. A torn interleaving could leave the map in either state
    // depending on last-writer-wins, but it must not panic, must not
    // leak a half-inserted entry, and `is_authorized` / `check_fast`
    // must never observe a half-committed state.
    // ========================================================================

    /// Authorize + revoke on the SAME key racing across N threads.
    /// The final `is_authorized` state depends on the
    /// last-writer-wins interleaving, but the map must end in a
    /// coherent state (either entry present or absent, never
    /// corrupted) and no panic along the way.
    #[test]
    fn concurrent_authorize_and_revoke_on_same_key_is_panic_free() {
        use std::sync::{Arc, Barrier};
        use std::thread;

        let guard = Arc::new(AuthGuard::new());
        let origin = 0x1234_5678_9ABC_DEF0u64;
        let channel: ChannelHash = 0x4242_4242;
        let iters = 1_000u32;
        let start = Arc::new(Barrier::new(3));

        let authorizer = {
            let guard = guard.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    guard.authorize(origin, channel);
                }
            })
        };
        let revoker = {
            let guard = guard.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    guard.revoke(origin, channel);
                }
            })
        };
        // Observer: constantly check the auth state. Every
        // observation must return a bool, never panic, and the
        // internal DashMap state must remain self-consistent
        // (covered by the other assertions after join).
        let observer = {
            let guard = guard.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    let _ = guard.is_authorized(origin, channel);
                    let _ = guard.check_fast(origin, channel);
                }
            })
        };

        authorizer.join().expect("authorizer panicked");
        revoker.join().expect("revoker panicked");
        observer.join().expect("observer panicked");

        // Final state is SOME boolean — either "last op was
        // authorize → entry present" or "last op was revoke →
        // entry absent". Both are legitimate; asserting that
        // the state is NOT torn means the two calls round-trip.
        let final_state = guard.is_authorized(origin, channel);
        // Double-query to ensure read stability.
        assert_eq!(
            final_state,
            guard.is_authorized(origin, channel),
            "two sequential is_authorized calls must agree — \
             torn read would indicate DashMap corruption",
        );
        // And authorized_count must equal exactly 0 or 1 — no
        // phantom entries, no duplicates.
        let count = guard.authorized_count();
        assert!(
            count == 0 || count == 1,
            "authorized_count should be 0 or 1 after the race; got {count}",
        );
    }

    /// Control-plane variant: `allow_channel` + `revoke_channel`
    /// race on the same `(origin, ChannelName)` entry. Same
    /// invariants — panic-free, coherent final state — applied
    /// to the exact-match ACL that storage / control-plane
    /// paths consult via `is_authorized_full`.
    #[test]
    fn concurrent_allow_and_revoke_channel_on_same_key_is_panic_free() {
        use std::sync::{Arc, Barrier};
        use std::thread;

        let guard = Arc::new(AuthGuard::new());
        let origin = 0xDEAD_BEEF_FEED_CAFEu64;
        let name = ChannelName::new("auth/contended").expect("channel name");
        let iters = 1_000u32;
        let start = Arc::new(Barrier::new(3));

        let allower = {
            let guard = guard.clone();
            let name = name.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    guard.allow_channel(origin, &name);
                }
            })
        };
        let revoker = {
            let guard = guard.clone();
            let name = name.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    guard.revoke_channel(origin, &name);
                }
            })
        };
        let observer = {
            let guard = guard.clone();
            let name = name.clone();
            let start = start.clone();
            thread::spawn(move || {
                start.wait();
                for _ in 0..iters {
                    let _ = guard.is_authorized_full(origin, &name);
                }
            })
        };

        allower.join().expect("allower panicked");
        revoker.join().expect("revoker panicked");
        observer.join().expect("observer panicked");

        // Coherent terminal state — true or false, never torn.
        let final_state = guard.is_authorized_full(origin, &name);
        assert_eq!(
            final_state,
            guard.is_authorized_full(origin, &name),
            "sequential is_authorized_full reads must agree",
        );
    }
}