net-mesh 0.21.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
//! Bandwidth budget — `docs/plans/REDEX_DISTRIBUTED_PLAN.md` §5 +
//! Locked decision 10.
//!
//! Token-bucket rate limiter the catch-up loop consults before
//! shipping a [`SyncResponse`] chunk. Configured per channel via
//! `ReplicationConfig::replication_budget_fraction` (default 0.5)
//! and the operator's measured NIC peak; refills at the configured
//! per-second rate and caps the burst at one second's worth of
//! tokens (so a long idle period doesn't accumulate an unbounded
//! credit that lets the next round saturate the link).
//!
//! Pure logic — caller passes `Instant::now()` so DST and unit
//! tests can advance time deterministically. The eventual tokio
//! interval-driven catch-up loop calls `try_consume(bytes,
//! Instant::now())` before assembling each chunk; on `false` it
//! defers the chunk to the next tick.
//!
//! Backpressure-aware: the reliable-stream layer already throttles
//! based on the receiver's flow-control window; this budget enforces
//! the *sender's* outbound cap so a single channel's catch-up can't
//! starve foreground publish traffic. Both mechanisms compose:
//! whichever is more restrictive at the moment wins.

use std::time::{Duration, Instant};

use super::bandwidth::BandwidthClass;

/// v0.3 Phase D4 anti-starvation hatch threshold: if a
/// `Background` request has been denied for at least this
/// long, the next denial is converted to a one-shot admission
/// regardless of the `background_fraction` gate. Resets on each
/// successful Background admission.
///
/// 60 seconds matches the v0.3 plan §7. Tunable via
/// [`BandwidthBudget::with_background_starve_window`] for tests.
pub const BACKGROUND_STARVE_WINDOW_DEFAULT: Duration = Duration::from_secs(60);

/// v0.3 Phase D4 anti-starvation hatch grant cap, as a fraction of
/// `capacity_bytes`. The hatch guarantees Background a periodic
/// admission, but without an upper-bound on the per-shot grant, an
/// adversarial `chunk_max = u32::MAX` Background request that's
/// been starved past the window can floor the bucket to zero in a
/// single shot — the exact "Foreground starved by Background"
/// failure D4 was supposed to prevent.
///
/// 10% of capacity caps the worst-case Foreground impact per hatch
/// firing at one-tenth of one second's refill. Combined with the
/// 60-second starve window, Background's hatch-driven share is at
/// most ~0.17% of capacity over any 60-second window (10% / 60),
/// well below the plan's "≥ 10% guaranteed minimum" floor (which
/// is targeting steady-state share, not per-hatch grant size).
pub const BACKGROUND_HATCH_MAX_GRANT_FRACTION: f64 = 0.10;

/// v0.3 Phase D2 `Realtime` admission cap, as a fraction of
/// `capacity_bytes`. Realtime requests bypass the rate-limit
/// failure path, but the `class` byte on each `SyncRequest` is
/// untrusted from the wire (the request-binding token does not
/// cover the trailing class byte). A hostile peer in the
/// replica-set can stamp `Realtime` on every outbound request and
/// would otherwise drain the bucket to zero on every call.
///
/// Cap Realtime grants at 50% of capacity per call: large enough
/// to admit any honest interactive operation (Realtime is intended
/// for control-plane sweeps with chunk_max bounded by
/// `SYNC_REQUEST_CHUNK_MAX_DEFAULT`), small enough that an
/// adversarial peer can't drain the bucket in one shot.
/// Oversized Realtime requests are denied — a misbehaving peer
/// has to send many small requests, each of which still drains
/// the bucket per the existing semantic, but the per-shot floor
/// is now bounded.
pub const REALTIME_MAX_GRANT_FRACTION: f64 = 0.50;

