freenet 0.2.68

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
//! Per-(peer, contract) UPDATE rate limiter — front-line defense against
//! the May 21 incident pattern where one peer floods one contract with
//! many UPDATE/s.
//!
//! ## What this does
//!
//! Maintains a `(sender_addr, contract_instance_id) → last_accepted_at`
//! map. When a new UPDATE arrives, check the time since the last
//! accepted UPDATE for the same pair. If under [`MIN_UPDATE_INTERVAL`],
//! reject; otherwise stamp the new time and accept.
//!
//! ## What this is NOT
//!
//! - **Not the MAD outlier detector.** That's `crate::governance` and
//!   `crate::contract::governance` — those react in *minutes* via the
//!   reaper loop. This module reacts in *milliseconds* at the receive
//!   boundary.
//! - **Not a token-bucket smoothing layer.** Sustained bursts are
//!   rejected outright, not queued. A flood pattern hits a flat ceiling.
//! - **Not aware of intent.** A "renewing subscription that happened to
//!   trigger an UPDATE" looks identical to "DOS flood at the same rate";
//!   the rate is a flat statistical threshold, and we choose it
//!   generously enough that legitimate traffic doesn't hit it.
//!
//! ## Default rate
//!
//! [`MIN_UPDATE_INTERVAL`] defaults to 100ms (≈10 UPDATEs/sec from a
//! single peer for a single contract). Realistic human-driven contract
//! updates (chat, settings change, post) are orders of magnitude slower
//! than this. The 4PjqN5… incident was producing many UPDATEs per
//! second sustained — well above this ceiling.
//!
//! ## Bounded growth
//!
//! Two layers:
//!
//! 1. **Hard cap on entry count** ([`MAX_TRACKED_PAIRS`]). When the
//!    cap is hit, new entries are rejected — an attacker churning
//!    distinct `(sender, contract)` pairs cannot grow the map past
//!    this size. Critical because the address space is attacker-
//!    chosen (32-byte contract id × any source address).
//! 2. **Periodic TTL sweep** ([`UpdateRateLimiter::cleanup`]) drops
//!    entries idle for longer than [`CLEANUP_AGE`]. Hooked into the
//!    Ring's existing reaper tick.
//!
//! The cap alone is sufficient to bound memory; the TTL sweep
//! reclaims space from genuinely-idle pairs so the cap isn't
//! prematurely reached under normal traffic.
//!
//! ## Semantic note: `sender_addr` is the immediate upstream hop
//!
//! The key uses the SocketAddr of the peer that *sent us* the
//! message — not the originator of the UPDATE transaction. This is
//! deliberate: the limiter is a **receiver-side resource guard**.
//! "How much of my CPU/memory am I willing to spend processing
//! UPDATEs from a single immediate peer for a single contract?" is
//! the question this answers. If A floods through B to reach us, B
//! is who hits our limiter — and that's correct, because B is who
//! is consuming our resources.
//!
//! Originator-level protection is a different layer (Phase 7 ban
//! enforcement, where a banned originator's traffic is rejected
//! even if relayed). The two layers compose; neither subsumes the
//! other.
//!
//! ## Design doc reference
//!
//! `docs/design/contract-hardening.md` Phase 2: *"`TrackedBackoff<(PeerId,
//! ContractInstanceId)>`. Apply at `SyncStateToPeer` emit + originator
//! UPDATE entry. Reject with typed marker."*
//!
//! Divergences from the design doc, with rationale:
//!
//! - **Flat ceiling instead of `TrackedBackoff` exponential.** A flat
//!   100ms ceiling catches the May 21 pattern; exponential repeat-
//!   offender cooldown can land as a follow-up if observation shows
//!   it's needed.
//! - **Inbound relay receive-boundary, not `SyncStateToPeer` emit.**
//!   The doc's wording targets outbound emit. We instead gate at
//!   inbound receive (4 wire-variant sites: `RequestUpdate`,
//!   `BroadcastTo`, `RequestUpdateStreaming`, `BroadcastToStreaming`).
//!   Receive-side guards a peer's own resources directly; emit-side
//!   guards against being an amplifier. Receive is the higher-value
//!   first cut; emit is a follow-up.
//! - **No typed wire-level error.** Rejected UPDATEs are dropped
//!   silently. The sender's own retry / governance scoring will
//!   detect the flood pattern from its end.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;

