irontide-engine 1.2.1

IronTide engine runtime: the per-torrent actor, peer I/O loops, and the non-leaf engine infrastructure (disk I/O, tracker management, alerts, streaming, extensions, SSL) — the renamed irontide-torrent-actor
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
//! M257c: pure per-torrent request-budget allocator.
//!
//! Computes per-peer pipeline quotas from the aggregate budget. Pure
//! function — the `pipeline_tick` feeds it actor-side state and stores
//! the results into each peer's `target_depth`. Enforcement happens at
//! the reader's permit-return/unchoke-refill sites (`peer_shared.rs`).
//!
//! Scope notes: eviction of starved CONNECTIONS stays with the M148
//! eviction system — the budget only right-sizes request grants. The
//! per-peer ceiling is the BDP-adaptive cap from [`bdp_cap`] (M257f) —
//! legacy fixed 128 survives only on the budget-0 arm and the
//! no-RTT-yet fallback. Surplus a capped peer cannot take is
//! re-offered to peers still below THEIR cap (one bounded spill pass,
//! by rate order).

use std::net::SocketAddr;

/// Per-peer allocator input:
/// `(addr, ewma_rate_bytes_per_sec, unchoked_us, bdp_cap)`.
/// The cap comes from [`bdp_cap`] (computed actor-side each tick from
/// the rate and RTT EWMAs); 128 reproduces the legacy fixed ceiling.
pub(crate) type PeerRate = (SocketAddr, u64, bool, u32);

/// Depth formula constants — M257f. Mechanism, not policy: the budget
/// knobs (`request_budget_per_torrent` = 0) are the enable/disable
/// surface; these become registry-backed settings only if evidence
/// demands tunability.
const DEPTH_FLOOR: u32 = 4;
const DEPTH_CEILING: u32 = 512;
const DEPTH_SLACK: u32 = 4;
/// Consecutive shrink signals required before a cap reduction lands.
/// Grow probes immediately; shrink is confirmed — measured rate cannot
/// exceed what CURRENT depth allows, so an unconfirmed shrink
/// mid-EWMA-ramp would self-limit the pipe (cold-ramp feedback
/// deadlock). A signal only counts below the ⅔ line (see `bdp_cap`
/// body): the convergence staircase computes fractions of the
/// DEPTH-LIMITED rate during ramp (1−0.7^k of ~13.1 MB/s on the
/// high-BDP reference link, NOT of the 16 MB/s true link — depth 128
/// caps the rate the EWMA can ever see), and those readings must reset
/// the streak, not feed it. The measured tick-3 reading computes 89 on
/// that link: a ¾ line (96) would land the forbidden mid-ramp shrink
/// (evidence run 2: `high_bdp` frozen at in-flight ≈ 95); the ⅔ line
/// (86) resets with deterministic margin.
const SHRINK_CONFIRM_TICKS: u8 = 3;

/// Legacy fixed depth — the budget-0 arm, the no-RTT-yet fallback, and
/// the actor-side `bdp_cap` field init. `pub(crate)` so `peer_state`
/// and tests share the one definition (no scattered `128` literals or
/// repeated casts).
#[allow(clippy::cast_possible_truncation)] // INITIAL_QUEUE_DEPTH = 128, fits u32
pub(crate) const LEGACY_DEPTH: u32 = crate::peer_shared::INITIAL_QUEUE_DEPTH as u32;