/// Token-bucket rate limiter scaled to a fraction of measured NIC
/// peak. Caller mutates via [`Self::try_consume`]; refill is
/// time-driven (passing the current `Instant` each call).
///
/// Burst capacity = one second of tokens. A long idle period
/// doesn't accumulate unbounded credit — the bucket caps at the
/// per-second rate so the next active second is bounded. The plan
/// §5 prefers steady-state throttling over burst absorption; this
/// matches.
#[derive(Debug, Clone)]
pub struct BandwidthBudget {
    /// Tokens currently available, in bytes.
    available_bytes: f64,
    /// Refill rate in bytes per second. Computed at construction
    /// from `nic_peak_bps × fraction`.
    refill_bps: f64,
    /// Bucket capacity in bytes — equal to `refill_bps` so the
    /// burst is bounded at one second's worth of tokens.
    capacity_bytes: f64,
    /// Last time we refilled the bucket. Caller-supplied `Instant`
    /// drives this; no system-clock reads inside the limiter.
    last_refill: Instant,
    /// v0.3 Phase D4: last time a `Background` request was
    /// admitted (either through the gate or via the anti-
    /// starvation hatch). `None` until the first Background
    /// admission. Drives the 60 s starve detector: if a
    /// Background request is denied AND `now -
    /// last_background_admission > background_starve_window`,
    /// the next denial is converted to a one-shot admission +
    /// resets the timer.
    ///
    /// Initialised to `Some(now)` at construction so the very
    /// first request doesn't trip the hatch — operators expect
    /// the gate to fire normally before starvation logic kicks
    /// in.
    last_background_admission: Option<Instant>,
    /// v0.3 Phase D4 starve threshold. Default
    /// [`BACKGROUND_STARVE_WINDOW_DEFAULT`] (60 s); tests inject
    /// shorter windows via
    /// [`Self::with_background_starve_window`].
    background_starve_window: Duration,
    /// v0.3 Phase D2: outstanding bytes attributed to past
    /// `Realtime` admissions. Tracks the *cumulative* bytes
    /// Realtime has drained-but-not-yet-refunded so the next
    /// refund (which is class-blind on the wire layer — the
    /// dispatcher only sees "the send failed; refund N bytes")
    /// repays the Realtime debt FIRST before crediting the
    /// general `available_bytes` pool. Without this, a Realtime
    /// drain followed by a Foreground send-failure refund would
    /// credit Foreground for the cost Realtime imposed, giving
    /// Realtime a permanent free pass.
    ///
    /// The accountant is monotonic-increasing on Realtime admit
    /// and monotonic-decreasing on refund; clamped at 0 so an
    /// over-refund (refund > debt) credits the remainder to the
    /// general pool.
    realtime_debt: f64,
}

impl BandwidthBudget {
    /// Construct a budget limiter scaled to `fraction × nic_peak_bps`.
    ///
    /// - `fraction` is clamped to `(0.0, 1.0]` (a fraction of zero
    ///   or negative would make the bucket never refill; > 1.0
    ///   would let the channel exceed the measured NIC peak,
    ///   which is meaningless).
    /// - `nic_peak_bps` is the operator's measured per-link peak
    ///   in bytes per second.
    /// - `now` seeds the `last_refill` timestamp; the bucket
    ///   starts full so the first call to `try_consume` succeeds
    ///   up to the capacity.
    pub fn new(fraction: f32, nic_peak_bps: u64, now: Instant) -> Self {
        // Clamp the fraction; the [`ReplicationConfig`] validator
        // already enforces this, but landing it here too keeps
        // unit tests + DST scenarios from constructing a
        // pathological limiter.
        let clamped = if !fraction.is_finite() || fraction <= 0.0 {
            // Lowest non-zero value — keeps the bucket refilling
            // at a glacial pace rather than producing div-by-zero.
            // The config validator rejects this shape before
            // construction in production.
            f32::EPSILON
        } else if fraction > 1.0 {
            1.0
        } else {
            fraction
        };
        let refill_bps = nic_peak_bps as f64 * clamped as f64;
        // Burst capacity caps at one second of tokens. Plan §5:
        // "Per-request chunk_max bounds memory footprint of any
        // single sync exchange" — burst-bucket size honors that.
        let capacity_bytes = refill_bps;
        Self {
            available_bytes: capacity_bytes,
            refill_bps,
            capacity_bytes,
            last_refill: now,
            // Seed `last_background_admission` to `now` so the
            // anti-starvation hatch doesn't trip on the very
            // first Background request (60 s starve from
            // construction-time).
            last_background_admission: Some(now),
            background_starve_window: BACKGROUND_STARVE_WINDOW_DEFAULT,
            realtime_debt: 0.0,
        }
    }

    /// Override the v0.3 Phase D4 anti-starvation window
    /// (default [`BACKGROUND_STARVE_WINDOW_DEFAULT`] = 60 s).
    /// Tests inject a short window (e.g. 100 ms) to exercise the
    /// hatch deterministically without sleeping for 60 seconds.
    pub fn with_background_starve_window(mut self, window: Duration) -> Self {
        self.background_starve_window = window;
        self
    }

    /// Refill the bucket given `now`. Called internally by
    /// [`Self::try_consume`]; exposed so the eventual heartbeat
    /// loop can pre-refill before consulting the budget for a
    /// multi-chunk decision.
    pub fn refill(&mut self, now: Instant) {
        let elapsed = now.saturating_duration_since(self.last_refill);
        if elapsed.is_zero() {
            return;
        }
        let added = self.refill_bps * elapsed.as_secs_f64();
        self.available_bytes = (self.available_bytes + added).min(self.capacity_bytes);
        self.last_refill = now;
    }