use dashmap::DashMap;
use freenet_stdlib::prelude::ContractInstanceId;
use tokio::time::Instant;

use crate::util::time_source::TimeSource;

/// Minimum interval between accepted UPDATEs for the same
/// `(sender, contract)` pair. Default 100ms (≈10/s). UPDATEs arriving
/// faster than this are dropped.
///
/// Calibration rationale: legitimate contract updates are
/// human-cadence (seconds to minutes between writes). The 4PjqN5…
/// May 21 incident was producing many UPDATEs/s sustained. 100ms is
/// generous enough to never trigger on a real user while blocking the
/// flood pattern instantly.
pub(crate) const MIN_UPDATE_INTERVAL: Duration = Duration::from_millis(100);

/// How long an idle `(sender, contract)` entry stays in the map before
/// being cleared by the periodic cleanup pass. Bounds memory while
/// preserving the rate-limit signal across short-term retries.
pub(crate) const CLEANUP_AGE: Duration = Duration::from_secs(5 * 60);

/// Hard upper bound on the number of tracked `(sender, contract)`
/// pairs. When the cap is reached, new pairs are rejected outright —
/// this is the "no unbounded per-key collection for attacker-influenced
/// keys" rule from `.claude/rules/code-style.md`. At 64 bytes/entry,
/// 16 384 pairs ≈ 1 MB — tiny — but bounds the worst case where an
/// attacker chooses fresh contract ids.
///
/// On reaching the cap: the per-shard guard's existing entries
/// continue to track normally (so a legitimate peer who was already
/// tracked keeps working); only NEW pairs hit the cap. Combined with
/// the TTL sweep this gives smooth recovery as idle pairs roll off.
pub(crate) const MAX_TRACKED_PAIRS: usize = 16_384;

/// Outcome of an UPDATE rate-limit check. Callers must treat any
/// non-`Allowed` variant as "drop this message at the receive
/// boundary, do not spawn a relay driver."
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RateLimitDecision {
    /// Allowed — proceed with normal UPDATE handling. The accepted
    /// timestamp has been stamped atomically.
    Allowed,
    /// Rejected — too soon after the previous accepted UPDATE from this
    /// `(sender, contract)` pair. Carries the elapsed time vs the
    /// configured minimum for diagnostic logging.
    Rejected {
        elapsed: Duration,
        min_interval: Duration,
    },
    /// Rejected because the tracking map has hit
    /// [`MAX_TRACKED_PAIRS`] and this is a new pair. Distinct from
    /// `Rejected` so the dashboard can tell "throttling working as
    /// intended" apart from "limiter overflowing" — the latter
    /// suggests an attack pattern with churned identities.
    CapacityExceeded,
}

impl RateLimitDecision {
    /// Convenience for the common branch shape `if !decision.is_allowed()`.
    pub fn is_allowed(self) -> bool {
        matches!(self, RateLimitDecision::Allowed)
    }
}

/// Per-(sender_addr, contract_instance_id) UPDATE rate limiter.
///
/// All state lives in a single [`DashMap`] keyed by the pair. Each
/// entry stores only the `Instant` of the most recent accepted UPDATE,
/// so per-entry memory is tiny.
pub(crate) struct UpdateRateLimiter {
    last_accepted: DashMap<(SocketAddr, ContractInstanceId), Instant>,
    /// Authoritative size counter for capacity enforcement. Held
    /// in sync with `last_accepted.len()` at every stable point
    /// (incremented by successful new-pair inserts in
    /// `check_and_record`; decremented by the count of dropped
    /// entries in `cleanup`).
    ///
    /// Why not just use `last_accepted.len()`? Because `len()` walks
    /// every shard, so it can't be called while holding any shard
    /// guard (the previous probe-then-insert iteration deadlocked on
    /// this). With this counter, the cap check is a single atomic
    /// `fetch_add`, which strictly serializes — no overshoot from
    /// concurrent inserts. Codex re-review of #4285 caught that the
    /// `len()` precheck overshoots by `num_concurrent_callers`,
    /// which can be hundreds.
    size: AtomicUsize,
    min_interval: Duration,
    max_tracked_pairs: usize,
    time_source: Arc<dyn TimeSource + Send + Sync>,
    /// Total accepted UPDATEs since limiter creation. Surfaced on the
    /// dashboard so operators can see the limiter's signal-to-noise.
    accepted_total: AtomicU64,
    /// Total rejected UPDATEs since limiter creation (rate-limit hits).
    rejected_total: AtomicU64,
    /// Total UPDATEs rejected because the tracking map was at capacity
    /// when a new `(sender, contract)` pair tried to register. Surfaced
    /// separately because a non-zero value suggests an attacker is
    /// churning identities to exhaust the limiter's address space.
    capacity_rejected_total: AtomicU64,
}