/// Remote choke-onset rate per minute (torrent-wide EWMA) above which
/// BDP pricing suspends and caps pin at [`LEGACY_DEPTH`]. This is a
/// **backstop for pathological rotation churn**, NOT the primary fix
/// for contended swarms — the window-rate cap input (see
/// `PeerPipelineState::last_window_rate`) is.
///
/// Calibration trail (M257f evidence). Iteration 3 added this gate at
/// 12.0 against a run-3 hypothesis that pricing a rotation-shared swarm
/// (seeders cycling unchoke slots, sub-block honest per-pipe BDP —
/// `bdp_blocks_estimate 0.92`) collapses throughput in a churn spiral.
/// Runs 4 and 5 **refuted that as the operative mechanism**:
///
/// - The actor's onset detector is tick-sampled (`prev_choking` edges at
///   the 1 s pipeline tick), so it UNDERCOUNTS the raw `on_choke` rate
///   the report shows — sub-second choke/unchoke cycles complete inside
///   one tick and the edge is missed. So the report's 12.25/min on
///   `high_bdp` maps to a far lower actor-side estimate.
/// - `high_bdp` was byte-identical across runs 4 (thr 12.0) and 5 (thr
///   30.0): `in_flight` ~147, bytes 0.853 either way. Suspension never
///   fired there at either threshold — it tracks BDP cleanly on its own.
/// - `contended` IMPROVED when the threshold rose 12 → 30 (bytes
///   0.696 → 0.877, the 30 s collapse run gone, cv 0.109 → 0.030). At
///   12.0 its estimate tripped the gate and ran the legacy-pin regime;
///   at 30.0 it stays in BDP pricing — which, now that the window-rate
///   input removed the staircase knife-edge, is the BETTER regime under
///   churn. Suspension was HARMING contended, not protecting it.
///
/// 30.0 therefore leaves the whole measured envelope (dedicated and
/// contended alike) in window-rate-safe BDP pricing. The gate stays only
/// to pin LEGACY under churn beyond that envelope (estimate > 30/min
/// sustained), where pricing safety is unverified — a conservative
/// backstop, exercised by `m257f_churn_suspends_bdp_pricing`, dormant in
/// every perf profile. Residual contended instability (cv 0.030) is the
/// choke-hysteresis (M257d) + producer-pump (M257e) axis, not depth.
pub(crate) const CHURN_SUSPEND_FLIPS_PER_MIN: f64 = 30.0;

/// BDP-adaptive per-peer depth cap: `clamp(⌈rate × RTT / 16 KiB⌉ +
/// slack, 4, 512)`, grow-fast / shrink-slow. Returns `(cap,
/// shrink_streak)` for the caller to store. Pure.
///
/// Against a remote that silently drops requests beyond its serve
/// queue, delivered rate plateaus and this cap converges to ≈ that
/// queue + slack — the rate feedback self-corrects (recon fact 4).
#[allow(
    clippy::cast_precision_loss,
    clippy::cast_possible_truncation,
    clippy::cast_sign_loss
)] // rate→f64 at engine scales; float→u32 saturates (Rust 1.45+) then clamps to ≤512
pub(crate) fn bdp_cap(
    ewma_rate: u64,
    rtt_secs: Option<f64>,
    prev_cap: u32,
    shrink_streak: u8,
) -> (u32, u8) {
    let Some(rtt) = rtt_secs else {
        // No block ever delivered: no RTT, no meaningful rate — the
        // formula has no data. Stay permissive at the legacy depth so
        // slow-start is never formula-capped.
        return (LEGACY_DEPTH, 0);
    };
    let blocks = (ewma_rate as f64 * rtt / 16384.0).ceil();
    let computed = (blocks as u32)
        .saturating_add(DEPTH_SLACK)
        .clamp(DEPTH_FLOOR, DEPTH_CEILING);
    if computed >= prev_cap {
        // Grow is a PROBE, not just grow-to-computed: with propagation
        // RTT, `computed` can never exceed what current depth already
        // delivers (rate ≤ depth/RTT ⇒ computed ≤ depth + slack), so
        // pure grow-to-computed crawls +slack per EWMA settle and never
        // unbinds a depth-limited pipe within a download. Stepping
        // prev + prev/8 overshoots measured BDP each round until
        // computed < cap (the link is rate-limited, not depth-limited),
        // parking one probe step past BDP + slack.
        let probe = prev_cap.saturating_add((prev_cap / 8).max(1));
        return (computed.max(probe).min(DEPTH_CEILING), 0);
    }
    // Readings in (⅔·prev, prev) are convergence noise or near-BDP
    // overshoot — hold the cap AND reset the streak. Only a breach of
    // the ⅔ line is a shrink signal (see SHRINK_CONFIRM_TICKS doc for
    // why ¾ is too tight against the depth-limited ramp). The line is
    // floored at DEPTH_FLOOR + 1 because computed clamps to
    // ≥ DEPTH_FLOOR: for prev = 5 a line ≤ 4 could never fire,
    // stranding the cap one above the floor.
    let shrink_line = (prev_cap - prev_cap / 3).max(DEPTH_FLOOR + 1);
    if computed >= shrink_line {
        return (prev_cap, 0);
    }
    let streak = shrink_streak.saturating_add(1);
    if streak >= SHRINK_CONFIRM_TICKS {
        (computed, 0)
    } else {
        (prev_cap, streak)
    }
}