    /// Try to consume `bytes` from the bucket. Returns `true` on
    /// success (tokens deducted); `false` on insufficient credit
    /// (state unchanged — caller defers and retries on the next
    /// tick after [`Self::refill`] runs again).
    ///
    /// `bytes == 0` always succeeds without state mutation.
    ///
    /// **Oversize request behavior.** Requests larger than
    /// [`Self::capacity_bytes`] (= one second's worth of refill)
    /// can never accumulate enough credit on their own — the
    /// bucket caps at capacity even after infinite refill. The catch-up
    /// path is responsible for splitting outbound chunks against
    /// this ceiling; if a single event is itself larger than the
    /// budget capacity (rare — e.g. a tiny channel with a large
    /// payload), the call admits it as a one-off, draining the
    /// bucket fully so subsequent requests defer until refill
    /// catches up. Without this clamp the channel would deadlock
    /// trying to send a single event it can never afford.
    pub fn try_consume(&mut self, bytes: u64, now: Instant) -> bool {
        // Routes through the class-aware path with `Foreground`
        // semantics. The `background_fraction` argument is inert
        // on the Foreground branch — the gate only consults the
        // fraction for `Background` requests — so any value works
        // here; `0.0` keeps the call site explicit. Existing call
        // sites stay backward-compatible without code change.
        self.try_consume_with_class(bytes, BandwidthClass::Foreground, now, 0.0)
    }

    /// Class-aware admission gate. v0.3 Phase D2.
    ///
    /// Per-class semantics:
    ///
    /// - `Foreground` (default): identical to the v0.2
    ///   `try_consume` — admit when the bucket has enough credit;
    ///   the oversize-event escape hatch still fires.
    /// - `Background`: admit only when the bucket has at least
    ///   `(1 - background_fraction) * capacity` available. The
    ///   reservation preserves Foreground headroom against
    ///   sustained Background load. **v0.3 Phase D4** anti-
    ///   starvation hatch: when a Background request is denied
    ///   AND the time since the last Background admission exceeds
    ///   `background_starve_window`, the denial is
    ///   one-shot-converted to an admission + the starve timer
    ///   resets. Bounded promotion (one request) prevents the
    ///   gate from flipping into "Foreground starved" during a
    ///   long backlog drain.
    /// - `Realtime`: bypasses the rate-limit failure path
    ///   entirely. Disk-pressure circuit-breakers still apply
    ///   above this layer; this gate doesn't consider them.
    ///   Drains the bucket on admission so the next non-Realtime
    ///   request feels the cost.
    ///
    /// `background_fraction` lives in `[0.0, 1.0)` — values at
    /// the edge of the range are permitted but degenerate (0.0
    /// gives Background full Foreground parity; ~1.0 makes
    /// Background practically unadmittable absent the anti-
    /// starvation hatch). The
    /// [`ReplicationConfig::validate`](super::replication_config::ReplicationConfig::validate)
    /// pass rejects out-of-range values at config time.
    pub fn try_consume_with_class(
        &mut self,
        bytes: u64,
        class: BandwidthClass,
        now: Instant,
        background_fraction: f32,
    ) -> bool {
        if bytes == 0 {
            return true;
        }
        self.refill(now);
        let cost = bytes as f64;

        match class {
            BandwidthClass::Realtime => {
                // Bypass the rate-limit failure path — but cap
                // per-shot cost at `REALTIME_MAX_GRANT_FRACTION ×
                // capacity` so an adversarial peer that stamps
                // `Realtime` on a u32::MAX-byte SyncRequest can't
                // drain the bucket to zero in one call. Hostile-
                // peer caveat: the `class` byte on the wire isn't
                // covered by the R-23 request-binding token, so
                // any replica-set peer can forge `Realtime`.
                // Higher layers (capability-tag advertisement of
                // who may use Realtime) are the proper place for
                // class-authz; this cap is the rate-limiter's
                // defense-in-depth.
                let realtime_cap = REALTIME_MAX_GRANT_FRACTION * self.capacity_bytes;
                if cost > realtime_cap {
                    return false;
                }
                self.available_bytes = (self.available_bytes - cost).max(0.0);
                // Track the Realtime drain so the next refund
                // pays this debt back first. Pre-fix the refund
                // path credited the general pool — i.e. a
                // Realtime drain followed by a Foreground send-
                // failure refund credited Foreground for the
                // Realtime cost.
                self.realtime_debt += cost;
                true
            }
            BandwidthClass::Foreground => {
                // v0.2 behavior preserved.
                if self.available_bytes >= cost {
                    self.available_bytes -= cost;
                    return true;
                }
                if bytes > self.capacity_bytes as u64
                    && self.available_bytes >= self.capacity_bytes - f64::EPSILON
                {
                    self.available_bytes = 0.0;
                    return true;
                }
                false
            }
            BandwidthClass::Background => {
                // Gate threshold: bucket must hold at least
                // `(1 - background_fraction) * capacity` to
                // satisfy the reservation against Foreground.
                let reserve = (1.0 - background_fraction as f64).max(0.0) * self.capacity_bytes;
                let gated_ok =
                    self.available_bytes >= cost && self.available_bytes - cost >= reserve;
                if gated_ok {
                    self.available_bytes -= cost;
                    self.last_background_admission = Some(now);
                    return true;
                }
                // Anti-starvation hatch: if Background has been
                // denied for too long, force a one-shot admission
                // regardless of the reserve gate. The hatch grant
                // is capped at `BACKGROUND_HATCH_MAX_GRANT_FRACTION
                // * capacity_bytes` so an adversarial chunk_max =
                // u32::MAX Background request can't floor the
                // bucket in a single shot. Requests larger than
                // the hatch cap are denied — the producer must
                // resubmit a smaller chunk (the request-side
                // chunking already caps SyncRequest::chunk_max to
                // SYNC_REQUEST_CHUNK_MAX_DEFAULT, so in practice
                // honest requests fit comfortably below the cap).
                let starved = self
                    .last_background_admission
                    .map(|t| now.saturating_duration_since(t) > self.background_starve_window)
                    .unwrap_or(true);
                if starved {
                    let hatch_cap = BACKGROUND_HATCH_MAX_GRANT_FRACTION * self.capacity_bytes;
                    if cost > hatch_cap {
                        // Oversized hatch attempt — deny without
                        // resetting the starve timer so a
                        // subsequent right-sized Background request
                        // can still take the next hatch firing.
                        return false;
                    }
                    self.available_bytes = (self.available_bytes - cost).max(0.0);
                    self.last_background_admission = Some(now);
                    return true;
                }
                false
            }
        }
    }