impl UpdateRateLimiter {
    pub fn new(time_source: Arc<dyn TimeSource + Send + Sync>) -> Self {
        Self::with_config(time_source, MIN_UPDATE_INTERVAL, MAX_TRACKED_PAIRS)
    }

    pub fn with_config(
        time_source: Arc<dyn TimeSource + Send + Sync>,
        min_interval: Duration,
        max_tracked_pairs: usize,
    ) -> Self {
        Self {
            last_accepted: DashMap::new(),
            size: AtomicUsize::new(0),
            min_interval,
            max_tracked_pairs,
            time_source,
            accepted_total: AtomicU64::new(0),
            rejected_total: AtomicU64::new(0),
            capacity_rejected_total: AtomicU64::new(0),
        }
    }

    /// Check whether an UPDATE from `sender` for `contract` is allowed
    /// right now, and atomically stamp the accepted timestamp if so.
    ///
    /// Two-property atomicity:
    ///
    /// 1. **Same-pair compare-and-stamp** is atomic via
    ///    [`DashMap::entry`] holding the per-shard guard across
    ///    timestamp comparison and update. Exactly one caller per
    ///    pair wins per `min_interval` window.
    ///
    /// 2. **Capacity enforcement** is strict via the [`Self::size`]
    ///    atomic counter. New-pair insertion reserves a slot with
    ///    `fetch_add` BEFORE inserting; if the post-increment value
    ///    exceeds the cap, the reservation is returned via
    ///    `fetch_sub` and the call returns `CapacityExceeded`.
    ///    Concurrent distinct-key inserts strictly serialize through
    ///    the counter — no overshoot.
    ///
    /// If rejected (either rate or capacity), no map mutation is
    /// made. A barrage of rejected attempts doesn't extend the rate
    /// window (the existing `last_accepted` timestamp is unchanged)
    /// and doesn't grow memory (the reservation is rolled back on
    /// CapacityExceeded).
    pub fn check_and_record(
        &self,
        sender: SocketAddr,
        contract: ContractInstanceId,
    ) -> RateLimitDecision {
        let now = self.time_source.now();
        let key = (sender, contract);

        use dashmap::mapref::entry::Entry;
        match self.last_accepted.entry(key) {
            Entry::Occupied(mut entry) => {
                // Existing pair: atomic compare-and-stamp under
                // shard guard.
                let last = *entry.get();
                let elapsed = now.saturating_duration_since(last);
                if elapsed < self.min_interval {
                    self.rejected_total.fetch_add(1, Ordering::Relaxed);
                    return RateLimitDecision::Rejected {
                        elapsed,
                        min_interval: self.min_interval,
                    };
                }
                *entry.get_mut() = now;
                self.accepted_total.fetch_add(1, Ordering::Relaxed);
                RateLimitDecision::Allowed
            }
            Entry::Vacant(entry) => {
                // New pair: reserve a slot via the authoritative
                // counter BEFORE inserting. `fetch_add` is the
                // serialization point — concurrent new-key inserts
                // strictly serialize, no overshoot beyond the cap.
                let prev = self.size.fetch_add(1, Ordering::Relaxed);
                if prev >= self.max_tracked_pairs {
                    // Cap exceeded. Roll back the reservation.
                    self.size.fetch_sub(1, Ordering::Relaxed);
                    self.capacity_rejected_total.fetch_add(1, Ordering::Relaxed);
                    return RateLimitDecision::CapacityExceeded;
                }
                entry.insert(now);
                self.accepted_total.fetch_add(1, Ordering::Relaxed);
                RateLimitDecision::Allowed
            }
        }
    }