pub(crate) fn compute_quotas(
    budget: u32,
    floor: u32,
    peers: &[PeerRate],
) -> Vec<(SocketAddr, u32)> {
    if budget == 0 {
        // Disabled: everyone runs the legacy fixed depth — BDP caps
        // deliberately ignored (this arm is the pre-M257c A/B control).
        return peers
            .iter()
            .map(|&(a, _, _, _)| (a, LEGACY_DEPTH))
            .collect();
    }
    let floor = floor.clamp(1, DEPTH_CEILING);
    let unchoked: Vec<&PeerRate> = peers.iter().filter(|&&(_, _, u, _)| u).collect();
    let n = u32::try_from(unchoked.len()).unwrap_or(u32::MAX);
    let mut out: Vec<(SocketAddr, u32)> = Vec::with_capacity(peers.len());

    // Choked peers: probe floor only (they issue nothing while choked;
    // a surprise unchoke runs at the floor until the next tick corrects).
    for &(a, _, u, _) in peers {
        if !u {
            out.push((a, floor));
        }
    }
    if n == 0 {
        return out;
    }

    let floors_total = floor.saturating_mul(n);
    let surplus = budget.saturating_sub(floors_total);
    // u128 sum: u64 EWMA rates can overflow a u64 accumulator under extreme
    // test/sim values; physically unreachable, cheap to close.
    let total_rate: u128 = unchoked.iter().map(|&&(_, r, _, _)| u128::from(r)).sum();

    // Pass 1: floor + proportional surplus share, capped per-peer.
    // Floors are the hard probe minimum — they win over a smaller cap.
    let mut quotas: Vec<(SocketAddr, u32, u64, u32)> = unchoked
        .iter()
        .map(|&&(a, r, _, c)| {
            let cap = c.max(floor);
            // checked_div: None only when total_rate == 0 (no peer has a
            // measured rate yet) — fall back to an equal split.
            let share = match (u128::from(surplus) * u128::from(r)).checked_div(total_rate) {
                Some(s) => u32::try_from(s).unwrap_or(u32::MAX),
                None => surplus / n,
            };
            (a, floor.saturating_add(share).min(cap), r, cap)
        })
        .collect();

    // Pass 2 (cap spill): surplus stranded by per-peer caps re-offers
    // to peers still below THEIR cap, by rate order. One bounded pass.
    let assigned: u32 = quotas.iter().map(|&(_, q, _, _)| q).sum();
    let mut spill = budget.saturating_sub(assigned);
    if spill > 0 {
        quotas.sort_by_key(|q| std::cmp::Reverse(q.2));
        for q in &mut quotas {
            if spill == 0 {
                break;
            }
            let room = q.3 - q.1;
            let take = room.min(spill);
            q.1 += take;
            spill -= take;
        }
    }
    out.extend(quotas.into_iter().map(|(a, q, _, _)| (a, q)));
    out
}

#[cfg(test)]
mod tests {
    use super::*;

    fn addr(n: u8) -> SocketAddr {
        format!("10.0.0.{n}:6881").parse().unwrap()
    }

    #[test]
    fn budget_zero_means_legacy_cap_for_everyone() {
        let q = compute_quotas(
            0,
            8,
            &[(addr(1), 1000, true, 128), (addr(2), 0, false, 128)],
        );
        assert!(q.iter().all(|&(_, d)| d == LEGACY_DEPTH));
    }