    /// Return previously-consumed `bytes` to the bucket. Called
    /// when a wire send fails after `try_consume` already deducted
    /// the cost — otherwise repeated send failures over a flaky
    /// link would drift the budget toward permanent backpressure
    /// without shipping any traffic. Idempotent saturation: the
    /// returned tokens never exceed `capacity_bytes`.
    ///
    /// `bytes == 0` is a no-op.
    ///
    /// Floors `available_bytes` to whole-byte precision before
    /// adding the refunded amount so accumulated fractional refill
    /// from [`Self::refill`] cannot compound across many refunds.
    /// `try_consume`'s `>=` compares `available_bytes` against
    /// `cost as f64`; without the floor, drift accumulating tens
    /// of millibits per refund could admit one extra byte over a
    /// long sequence of small refunds. The sub-byte fractional
    /// credit lost on each refund is recovered on the next refill
    /// tick.
    ///
    /// **Realtime debt repayment.** Refunds pay down the
    /// `realtime_debt` accountant FIRST before crediting the
    /// general `available_bytes` pool. Without this, a sequence
    /// like (Realtime admit → drain → Foreground admit → send
    /// fails → refund) credits the refund to Foreground's pool
    /// — giving Realtime a free pass on the bytes it imposed.
    /// Once the debt is zero, any remaining refund credits
    /// available_bytes as before.
    pub fn refund(&mut self, bytes: u64) {
        if bytes == 0 {
            return;
        }
        let mut credit = bytes as f64;
        if self.realtime_debt > 0.0 {
            let repay = credit.min(self.realtime_debt);
            self.realtime_debt -= repay;
            credit -= repay;
        }
        if credit <= 0.0 {
            return;
        }
        let floored = self.available_bytes.max(0.0).floor();
        self.available_bytes = (floored + credit).min(self.capacity_bytes);
    }

    /// Current available token count in bytes. Useful for
    /// observability — operators can graph "how much catch-up
    /// budget is unused?" to spot under-utilized links.
    pub fn available_bytes(&self) -> u64 {
        // Saturating-floor at zero — tokens can technically dip
        // a tick below zero via floating-point rounding; we
        // surface the user-facing accumulator clamped.
        self.available_bytes.max(0.0).floor() as u64
    }

    /// Configured refill rate in bytes/sec.
    pub fn refill_bps(&self) -> u64 {
        self.refill_bps as u64
    }

    /// Configured capacity (= refill_bps; one second's burst).
    pub fn capacity_bytes(&self) -> u64 {
        self.capacity_bytes as u64
    }