    /// Drop entries whose timestamp is older than [`CLEANUP_AGE`].
    /// Call periodically from a reaper loop to bound memory.
    ///
    /// Decrements the [`Self::size`] counter by the number of
    /// removed entries so the strict capacity enforcement remains
    /// accurate as idle pairs roll off.
    pub fn cleanup(&self) {
        let now = self.time_source.now();
        let cutoff = match now.checked_sub(CLEANUP_AGE) {
            Some(t) => t,
            None => return, // clock not advanced enough to bother
        };
        let mut removed = 0usize;
        self.last_accepted.retain(|_, last| {
            let keep = *last >= cutoff;
            if !keep {
                removed += 1;
            }
            keep
        });
        if removed > 0 {
            self.size.fetch_sub(removed, Ordering::Relaxed);
        }
    }

    /// Total accepted UPDATEs since creation. Surfaced on the node
    /// status dashboard via `RingStatsSnapshot` ("UPDATEs relayed").
    pub fn accepted_total(&self) -> u64 {
        self.accepted_total.load(Ordering::Relaxed)
    }

    /// Total rejected UPDATEs since creation. Surfaced on the node
    /// status dashboard via `RingStatsSnapshot` ("Rate-limited") — a
    /// rising value is the operator's signal that the per-(sender,
    /// contract) limiter may be dropping legitimate relayed traffic.
    pub fn rejected_total(&self) -> u64 {
        self.rejected_total.load(Ordering::Relaxed)
    }

    /// Total UPDATEs rejected because the tracking map was at capacity
    /// (a new `(sender, contract)` pair tried to register when the map
    /// already held [`MAX_TRACKED_PAIRS`] pairs). A non-zero value
    /// suggests an attacker is churning identities — surfaced separately
    /// from `rejected_total` on the dashboard ("Capacity-dropped") for
    /// that reason.
    pub fn capacity_rejected_total(&self) -> u64 {
        self.capacity_rejected_total.load(Ordering::Relaxed)
    }