    #[test]
    fn choked_peers_get_the_floor() {
        let q = compute_quotas(512, 8, &[(addr(1), 9999, false, 128)]);
        assert_eq!(q, vec![(addr(1), 8)]);
    }

    #[test]
    fn proportional_split_caps_at_128_and_respects_budget() {
        // One fast, one slow: fast peer hits the cap, slow gets floor+share.
        let q = compute_quotas(
            136,
            8,
            &[(addr(1), 10_000, true, 128), (addr(2), 0, true, 128)],
        );
        let total: u32 = q.iter().map(|&(_, d)| d).sum();
        assert!(total <= 136, "sum {total} must stay within budget");
        let fast = q.iter().find(|&&(a, _)| a == addr(1)).unwrap().1;
        let slow = q.iter().find(|&&(a, _)| a == addr(2)).unwrap().1;
        assert_eq!(slow, 8, "zero-rate peer holds the floor");
        assert_eq!(fast, 128, "floor 8 + full surplus 120, capped at 128");
    }

    #[test]
    fn all_zero_rates_split_equally() {
        let q = compute_quotas(64, 8, &[(addr(1), 0, true, 128), (addr(2), 0, true, 128)]);
        assert_eq!(q.iter().map(|&(_, d)| d).sum::<u32>(), 64);
        assert!(q.iter().all(|&(_, d)| d == 32));
    }

    #[test]
    fn floors_always_granted_even_when_budget_oversubscribed() {
        // 20 unchoked peers x floor 8 = 160 > budget 64: floors win
        // (budget is advisory aggregate; floor is the hard probe minimum).
        let peers: Vec<PeerRate> = (1..=20).map(|n| (addr(n), 0, true, 128)).collect();
        let q = compute_quotas(64, 8, &peers);
        assert!(q.iter().all(|&(_, d)| d == 8));
    }

    #[test]
    fn extreme_rates_do_not_overflow_the_sum() {
        // Two peers at u64::MAX rate: the u128 accumulator must not wrap
        // (a u64 sum would panic in debug / wrap in release — OV F2).
        let q = compute_quotas(
            256,
            8,
            &[
                (addr(1), u64::MAX, true, 128),
                (addr(2), u64::MAX, true, 128),
            ],
        );
        assert_eq!(q.iter().map(|&(_, d)| d).sum::<u32>(), 256);
        assert!(q.iter().all(|&(_, d)| d == 128));
    }

    #[test]
    fn surplus_distributes_by_rate_share() {
        // budget 256, 2 unchoked: floors 16, surplus 240 split 3:1.
        // Pass 1: a = 8 + 180 -> capped 128 (60 stranded); b = 8 + 60 = 68.
        // Spill pass: b takes the stranded 60 -> 128. Total 256.
        let q = compute_quotas(
            256,
            8,
            &[(addr(1), 3000, true, 128), (addr(2), 1000, true, 128)],
        );
        let a = q.iter().find(|&&(p, _)| p == addr(1)).unwrap().1;
        let b = q.iter().find(|&&(p, _)| p == addr(2)).unwrap().1;
        assert_eq!(a + b, 256);
        assert_eq!(a, 128, "8 + 180 capped at 128");
        assert_eq!(b, 128, "68 from pass 1 + 60 cap spill");
    }

    #[test]
    fn bdp_cap_no_rtt_stays_at_legacy_init() {
        // A peer that has never delivered a block has no RTT sample and
        // no meaningful rate — the formula has no data; stay permissive.
        let (cap, streak) = bdp_cap(0, None, 128, 2);
        assert_eq!((cap, streak), (128, 0));
    }

    #[test]
    fn bdp_cap_formula_exact_high_bdp() {
        // 16 MB/s × 160 ms = 2_560_000 bytes = 156.25 blocks → ceil 157
        // + slack 4 = 161, within [4, 512].
        let (cap, _) = bdp_cap(16_000_000, Some(0.160), 128, 0);
        assert_eq!(cap, 161);
    }