    /// Update the budget's NIC peak measurement. The proximity
    /// graph's per-link throughput counters update on a 60 s
    /// rolling window; this method lets the coordinator update
    /// without reconstructing the limiter (which would clear the
    /// current token balance).
    pub fn set_nic_peak(&mut self, nic_peak_bps: u64, fraction: f32, now: Instant) {
        // Refill before re-scaling so the existing balance maps
        // to the new capacity correctly.
        self.refill(now);
        let clamped = if !fraction.is_finite() || fraction <= 0.0 {
            f32::EPSILON
        } else if fraction > 1.0 {
            1.0
        } else {
            fraction
        };
        let new_refill = nic_peak_bps as f64 * clamped as f64;
        // Preserve the proportion of fill — a half-full bucket
        // stays half-full after the re-scale.
        let prev_proportion = if self.capacity_bytes > 0.0 {
            self.available_bytes / self.capacity_bytes
        } else {
            1.0
        };
        self.refill_bps = new_refill;
        self.capacity_bytes = new_refill;
        self.available_bytes = (new_refill * prev_proportion).min(new_refill);
        // Reset the D4 starve timer to `now`. Pre-fix, the prior
        // bucket's `last_background_admission` survived the
        // re-scale: an operator shrinking the NIC peak (e.g. to
        // throttle under congestion) inherited a stale timer that
        // could fire the hatch immediately after the new tighter
        // budget took effect — defeating the operator's intent.
        // Seeding to `now` mirrors the constructor's initial seed:
        // the hatch only fires after a full `background_starve_window`
        // has elapsed under the new budget.
        self.last_background_admission = Some(now);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    fn t0() -> Instant {
        Instant::now()
    }

    fn at(base: Instant, ms: u64) -> Instant {
        base + Duration::from_millis(ms)
    }

    #[test]
    fn bucket_starts_full_at_capacity() {
        // 1 MB/s budget = 1_048_576 bytes/sec.
        let base = t0();
        let bb = BandwidthBudget::new(0.5, 2 * 1024 * 1024, base);
        // 0.5 × 2 MiB/s = 1 MiB/s.
        assert_eq!(bb.refill_bps(), 1024 * 1024);
        assert_eq!(bb.capacity_bytes(), 1024 * 1024);
        assert_eq!(bb.available_bytes(), 1024 * 1024);
    }

    #[test]
    fn try_consume_succeeds_within_capacity() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000_000, base);
        assert!(bb.try_consume(500_000, base));
        // ~500_000 bytes left (slack for f64 rounding).
        assert!(bb.available_bytes() >= 499_999);
        assert!(bb.available_bytes() <= 500_001);
    }

    #[test]
    fn try_consume_fails_when_empty() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 100, base);
        // Drain the entire 100-byte capacity.
        assert!(bb.try_consume(100, base));
        // Subsequent consume within the same instant fails.
        assert!(!bb.try_consume(1, base));
    }

    #[test]
    fn refill_restores_tokens_over_time() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base); // 1 KB/s
        bb.try_consume(1_000, base); // drain
        assert_eq!(bb.available_bytes(), 0);
        // 500 ms elapsed → 500 bytes refilled.
        bb.refill(at(base, 500));
        let avail = bb.available_bytes();
        assert!(
            (499..=500).contains(&avail),
            "expected ~500 bytes refilled, got {avail}",
        );
    }

    #[test]
    fn refill_caps_at_capacity_not_unbounded() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        // 10 seconds idle — would refill 10 KB if unbounded;
        // capped at 1 KB (one second's worth).
        bb.refill(at(base, 10_000));
        assert_eq!(bb.available_bytes(), 1_000);
    }

    #[test]
    fn zero_byte_consume_always_succeeds() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        bb.try_consume(1_000, base); // drain
        assert!(bb.try_consume(0, base));
        assert_eq!(bb.available_bytes(), 0); // no spurious refill
    }

    #[test]
    fn fraction_above_one_clamped() {
        let base = t0();
        let bb = BandwidthBudget::new(2.0, 1_000_000, base);
        // Clamped at 1.0 — refill = full NIC peak, not 2×.
        assert_eq!(bb.refill_bps(), 1_000_000);
    }

    #[test]
    fn fraction_zero_falls_back_to_epsilon() {
        let base = t0();
        let bb = BandwidthBudget::new(0.0, 1_000_000_000, base);
        // Epsilon × 1 GB/s = ~119 B/s. Lock the floor: bucket
        // does refill (slowly) rather than being permanently empty.
        assert!(bb.refill_bps() > 0);
    }

    #[test]
    fn fraction_nan_falls_back_to_epsilon() {
        let base = t0();
        let bb = BandwidthBudget::new(f32::NAN, 1_000_000_000, base);
        assert!(bb.refill_bps() > 0);
    }

    #[test]
    fn fraction_neg_inf_falls_back_to_epsilon() {
        let base = t0();
        let bb = BandwidthBudget::new(f32::NEG_INFINITY, 1_000_000_000, base);
        assert!(bb.refill_bps() > 0);
    }

    #[test]
    fn partial_consume_then_refill_then_consume() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        bb.try_consume(600, base); // 400 left
        bb.refill(at(base, 500)); // +500 → capped at 1000? actually 900
                                  // wait: 400 + (500 ms × 1000/s) = 400 + 500 = 900
        let avail = bb.available_bytes();
        assert!((899..=900).contains(&avail), "got {avail}");
        // Consuming the remainder.
        assert!(bb.try_consume(900, at(base, 500)));
    }

    #[test]
    fn try_consume_oversize_with_full_bucket_admits_as_one_off() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        // Full bucket + a single chunk larger than capacity. The
        // budget can't accumulate enough credit even after
        // infinite refill (capacity caps at one second's tokens),
        // so the only choice is admit-once-and-drain. Otherwise
        // the channel deadlocks trying to ship an event that's
        // too large for the configured budget.
        assert!(bb.try_consume(2_000, base));
        // Bucket fully drained; subsequent normal-sized requests
        // defer until refill.
        assert!(!bb.try_consume(1, base));
    }

    #[test]
    fn try_consume_oversize_with_partial_bucket_fails() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        // Drain half the bucket via a normal-sized request.
        assert!(bb.try_consume(500, base));
        // Oversize chunk now arrives. Bucket isn't at full credit
        // anymore, so the escape hatch doesn't fire; caller defers
        // until refill catches up.
        assert!(!bb.try_consume(2_000, base));
        // State preserved on failure — half the bucket is still
        // there for the caller to consume with a smaller request.
        let remaining = bb.available_bytes();
        assert!((499..=501).contains(&remaining));
    }

    #[test]
    fn set_nic_peak_preserves_fill_proportion() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        bb.try_consume(500, base); // half full
        let before = bb.available_bytes();
        assert!((499..=501).contains(&before));
        // NIC peak doubles; half-full stays half-full of the
        // new capacity.
        bb.set_nic_peak(2_000, 1.0, base);
        assert_eq!(bb.capacity_bytes(), 2_000);
        let after = bb.available_bytes();
        assert!(
            (999..=1_000).contains(&after),
            "expected ~half of 2_000 = 1_000; got {after}",
        );
    }

    #[test]
    fn set_nic_peak_to_smaller_caps_at_new_capacity() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 10_000, base);
        // Full bucket at 10 KB.
        // Shrink to 1 KB peak — bucket must cap at the new
        // capacity, not retain the old 10 KB.
        bb.set_nic_peak(1_000, 1.0, base);
        assert_eq!(bb.capacity_bytes(), 1_000);
        assert!(bb.available_bytes() <= 1_000);
    }

    #[test]
    fn refill_with_zero_elapsed_is_noop() {
        let base = t0();
        let mut bb = BandwidthBudget::new(1.0, 1_000, base);
        bb.try_consume(500, base);
        let before = bb.available_bytes();
        bb.refill(base); // same instant
        assert_eq!(bb.available_bytes(), before);
    }
}

