Skip to main content

snapdir_stores/
adaptive.rs

1//! Adaptive concurrency + throughput control (pure control logic).
2//!
3//! This module is the **brain and the gate** of snapdir's adaptive transfer
4//! tuning, but it deliberately performs **no** network or real I/O and reads
5//! **no** real clock or system samplers itself — every external signal (CPU,
6//! RSS, elapsed time, per-op outcomes) is *injected*. That keeps the controller
7//! fully deterministic and unit-testable; wiring it into the live transfer
8//! loops (and feeding it [`snapdir_core::resources`] samples + a real monotonic
9//! clock) is a later gate.
10//!
11//! Three pieces:
12//!
13//! - [`AdaptiveGate`] — a **resizable** concurrency permit pool shared by both
14//!   transfer backends. It exposes an async [`acquire`](AdaptiveGate::acquire)
15//!   (tokio semaphore) for the futures path and a zero-dependency blocking
16//!   [`acquire_blocking`](AdaptiveGate::acquire_blocking) (mutex + condvar) for
17//!   the rayon path; [`set_limit`](AdaptiveGate::set_limit) retunes both live.
18//! - [`AdaptiveController`] — the control law (slow-start → AIMD, latency
19//!   gradient, congestion backoff with cooldown, CPU/memory guardrails,
20//!   periodic re-probing) that turns injected op samples + metrics into a
21//!   [`Decision`] (next concurrency limit + target byte-rate).
22//! - Supporting value types: [`AdaptivePolicy`], [`OpSample`], [`OpResult`],
23//!   [`Decision`].
24
25// The controller works in `f64` throughput/latency space and converts to/from
26// integer concurrency limits and byte-rates. Those casts are inherent to an
27// *advisory* control signal (never a correctness path), so the pedantic cast
28// lints are allowed module-wide, matching the resources sampler's convention.
29#![allow(
30    clippy::cast_precision_loss,
31    clippy::cast_possible_truncation,
32    clippy::cast_sign_loss
33)]
34
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, Condvar, Mutex, PoisonError};
37use std::time::{Duration, Instant};
38
39use snapdir_core::resources::{resident_set_bytes, CpuSampler};
40use snapdir_core::Meter;
41use tokio::sync::{OwnedSemaphorePermit, Semaphore};
42
43// ---------------------------------------------------------------------------
44// AdaptiveGate — resizable concurrency permit pool (async + blocking).
45// ---------------------------------------------------------------------------
46
47/// State for the hand-rolled, zero-dependency blocking counting semaphore that
48/// backs the rayon path.
49#[derive(Debug)]
50struct BlockingState {
51    /// Permits currently available to hand out.
52    available: usize,
53    /// The effective concurrency limit (number of permits that *should* exist,
54    /// counting both available and in-flight). `set_limit` adjusts this.
55    limit: usize,
56    /// Permits currently checked out (held by live guards).
57    in_flight: usize,
58}
59
60/// Shared inner state of an [`AdaptiveGate`].
61#[derive(Debug)]
62struct GateInner {
63    /// Absolute upper bound on the limit (construction param). `limit` is always
64    /// clamped to `[1, ceiling]`.
65    ceiling: usize,
66    /// The current logical limit, mirrored across both backends. Stored as an
67    /// atomic for cheap reads (e.g. [`AdaptiveGate::limit`]).
68    limit: AtomicUsize,
69    /// The async permit pool (futures/tokio path). Resized via `add_permits` /
70    /// acquire-and-`forget`.
71    sem: Arc<Semaphore>,
72    /// Outstanding "shrink debt" for the async pool: permits a `set_limit`
73    /// shrink wanted to remove but couldn't (they were in flight). The next
74    /// permit Drops pay this down by forgetting instead of returning, so the
75    /// semaphore's effective capacity converges to the new limit without ever
76    /// revoking a held permit. `tokio::sync::Semaphore` has no "max" knob, so
77    /// this debt is how a shrink-below-in-flight is made exact.
78    async_debt: AtomicUsize,
79    /// The blocking permit pool (rayon path): a counting semaphore built from a
80    /// mutex + condvar.
81    blocking: Mutex<BlockingState>,
82    /// Wakes blocking waiters when permits become available (limit raised or a
83    /// guard dropped).
84    blocking_cv: Condvar,
85}
86
87/// A resizable concurrency permit pool shared by both transfer backends.
88///
89/// The pool starts at `start` permits and can be retuned live with
90/// [`set_limit`](AdaptiveGate::set_limit) anywhere in `[1, ceiling]`. It serves
91/// two access paths over **one shared logical limit**:
92///
93/// - **async** ([`acquire`](AdaptiveGate::acquire)) for the futures/tokio
94///   transfer loop, backed by [`tokio::sync::Semaphore`];
95/// - **blocking** ([`acquire_blocking`](AdaptiveGate::acquire_blocking)) for the
96///   rayon store-to-store sync path, backed by a zero-dependency
97///   mutex+condvar counting semaphore (mirrors the token-bucket style in
98///   [`crate::transfer`]).
99///
100/// Both return an RAII guard that releases its permit on drop. `set_limit`
101/// grows the async pool with `add_permits` and shrinks it by acquiring and
102/// [`forget`](tokio::sync::SemaphorePermit::forget)ting permits (the standard
103/// resizable-semaphore technique); for the blocking pool it adjusts the
104/// available count and notifies waiters. Shrinking **never** revokes a permit
105/// already held — it only reduces how many *new* permits can be handed out, so
106/// in-flight work drains naturally and shrinking can never deadlock held
107/// permits.
108///
109/// The gate is [`Clone`] (cloning shares the same underlying pools via [`Arc`]).
110#[derive(Clone, Debug)]
111pub struct AdaptiveGate {
112    inner: Arc<GateInner>,
113}
114
115/// RAII guard for an async permit acquired from an [`AdaptiveGate`]. Releases
116/// the permit on drop — unless the gate carries outstanding shrink debt, in
117/// which case this permit is *forgotten* to pay that debt down (so a shrink
118/// that happened while this permit was in flight takes effect exactly).
119#[derive(Debug)]
120pub struct GatePermit {
121    inner: Arc<GateInner>,
122    // `Option` so Drop can move the permit out to either return it (drop) or
123    // forget it (pay shrink debt).
124    permit: Option<OwnedSemaphorePermit>,
125}
126
127impl Drop for GatePermit {
128    fn drop(&mut self) {
129        let Some(permit) = self.permit.take() else {
130            return;
131        };
132        // If a shrink is still owed permits, consume this one toward that debt
133        // (forget => the semaphore's capacity drops by one) instead of
134        // returning it to the pool.
135        let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
136        loop {
137            if debt == 0 {
138                return; // no debt: let the permit drop normally (returns it)
139            }
140            match self.inner.async_debt.compare_exchange_weak(
141                debt,
142                debt - 1,
143                Ordering::SeqCst,
144                Ordering::SeqCst,
145            ) {
146                Ok(_) => {
147                    permit.forget();
148                    return;
149                }
150                Err(actual) => debt = actual,
151            }
152        }
153    }
154}
155
156/// RAII guard for a blocking permit acquired from an [`AdaptiveGate`]. Releases
157/// the permit (and notifies one waiter) on drop.
158#[derive(Debug)]
159pub struct BlockingGatePermit {
160    inner: Arc<GateInner>,
161}
162
163impl Drop for BlockingGatePermit {
164    fn drop(&mut self) {
165        let mut state = self
166            .inner
167            .blocking
168            .lock()
169            .unwrap_or_else(PoisonError::into_inner);
170        state.in_flight = state.in_flight.saturating_sub(1);
171        // Only return the permit to the available pool if we are still within
172        // the (possibly shrunk) limit; otherwise the permit is absorbed by the
173        // shrink (in_flight already dropped, which is what matters).
174        if state.available + state.in_flight < state.limit {
175            state.available += 1;
176            drop(state);
177            self.inner.blocking_cv.notify_one();
178        }
179    }
180}
181
182impl AdaptiveGate {
183    /// Builds a gate whose limit starts at `start` and can be retuned anywhere in
184    /// `[1, ceiling]`. Both `start` and `ceiling` are clamped to at least 1, and
185    /// `start` is clamped to `ceiling`.
186    #[must_use]
187    pub fn new(start: usize, ceiling: usize) -> Self {
188        let ceiling = ceiling.max(1);
189        let start = start.clamp(1, ceiling);
190        Self {
191            inner: Arc::new(GateInner {
192                ceiling,
193                limit: AtomicUsize::new(start),
194                sem: Arc::new(Semaphore::new(start)),
195                async_debt: AtomicUsize::new(0),
196                blocking: Mutex::new(BlockingState {
197                    available: start,
198                    limit: start,
199                    in_flight: 0,
200                }),
201                blocking_cv: Condvar::new(),
202            }),
203        }
204    }
205
206    /// The construction-time ceiling (the maximum the limit can ever reach).
207    #[must_use]
208    pub fn ceiling(&self) -> usize {
209        self.inner.ceiling
210    }
211
212    /// The current logical concurrency limit (shared by both backends).
213    #[must_use]
214    pub fn limit(&self) -> usize {
215        self.inner.limit.load(Ordering::SeqCst)
216    }
217
218    /// Retunes the effective concurrency limit live, clamped to `[1, ceiling]`.
219    ///
220    /// Grows the async pool with `add_permits`; shrinks it by reserving (and
221    /// forgetting) the surplus permits so the semaphore's effective capacity
222    /// drops without revoking permits already in flight. Adjusts the blocking
223    /// pool's available count symmetrically and wakes waiters when growing.
224    /// Returns the new (clamped) limit.
225    pub fn set_limit(&self, n: usize) -> usize {
226        let new = n.clamp(1, self.inner.ceiling);
227        let old = self.inner.limit.swap(new, Ordering::SeqCst);
228        if new == old {
229            return new;
230        }
231
232        // --- async pool -----------------------------------------------------
233        if new > old {
234            // First, pay down any outstanding shrink debt with the growth (we
235            // owed removals that hadn't landed yet); only add the remainder.
236            let grow = new - old;
237            let paid = self.take_debt(grow);
238            let remainder = grow - paid;
239            if remainder > 0 {
240                self.inner.sem.add_permits(remainder);
241            }
242        } else {
243            // Shrink: remove (old - new) permits. Forget what's available now;
244            // record the rest as debt to be paid by in-flight permits' Drop.
245            let mut to_remove = old - new;
246            while to_remove > 0 {
247                if let Ok(permit) = self.inner.sem.clone().try_acquire_owned() {
248                    permit.forget();
249                    to_remove -= 1;
250                } else {
251                    break; // remainder is in flight; record it as debt
252                }
253            }
254            if to_remove > 0 {
255                self.inner.async_debt.fetch_add(to_remove, Ordering::SeqCst);
256            }
257        }
258
259        // --- blocking pool --------------------------------------------------
260        {
261            let mut state = self
262                .inner
263                .blocking
264                .lock()
265                .unwrap_or_else(PoisonError::into_inner);
266            state.limit = new;
267            // Recompute available as (limit - in_flight), floored at 0. Growing
268            // raises available; shrinking lowers it (never touching in-flight
269            // guards, which release against the new limit on drop).
270            state.available = new.saturating_sub(state.in_flight);
271            drop(state);
272            if new > old {
273                self.inner.blocking_cv.notify_all();
274            }
275        }
276
277        new
278    }
279
280    /// Reduces the outstanding async shrink debt by up to `n`, returning how
281    /// much was actually paid (so the caller can apply only the remainder when
282    /// growing the pool).
283    fn take_debt(&self, n: usize) -> usize {
284        let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
285        loop {
286            let pay = debt.min(n);
287            if pay == 0 {
288                return 0;
289            }
290            match self.inner.async_debt.compare_exchange_weak(
291                debt,
292                debt - pay,
293                Ordering::SeqCst,
294                Ordering::SeqCst,
295            ) {
296                Ok(_) => return pay,
297                Err(actual) => debt = actual,
298            }
299        }
300    }
301
302    /// Acquires one async permit, awaiting if the pool is at its current limit.
303    /// The returned [`GatePermit`] releases the permit on drop.
304    ///
305    /// # Panics
306    ///
307    /// Never under normal use; the semaphore is only closed when the gate is
308    /// dropped, which cannot happen while a caller holds an `&self`.
309    pub async fn acquire(&self) -> GatePermit {
310        let permit = Arc::clone(&self.inner.sem)
311            .acquire_owned()
312            .await
313            .expect("AdaptiveGate semaphore is never closed while the gate is alive");
314        GatePermit {
315            inner: Arc::clone(&self.inner),
316            permit: Some(permit),
317        }
318    }
319
320    /// Blocking sibling of [`acquire`](AdaptiveGate::acquire): parks the calling
321    /// OS thread until a permit is free, then returns a guard that releases it
322    /// on drop. Used by the rayon store-to-store sync path.
323    #[must_use]
324    pub fn acquire_blocking(&self) -> BlockingGatePermit {
325        let mut state = self
326            .inner
327            .blocking
328            .lock()
329            .unwrap_or_else(PoisonError::into_inner);
330        while state.available == 0 {
331            state = self
332                .inner
333                .blocking_cv
334                .wait(state)
335                .unwrap_or_else(PoisonError::into_inner);
336        }
337        state.available -= 1;
338        state.in_flight += 1;
339        BlockingGatePermit {
340            inner: Arc::clone(&self.inner),
341        }
342    }
343
344    /// Best-effort count of async permits currently available (for tests/metrics).
345    #[must_use]
346    pub fn available_permits(&self) -> usize {
347        self.inner.sem.available_permits()
348    }
349}
350
351// ---------------------------------------------------------------------------
352// Controller value types.
353// ---------------------------------------------------------------------------
354
355/// Outcome class of a single transfer operation, fed to
356/// [`AdaptiveController::record_op`].
357#[derive(Clone, Copy, Debug, PartialEq, Eq)]
358pub enum OpResult {
359    /// Completed successfully.
360    Ok,
361    /// The backend explicitly throttled us (e.g. HTTP 429 / `SlowDown`).
362    Throttle,
363    /// A hard error, treated as congestion when timeout-class (e.g. request
364    /// timeout, connection reset).
365    HardErr,
366}
367
368/// One sampled transfer operation: bytes moved, observed latency, and outcome.
369#[derive(Clone, Copy, Debug)]
370pub struct OpSample {
371    /// Bytes transferred by this operation.
372    pub bytes: u64,
373    /// End-to-end latency of the operation.
374    pub latency: Duration,
375    /// Outcome class.
376    pub result: OpResult,
377}
378
379/// The controller's tuning policy (its *view* of config; the `TransferConfig`
380/// integration is a later gate).
381#[derive(Clone, Copy, Debug)]
382pub struct AdaptivePolicy {
383    /// Target operating fraction of the discovered knee (default `0.8`): the
384    /// controller aims for `fraction × knee` for both concurrency and rate,
385    /// leaving headroom.
386    pub fraction: f64,
387    /// Absolute concurrency ceiling; the limit never exceeds this.
388    pub ceiling: usize,
389    /// Total machine RAM in bytes, the denominator of the memory-budget
390    /// guardrail (`limit × p95_obj_size ≤ fraction × total_ram`). `0` disables
391    /// the memory cap.
392    pub total_ram: u64,
393    /// Optional hard cap on the target byte-rate (e.g. a user `--max-rate`).
394    /// `None` means rate is bounded only by the measured goodput knee.
395    pub max_rate: Option<u64>,
396}
397
398impl AdaptivePolicy {
399    /// Builds a policy. `fraction` is clamped to `(0, 1]` (default `0.8` if
400    /// non-finite or out of range); `ceiling` is clamped to at least 1.
401    #[must_use]
402    pub fn new(fraction: f64, ceiling: usize, total_ram: u64, max_rate: Option<u64>) -> Self {
403        let fraction = if fraction.is_finite() && fraction > 0.0 && fraction <= 1.0 {
404            fraction
405        } else {
406            0.8
407        };
408        Self {
409            fraction,
410            ceiling: ceiling.max(1),
411            total_ram,
412            max_rate,
413        }
414    }
415}
416
417impl Default for AdaptivePolicy {
418    fn default() -> Self {
419        Self {
420            fraction: 0.8,
421            ceiling: 16,
422            total_ram: 0,
423            max_rate: None,
424        }
425    }
426}
427
428/// The controller's output for one [`tick`](AdaptiveController::tick): the next
429/// concurrency limit and an optional target byte-rate.
430#[derive(Clone, Copy, Debug, PartialEq, Eq)]
431pub struct Decision {
432    /// The new concurrency limit to apply (clamped to `[1, ceiling]` and the
433    /// memory budget).
434    pub limit: usize,
435    /// The new aggregate target byte-rate, or `None` for unlimited. Computed as
436    /// `fraction × measured-goodput-knee`, clamped to `policy.max_rate`.
437    pub target_rate: Option<u64>,
438}
439
440// ---------------------------------------------------------------------------
441// AdaptiveController — the pure control law.
442// ---------------------------------------------------------------------------
443
444/// Which phase of the control law we are in.
445#[derive(Clone, Copy, Debug, PartialEq, Eq)]
446enum Phase {
447    /// Multiplicatively ramping up (`×1.5`/tick) until the first congestion/knee.
448    SlowStart,
449    /// Additive-increase / multiplicative-decrease steady state.
450    Aimd,
451}
452
453/// EWMA smoothing factor for goodput / rtt (higher = more responsive).
454const EWMA_ALPHA: f64 = 0.3;
455/// Slow-start growth multiplier per tick.
456const SLOW_START_MULT: f64 = 1.5;
457/// Multiplicative decrease factor on congestion (AIMD's `×0.5`).
458const BACKOFF_MULT: f64 = 0.5;
459/// Latency-gradient threshold: `rtt_min/rtt` below this ⇒ queueing detected.
460const GRADIENT_THRESHOLD: f64 = 0.7;
461/// CPU percentage above which we never increase the limit.
462const CPU_NO_INCREASE_PCT: f64 = 85.0;
463/// CPU percentage above which we actively decrease the limit.
464const CPU_DECREASE_PCT: f64 = 95.0;
465/// Cooldown after a congestion event during which we never increase.
466const COOLDOWN: Duration = Duration::from_secs(15);
467/// Re-probe interval when stable.
468const REPROBE_INTERVAL: Duration = Duration::from_secs(15);
469/// Relative goodput improvement required for slow-start to keep growing /
470/// for a re-probe to be kept (hysteresis).
471const IMPROVE_EPS: f64 = 0.02;
472
473/// Injected monotonic timestamp. The controller never reads a real clock; the
474/// caller passes a monotonically non-decreasing value (real code:
475/// `Instant::now()`; tests: a fake counter). Stored as nanoseconds since an
476/// arbitrary epoch so the type is trivially `Copy` and deterministic.
477pub type MonoTime = Duration;
478
479/// The adaptive control brain.
480///
481/// PURE and deterministic: it never calls [`std::time::Instant::now`] or the
482/// [`snapdir_core::resources`] samplers. All time and metrics are injected via
483/// [`tick`](AdaptiveController::tick); per-op feedback via
484/// [`record_op`](AdaptiveController::record_op). Given the same inputs it always
485/// produces the same [`Decision`]s.
486#[derive(Debug)]
487pub struct AdaptiveController {
488    policy: AdaptivePolicy,
489
490    /// Current concurrency limit (the controller's authoritative value).
491    limit: f64,
492    /// Control phase (slow-start vs AIMD).
493    phase: Phase,
494
495    /// EWMA of goodput in bytes/sec (across recorded ops since the last tick).
496    goodput_ewma: f64,
497    /// Best goodput seen so far (the "knee" estimate).
498    goodput_knee: f64,
499    /// Goodput EWMA captured at the previous tick (for slow-start improvement).
500    goodput_prev_tick: f64,
501
502    /// EWMA of per-op latency, in seconds.
503    rtt_ewma: f64,
504    /// Minimum latency observed (the unloaded baseline), in seconds.
505    rtt_min: f64,
506
507    /// Accumulators for ops recorded since the last `tick`.
508    acc_bytes: u64,
509    acc_latency_secs: f64,
510    acc_count: u64,
511    /// Set when any op since the last tick was a Throttle / timeout-class error.
512    congestion_seen: bool,
513
514    /// `Some(deadline)` while in a post-congestion cooldown (no increases).
515    cooldown_until: Option<MonoTime>,
516    /// Time of the last re-probe (or controller start).
517    last_reprobe: MonoTime,
518    /// `Some((limit_before, goodput_before))` while a re-probe is outstanding,
519    /// so the next tick can keep-or-revert it.
520    probe_pending: Option<(f64, f64)>,
521
522    /// Whether `tick` has run at least once (to seed `last_reprobe`).
523    started: bool,
524}
525
526impl AdaptiveController {
527    /// Builds a controller from a policy. The limit starts at `2` (slow-start
528    /// seed), clamped to the policy ceiling.
529    #[must_use]
530    pub fn new(policy: AdaptivePolicy) -> Self {
531        let start = 2.0_f64.min(policy.ceiling as f64).max(1.0);
532        Self {
533            policy,
534            limit: start,
535            phase: Phase::SlowStart,
536            goodput_ewma: 0.0,
537            goodput_knee: 0.0,
538            goodput_prev_tick: 0.0,
539            rtt_ewma: 0.0,
540            rtt_min: f64::INFINITY,
541            acc_bytes: 0,
542            acc_latency_secs: 0.0,
543            acc_count: 0,
544            congestion_seen: false,
545            cooldown_until: None,
546            last_reprobe: Duration::ZERO,
547            probe_pending: None,
548            started: false,
549        }
550    }
551
552    /// The current concurrency limit (clamped to `[1, ceiling]`).
553    #[must_use]
554    pub fn current_limit(&self) -> usize {
555        (self.limit.round() as usize).clamp(1, self.policy.ceiling)
556    }
557
558    /// Records one completed (or failed) operation, updating the running EWMAs
559    /// and the congestion marker. Call between ticks; the accumulated samples
560    /// are folded into the goodput/rtt estimates at the next
561    /// [`tick`](AdaptiveController::tick).
562    pub fn record_op(&mut self, sample: OpSample) {
563        let secs = sample.latency.as_secs_f64().max(0.0);
564
565        // Per-op latency EWMA + min (baseline) tracking. Skip zero-latency
566        // degenerate samples for the min so a bogus 0 never pins rtt_min.
567        if secs > 0.0 {
568            if self.rtt_ewma <= 0.0 {
569                self.rtt_ewma = secs;
570            } else {
571                self.rtt_ewma = EWMA_ALPHA.mul_add(secs - self.rtt_ewma, self.rtt_ewma);
572            }
573            if secs < self.rtt_min {
574                self.rtt_min = secs;
575            }
576        }
577
578        self.acc_bytes = self.acc_bytes.saturating_add(sample.bytes);
579        self.acc_latency_secs += secs;
580        self.acc_count += 1;
581
582        match sample.result {
583            OpResult::Throttle | OpResult::HardErr => self.congestion_seen = true,
584            OpResult::Ok => {}
585        }
586    }
587
588    /// Applies the control law for one interval and returns the next
589    /// [`Decision`]. `now` is the injected monotonic time, `cpu_pct`/`rss` are
590    /// best-effort system samples (`None` ⇒ unknown ⇒ that guardrail is
591    /// skipped), and `p95_obj_size` is the recent 95th-percentile object size
592    /// used by the memory-budget guardrail.
593    ///
594    /// The controller never reads a real clock or sampler; all of these are
595    /// supplied by the (later) wiring gate.
596    pub fn tick(
597        &mut self,
598        now: MonoTime,
599        cpu_pct: Option<f64>,
600        _rss: Option<u64>,
601        p95_obj_size: u64,
602    ) -> Decision {
603        if !self.started {
604            self.started = true;
605            self.last_reprobe = now;
606        }
607
608        // ---- fold accumulated op samples into the goodput EWMA -------------
609        let interval_goodput = self.window_goodput();
610        if interval_goodput > 0.0 {
611            if self.goodput_ewma <= 0.0 {
612                self.goodput_ewma = interval_goodput;
613            } else {
614                self.goodput_ewma =
615                    EWMA_ALPHA.mul_add(interval_goodput - self.goodput_ewma, self.goodput_ewma);
616            }
617            if self.goodput_ewma > self.goodput_knee {
618                self.goodput_knee = self.goodput_ewma;
619            }
620        }
621        let congestion = self.congestion_seen;
622        let gradient = self.latency_gradient();
623
624        // ---- guardrail flags ----------------------------------------------
625        let cpu_blocks_increase = cpu_pct.is_some_and(|c| c > CPU_NO_INCREASE_PCT);
626        let cpu_forces_decrease = cpu_pct.is_some_and(|c| c > CPU_DECREASE_PCT);
627        let in_cooldown = self.cooldown_until.is_some_and(|d| now < d);
628        // Latency gradient below threshold ⇒ queue building ⇒ hold/decrease.
629        let queueing = gradient.is_some_and(|g| g < GRADIENT_THRESHOLD);
630
631        // A pending re-probe is resolved first (keep if it helped, else revert).
632        if let Some((prev_limit, prev_goodput)) = self.probe_pending.take() {
633            let improved = self.goodput_ewma > prev_goodput * (1.0 + IMPROVE_EPS);
634            if !improved || congestion || queueing {
635                self.limit = prev_limit; // revert the speculative probe
636            }
637            // else: keep the higher limit.
638        }
639
640        // ---- congestion backoff (highest priority) ------------------------
641        if congestion {
642            self.limit = (self.limit * BACKOFF_MULT).max(1.0);
643            self.phase = Phase::Aimd; // first knee ends slow-start
644            self.cooldown_until = Some(now + COOLDOWN);
645            self.goodput_prev_tick = self.goodput_ewma;
646            return self.finish(now, p95_obj_size);
647        }
648
649        // ---- CPU hard decrease --------------------------------------------
650        if cpu_forces_decrease {
651            self.limit = (self.limit * BACKOFF_MULT).max(1.0);
652            self.goodput_prev_tick = self.goodput_ewma;
653            return self.finish(now, p95_obj_size);
654        }
655
656        // ---- latency-gradient hold/decrease -------------------------------
657        if queueing {
658            // Queue building without an explicit error: gently decrease and
659            // leave slow-start. Hysteresis: only step down by one.
660            if self.phase == Phase::SlowStart {
661                self.phase = Phase::Aimd;
662            }
663            self.limit = (self.limit - 1.0).max(1.0);
664            self.goodput_prev_tick = self.goodput_ewma;
665            return self.finish(now, p95_obj_size);
666        }
667
668        // ---- no-increase guards (cooldown / CPU≈busy) ---------------------
669        if in_cooldown || cpu_blocks_increase {
670            self.goodput_prev_tick = self.goodput_ewma;
671            return self.finish(now, p95_obj_size);
672        }
673
674        // ---- healthy: grow per phase --------------------------------------
675        match self.phase {
676            Phase::SlowStart => {
677                // Keep multiplying while goodput is still rising; once it
678                // plateaus, treat as the knee and switch to AIMD.
679                let rising = self.goodput_ewma > self.goodput_prev_tick * (1.0 + IMPROVE_EPS)
680                    || self.goodput_prev_tick <= 0.0;
681                if rising {
682                    self.limit *= SLOW_START_MULT;
683                } else {
684                    self.phase = Phase::Aimd;
685                    self.limit += 1.0; // gentle additive after the knee
686                }
687            }
688            Phase::Aimd => {
689                // Additive increase, but periodically do an explicit re-probe
690                // (mark it pending so the next tick keeps-or-reverts).
691                if now >= self.last_reprobe + REPROBE_INTERVAL {
692                    self.last_reprobe = now;
693                    self.probe_pending = Some((self.limit, self.goodput_ewma));
694                    self.limit += 1.0;
695                } else {
696                    self.limit += 1.0;
697                }
698            }
699        }
700
701        self.goodput_prev_tick = self.goodput_ewma;
702        self.finish(now, p95_obj_size)
703    }
704
705    /// Goodput over the just-closed window (bytes / summed-latency), or 0 when
706    /// no ops were recorded.
707    fn window_goodput(&self) -> f64 {
708        if self.acc_count == 0 || self.acc_latency_secs <= 0.0 {
709            return 0.0;
710        }
711        // Aggregate goodput ≈ bytes moved divided by the *average* per-op
712        // latency times concurrency is implicit in the op stream; here we use
713        // the simple bytes / total-latency * current-limit estimate, which
714        // rises with both faster ops and more parallelism.
715        let per_op = self.acc_bytes as f64 / self.acc_latency_secs;
716        per_op * self.limit
717    }
718
719    /// `rtt_min / rtt_ewma` (∈ (0,1]); `None` until both are known. Near 1.0
720    /// means no queueing; small means latency has inflated (congestion).
721    fn latency_gradient(&self) -> Option<f64> {
722        if self.rtt_ewma > 0.0 && self.rtt_min.is_finite() && self.rtt_min > 0.0 {
723            Some((self.rtt_min / self.rtt_ewma).clamp(0.0, 1.0))
724        } else {
725            None
726        }
727    }
728
729    /// Applies the hard caps (ceiling + memory budget), resets the per-window
730    /// accumulators, and packages the [`Decision`].
731    fn finish(&mut self, _now: MonoTime, p95_obj_size: u64) -> Decision {
732        // Memory-budget hard cap: limit × p95_obj_size ≤ fraction × total_ram.
733        let mem_cap = self.memory_cap(p95_obj_size);
734        let ceiling = self.policy.ceiling;
735        let capped = (self.limit.round() as usize).clamp(1, ceiling).min(mem_cap);
736        // Persist the capped value so the cap is sticky (the controller never
737        // "remembers" a limit it is not allowed to use).
738        self.limit = capped as f64;
739
740        // Reset per-window accumulators.
741        self.acc_bytes = 0;
742        self.acc_latency_secs = 0.0;
743        self.acc_count = 0;
744        self.congestion_seen = false;
745
746        Decision {
747            limit: capped,
748            target_rate: self.target_rate(),
749        }
750    }
751
752    /// The largest concurrency the memory budget permits:
753    /// `floor(fraction × total_ram / p95_obj_size)`, at least 1. When either
754    /// `total_ram` or `p95_obj_size` is 0 the budget is unbounded (returns the
755    /// ceiling).
756    fn memory_cap(&self, p95_obj_size: u64) -> usize {
757        if self.policy.total_ram == 0 || p95_obj_size == 0 {
758            return self.policy.ceiling;
759        }
760        let budget = self.policy.fraction * self.policy.total_ram as f64;
761        let cap = (budget / p95_obj_size as f64).floor();
762        if cap < 1.0 {
763            1
764        } else {
765            (cap as usize).min(self.policy.ceiling)
766        }
767    }
768
769    /// `fraction × measured-goodput-knee`, clamped to `policy.max_rate`. `None`
770    /// until a knee has been observed and no rate cap forces a value.
771    fn target_rate(&self) -> Option<u64> {
772        let knee_rate = if self.goodput_knee > 0.0 {
773            Some((self.policy.fraction * self.goodput_knee).max(1.0) as u64)
774        } else {
775            None
776        };
777        match (knee_rate, self.policy.max_rate) {
778            (Some(k), Some(m)) => Some(k.min(m)),
779            (Some(k), None) => Some(k),
780            (None, Some(m)) => Some(m),
781            (None, None) => None,
782        }
783    }
784}
785
786// ---------------------------------------------------------------------------
787// ControllerDriver — bridges the pure controller to the live transfer loops.
788// ---------------------------------------------------------------------------
789
790/// Computes the 95th-percentile of a set of object sizes (the memory-budget
791/// denominator the controller's `tick` expects). An empty set yields `0`
792/// (memory guardrail disabled). The slice is cloned + sorted locally so the
793/// caller's data is untouched.
794#[must_use]
795pub fn p95_object_size(sizes: &[u64]) -> u64 {
796    if sizes.is_empty() {
797        return 0;
798    }
799    let mut sorted = sizes.to_vec();
800    sorted.sort_unstable();
801    // Nearest-rank p95: index = ceil(0.95 * n) - 1, clamped into range.
802    let n = sorted.len();
803    let rank = ((0.95 * n as f64).ceil() as usize).max(1);
804    sorted[rank.min(n) - 1]
805}
806
807/// Shared, live bridge between a pure [`AdaptiveController`] and the running
808/// transfer backends.
809///
810/// The controller is `&mut` and deterministic; this wrapper owns it behind a
811/// [`Mutex`] (tick is infrequent, so contention is negligible) and adds the
812/// *impure* parts the controller deliberately omits: a real monotonic clock, a
813/// live [`CpuSampler`] + RSS sampler, the manifest's p95 object size, and the
814/// application of each [`Decision`] to the shared [`AdaptiveGate`] (concurrency)
815/// and — for the network backends — a rate-limiter setter + display [`Meter`].
816///
817/// Both transfer paths use it identically:
818///
819/// - every completed/failed op calls [`record_op`](ControllerDriver::record_op)
820///   with its measured `OpSample`;
821/// - a lightweight driver (a tokio `interval` task for the async path, a
822///   `std::thread` for the rayon path) periodically calls
823///   [`tick`](ControllerDriver::tick), which samples CPU/RSS, advances the
824///   controller, resizes the gate, and reports the new limit/rate.
825///
826/// The driver is [`Clone`] (the controller, gate, sampler and the
827/// rate/limit appliers are all shared via [`Arc`]).
828#[derive(Clone)]
829pub struct ControllerDriver {
830    controller: Arc<Mutex<AdaptiveController>>,
831    gate: AdaptiveGate,
832    cpu: Arc<Mutex<CpuSampler>>,
833    p95_obj_size: u64,
834    /// Wall-clock origin: `tick` injects `now = epoch.elapsed()` so the
835    /// controller sees a monotonic [`MonoTime`].
836    epoch: Instant,
837    /// Applies a target byte-rate to the backend's live rate limiter (async or
838    /// blocking). Local `FileStore` has no rate limit, so this is `None`.
839    rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
840    /// Optional display meter: the new limit / target rate are mirrored into it
841    /// for the progress bar (advisory only).
842    meter: Option<Arc<Meter>>,
843}
844
845// The closure / mutex-wrapped controller fields are intentionally omitted from
846// the human-facing debug view (a `dyn Fn` is not `Debug`, and the controller's
847// internals are large + uninteresting here); the load-bearing live state
848// (limits, p95, wiring presence) is shown instead.
849#[allow(clippy::missing_fields_in_debug)]
850impl std::fmt::Debug for ControllerDriver {
851    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852        f.debug_struct("ControllerDriver")
853            .field("gate_limit", &self.gate.limit())
854            .field("ceiling", &self.gate.ceiling())
855            .field("p95_obj_size", &self.p95_obj_size)
856            .field("has_rate_applier", &self.rate_applier.is_some())
857            .field("has_meter", &self.meter.is_some())
858            .finish()
859    }
860}
861
862impl ControllerDriver {
863    /// Builds a driver around a fresh controller for `policy`, the shared
864    /// `gate`, the manifest's `p95_obj_size`, an optional `rate_applier` (the
865    /// backend's live rate-limit setter; `None` for the rate-less local store),
866    /// and an optional display `meter`.
867    #[must_use]
868    pub fn new(
869        policy: AdaptivePolicy,
870        gate: AdaptiveGate,
871        p95_obj_size: u64,
872        rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
873        meter: Option<Arc<Meter>>,
874    ) -> Self {
875        Self {
876            controller: Arc::new(Mutex::new(AdaptiveController::new(policy))),
877            gate,
878            cpu: Arc::new(Mutex::new(CpuSampler::new())),
879            p95_obj_size,
880            epoch: Instant::now(),
881            rate_applier,
882            meter,
883        }
884    }
885
886    /// Records one completed/failed op into the shared controller.
887    pub fn record_op(&self, sample: OpSample) {
888        self.controller
889            .lock()
890            .unwrap_or_else(PoisonError::into_inner)
891            .record_op(sample);
892    }
893
894    /// Advances the controller one interval: samples CPU/RSS, ticks with the
895    /// injected monotonic clock + p95 object size, then applies the resulting
896    /// [`Decision`] to the gate (concurrency), the rate applier (byte-rate), and
897    /// the display meter. Returns the decision for tests/inspection.
898    pub fn tick(&self) -> Decision {
899        let cpu_pct = self
900            .cpu
901            .lock()
902            .unwrap_or_else(PoisonError::into_inner)
903            .poll();
904        let rss = resident_set_bytes();
905        let now = self.epoch.elapsed();
906
907        let decision = {
908            let mut controller = self
909                .controller
910                .lock()
911                .unwrap_or_else(PoisonError::into_inner);
912            controller.tick(now, cpu_pct, rss, self.p95_obj_size)
913        };
914
915        // Apply: resize the shared gate, retune the rate limiter, mirror both
916        // into the display meter (all advisory; never change what is transferred).
917        self.gate.set_limit(decision.limit);
918        if let Some(apply) = &self.rate_applier {
919            apply(decision.target_rate);
920        }
921        if let Some(meter) = &self.meter {
922            meter.set_current_limit(decision.limit as u64);
923            meter.set_target_rate(decision.target_rate.unwrap_or(0));
924        }
925        decision
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use super::*;
932    use std::sync::atomic::{AtomicUsize, Ordering};
933    use std::thread;
934
935    // ----- AdaptiveGate: async ----------------------------------------------
936
937    fn runtime() -> tokio::runtime::Runtime {
938        tokio::runtime::Builder::new_current_thread()
939            .enable_time()
940            .build()
941            .expect("build tokio runtime")
942    }
943
944    #[test]
945    fn adaptive_gate_async_acquire_blocks_beyond_limit() {
946        let rt = runtime();
947        rt.block_on(async {
948            let gate = AdaptiveGate::new(2, 8);
949            let p1 = gate.acquire().await;
950            let p2 = gate.acquire().await;
951            assert_eq!(gate.available_permits(), 0, "both permits taken");
952
953            // A third acquire must not be immediately ready.
954            let fut = gate.acquire();
955            tokio::pin!(fut);
956            let pending = futures::poll!(&mut fut);
957            assert!(pending.is_pending(), "third acquire blocks at limit 2");
958
959            // Raising the limit unblocks it.
960            gate.set_limit(3);
961            let p3 = futures::poll!(&mut fut);
962            assert!(p3.is_ready(), "set_limit(3) frees a permit for the waiter");
963            drop((p1, p2));
964        });
965    }
966
967    /// Mirror transfer.rs's `max_in_flight_for` idiom: drive many acquires and
968    /// record the peak concurrent holders under a fixed gate limit.
969    fn peak_under_limit(limit: usize, items: usize) -> usize {
970        let rt = runtime();
971        rt.block_on(async {
972            let gate = AdaptiveGate::new(limit, 32);
973            let in_flight = Arc::new(AtomicUsize::new(0));
974            let high = Arc::new(AtomicUsize::new(0));
975            let mut handles = Vec::new();
976            for _ in 0..items {
977                let gate = gate.clone();
978                let in_flight = Arc::clone(&in_flight);
979                let high = Arc::clone(&high);
980                handles.push(tokio::spawn(async move {
981                    let _p = gate.acquire().await;
982                    let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
983                    high.fetch_max(cur, Ordering::SeqCst);
984                    tokio::time::sleep(Duration::from_millis(20)).await;
985                    in_flight.fetch_sub(1, Ordering::SeqCst);
986                }));
987            }
988            for h in handles {
989                h.await.unwrap();
990            }
991            high.load(Ordering::SeqCst)
992        })
993    }
994
995    #[test]
996    fn adaptive_gate_async_set_limit_changes_max_in_flight() {
997        assert_eq!(peak_under_limit(3, 12), 3, "limit 3 caps in-flight at 3");
998        assert_eq!(peak_under_limit(1, 5), 1, "limit 1 is strictly sequential");
999    }
1000
1001    #[test]
1002    fn adaptive_gate_async_shrink_does_not_deadlock_held_permits() {
1003        let rt = runtime();
1004        rt.block_on(async {
1005            let gate = AdaptiveGate::new(4, 8);
1006            let p1 = gate.acquire().await;
1007            let p2 = gate.acquire().await;
1008            // Shrink below the number of held permits.
1009            let new = gate.set_limit(1);
1010            assert_eq!(new, 1);
1011            // Dropping held permits must not panic/deadlock; afterwards the
1012            // effective limit settles to 1.
1013            drop(p1);
1014            drop(p2);
1015            // Acquire one (should succeed) and confirm a second blocks.
1016            let _q = gate.acquire().await;
1017            let fut = gate.acquire();
1018            tokio::pin!(fut);
1019            assert!(
1020                futures::poll!(&mut fut).is_pending(),
1021                "after shrink to 1, only one permit is available"
1022            );
1023        });
1024    }
1025
1026    // ----- AdaptiveGate: blocking -------------------------------------------
1027
1028    #[test]
1029    fn adaptive_gate_blocking_acquire_and_set_limit() {
1030        let gate = AdaptiveGate::new(1, 8);
1031        let in_flight = Arc::new(AtomicUsize::new(0));
1032        let high = Arc::new(AtomicUsize::new(0));
1033
1034        // With limit 1, two threads must serialize (peak in-flight 1).
1035        let mut handles = Vec::new();
1036        for _ in 0..4 {
1037            let gate = gate.clone();
1038            let in_flight = Arc::clone(&in_flight);
1039            let high = Arc::clone(&high);
1040            handles.push(thread::spawn(move || {
1041                let _p = gate.acquire_blocking();
1042                let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1043                high.fetch_max(cur, Ordering::SeqCst);
1044                thread::sleep(Duration::from_millis(30));
1045                in_flight.fetch_sub(1, Ordering::SeqCst);
1046            }));
1047        }
1048        for h in handles {
1049            h.join().unwrap();
1050        }
1051        assert_eq!(
1052            high.load(Ordering::SeqCst),
1053            1,
1054            "blocking limit 1 serializes"
1055        );
1056
1057        // Now raise the limit and confirm parallelism rises.
1058        let gate2 = AdaptiveGate::new(1, 8);
1059        gate2.set_limit(3);
1060        let in_flight = Arc::new(AtomicUsize::new(0));
1061        let high = Arc::new(AtomicUsize::new(0));
1062        let mut handles = Vec::new();
1063        for _ in 0..6 {
1064            let gate = gate2.clone();
1065            let in_flight = Arc::clone(&in_flight);
1066            let high = Arc::clone(&high);
1067            handles.push(thread::spawn(move || {
1068                let _p = gate.acquire_blocking();
1069                let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1070                high.fetch_max(cur, Ordering::SeqCst);
1071                thread::sleep(Duration::from_millis(30));
1072                in_flight.fetch_sub(1, Ordering::SeqCst);
1073            }));
1074        }
1075        for h in handles {
1076            h.join().unwrap();
1077        }
1078        let peak = high.load(Ordering::SeqCst);
1079        assert!(
1080            (1..=3).contains(&peak) && peak >= 2,
1081            "after set_limit(3) peak in-flight should reach ~3, got {peak}"
1082        );
1083    }
1084
1085    #[test]
1086    fn adaptive_gate_blocking_shrink_does_not_deadlock() {
1087        let gate = AdaptiveGate::new(4, 8);
1088        let p1 = gate.acquire_blocking();
1089        let p2 = gate.acquire_blocking();
1090        gate.set_limit(1); // shrink below held count
1091        drop(p1);
1092        drop(p2);
1093        // A subsequent acquire must still succeed (no deadlock / no lost permit).
1094        let got = Arc::new(AtomicUsize::new(0));
1095        let g = gate.clone();
1096        let got2 = Arc::clone(&got);
1097        let h = thread::spawn(move || {
1098            let _p = g.acquire_blocking();
1099            got2.fetch_add(1, Ordering::SeqCst);
1100        });
1101        h.join().unwrap();
1102        assert_eq!(got.load(Ordering::SeqCst), 1, "acquire after shrink works");
1103    }
1104
1105    // ----- AdaptiveController -----------------------------------------------
1106
1107    /// Convenience: a healthy Ok op of `bytes` at `latency_ms`.
1108    fn ok_op(bytes: u64, latency_ms: u64) -> OpSample {
1109        OpSample {
1110            bytes,
1111            latency: Duration::from_millis(latency_ms),
1112            result: OpResult::Ok,
1113        }
1114    }
1115
1116    fn big_policy() -> AdaptivePolicy {
1117        // Big RAM so the memory guardrail never bites unless a test wants it.
1118        AdaptivePolicy::new(0.8, 64, u64::MAX, None)
1119    }
1120
1121    #[test]
1122    fn adaptive_controller_healthy_stream_ramps_up_then_caps() {
1123        let mut c = AdaptiveController::new(big_policy());
1124        let mut t = Duration::ZERO;
1125        let mut last = c.current_limit();
1126        let mut max_seen = last;
1127        // Rising goodput: each tick the ops get "faster" (more bytes/sec).
1128        for i in 0..20u64 {
1129            for _ in 0..4 {
1130                c.record_op(ok_op(1_000_000 + i * 200_000, 50));
1131            }
1132            let d = c.tick(t, Some(30.0), Some(0), 4096);
1133            assert!(d.limit <= 64, "never exceed ceiling");
1134            max_seen = max_seen.max(d.limit);
1135            last = d.limit;
1136            t += Duration::from_secs(1);
1137        }
1138        assert!(
1139            max_seen > 2,
1140            "healthy stream should ramp the limit up from the start of 2, got max {max_seen}"
1141        );
1142        assert!(last <= 64, "stays within ceiling, got {last}");
1143    }
1144
1145    #[test]
1146    fn adaptive_controller_throttle_backs_off_and_cooldown_holds() {
1147        let mut c = AdaptiveController::new(big_policy());
1148        let mut t = Duration::ZERO;
1149        // Ramp up first.
1150        for _ in 0..6 {
1151            for _ in 0..4 {
1152                c.record_op(ok_op(2_000_000, 40));
1153            }
1154            c.tick(t, Some(20.0), Some(0), 4096);
1155            t += Duration::from_secs(1);
1156        }
1157        let before = c.current_limit();
1158        assert!(before > 2, "should have grown before the throttle");
1159
1160        // Inject a Throttle.
1161        c.record_op(OpSample {
1162            bytes: 1000,
1163            latency: Duration::from_millis(40),
1164            result: OpResult::Throttle,
1165        });
1166        let d = c.tick(t, Some(20.0), Some(0), 4096);
1167        assert!(
1168            d.limit <= before / 2 + 1,
1169            "throttle should at least halve the limit: before {before}, after {}",
1170            d.limit
1171        );
1172        let after_backoff = d.limit;
1173        t += Duration::from_secs(1);
1174
1175        // For the 15s cooldown, even healthy ticks must not increase the limit.
1176        for _ in 0..10 {
1177            for _ in 0..4 {
1178                c.record_op(ok_op(5_000_000, 20)); // very healthy
1179            }
1180            let d = c.tick(t, Some(10.0), Some(0), 4096);
1181            assert!(
1182                d.limit <= after_backoff,
1183                "no increase during cooldown: {} > {after_backoff}",
1184                d.limit
1185            );
1186            t += Duration::from_secs(1);
1187        }
1188
1189        // After the cooldown (>15s), increases resume.
1190        t += Duration::from_secs(6);
1191        for _ in 0..3 {
1192            for _ in 0..4 {
1193                c.record_op(ok_op(6_000_000, 20));
1194            }
1195            c.tick(t, Some(10.0), Some(0), 4096);
1196            t += Duration::from_secs(1);
1197        }
1198        assert!(
1199            c.current_limit() > after_backoff,
1200            "limit should recover after the cooldown expires"
1201        );
1202    }
1203
1204    #[test]
1205    fn adaptive_controller_rising_latency_holds_without_error() {
1206        let mut c = AdaptiveController::new(big_policy());
1207        let mut t = Duration::ZERO;
1208        // Establish a low rtt_min with fast ops, ramp up.
1209        for _ in 0..5 {
1210            for _ in 0..4 {
1211                c.record_op(ok_op(2_000_000, 10)); // 10ms baseline
1212            }
1213            c.tick(t, Some(20.0), Some(0), 4096);
1214            t += Duration::from_secs(1);
1215        }
1216        let peak = c.current_limit();
1217
1218        // Now latency inflates massively (queueing) but NO errors: gradient
1219        // rtt_min/rtt drops well below threshold ⇒ controller must not grow.
1220        for _ in 0..6 {
1221            for _ in 0..4 {
1222                c.record_op(ok_op(2_000_000, 200)); // 200ms now, 20x baseline
1223            }
1224            let d = c.tick(t, Some(20.0), Some(0), 4096);
1225            assert!(
1226                d.limit <= peak,
1227                "high latency gradient must hold/decrease (no growth): {} > {peak}",
1228                d.limit
1229            );
1230            t += Duration::from_secs(1);
1231        }
1232        assert!(
1233            c.current_limit() <= peak,
1234            "latency-gradient guard held the limit at/below the peak"
1235        );
1236    }
1237
1238    #[test]
1239    fn adaptive_controller_memory_budget_caps_limit() {
1240        // Tiny RAM, large object size ⇒ memory budget forces a small limit.
1241        // budget = 0.8 * 10MiB = 8MiB; p95 = 2MiB ⇒ cap = floor(8/2) = 4.
1242        let total_ram = 10 * 1024 * 1024;
1243        let p95 = 2 * 1024 * 1024;
1244        let policy = AdaptivePolicy::new(0.8, 64, total_ram, None);
1245        let mut c = AdaptiveController::new(policy);
1246        let mut t = Duration::ZERO;
1247        for _ in 0..30 {
1248            for _ in 0..4 {
1249                c.record_op(ok_op(10_000_000, 10)); // very healthy, wants to grow
1250            }
1251            let d = c.tick(t, Some(10.0), Some(0), p95);
1252            // Invariant: limit * p95 <= fraction * total_ram (never violated).
1253            assert!(
1254                (d.limit as u64) * p95 <= ((0.8 * total_ram as f64) as u64),
1255                "memory budget violated: limit {} * p95 {} > budget",
1256                d.limit,
1257                p95
1258            );
1259            assert!(
1260                d.limit <= 4,
1261                "memory cap should pin limit at 4, got {}",
1262                d.limit
1263            );
1264            t += Duration::from_secs(1);
1265        }
1266    }
1267
1268    #[test]
1269    fn adaptive_controller_high_cpu_prevents_increase() {
1270        let mut c = AdaptiveController::new(big_policy());
1271        let mut t = Duration::ZERO;
1272        // Warm up a little at low CPU.
1273        for _ in 0..3 {
1274            for _ in 0..4 {
1275                c.record_op(ok_op(2_000_000, 20));
1276            }
1277            c.tick(t, Some(20.0), Some(0), 4096);
1278            t += Duration::from_secs(1);
1279        }
1280        let before = c.current_limit();
1281        // Now CPU is pinned > 85%: no increase, even with healthy ops.
1282        for _ in 0..8 {
1283            for _ in 0..4 {
1284                c.record_op(ok_op(5_000_000, 10));
1285            }
1286            let d = c.tick(t, Some(90.0), Some(0), 4096);
1287            assert!(
1288                d.limit <= before,
1289                "cpu>85 must block increases: {} > {before}",
1290                d.limit
1291            );
1292            t += Duration::from_secs(1);
1293        }
1294    }
1295
1296    #[test]
1297    fn adaptive_controller_converges_on_steady_stream() {
1298        let mut c = AdaptiveController::new(big_policy());
1299        let mut t = Duration::ZERO;
1300        // Run long enough to ramp and settle on a perfectly steady stream.
1301        let mut limits = Vec::new();
1302        for _ in 0..60 {
1303            for _ in 0..4 {
1304                c.record_op(ok_op(3_000_000, 25)); // identical every op
1305            }
1306            let d = c.tick(t, Some(40.0), Some(0), 4096);
1307            limits.push(d.limit);
1308            t += Duration::from_secs(1);
1309        }
1310        // Look at the tail: it should not oscillate wildly. Measure the spread
1311        // of the last 15 ticks.
1312        let tail = &limits[limits.len() - 15..];
1313        let min = *tail.iter().min().unwrap();
1314        let max = *tail.iter().max().unwrap();
1315        assert!(
1316            max - min <= 3,
1317            "steady stream should converge (small tail spread), got min {min} max {max} tail {tail:?}"
1318        );
1319    }
1320
1321    // ----- ControllerDriver -------------------------------------------------
1322
1323    #[test]
1324    fn p95_object_size_nearest_rank() {
1325        assert_eq!(p95_object_size(&[]), 0, "empty -> 0 (memory cap disabled)");
1326        assert_eq!(p95_object_size(&[42]), 42, "single element");
1327        // 1..=100: nearest-rank p95 = the 95th value = 95.
1328        let sizes: Vec<u64> = (1..=100).collect();
1329        assert_eq!(p95_object_size(&sizes), 95);
1330        // p95 is robust to one huge outlier in a small set (returns the max here).
1331        assert_eq!(p95_object_size(&[1, 1, 1, 1_000_000]), 1_000_000);
1332    }
1333
1334    #[test]
1335    fn controller_driver_throttle_drives_gate_decrease() {
1336        // A driver wired to a gate: record several healthy ops + tick to grow the
1337        // gate, then inject a Throttle + tick and assert the gate's live limit
1338        // dropped (the controller's backoff was applied to the real gate).
1339        let gate = AdaptiveGate::new(2, 32);
1340        let applied_rate = Arc::new(Mutex::new(None::<Option<u64>>));
1341        let rate_sink = Arc::clone(&applied_rate);
1342        let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> = Arc::new(move |r| {
1343            *rate_sink.lock().unwrap() = Some(r);
1344        });
1345        let policy = AdaptivePolicy::new(0.8, 32, u64::MAX, None);
1346        let driver = ControllerDriver::new(policy, gate.clone(), 4096, Some(rate_applier), None);
1347
1348        // Grow: healthy ops over several ticks should raise the gate above 2.
1349        for _ in 0..8 {
1350            for _ in 0..4 {
1351                driver.record_op(OpSample {
1352                    bytes: 2_000_000,
1353                    latency: Duration::from_millis(40),
1354                    result: OpResult::Ok,
1355                });
1356            }
1357            driver.tick();
1358        }
1359        let grown = gate.limit();
1360        assert!(
1361            grown > 2,
1362            "healthy stream should grow the gate, got {grown}"
1363        );
1364
1365        // Inject a Throttle, then tick: the gate's limit must drop (backoff).
1366        driver.record_op(OpSample {
1367            bytes: 1000,
1368            latency: Duration::from_millis(40),
1369            result: OpResult::Throttle,
1370        });
1371        let decision = driver.tick();
1372        assert!(
1373            gate.limit() < grown,
1374            "throttle must shrink the gate: {} >= {grown}",
1375            gate.limit()
1376        );
1377        assert_eq!(
1378            gate.limit(),
1379            decision.limit,
1380            "the gate reflects the decision's limit"
1381        );
1382        // A rate was applied to the limiter at least once (target_rate flows out).
1383        assert!(
1384            applied_rate.lock().unwrap().is_some(),
1385            "the rate applier must have been invoked"
1386        );
1387    }
1388
1389    #[test]
1390    fn adaptive_controller_target_rate_respects_fraction() {
1391        let mut c = AdaptiveController::new(big_policy());
1392        let mut t = Duration::ZERO;
1393        let mut last_rate = None;
1394        for _ in 0..15 {
1395            for _ in 0..4 {
1396                c.record_op(ok_op(1_000_000, 100)); // steady goodput
1397            }
1398            let d = c.tick(t, Some(30.0), Some(0), 4096);
1399            last_rate = d.target_rate;
1400            t += Duration::from_secs(1);
1401        }
1402        let rate = last_rate.expect("a knee should have produced a target_rate");
1403        // target_rate must be ~fraction (0.8) of the measured goodput knee.
1404        // We don't know the exact knee, but it must be positive and the
1405        // controller computed it as 0.8 * knee, so re-deriving: rate/0.8 = knee.
1406        assert!(
1407            rate > 0,
1408            "target_rate should be positive once a knee is known"
1409        );
1410
1411        // With a max_rate cap below the knee fraction, the target is clamped.
1412        let policy = AdaptivePolicy::new(0.8, 64, u64::MAX, Some(1234));
1413        let mut c2 = AdaptiveController::new(policy);
1414        let mut t2 = Duration::ZERO;
1415        for _ in 0..10 {
1416            for _ in 0..4 {
1417                c2.record_op(ok_op(10_000_000, 10));
1418            }
1419            let d = c2.tick(t2, Some(30.0), Some(0), 4096);
1420            assert!(
1421                d.target_rate.unwrap_or(0) <= 1234,
1422                "target_rate must respect max_rate cap"
1423            );
1424            t2 += Duration::from_secs(1);
1425        }
1426    }
1427}