    #[test]
    fn bdp_cap_grow_is_immediate_and_resets_streak() {
        // computed 161 exceeds both prev 128 and the probe step
        // 128 + 128/8 = 144 — grow lands the full computed value.
        let (cap, streak) = bdp_cap(16_000_000, Some(0.160), 128, 2);
        assert_eq!((cap, streak), (161, 0));
    }

    #[test]
    fn bdp_cap_grow_probes_past_measured_bdp() {
        // The tautology-breaker: with propagation RTT, measured rate is
        // bounded by current depth (rate ≤ depth/RTT), so computed can
        // never exceed depth + slack — grow-to-computed alone crawls
        // +slack per EWMA settle and never unbinds a depth-limited
        // pipe. 12_697_600 B/s × 160 ms = exactly 124 blocks + 4 =
        // computed 128 == prev: the probe steps past it to
        // 128 + 128/8 = 144 anyway. Next round's measured rate can then
        // fill the new headroom (or computed < cap parks the probe).
        let (cap, streak) = bdp_cap(12_697_600, Some(0.160), 128, 0);
        assert_eq!((cap, streak), (144, 0));
    }

    #[test]
    fn bdp_cap_shrink_needs_three_consecutive_confirms() {
        // 2 MB/s × 30 ms = 3.66 blocks → ceil 4 + 4 = 8, far below the
        // ⅔ line of prev 128 (86) — a true shrink signal.
        let (c1, s1) = bdp_cap(2_000_000, Some(0.030), 128, 0);
        assert_eq!((c1, s1), (128, 1), "first shrink signal holds");
        let (c2, s2) = bdp_cap(2_000_000, Some(0.030), c1, s1);
        assert_eq!((c2, s2), (128, 2), "second shrink signal holds");
        let (c3, s3) = bdp_cap(2_000_000, Some(0.030), c2, s2);
        assert_eq!((c3, s3), (8, 0), "third confirm lands the shrink");
    }

    #[test]
    fn bdp_cap_near_miss_holds_without_streak() {
        // Computed within ⅔ of prev is NOT a shrink signal: 10 MB/s ×
        // 160 ms = 97.66 blocks → ceil 98 + 4 = 102; 102 < 128 but
        // 102 ≥ 86 (= 128 − 128/3) → hold, streak stays 0. Near-BDP
        // overshoot sticks by design (recon fact 5 corollary).
        let (cap, streak) = bdp_cap(10_000_000, Some(0.160), 128, 2);
        assert_eq!((cap, streak), (128, 0));
    }

    #[test]
    fn bdp_cap_depth_limited_ramp_tick3_resets_streak() {
        // THE evidence-run-2 boundary. During cold ramp the EWMA can
        // only see the DEPTH-LIMITED rate (128 × 16 KiB / 160 ms ≈
        // 13.107 MB/s, not the 16 MB/s link). Tick 3's EWMA fraction
        // 1 − 0.7³ = 0.657 × 13.107 MB/s = 8.611 MB/s computes
        // ceil(84.09) + 4 = 89. Against the old ¾ line (96) this was
        // the third consecutive signal — the shrink LANDED mid-ramp and
        // froze high_bdp at in-flight ≈ 95 (run 2, n=3, CV ≈ 0). The ⅔
        // line (86) classifies 89 as convergence noise: hold + reset.
        let (cap, streak) = bdp_cap(8_611_000, Some(0.160), 128, 2);
        assert_eq!((cap, streak), (128, 0));
    }

    #[test]
    fn bdp_cap_floor_edge_shrinks_from_five_to_four() {
        // OV round-1 F3: with prev = 5 the raw ⅔ line is 5 − 5/3 = 4,
        // and computed clamps to ≥ 4, so a strict `< 4` could never
        // fire — cap 5 would be stranded forever. The line's
        // `.max(DEPTH_FLOOR + 1)` makes a floor-level reading count;
        // three confirms land 5 → 4. Rate 0 with an RTT sample computes
        // ceil(0) + slack 4 = 4.
        let (c1, s1) = bdp_cap(0, Some(0.030), 5, 0);
        assert_eq!((c1, s1), (5, 1), "first floor-level signal holds");
        let (c2, s2) = bdp_cap(0, Some(0.030), c1, s1);
        assert_eq!((c2, s2), (5, 2), "second holds");
        let (c3, s3) = bdp_cap(0, Some(0.030), c2, s2);
        assert_eq!((c3, s3), (4, 0), "third lands the floor");
    }