// ============================================================================
// v0.3 Phase D2 + D4 class-aware admission tests
// ============================================================================

#[cfg(test)]
mod class_tests {
    use super::*;

    fn at(base: Instant, millis: u64) -> Instant {
        base + Duration::from_millis(millis)
    }

    fn fresh(capacity_bps: u64) -> (BandwidthBudget, Instant) {
        let base = Instant::now();
        (BandwidthBudget::new(1.0, capacity_bps, base), base)
    }

    /// `Foreground` matches the legacy `try_consume` path —
    /// admits up to capacity, rejects past it.
    #[test]
    fn foreground_admits_up_to_capacity_then_rejects() {
        let (mut bb, base) = fresh(1000);
        assert!(bb.try_consume_with_class(600, BandwidthClass::Foreground, base, 0.3,));
        assert!(bb.try_consume_with_class(400, BandwidthClass::Foreground, base, 0.3,));
        // Bucket empty — next Foreground request fails.
        assert!(!bb.try_consume_with_class(1, BandwidthClass::Foreground, base, 0.3,));
    }

    /// `Background` is gated at `(1 - fraction) * capacity` —
    /// fraction 0.3 means Background needs `available >= 700`.
    /// Below that it's denied.
    #[test]
    fn background_admits_only_when_above_reserve() {
        let (mut bb, base) = fresh(1000);
        // Fresh bucket: available=1000. Background admitted
        // because 1000 - 200 = 800 >= reserve(700).
        assert!(bb.try_consume_with_class(200, BandwidthClass::Background, base, 0.3,));
        // available=800, request 200 → would leave 600 < reserve(700).
        // Denied (and the D4 starve timer hasn't tripped yet).
        assert!(!bb.try_consume_with_class(200, BandwidthClass::Background, at(base, 1), 0.3,));
    }

    /// `Background` with fraction 1.0 behaves like `Foreground`
    /// — the reserve threshold collapses to 0 so Background can
    /// drain the bucket up to capacity. Fraction is the share
    /// of capacity Background is ALLOWED to consume.
    #[test]
    fn background_with_full_fraction_acts_like_foreground() {
        let (mut bb, base) = fresh(1000);
        assert!(bb.try_consume_with_class(900, BandwidthClass::Background, base, 1.0,));
        assert!(bb.try_consume_with_class(100, BandwidthClass::Background, base, 1.0,));
    }

    /// `Background` with fraction 0.0 means "Background gets no
    /// allocation" — the reserve threshold equals capacity so
    /// even a tiny request on a full bucket is denied (until the
    /// anti-starvation hatch fires).
    #[test]
    fn background_with_zero_fraction_denied_on_any_credit() {
        let (mut bb, base) = fresh(1000);
        // Fresh bucket, fraction=0.0 → reserve = capacity =
        // 1000. Need `available - cost >= 1000`, but available
        // starts at 1000 and any cost takes it below. Denied.
        assert!(!bb.try_consume_with_class(1, BandwidthClass::Background, base, 0.0,));
    }