    /// Number of tracked `(sender, contract)` pairs. Used by tests and
    /// the dashboard for size visibility.
    #[cfg_attr(not(test), allow(dead_code))]
    pub fn len(&self) -> usize {
        self.last_accepted.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::util::time_source::SharedMockTimeSource;

    fn mk_sender(byte: u8) -> SocketAddr {
        SocketAddr::from(([10, 0, 0, byte], 30000 + byte as u16))
    }

    fn mk_contract(byte: u8) -> ContractInstanceId {
        ContractInstanceId::new([byte; 32])
    }

    fn mk_limiter() -> (UpdateRateLimiter, SharedMockTimeSource) {
        let ts = SharedMockTimeSource::new();
        let limiter = UpdateRateLimiter::new(Arc::new(ts.clone()));
        (limiter, ts)
    }

    // Local alias so test bodies can keep using `ts.advance(d)`
    trait Advance {
        fn advance(&self, d: Duration);
    }
    impl Advance for SharedMockTimeSource {
        fn advance(&self, d: Duration) {
            self.advance_time(d);
        }
    }

    #[test]
    fn first_update_for_pair_is_allowed() {
        let (l, _ts) = mk_limiter();
        let d = l.check_and_record(mk_sender(1), mk_contract(1));
        assert_eq!(d, RateLimitDecision::Allowed);
        assert_eq!(l.accepted_total(), 1);
        assert_eq!(l.rejected_total(), 0);
    }

    #[test]
    fn second_update_within_min_interval_is_rejected() {
        let (l, ts) = mk_limiter();
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // Only 10ms later — well under 100ms default.
        ts.advance(Duration::from_millis(10));
        let d = l.check_and_record(mk_sender(1), mk_contract(1));
        assert!(
            matches!(d, RateLimitDecision::Rejected { .. }),
            "second UPDATE 10ms after first must be rejected, got {d:?}"
        );
        assert_eq!(l.accepted_total(), 1);
        assert_eq!(l.rejected_total(), 1);
    }

    #[test]
    fn update_after_min_interval_is_allowed() {
        let (l, ts) = mk_limiter();
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // 200ms later — past the 100ms default.
        ts.advance(Duration::from_millis(200));
        let d = l.check_and_record(mk_sender(1), mk_contract(1));
        assert_eq!(d, RateLimitDecision::Allowed);
        assert_eq!(l.accepted_total(), 2);
        assert_eq!(l.rejected_total(), 0);
    }

    #[test]
    fn different_senders_same_contract_independent() {
        let (l, ts) = mk_limiter();
        // Sender 1 accepts.
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // Sender 2 immediately also accepts — different key.
        ts.advance(Duration::from_millis(1));
        assert!(
            l.check_and_record(mk_sender(2), mk_contract(1))
                .is_allowed()
        );
        // Sender 1 retry 1ms later still rejected.
        let d = l.check_and_record(mk_sender(1), mk_contract(1));
        assert!(matches!(d, RateLimitDecision::Rejected { .. }));
    }

    #[test]
    fn same_sender_different_contracts_independent() {
        let (l, _ts) = mk_limiter();
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // Same sender, different contract — independent rate limit.
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(2))
                .is_allowed()
        );
        assert_eq!(l.accepted_total(), 2);
    }

    #[test]
    fn rejected_attempts_do_not_extend_window() {
        // If a flooding peer keeps trying every 10ms, we want the
        // first attempt past the 100ms window to succeed — i.e. the
        // rejected attempts MUST NOT push the last_accepted timestamp
        // forward. Otherwise a sustained flood would lock the pair out
        // indefinitely (the existing peer would never recover).
        let (l, ts) = mk_limiter();
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // 9 sub-100ms attempts, all rejected.
        for _ in 0..9 {
            ts.advance(Duration::from_millis(10));
            assert!(
                !l.check_and_record(mk_sender(1), mk_contract(1))
                    .is_allowed()
            );
        }
        // Now we're at 90ms — still rejected.
        ts.advance(Duration::from_millis(5));
        assert!(
            !l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed()
        );
        // One more advance puts us past 100ms from the FIRST accept.
        ts.advance(Duration::from_millis(10));
        assert!(
            l.check_and_record(mk_sender(1), mk_contract(1))
                .is_allowed(),
            "after 105ms+ from original accept, next attempt MUST be allowed — \
             rejected attempts must not have moved the window forward"
        );
    }

    #[test]
    fn cleanup_removes_stale_entries() {
        let (l, ts) = mk_limiter();
        l.check_and_record(mk_sender(1), mk_contract(1));
        l.check_and_record(mk_sender(2), mk_contract(2));
        assert_eq!(l.len(), 2);

        // Advance past CLEANUP_AGE.
        ts.advance(CLEANUP_AGE + Duration::from_secs(1));
        l.cleanup();
        assert_eq!(l.len(), 0, "all stale entries must be cleared");
    }

    #[test]
    fn cleanup_preserves_fresh_entries() {
        let (l, ts) = mk_limiter();
        l.check_and_record(mk_sender(1), mk_contract(1));
        // Advance partway through the cleanup age.
        ts.advance(CLEANUP_AGE / 2);
        l.cleanup();
        assert_eq!(l.len(), 1, "fresh entry must be preserved");
    }

    #[test]
    fn counters_track_accepts_and_rejects() {
        let (l, ts) = mk_limiter();
        for i in 0..5 {
            // Five accepts: stagger by min_interval.
            ts.advance(MIN_UPDATE_INTERVAL + Duration::from_millis(1));
            assert!(
                l.check_and_record(mk_sender(1), mk_contract(1))
                    .is_allowed(),
                "iter {i}"
            );
        }
        // Three rejects: hammer with no advance.
        for _ in 0..3 {
            assert!(
                !l.check_and_record(mk_sender(1), mk_contract(1))
                    .is_allowed()
            );
        }
        assert_eq!(l.accepted_total(), 5);
        assert_eq!(l.rejected_total(), 3);
    }

    /// Pin test exhibiting the May 21 incident pattern: a single sender
    /// hammering a single contract at ~10 UPDATEs/s. With the limiter,
    /// at most ~10/s are accepted (one per MIN_UPDATE_INTERVAL window);
    /// the rest are dropped. The "real" 4PjqN5… incident was producing
    /// far more than this, so the rejection rate is overwhelming.
    #[test]
    fn may21_flood_pattern_is_throttled() {
        let (l, ts) = mk_limiter();
        let sender = mk_sender(1);
        let contract = mk_contract(1);

        // Simulate 1 second of flooding at 1ms per attempt (1000
        // attempts/s — 100× over the 10/s ceiling).
        for _ in 0..1000 {
            l.check_and_record(sender, contract);
            ts.advance(Duration::from_millis(1));
        }
        // Expected admits: floor(1000ms / 100ms) + 1 (first one is
        // unconditional) = ~11 admissions, ~989 rejections.
        let accepted = l.accepted_total();
        let rejected = l.rejected_total();
        assert!(
            (9..=12).contains(&accepted),
            "expected ~10 admits over 1s of flooding, got {accepted}"
        );
        assert_eq!(accepted + rejected, 1000);
        // The reject rate must be high — the flood is mostly dropped.
        assert!(
            rejected as f64 / 1000.0 > 0.95,
            "expected >95% rejection rate, got {}",
            rejected as f64 / 1000.0
        );
    }

    /// Pin: hard cap on tracked pairs prevents an attacker churning
    /// fresh contract identities from growing the map indefinitely.
    /// Once `MAX_TRACKED_PAIRS` is reached, new pairs get
    /// `CapacityExceeded`, never `Allowed`. Existing pairs keep
    /// working — graceful degradation, not total denial.
    #[test]
    fn capacity_exceeded_when_cap_reached() {
        // Small cap so the test is fast.
        let ts = SharedMockTimeSource::new();
        let limiter = UpdateRateLimiter::with_config(
            Arc::new(ts.clone()),
            MIN_UPDATE_INTERVAL,
            8, // tiny cap for test speed
        );

        // Fill the map with 8 distinct pairs — all should be Allowed.
        for i in 0..8 {
            let d = limiter.check_and_record(mk_sender(i + 1), mk_contract(i + 1));
            assert_eq!(d, RateLimitDecision::Allowed, "pair {i} should be allowed");
        }
        assert_eq!(limiter.len(), 8);

        // Pair #9 is a new key — must be CapacityExceeded.
        let d = limiter.check_and_record(mk_sender(99), mk_contract(99));
        assert_eq!(
            d,
            RateLimitDecision::CapacityExceeded,
            "new pair past the cap must be CapacityExceeded"
        );
        assert_eq!(limiter.capacity_rejected_total(), 1);

        // Pair #1 (already tracked) — should still work after
        // the min_interval elapses. Existing pairs aren't punished
        // by the cap.
        ts.advance(MIN_UPDATE_INTERVAL + Duration::from_millis(1));
        let d = limiter.check_and_record(mk_sender(1), mk_contract(1));
        assert_eq!(
            d,
            RateLimitDecision::Allowed,
            "existing pair must keep working at cap"
        );

        // After cleanup drops stale entries the cap recovers, but in
        // this test no time has passed so they're still fresh.
    }

    /// Pin the atomicity of `check_and_record` under concurrent
    /// callers. Without `DashMap::entry()` holding the shard guard
    /// across the time-comparison + stamp, two threads racing the
    /// same `(sender, contract)` pair could both decide `Allowed`,
    /// both increment `accepted_total`, and both spawn relay work.
    /// Codex review of PR #4285 caught this. The fixed implementation
    /// must serialize exactly one Allowed per `min_interval` window
    /// per pair.
    #[test]
    fn concurrent_check_and_record_admits_one_per_window() {
        use std::sync::{Arc as StdArc, Barrier};
        use std::thread;

        let ts = SharedMockTimeSource::new();
        let limiter = StdArc::new(UpdateRateLimiter::new(Arc::new(ts.clone())));
        let sender = mk_sender(1);
        let contract = mk_contract(1);

        const THREADS: usize = 16;
        let barrier = StdArc::new(Barrier::new(THREADS));
        let mut handles = Vec::with_capacity(THREADS);

        for _ in 0..THREADS {
            let l = limiter.clone();
            let b = barrier.clone();
            handles.push(thread::spawn(move || {
                b.wait();
                l.check_and_record(sender, contract)
            }));
        }

        let mut allowed = 0;
        let mut rejected = 0;
        for h in handles {
            match h.join().unwrap() {
                RateLimitDecision::Allowed => allowed += 1,
                RateLimitDecision::Rejected { .. } => rejected += 1,
                RateLimitDecision::CapacityExceeded => panic!("unexpected cap"),
            }
        }
        assert_eq!(
            allowed, 1,
            "exactly ONE concurrent caller must be admitted per window; \
             got {allowed} admits, {rejected} rejects"
        );
        assert_eq!(rejected, THREADS - 1);
        assert_eq!(limiter.accepted_total(), 1);
        assert_eq!(limiter.rejected_total(), (THREADS - 1) as u64);
    }

    /// Source-grep pin: the rate-limit gate at the UPDATE dispatch
    /// site in `node.rs` must cover ALL four UPDATE wire variants
    /// (`RequestUpdate`, `BroadcastTo`, `RequestUpdateStreaming`,
    /// `BroadcastToStreaming`). Codex review of #4285 caught a
    /// previous iteration that only gated `RequestUpdate`, letting
    /// a flooder bypass by switching opcode.
    ///
    /// Strategy: scrape `node.rs` for the UPDATE dispatch block and
    /// assert (a) the rate-limit call site appears, (b) it appears
    /// BEFORE any `start_relay_*` spawn, and (c) the four wire-variant
    /// names are mentioned in the same block (proving the dispatch
    /// matches on all of them).
    #[test]
    fn update_dispatch_gates_all_four_wire_variants() {
        const NODE_SRC: &str = include_str!("../node.rs");

        // Find the UPDATE handler block.
        let block_start = NODE_SRC
            .find("NetMessageV1::Update(ref op) =>")
            .expect("could not locate UPDATE dispatch block in node.rs");

        // Bound the search to the END of the UPDATE arm — find the
        // next `NetMessageV1::` arm that starts at the same match
        // level. The UPDATE block must not "spill" into the next
        // handler for our assertions (Codex re-review nit on #4285
        // — the previous 8KB slice could include the next handler).
        let tail = &NODE_SRC[block_start + 1..];
        let block_len = tail
            .find("\n        NetMessageV1::")
            .or_else(|| tail.find("\n    NetMessageV1::"))
            .unwrap_or(tail.len());
        let block = &NODE_SRC[block_start..block_start + 1 + block_len];

        // (a) the gate is invoked.
        let rate_limit_pos = block
            .find("update_rate_limiter")
            .expect("update_rate_limiter not invoked in UPDATE dispatch block");

        // (b) the gate fires BEFORE the first relay spawn — otherwise
        //     the spawn cost is paid even on rejection.
        let first_spawn_pos = block
            .find("start_relay_request_update(")
            .expect("start_relay_request_update spawn not found in block");
        assert!(
            rate_limit_pos < first_spawn_pos,
            "rate limit gate (offset {rate_limit_pos}) must appear BEFORE \
             the first relay spawn (offset {first_spawn_pos}) so rejected \
             messages don't pay the spawn cost"
        );

        // (c) all four wire variants are matched in the dispatch, and
        //     each spawn site is also present.
        for variant in [
            "UpdateMsg::RequestUpdate {",
            "UpdateMsg::BroadcastTo {",
            "UpdateMsg::RequestUpdateStreaming {",
            "UpdateMsg::BroadcastToStreaming {",
        ] {
            assert!(
                block.contains(variant),
                "UPDATE dispatch block missing wire variant: `{variant}`. \
                 If a new UPDATE wire variant was added, gate it through \
                 the rate limiter and update this list. If a variant was \
                 removed, update this list."
            );
        }
        // Spawn-site cross-check — the dispatch must invoke all four
        // relay drivers. This pins the variants to actual driver
        // calls (not just match arms), making the test less brittle
        // to comment-only mentions of a variant name.
        for spawn in [
            "start_relay_request_update(",
            "start_relay_broadcast_to(",
            "start_relay_request_update_streaming(",
            "start_relay_broadcast_to_streaming(",
        ] {
            let count = block.matches(spawn).count();
            assert!(
                count >= 1,
                "UPDATE dispatch block does not invoke `{spawn}` — the \
                 corresponding wire variant is not actually gated."
            );
        }
    }

    /// Strict-cap pin: under concurrent insertion of distinct keys
    /// (no shared key), the total accepted count must NOT exceed the
    /// cap regardless of how many threads race. Codex re-review of
    /// PR #4285 caught that the previous `len()`-precheck pattern
    /// could overshoot by up to `num_concurrent_callers` because
    /// every racing caller saw `len < cap` at probe time. The fix
    /// uses an `AtomicUsize::fetch_add` reservation, which strictly
    /// serializes.
    #[test]
    fn concurrent_distinct_keys_do_not_overshoot_cap() {
        use std::sync::{Arc as StdArc, Barrier};
        use std::thread;

        const CAP: usize = 8;
        const THREADS: usize = 64; // 8× the cap to stress the race

        let ts = SharedMockTimeSource::new();
        let limiter = StdArc::new(UpdateRateLimiter::with_config(
            Arc::new(ts.clone()),
            MIN_UPDATE_INTERVAL,
            CAP,
        ));
        let barrier = StdArc::new(Barrier::new(THREADS));
        let mut handles = Vec::with_capacity(THREADS);

        for i in 0..THREADS {
            let l = limiter.clone();
            let b = barrier.clone();
            handles.push(thread::spawn(move || {
                b.wait();
                // Each thread tries a DISTINCT key, so they all
                // exercise the new-pair (Vacant) path concurrently.
                l.check_and_record(mk_sender((i + 1) as u8), mk_contract((i + 1) as u8))
            }));
        }

        let mut allowed = 0;
        let mut cap_rejected = 0;
        let mut rate_rejected = 0;
        for h in handles {
            match h.join().unwrap() {
                RateLimitDecision::Allowed => allowed += 1,
                RateLimitDecision::CapacityExceeded => cap_rejected += 1,
                RateLimitDecision::Rejected { .. } => rate_rejected += 1,
            }
        }
        // Critical invariant: the map size is EXACTLY the cap, not
        // cap + overshoot.
        assert_eq!(
            limiter.len(),
            CAP,
            "strict cap: map size must equal CAP after a 64-thread \
             concurrent flood of distinct keys, got {}",
            limiter.len()
        );
        assert_eq!(
            allowed, CAP,
            "exactly CAP admissions allowed under flood, got {allowed}"
        );
        assert_eq!(cap_rejected, THREADS - CAP);
        assert_eq!(rate_rejected, 0);
        assert_eq!(limiter.capacity_rejected_total(), (THREADS - CAP) as u64);
    }

    /// Pin: cleanup decrements the `size` counter by the number of
    /// dropped entries. After all entries roll off, the cap should
    /// be fully available again — strict-cap accounting tracks the
    /// map. Caught a related issue while refactoring the cap
    /// implementation.
    #[test]
    fn cleanup_decrements_size_counter() {
        let ts = SharedMockTimeSource::new();
        let limiter = UpdateRateLimiter::with_config(
            Arc::new(ts.clone()),
            MIN_UPDATE_INTERVAL,
            4, // small cap
        );
        // Fill to cap.
        for i in 0..4 {
            assert_eq!(
                limiter.check_and_record(mk_sender(i + 1), mk_contract(i + 1)),
                RateLimitDecision::Allowed
            );
        }
        // 5th is CapacityExceeded.
        assert_eq!(
            limiter.check_and_record(mk_sender(5), mk_contract(5)),
            RateLimitDecision::CapacityExceeded
        );
        // Age out all entries.
        ts.advance(CLEANUP_AGE + Duration::from_secs(1));
        limiter.cleanup();
        assert_eq!(limiter.len(), 0);
        // Now we should be able to add 4 more without hitting the cap.
        for i in 10..14 {
            assert_eq!(
                limiter.check_and_record(mk_sender(i), mk_contract(i)),
                RateLimitDecision::Allowed,
                "after cleanup, new pair (sender={i}) should be admitted"
            );
        }
    }
}