    #[test]
    fn bdp_cap_cold_ramp_never_shrinks_below_init() {
        // EWMA convergence staircase at the DEPTH-LIMITED rate
        // (13.107 MB/s — what a 128-deep pipe on a 160 ms link can
        // actually deliver; the plan's original model used fractions of
        // the 16 MB/s true link and missed the run-2 mid-ramp shrink).
        // Fractions 1 − 0.7^k: computed caps run 43, 70, 89, 102, 111,
        // 117, 122, 127, 131… Ticks 1-2 signal (43, 70 < 86) but tick
        // 3's 89 clears the ⅔ line and RESETS the streak — no shrink
        // lands. At frac 0.99 computed 131 ≥ 128 fires grow, and the
        // probe step (max(131, 144)) unbinds past the legacy depth.
        let mut cap = 128u32;
        let mut streak = 0u8;
        let mut max_seen = 0u32;
        for frac in [0.30f64, 0.51, 0.657, 0.76, 0.83, 0.88, 0.92, 0.96, 0.99] {
            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
            let rate = (13_107_200.0 * frac) as u64;
            let (c, s) = bdp_cap(rate, Some(0.160), cap, streak);
            assert!(c >= 128, "cap dropped below init during ramp: {c}");
            cap = c;
            streak = s;
            max_seen = max_seen.max(c);
        }
        assert!(max_seen > 128, "cap never unbound during ramp");
    }

    #[test]
    fn bdp_cap_clamps_to_floor_and_ceiling() {
        // Tiny BDP: 100 KB/s × 10 ms = 0.06 blocks → ceil 1 + 4 = 5 ≥ floor 4.
        let (lo, _) = bdp_cap(100_000, Some(0.010), 4, 2);
        assert_eq!(lo, 5);
        // Huge BDP saturates at the ceiling, no overflow: u64::MAX rate.
        let (hi, _) = bdp_cap(u64::MAX, Some(10.0), 512, 0);
        assert_eq!(hi, 512);
    }

    #[test]
    fn per_peer_caps_bound_quota_and_spill() {
        // budget 512, floor 8, two unchoked equal-rate peers with caps
        // 161 and 8: pass 1 gives 8+248 each → capped to 161 and 8;
        // spill finds no room (both at cap) → total 169 ≤ budget.
        let q = compute_quotas(
            512,
            8,
            &[(addr(1), 1000, true, 161), (addr(2), 1000, true, 8)],
        );
        let a = q.iter().find(|&&(p, _)| p == addr(1)).unwrap().1;
        let b = q.iter().find(|&&(p, _)| p == addr(2)).unwrap().1;
        assert_eq!(a, 161, "fast peer fills to its BDP cap");
        assert_eq!(b, 8, "slow peer pinned at its BDP cap");
    }

    #[test]
    fn cap_below_floor_loses_to_floor() {
        // Floors are the hard probe minimum (M257c invariant) — a BDP
        // cap of 4 under floor 8 must still grant 8.
        let q = compute_quotas(64, 8, &[(addr(1), 0, true, 4)]);
        assert_eq!(q, vec![(addr(1), 8)]);
    }

    #[test]
    fn budget_zero_ignores_caps_entirely() {
        // Legacy arm: fixed INITIAL_QUEUE_DEPTH for everyone, BDP caps
        // must NOT apply (this is the A/B control = pre-M257c engine).
        let q = compute_quotas(0, 8, &[(addr(1), 1000, true, 8), (addr(2), 0, false, 4)]);
        assert!(q.iter().all(|&(_, d)| d == 128));
    }
}