    /// `Realtime` bypasses the failure path entirely. Drains
    /// the bucket but admits even when capacity is exhausted —
    /// up to the per-shot cap.
    #[test]
    fn realtime_bypasses_failure_path() {
        let (mut bb, base) = fresh(1000);
        // Drain the bucket.
        assert!(bb.try_consume_with_class(1000, BandwidthClass::Foreground, base, 0.3,));
        // Foreground denied.
        assert!(!bb.try_consume_with_class(1, BandwidthClass::Foreground, base, 0.3,));
        // Realtime admitted regardless of empty bucket; 500 byte
        // request is under the 50% cap (500 of 1000).
        assert!(bb.try_consume_with_class(500, BandwidthClass::Realtime, base, 0.3,));
    }

    /// set_nic_peak must reset the D4 starve timer so an operator
    /// throttling the NIC peak under congestion doesn't inherit a
    /// stale "Background has been starved for X seconds" state
    /// from the prior bucket. The first hatch firing under the
    /// new budget must wait a fresh `background_starve_window`.
    #[test]
    fn set_nic_peak_resets_d4_starve_timer() {
        let base = Instant::now();
        let mut bb = BandwidthBudget::new(1.0, 10_000, base)
            .with_background_starve_window(Duration::from_millis(100));
        // Drain via Foreground.
        assert!(bb.try_consume_with_class(10_000, BandwidthClass::Foreground, base, 0.3));
        // Wait long enough that the original starve timer expires.
        let post_window = at(base, 200);
        // Re-scale NIC peak. Without the reset, the next
        // Background request at `post_window` would observe
        // `now - base > window` and fire the hatch immediately.
        bb.set_nic_peak(20_000, 1.0, post_window);
        // A Background request RIGHT AFTER set_nic_peak should be
        // denied — the starve timer was reset to post_window, so
        // the window hasn't yet elapsed under the new budget.
        let small_request = 100u64; // well under reserve gate
        assert!(
            !bb.try_consume_with_class(
                small_request,
                BandwidthClass::Background,
                at(post_window, 1),
                0.3,
            ),
            "Background must be denied within fresh window after set_nic_peak",
        );
    }

    /// Refunds first pay down the Realtime debt accountant
    /// before crediting the general available_bytes pool. Pre-fix
    /// a refund following a Realtime drain credited Foreground
    /// for the Realtime cost — a permanent free pass for
    /// Realtime that the gate was supposed to prevent.
    #[test]
    fn refund_repays_realtime_debt_before_crediting_general_pool() {
        let base = Instant::now();
        let mut bb = BandwidthBudget::new(1.0, 1000, base);
        // Realtime drains 400 bytes. Realtime debt = 400;
        // available = 600.
        assert!(bb.try_consume_with_class(400, BandwidthClass::Realtime, base, 0.3));
        let after_rt = bb.available_bytes();
        assert!((599..=600).contains(&after_rt));
        // Foreground send + failure → refund 100. Pre-fix this
        // credited available_bytes to 700 (refund of cost the
        // Realtime path imposed); post-fix it pays down the
        // Realtime debt (now 300) and available_bytes stays
        // unchanged.
        assert!(bb.try_consume_with_class(100, BandwidthClass::Foreground, base, 0.3));
        // available = 500 after Foreground draw
        bb.refund(100);
        // After refund: debt = 300 (was 400, minus 100), avail
        // unchanged at 500 (refund went entirely to debt, none
        // to general pool).
        let after_refund = bb.available_bytes();
        assert!(
            (499..=501).contains(&after_refund),
            "refund must pay debt FIRST; expected ~500, got {after_refund}",
        );
        // A subsequent refund of 500 pays the remaining 300 of
        // debt and credits 200 to available_bytes.
        bb.refund(500);
        let after_big_refund = bb.available_bytes();
        assert!(
            (699..=701).contains(&after_big_refund),
            "second refund must credit overflow after debt; expected ~700, got {after_big_refund}",
        );
    }

    /// `Realtime` per-shot cost is capped at
    /// REALTIME_MAX_GRANT_FRACTION × capacity so an adversarial
    /// peer can't drain the bucket to zero in one shot. The
    /// class byte on the wire is unauthenticated, so any
    /// replica-set peer can forge `Realtime`; this cap is the
    /// rate-limiter's defense-in-depth.
    #[test]
    fn realtime_denies_oversized_request_to_prevent_drain_in_one_shot() {
        let (mut bb, base) = fresh(1000);
        // Realtime request larger than the 50% cap (= 500 bytes).
        assert!(
            !bb.try_consume_with_class(750, BandwidthClass::Realtime, base, 0.3),
            "oversized Realtime request must be denied",
        );
        // Bucket untouched on denial.
        assert_eq!(bb.available_bytes(), 1000);
        // Right-sized Realtime request admits.
        assert!(bb.try_consume_with_class(500, BandwidthClass::Realtime, base, 0.3));
    }

    /// D4 anti-starvation hatch: a Background request denied
    /// past the starve window converts to a one-shot admission
    /// and resets the timer.
    #[test]
    fn d4_background_starve_hatch_one_shot_admit() {
        let base = Instant::now();
        // 10_000 capacity → hatch cap = 1_000. Request sizes
        // below the cap so the hatch admits; pre-cap-fix this
        // test used 200-byte requests against a 1_000-byte
        // bucket, which now exceeds the 100-byte hatch cap.
        let mut bb = BandwidthBudget::new(1.0, 10_000, base)
            .with_background_starve_window(Duration::from_millis(100));
        // Drain the bucket via Foreground so Background hits
        // the gate cleanly.
        assert!(bb.try_consume_with_class(10_000, BandwidthClass::Foreground, base, 0.3,));
        // First Background within the starve window: denied.
        assert!(!bb.try_consume_with_class(500, BandwidthClass::Background, at(base, 50), 0.3,));
        // Past the starve window: one-shot admitted (500 <
        // 1_000 hatch cap).
        assert!(bb.try_consume_with_class(500, BandwidthClass::Background, at(base, 200), 0.3,));
        // Next Background within the starve window after the
        // hatch firing: denied again (one-shot, not "open the
        // floodgates"). Use a tight gap so refill doesn't restore
        // the bucket past the reserve.
        assert!(!bb.try_consume_with_class(500, BandwidthClass::Background, at(base, 201), 0.3,));
    }

    /// D4 timer resets on every successful Background admission
    /// (whether via the gate or via the hatch). A successful
    /// gated admission means the next denial doesn't immediately
    /// re-trip the hatch.
    #[test]
    fn d4_starve_timer_resets_on_successful_gated_admission() {
        let base = Instant::now();
        let mut bb = BandwidthBudget::new(1.0, 1000, base)
            .with_background_starve_window(Duration::from_millis(100));
        // Fresh bucket: Background admitted via gate.
        assert!(bb.try_consume_with_class(200, BandwidthClass::Background, base, 0.3,));
        // Drain via Foreground.
        assert!(bb.try_consume_with_class(800, BandwidthClass::Foreground, at(base, 1), 0.3,));
        // 50 ms in (< 100 ms window): Background denied,
        // hatch doesn't fire (timer was just reset).
        assert!(!bb.try_consume_with_class(200, BandwidthClass::Background, at(base, 50), 0.3,));
    }

    /// Legacy `try_consume` still works identically — it routes
    /// through the class-aware path with Foreground semantics.
    #[test]
    fn legacy_try_consume_unchanged_behavior() {
        let (mut bb, base) = fresh(1000);
        assert!(bb.try_consume(500, base));
        assert!(bb.try_consume(500, base));
        assert!(!bb.try_consume(1, base));
    }

    /// D4 hatch must cap per-shot grants at
    /// BACKGROUND_HATCH_MAX_GRANT_FRACTION × capacity. An
    /// adversarial Background request whose cost exceeds the cap
    /// is denied even after starvation — no single hatch firing
    /// can floor the bucket to zero. Pre-fix, the hatch admitted
    /// unbounded bytes per shot.
    #[test]
    fn d4_hatch_denies_oversized_background_request() {
        let base = Instant::now();
        // 10 KB capacity → 1 KB hatch cap.
        let mut bb = BandwidthBudget::new(1.0, 10_000, base)
            .with_background_starve_window(Duration::from_millis(50));
        // Drain so the gate denies.
        assert!(bb.try_consume_with_class(10_000, BandwidthClass::Foreground, base, 0.3));
        // Wait past the starve window.
        let later = at(base, 100);
        // Adversarial Background request of 5 KB — well above
        // the 1 KB hatch cap. Must be denied.
        assert!(
            !bb.try_consume_with_class(5_000, BandwidthClass::Background, later, 0.3),
            "oversized Background hatch attempt (5 KB > 10% × 10 KB) must be denied"
        );
        // A right-sized Background request (under the cap) IS
        // admitted by the hatch on a subsequent attempt — the
        // failed oversized attempt didn't consume the starve
        // timer reset.
        assert!(
            bb.try_consume_with_class(500, BandwidthClass::Background, later, 0.3),
            "right-sized Background request under hatch cap must be admitted"
        );
    }

    /// Companion: a right-sized Background request through the
    /// hatch admits even when refill has restored some balance,
    /// and never drains the bucket below zero.
    #[test]
    fn d4_hatch_admits_within_cap() {
        let base = Instant::now();
        let mut bb = BandwidthBudget::new(1.0, 10_000, base)
            .with_background_starve_window(Duration::from_millis(50));
        // Drain via Foreground.
        assert!(bb.try_consume_with_class(10_000, BandwidthClass::Foreground, base, 0.3));
        let later = at(base, 100);
        // Right-sized hatch (50 bytes, well below 1 KB cap).
        assert!(bb.try_consume_with_class(50, BandwidthClass::Background, later, 0.3));
        // Bucket stays non-negative; refill restored some bytes,
        // hatch drained 50 more.
        assert!(bb.available_bytes() <= 10_000);
    }
}