snapdir_stores/adaptive.rs
1//! Adaptive concurrency + throughput control (pure control logic).
2//!
3//! This module is the **brain and the gate** of snapdir's adaptive transfer
4//! tuning, but it deliberately performs **no** network or real I/O and reads
5//! **no** real clock or system samplers itself — every external signal (CPU,
6//! RSS, elapsed time, per-op outcomes) is *injected*. That keeps the controller
7//! fully deterministic and unit-testable; wiring it into the live transfer
8//! loops (and feeding it [`snapdir_core::resources`] samples + a real monotonic
9//! clock) is a later gate.
10//!
11//! Three pieces:
12//!
13//! - [`AdaptiveGate`] — a **resizable** concurrency permit pool shared by both
14//! transfer backends. It exposes an async [`acquire`](AdaptiveGate::acquire)
15//! (tokio semaphore) for the futures path and a zero-dependency blocking
16//! [`acquire_blocking`](AdaptiveGate::acquire_blocking) (mutex + condvar) for
17//! the rayon path; [`set_limit`](AdaptiveGate::set_limit) retunes both live.
18//! - [`AdaptiveController`] — the control law (slow-start → AIMD, latency
19//! gradient, congestion backoff with cooldown, CPU/memory guardrails,
20//! periodic re-probing) that turns injected op samples + metrics into a
21//! [`Decision`] (next concurrency limit + target byte-rate).
22//! - Supporting value types: [`AdaptivePolicy`], [`OpSample`], [`OpResult`],
23//! [`Decision`].
24
25// The controller works in `f64` throughput/latency space and converts to/from
26// integer concurrency limits and byte-rates. Those casts are inherent to an
27// *advisory* control signal (never a correctness path), so the pedantic cast
28// lints are allowed module-wide, matching the resources sampler's convention.
29#![allow(
30 clippy::cast_precision_loss,
31 clippy::cast_possible_truncation,
32 clippy::cast_sign_loss
33)]
34
35use std::sync::atomic::{AtomicUsize, Ordering};
36use std::sync::{Arc, Condvar, Mutex, PoisonError};
37use std::time::{Duration, Instant};
38
39use snapdir_core::resources::{resident_set_bytes, CpuSampler};
40use snapdir_core::Meter;
41use tokio::sync::{OwnedSemaphorePermit, Semaphore};
42
43// ---------------------------------------------------------------------------
44// AdaptiveGate — resizable concurrency permit pool (async + blocking).
45// ---------------------------------------------------------------------------
46
47/// State for the hand-rolled, zero-dependency blocking counting semaphore that
48/// backs the rayon path.
49#[derive(Debug)]
50struct BlockingState {
51 /// Permits currently available to hand out.
52 available: usize,
53 /// The effective concurrency limit (number of permits that *should* exist,
54 /// counting both available and in-flight). `set_limit` adjusts this.
55 limit: usize,
56 /// Permits currently checked out (held by live guards).
57 in_flight: usize,
58}
59
60/// Shared inner state of an [`AdaptiveGate`].
61#[derive(Debug)]
62struct GateInner {
63 /// Absolute upper bound on the limit (construction param). `limit` is always
64 /// clamped to `[1, ceiling]`.
65 ceiling: usize,
66 /// The current logical limit, mirrored across both backends. Stored as an
67 /// atomic for cheap reads (e.g. [`AdaptiveGate::limit`]).
68 limit: AtomicUsize,
69 /// The async permit pool (futures/tokio path). Resized via `add_permits` /
70 /// acquire-and-`forget`.
71 sem: Arc<Semaphore>,
72 /// Outstanding "shrink debt" for the async pool: permits a `set_limit`
73 /// shrink wanted to remove but couldn't (they were in flight). The next
74 /// permit Drops pay this down by forgetting instead of returning, so the
75 /// semaphore's effective capacity converges to the new limit without ever
76 /// revoking a held permit. `tokio::sync::Semaphore` has no "max" knob, so
77 /// this debt is how a shrink-below-in-flight is made exact.
78 async_debt: AtomicUsize,
79 /// The blocking permit pool (rayon path): a counting semaphore built from a
80 /// mutex + condvar.
81 blocking: Mutex<BlockingState>,
82 /// Wakes blocking waiters when permits become available (limit raised or a
83 /// guard dropped).
84 blocking_cv: Condvar,
85}
86
87/// A resizable concurrency permit pool shared by both transfer backends.
88///
89/// The pool starts at `start` permits and can be retuned live with
90/// [`set_limit`](AdaptiveGate::set_limit) anywhere in `[1, ceiling]`. It serves
91/// two access paths over **one shared logical limit**:
92///
93/// - **async** ([`acquire`](AdaptiveGate::acquire)) for the futures/tokio
94/// transfer loop, backed by [`tokio::sync::Semaphore`];
95/// - **blocking** ([`acquire_blocking`](AdaptiveGate::acquire_blocking)) for the
96/// rayon store-to-store sync path, backed by a zero-dependency
97/// mutex+condvar counting semaphore (mirrors the token-bucket style in
98/// [`crate::transfer`]).
99///
100/// Both return an RAII guard that releases its permit on drop. `set_limit`
101/// grows the async pool with `add_permits` and shrinks it by acquiring and
102/// [`forget`](tokio::sync::SemaphorePermit::forget)ting permits (the standard
103/// resizable-semaphore technique); for the blocking pool it adjusts the
104/// available count and notifies waiters. Shrinking **never** revokes a permit
105/// already held — it only reduces how many *new* permits can be handed out, so
106/// in-flight work drains naturally and shrinking can never deadlock held
107/// permits.
108///
109/// The gate is [`Clone`] (cloning shares the same underlying pools via [`Arc`]).
110#[derive(Clone, Debug)]
111pub struct AdaptiveGate {
112 inner: Arc<GateInner>,
113}
114
115/// RAII guard for an async permit acquired from an [`AdaptiveGate`]. Releases
116/// the permit on drop — unless the gate carries outstanding shrink debt, in
117/// which case this permit is *forgotten* to pay that debt down (so a shrink
118/// that happened while this permit was in flight takes effect exactly).
119#[derive(Debug)]
120pub struct GatePermit {
121 inner: Arc<GateInner>,
122 // `Option` so Drop can move the permit out to either return it (drop) or
123 // forget it (pay shrink debt).
124 permit: Option<OwnedSemaphorePermit>,
125}
126
127impl Drop for GatePermit {
128 fn drop(&mut self) {
129 let Some(permit) = self.permit.take() else {
130 return;
131 };
132 // If a shrink is still owed permits, consume this one toward that debt
133 // (forget => the semaphore's capacity drops by one) instead of
134 // returning it to the pool.
135 let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
136 loop {
137 if debt == 0 {
138 return; // no debt: let the permit drop normally (returns it)
139 }
140 match self.inner.async_debt.compare_exchange_weak(
141 debt,
142 debt - 1,
143 Ordering::SeqCst,
144 Ordering::SeqCst,
145 ) {
146 Ok(_) => {
147 permit.forget();
148 return;
149 }
150 Err(actual) => debt = actual,
151 }
152 }
153 }
154}
155
156/// RAII guard for a blocking permit acquired from an [`AdaptiveGate`]. Releases
157/// the permit (and notifies one waiter) on drop.
158#[derive(Debug)]
159pub struct BlockingGatePermit {
160 inner: Arc<GateInner>,
161}
162
163impl Drop for BlockingGatePermit {
164 fn drop(&mut self) {
165 let mut state = self
166 .inner
167 .blocking
168 .lock()
169 .unwrap_or_else(PoisonError::into_inner);
170 state.in_flight = state.in_flight.saturating_sub(1);
171 // Only return the permit to the available pool if we are still within
172 // the (possibly shrunk) limit; otherwise the permit is absorbed by the
173 // shrink (in_flight already dropped, which is what matters).
174 if state.available + state.in_flight < state.limit {
175 state.available += 1;
176 drop(state);
177 self.inner.blocking_cv.notify_one();
178 }
179 }
180}
181
182impl AdaptiveGate {
183 /// Builds a gate whose limit starts at `start` and can be retuned anywhere in
184 /// `[1, ceiling]`. Both `start` and `ceiling` are clamped to at least 1, and
185 /// `start` is clamped to `ceiling`.
186 #[must_use]
187 pub fn new(start: usize, ceiling: usize) -> Self {
188 let ceiling = ceiling.max(1);
189 let start = start.clamp(1, ceiling);
190 Self {
191 inner: Arc::new(GateInner {
192 ceiling,
193 limit: AtomicUsize::new(start),
194 sem: Arc::new(Semaphore::new(start)),
195 async_debt: AtomicUsize::new(0),
196 blocking: Mutex::new(BlockingState {
197 available: start,
198 limit: start,
199 in_flight: 0,
200 }),
201 blocking_cv: Condvar::new(),
202 }),
203 }
204 }
205
206 /// The construction-time ceiling (the maximum the limit can ever reach).
207 #[must_use]
208 pub fn ceiling(&self) -> usize {
209 self.inner.ceiling
210 }
211
212 /// The current logical concurrency limit (shared by both backends).
213 #[must_use]
214 pub fn limit(&self) -> usize {
215 self.inner.limit.load(Ordering::SeqCst)
216 }
217
218 /// Retunes the effective concurrency limit live, clamped to `[1, ceiling]`.
219 ///
220 /// Grows the async pool with `add_permits`; shrinks it by reserving (and
221 /// forgetting) the surplus permits so the semaphore's effective capacity
222 /// drops without revoking permits already in flight. Adjusts the blocking
223 /// pool's available count symmetrically and wakes waiters when growing.
224 /// Returns the new (clamped) limit.
225 pub fn set_limit(&self, n: usize) -> usize {
226 let new = n.clamp(1, self.inner.ceiling);
227 let old = self.inner.limit.swap(new, Ordering::SeqCst);
228 if new == old {
229 return new;
230 }
231
232 // --- async pool -----------------------------------------------------
233 if new > old {
234 // First, pay down any outstanding shrink debt with the growth (we
235 // owed removals that hadn't landed yet); only add the remainder.
236 let grow = new - old;
237 let paid = self.take_debt(grow);
238 let remainder = grow - paid;
239 if remainder > 0 {
240 self.inner.sem.add_permits(remainder);
241 }
242 } else {
243 // Shrink: remove (old - new) permits. Forget what's available now;
244 // record the rest as debt to be paid by in-flight permits' Drop.
245 let mut to_remove = old - new;
246 while to_remove > 0 {
247 if let Ok(permit) = self.inner.sem.clone().try_acquire_owned() {
248 permit.forget();
249 to_remove -= 1;
250 } else {
251 break; // remainder is in flight; record it as debt
252 }
253 }
254 if to_remove > 0 {
255 self.inner.async_debt.fetch_add(to_remove, Ordering::SeqCst);
256 }
257 }
258
259 // --- blocking pool --------------------------------------------------
260 {
261 let mut state = self
262 .inner
263 .blocking
264 .lock()
265 .unwrap_or_else(PoisonError::into_inner);
266 state.limit = new;
267 // Recompute available as (limit - in_flight), floored at 0. Growing
268 // raises available; shrinking lowers it (never touching in-flight
269 // guards, which release against the new limit on drop).
270 state.available = new.saturating_sub(state.in_flight);
271 drop(state);
272 if new > old {
273 self.inner.blocking_cv.notify_all();
274 }
275 }
276
277 new
278 }
279
280 /// Reduces the outstanding async shrink debt by up to `n`, returning how
281 /// much was actually paid (so the caller can apply only the remainder when
282 /// growing the pool).
283 fn take_debt(&self, n: usize) -> usize {
284 let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
285 loop {
286 let pay = debt.min(n);
287 if pay == 0 {
288 return 0;
289 }
290 match self.inner.async_debt.compare_exchange_weak(
291 debt,
292 debt - pay,
293 Ordering::SeqCst,
294 Ordering::SeqCst,
295 ) {
296 Ok(_) => return pay,
297 Err(actual) => debt = actual,
298 }
299 }
300 }
301
302 /// Acquires one async permit, awaiting if the pool is at its current limit.
303 /// The returned [`GatePermit`] releases the permit on drop.
304 ///
305 /// # Panics
306 ///
307 /// Never under normal use; the semaphore is only closed when the gate is
308 /// dropped, which cannot happen while a caller holds an `&self`.
309 pub async fn acquire(&self) -> GatePermit {
310 let permit = Arc::clone(&self.inner.sem)
311 .acquire_owned()
312 .await
313 .expect("AdaptiveGate semaphore is never closed while the gate is alive");
314 GatePermit {
315 inner: Arc::clone(&self.inner),
316 permit: Some(permit),
317 }
318 }
319
320 /// Blocking sibling of [`acquire`](AdaptiveGate::acquire): parks the calling
321 /// OS thread until a permit is free, then returns a guard that releases it
322 /// on drop. Used by the rayon store-to-store sync path.
323 #[must_use]
324 pub fn acquire_blocking(&self) -> BlockingGatePermit {
325 let mut state = self
326 .inner
327 .blocking
328 .lock()
329 .unwrap_or_else(PoisonError::into_inner);
330 while state.available == 0 {
331 state = self
332 .inner
333 .blocking_cv
334 .wait(state)
335 .unwrap_or_else(PoisonError::into_inner);
336 }
337 state.available -= 1;
338 state.in_flight += 1;
339 BlockingGatePermit {
340 inner: Arc::clone(&self.inner),
341 }
342 }
343
344 /// Best-effort count of async permits currently available (for tests/metrics).
345 #[must_use]
346 pub fn available_permits(&self) -> usize {
347 self.inner.sem.available_permits()
348 }
349}
350
351// ---------------------------------------------------------------------------
352// Controller value types.
353// ---------------------------------------------------------------------------
354
355/// Outcome class of a single transfer operation, fed to
356/// [`AdaptiveController::record_op`].
357#[derive(Clone, Copy, Debug, PartialEq, Eq)]
358pub enum OpResult {
359 /// Completed successfully.
360 Ok,
361 /// The backend explicitly throttled us (e.g. HTTP 429 / `SlowDown`).
362 Throttle,
363 /// A hard error, treated as congestion when timeout-class (e.g. request
364 /// timeout, connection reset).
365 HardErr,
366}
367
368/// One sampled transfer operation: bytes moved, observed latency, and outcome.
369#[derive(Clone, Copy, Debug)]
370pub struct OpSample {
371 /// Bytes transferred by this operation.
372 pub bytes: u64,
373 /// End-to-end latency of the operation.
374 pub latency: Duration,
375 /// Outcome class.
376 pub result: OpResult,
377}
378
379/// The controller's tuning policy (its *view* of config; the `TransferConfig`
380/// integration is a later gate).
381#[derive(Clone, Copy, Debug)]
382pub struct AdaptivePolicy {
383 /// Target operating fraction of the discovered knee (default `0.8`): the
384 /// controller aims for `fraction × knee` for both concurrency and rate,
385 /// leaving headroom.
386 pub fraction: f64,
387 /// Absolute concurrency ceiling; the limit never exceeds this.
388 pub ceiling: usize,
389 /// Total machine RAM in bytes, the denominator of the memory-budget
390 /// guardrail (`limit × p95_obj_size ≤ fraction × total_ram`). `0` disables
391 /// the memory cap.
392 pub total_ram: u64,
393 /// Optional hard cap on the target byte-rate (e.g. a user `--max-rate`).
394 /// `None` means rate is bounded only by the measured goodput knee.
395 pub max_rate: Option<u64>,
396}
397
398impl AdaptivePolicy {
399 /// Builds a policy. `fraction` is clamped to `(0, 1]` (default `0.8` if
400 /// non-finite or out of range); `ceiling` is clamped to at least 1.
401 #[must_use]
402 pub fn new(fraction: f64, ceiling: usize, total_ram: u64, max_rate: Option<u64>) -> Self {
403 let fraction = if fraction.is_finite() && fraction > 0.0 && fraction <= 1.0 {
404 fraction
405 } else {
406 0.8
407 };
408 Self {
409 fraction,
410 ceiling: ceiling.max(1),
411 total_ram,
412 max_rate,
413 }
414 }
415}
416
417impl Default for AdaptivePolicy {
418 fn default() -> Self {
419 Self {
420 fraction: 0.8,
421 ceiling: 16,
422 total_ram: 0,
423 max_rate: None,
424 }
425 }
426}
427
428/// The controller's output for one [`tick`](AdaptiveController::tick): the next
429/// concurrency limit and an optional target byte-rate.
430#[derive(Clone, Copy, Debug, PartialEq, Eq)]
431pub struct Decision {
432 /// The new concurrency limit to apply (clamped to `[1, ceiling]` and the
433 /// memory budget).
434 pub limit: usize,
435 /// The new aggregate target byte-rate, or `None` for unlimited. Computed as
436 /// `fraction × measured-goodput-knee`, clamped to `policy.max_rate`.
437 pub target_rate: Option<u64>,
438}
439
440// ---------------------------------------------------------------------------
441// AdaptiveController — the pure control law.
442// ---------------------------------------------------------------------------
443
444/// Which phase of the control law we are in.
445#[derive(Clone, Copy, Debug, PartialEq, Eq)]
446enum Phase {
447 /// Multiplicatively ramping up (`×1.5`/tick) until the first congestion/knee.
448 SlowStart,
449 /// Additive-increase / multiplicative-decrease steady state.
450 Aimd,
451}
452
453/// EWMA smoothing factor for goodput / rtt (higher = more responsive).
454const EWMA_ALPHA: f64 = 0.3;
455/// Slow-start growth multiplier per tick.
456const SLOW_START_MULT: f64 = 1.5;
457/// Multiplicative decrease factor on congestion (AIMD's `×0.5`).
458const BACKOFF_MULT: f64 = 0.5;
459/// Latency-gradient threshold: `rtt_min/rtt` below this ⇒ queueing detected.
460const GRADIENT_THRESHOLD: f64 = 0.7;
461/// CPU percentage above which we never increase the limit.
462const CPU_NO_INCREASE_PCT: f64 = 85.0;
463/// CPU percentage above which we actively decrease the limit.
464const CPU_DECREASE_PCT: f64 = 95.0;
465/// Cooldown after a congestion event during which we never increase.
466const COOLDOWN: Duration = Duration::from_secs(15);
467/// Re-probe interval when stable.
468const REPROBE_INTERVAL: Duration = Duration::from_secs(15);
469/// Relative goodput improvement required for slow-start to keep growing /
470/// for a re-probe to be kept (hysteresis).
471const IMPROVE_EPS: f64 = 0.02;
472
473/// Injected monotonic timestamp. The controller never reads a real clock; the
474/// caller passes a monotonically non-decreasing value (real code:
475/// `Instant::now()`; tests: a fake counter). Stored as nanoseconds since an
476/// arbitrary epoch so the type is trivially `Copy` and deterministic.
477pub type MonoTime = Duration;
478
479/// The adaptive control brain.
480///
481/// PURE and deterministic: it never calls [`std::time::Instant::now`] or the
482/// [`snapdir_core::resources`] samplers. All time and metrics are injected via
483/// [`tick`](AdaptiveController::tick); per-op feedback via
484/// [`record_op`](AdaptiveController::record_op). Given the same inputs it always
485/// produces the same [`Decision`]s.
486#[derive(Debug)]
487pub struct AdaptiveController {
488 policy: AdaptivePolicy,
489
490 /// Current concurrency limit (the controller's authoritative value).
491 limit: f64,
492 /// Control phase (slow-start vs AIMD).
493 phase: Phase,
494
495 /// EWMA of goodput in bytes/sec (across recorded ops since the last tick).
496 goodput_ewma: f64,
497 /// Best goodput seen so far (the "knee" estimate).
498 goodput_knee: f64,
499 /// Goodput EWMA captured at the previous tick (for slow-start improvement).
500 goodput_prev_tick: f64,
501
502 /// EWMA of per-op latency, in seconds.
503 rtt_ewma: f64,
504 /// Minimum latency observed (the unloaded baseline), in seconds.
505 rtt_min: f64,
506
507 /// Accumulators for ops recorded since the last `tick`.
508 acc_bytes: u64,
509 acc_latency_secs: f64,
510 acc_count: u64,
511 /// Set when any op since the last tick was a Throttle / timeout-class error.
512 congestion_seen: bool,
513
514 /// `Some(deadline)` while in a post-congestion cooldown (no increases).
515 cooldown_until: Option<MonoTime>,
516 /// Time of the last re-probe (or controller start).
517 last_reprobe: MonoTime,
518 /// `Some((limit_before, goodput_before))` while a re-probe is outstanding,
519 /// so the next tick can keep-or-revert it.
520 probe_pending: Option<(f64, f64)>,
521
522 /// Whether `tick` has run at least once (to seed `last_reprobe`).
523 started: bool,
524}
525
526impl AdaptiveController {
527 /// Builds a controller from a policy. The limit starts at `2` (slow-start
528 /// seed), clamped to the policy ceiling.
529 #[must_use]
530 pub fn new(policy: AdaptivePolicy) -> Self {
531 let start = 2.0_f64.min(policy.ceiling as f64).max(1.0);
532 Self {
533 policy,
534 limit: start,
535 phase: Phase::SlowStart,
536 goodput_ewma: 0.0,
537 goodput_knee: 0.0,
538 goodput_prev_tick: 0.0,
539 rtt_ewma: 0.0,
540 rtt_min: f64::INFINITY,
541 acc_bytes: 0,
542 acc_latency_secs: 0.0,
543 acc_count: 0,
544 congestion_seen: false,
545 cooldown_until: None,
546 last_reprobe: Duration::ZERO,
547 probe_pending: None,
548 started: false,
549 }
550 }
551
552 /// The current concurrency limit (clamped to `[1, ceiling]`).
553 #[must_use]
554 pub fn current_limit(&self) -> usize {
555 (self.limit.round() as usize).clamp(1, self.policy.ceiling)
556 }
557
558 /// Records one completed (or failed) operation, updating the running EWMAs
559 /// and the congestion marker. Call between ticks; the accumulated samples
560 /// are folded into the goodput/rtt estimates at the next
561 /// [`tick`](AdaptiveController::tick).
562 pub fn record_op(&mut self, sample: OpSample) {
563 let secs = sample.latency.as_secs_f64().max(0.0);
564
565 // Per-op latency EWMA + min (baseline) tracking. Skip zero-latency
566 // degenerate samples for the min so a bogus 0 never pins rtt_min.
567 if secs > 0.0 {
568 if self.rtt_ewma <= 0.0 {
569 self.rtt_ewma = secs;
570 } else {
571 self.rtt_ewma = EWMA_ALPHA.mul_add(secs - self.rtt_ewma, self.rtt_ewma);
572 }
573 if secs < self.rtt_min {
574 self.rtt_min = secs;
575 }
576 }
577
578 self.acc_bytes = self.acc_bytes.saturating_add(sample.bytes);
579 self.acc_latency_secs += secs;
580 self.acc_count += 1;
581
582 match sample.result {
583 OpResult::Throttle | OpResult::HardErr => self.congestion_seen = true,
584 OpResult::Ok => {}
585 }
586 }
587
588 /// Applies the control law for one interval and returns the next
589 /// [`Decision`]. `now` is the injected monotonic time, `cpu_pct`/`rss` are
590 /// best-effort system samples (`None` ⇒ unknown ⇒ that guardrail is
591 /// skipped), and `p95_obj_size` is the recent 95th-percentile object size
592 /// used by the memory-budget guardrail.
593 ///
594 /// The controller never reads a real clock or sampler; all of these are
595 /// supplied by the (later) wiring gate.
596 pub fn tick(
597 &mut self,
598 now: MonoTime,
599 cpu_pct: Option<f64>,
600 _rss: Option<u64>,
601 p95_obj_size: u64,
602 ) -> Decision {
603 if !self.started {
604 self.started = true;
605 self.last_reprobe = now;
606 }
607
608 // ---- fold accumulated op samples into the goodput EWMA -------------
609 let interval_goodput = self.window_goodput();
610 if interval_goodput > 0.0 {
611 if self.goodput_ewma <= 0.0 {
612 self.goodput_ewma = interval_goodput;
613 } else {
614 self.goodput_ewma =
615 EWMA_ALPHA.mul_add(interval_goodput - self.goodput_ewma, self.goodput_ewma);
616 }
617 if self.goodput_ewma > self.goodput_knee {
618 self.goodput_knee = self.goodput_ewma;
619 }
620 }
621 let congestion = self.congestion_seen;
622 let gradient = self.latency_gradient();
623
624 // ---- guardrail flags ----------------------------------------------
625 let cpu_blocks_increase = cpu_pct.is_some_and(|c| c > CPU_NO_INCREASE_PCT);
626 let cpu_forces_decrease = cpu_pct.is_some_and(|c| c > CPU_DECREASE_PCT);
627 let in_cooldown = self.cooldown_until.is_some_and(|d| now < d);
628 // Latency gradient below threshold ⇒ queue building ⇒ hold/decrease.
629 let queueing = gradient.is_some_and(|g| g < GRADIENT_THRESHOLD);
630
631 // A pending re-probe is resolved first (keep if it helped, else revert).
632 if let Some((prev_limit, prev_goodput)) = self.probe_pending.take() {
633 let improved = self.goodput_ewma > prev_goodput * (1.0 + IMPROVE_EPS);
634 if !improved || congestion || queueing {
635 self.limit = prev_limit; // revert the speculative probe
636 }
637 // else: keep the higher limit.
638 }
639
640 // ---- congestion backoff (highest priority) ------------------------
641 if congestion {
642 self.limit = (self.limit * BACKOFF_MULT).max(1.0);
643 self.phase = Phase::Aimd; // first knee ends slow-start
644 self.cooldown_until = Some(now + COOLDOWN);
645 self.goodput_prev_tick = self.goodput_ewma;
646 return self.finish(now, p95_obj_size);
647 }
648
649 // ---- CPU hard decrease --------------------------------------------
650 if cpu_forces_decrease {
651 self.limit = (self.limit * BACKOFF_MULT).max(1.0);
652 self.goodput_prev_tick = self.goodput_ewma;
653 return self.finish(now, p95_obj_size);
654 }
655
656 // ---- latency-gradient hold/decrease -------------------------------
657 if queueing {
658 // Queue building without an explicit error: gently decrease and
659 // leave slow-start. Hysteresis: only step down by one.
660 if self.phase == Phase::SlowStart {
661 self.phase = Phase::Aimd;
662 }
663 self.limit = (self.limit - 1.0).max(1.0);
664 self.goodput_prev_tick = self.goodput_ewma;
665 return self.finish(now, p95_obj_size);
666 }
667
668 // ---- no-increase guards (cooldown / CPU≈busy) ---------------------
669 if in_cooldown || cpu_blocks_increase {
670 self.goodput_prev_tick = self.goodput_ewma;
671 return self.finish(now, p95_obj_size);
672 }
673
674 // ---- healthy: grow per phase --------------------------------------
675 match self.phase {
676 Phase::SlowStart => {
677 // Keep multiplying while goodput is still rising; once it
678 // plateaus, treat as the knee and switch to AIMD.
679 let rising = self.goodput_ewma > self.goodput_prev_tick * (1.0 + IMPROVE_EPS)
680 || self.goodput_prev_tick <= 0.0;
681 if rising {
682 self.limit *= SLOW_START_MULT;
683 } else {
684 self.phase = Phase::Aimd;
685 self.limit += 1.0; // gentle additive after the knee
686 }
687 }
688 Phase::Aimd => {
689 // Additive increase, but periodically do an explicit re-probe
690 // (mark it pending so the next tick keeps-or-reverts).
691 if now >= self.last_reprobe + REPROBE_INTERVAL {
692 self.last_reprobe = now;
693 self.probe_pending = Some((self.limit, self.goodput_ewma));
694 self.limit += 1.0;
695 } else {
696 self.limit += 1.0;
697 }
698 }
699 }
700
701 self.goodput_prev_tick = self.goodput_ewma;
702 self.finish(now, p95_obj_size)
703 }
704
705 /// Goodput over the just-closed window (bytes / summed-latency), or 0 when
706 /// no ops were recorded.
707 fn window_goodput(&self) -> f64 {
708 if self.acc_count == 0 || self.acc_latency_secs <= 0.0 {
709 return 0.0;
710 }
711 // Aggregate goodput ≈ bytes moved divided by the *average* per-op
712 // latency times concurrency is implicit in the op stream; here we use
713 // the simple bytes / total-latency * current-limit estimate, which
714 // rises with both faster ops and more parallelism.
715 let per_op = self.acc_bytes as f64 / self.acc_latency_secs;
716 per_op * self.limit
717 }
718
719 /// `rtt_min / rtt_ewma` (∈ (0,1]); `None` until both are known. Near 1.0
720 /// means no queueing; small means latency has inflated (congestion).
721 fn latency_gradient(&self) -> Option<f64> {
722 if self.rtt_ewma > 0.0 && self.rtt_min.is_finite() && self.rtt_min > 0.0 {
723 Some((self.rtt_min / self.rtt_ewma).clamp(0.0, 1.0))
724 } else {
725 None
726 }
727 }
728
729 /// Applies the hard caps (ceiling + memory budget), resets the per-window
730 /// accumulators, and packages the [`Decision`].
731 fn finish(&mut self, _now: MonoTime, p95_obj_size: u64) -> Decision {
732 // Memory-budget hard cap: limit × p95_obj_size ≤ fraction × total_ram.
733 let mem_cap = self.memory_cap(p95_obj_size);
734 let ceiling = self.policy.ceiling;
735 let capped = (self.limit.round() as usize).clamp(1, ceiling).min(mem_cap);
736 // Persist the capped value so the cap is sticky (the controller never
737 // "remembers" a limit it is not allowed to use).
738 self.limit = capped as f64;
739
740 // Reset per-window accumulators.
741 self.acc_bytes = 0;
742 self.acc_latency_secs = 0.0;
743 self.acc_count = 0;
744 self.congestion_seen = false;
745
746 Decision {
747 limit: capped,
748 target_rate: self.target_rate(),
749 }
750 }
751
752 /// The largest concurrency the memory budget permits:
753 /// `floor(fraction × total_ram / p95_obj_size)`, at least 1. When either
754 /// `total_ram` or `p95_obj_size` is 0 the budget is unbounded (returns the
755 /// ceiling).
756 fn memory_cap(&self, p95_obj_size: u64) -> usize {
757 if self.policy.total_ram == 0 || p95_obj_size == 0 {
758 return self.policy.ceiling;
759 }
760 let budget = self.policy.fraction * self.policy.total_ram as f64;
761 let cap = (budget / p95_obj_size as f64).floor();
762 if cap < 1.0 {
763 1
764 } else {
765 (cap as usize).min(self.policy.ceiling)
766 }
767 }
768
769 /// `fraction × measured-goodput-knee`, clamped to `policy.max_rate`. `None`
770 /// until a knee has been observed and no rate cap forces a value.
771 fn target_rate(&self) -> Option<u64> {
772 let knee_rate = if self.goodput_knee > 0.0 {
773 Some((self.policy.fraction * self.goodput_knee).max(1.0) as u64)
774 } else {
775 None
776 };
777 match (knee_rate, self.policy.max_rate) {
778 (Some(k), Some(m)) => Some(k.min(m)),
779 (Some(k), None) => Some(k),
780 (None, Some(m)) => Some(m),
781 (None, None) => None,
782 }
783 }
784}
785
786// ---------------------------------------------------------------------------
787// ControllerDriver — bridges the pure controller to the live transfer loops.
788// ---------------------------------------------------------------------------
789
790/// Computes the 95th-percentile of a set of object sizes (the memory-budget
791/// denominator the controller's `tick` expects). An empty set yields `0`
792/// (memory guardrail disabled). The slice is cloned + sorted locally so the
793/// caller's data is untouched.
794#[must_use]
795pub fn p95_object_size(sizes: &[u64]) -> u64 {
796 if sizes.is_empty() {
797 return 0;
798 }
799 let mut sorted = sizes.to_vec();
800 sorted.sort_unstable();
801 // Nearest-rank p95: index = ceil(0.95 * n) - 1, clamped into range.
802 let n = sorted.len();
803 let rank = ((0.95 * n as f64).ceil() as usize).max(1);
804 sorted[rank.min(n) - 1]
805}
806
807/// Shared, live bridge between a pure [`AdaptiveController`] and the running
808/// transfer backends.
809///
810/// The controller is `&mut` and deterministic; this wrapper owns it behind a
811/// [`Mutex`] (tick is infrequent, so contention is negligible) and adds the
812/// *impure* parts the controller deliberately omits: a real monotonic clock, a
813/// live [`CpuSampler`] + RSS sampler, the manifest's p95 object size, and the
814/// application of each [`Decision`] to the shared [`AdaptiveGate`] (concurrency)
815/// and — for the network backends — a rate-limiter setter + display [`Meter`].
816///
817/// Both transfer paths use it identically:
818///
819/// - every completed/failed op calls [`record_op`](ControllerDriver::record_op)
820/// with its measured `OpSample`;
821/// - a lightweight driver (a tokio `interval` task for the async path, a
822/// `std::thread` for the rayon path) periodically calls
823/// [`tick`](ControllerDriver::tick), which samples CPU/RSS, advances the
824/// controller, resizes the gate, and reports the new limit/rate.
825///
826/// The driver is [`Clone`] (the controller, gate, sampler and the
827/// rate/limit appliers are all shared via [`Arc`]).
828#[derive(Clone)]
829pub struct ControllerDriver {
830 controller: Arc<Mutex<AdaptiveController>>,
831 gate: AdaptiveGate,
832 cpu: Arc<Mutex<CpuSampler>>,
833 p95_obj_size: u64,
834 /// Wall-clock origin: `tick` injects `now = epoch.elapsed()` so the
835 /// controller sees a monotonic [`MonoTime`].
836 epoch: Instant,
837 /// Applies a target byte-rate to the backend's live rate limiter (async or
838 /// blocking). Local `FileStore` has no rate limit, so this is `None`.
839 rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
840 /// Optional display meter: the new limit / target rate are mirrored into it
841 /// for the progress bar (advisory only).
842 meter: Option<Arc<Meter>>,
843}
844
845// The closure / mutex-wrapped controller fields are intentionally omitted from
846// the human-facing debug view (a `dyn Fn` is not `Debug`, and the controller's
847// internals are large + uninteresting here); the load-bearing live state
848// (limits, p95, wiring presence) is shown instead.
849#[allow(clippy::missing_fields_in_debug)]
850impl std::fmt::Debug for ControllerDriver {
851 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852 f.debug_struct("ControllerDriver")
853 .field("gate_limit", &self.gate.limit())
854 .field("ceiling", &self.gate.ceiling())
855 .field("p95_obj_size", &self.p95_obj_size)
856 .field("has_rate_applier", &self.rate_applier.is_some())
857 .field("has_meter", &self.meter.is_some())
858 .finish()
859 }
860}
861
862impl ControllerDriver {
863 /// Builds a driver around a fresh controller for `policy`, the shared
864 /// `gate`, the manifest's `p95_obj_size`, an optional `rate_applier` (the
865 /// backend's live rate-limit setter; `None` for the rate-less local store),
866 /// and an optional display `meter`.
867 #[must_use]
868 pub fn new(
869 policy: AdaptivePolicy,
870 gate: AdaptiveGate,
871 p95_obj_size: u64,
872 rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
873 meter: Option<Arc<Meter>>,
874 ) -> Self {
875 Self {
876 controller: Arc::new(Mutex::new(AdaptiveController::new(policy))),
877 gate,
878 cpu: Arc::new(Mutex::new(CpuSampler::new())),
879 p95_obj_size,
880 epoch: Instant::now(),
881 rate_applier,
882 meter,
883 }
884 }
885
886 /// Records one completed/failed op into the shared controller.
887 pub fn record_op(&self, sample: OpSample) {
888 self.controller
889 .lock()
890 .unwrap_or_else(PoisonError::into_inner)
891 .record_op(sample);
892 }
893
894 /// Advances the controller one interval: samples CPU/RSS, ticks with the
895 /// injected monotonic clock + p95 object size, then applies the resulting
896 /// [`Decision`] to the gate (concurrency), the rate applier (byte-rate), and
897 /// the display meter. Returns the decision for tests/inspection.
898 pub fn tick(&self) -> Decision {
899 let cpu_pct = self
900 .cpu
901 .lock()
902 .unwrap_or_else(PoisonError::into_inner)
903 .poll();
904 let rss = resident_set_bytes();
905 let now = self.epoch.elapsed();
906
907 let decision = {
908 let mut controller = self
909 .controller
910 .lock()
911 .unwrap_or_else(PoisonError::into_inner);
912 controller.tick(now, cpu_pct, rss, self.p95_obj_size)
913 };
914
915 // Apply: resize the shared gate, retune the rate limiter, mirror both
916 // into the display meter (all advisory; never change what is transferred).
917 self.gate.set_limit(decision.limit);
918 if let Some(apply) = &self.rate_applier {
919 apply(decision.target_rate);
920 }
921 if let Some(meter) = &self.meter {
922 meter.set_current_limit(decision.limit as u64);
923 meter.set_target_rate(decision.target_rate.unwrap_or(0));
924 }
925 decision
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 use super::*;
932 use std::sync::atomic::{AtomicUsize, Ordering};
933 use std::thread;
934
935 // ----- AdaptiveGate: async ----------------------------------------------
936
937 fn runtime() -> tokio::runtime::Runtime {
938 tokio::runtime::Builder::new_current_thread()
939 .enable_time()
940 .build()
941 .expect("build tokio runtime")
942 }
943
944 #[test]
945 fn adaptive_gate_async_acquire_blocks_beyond_limit() {
946 let rt = runtime();
947 rt.block_on(async {
948 let gate = AdaptiveGate::new(2, 8);
949 let p1 = gate.acquire().await;
950 let p2 = gate.acquire().await;
951 assert_eq!(gate.available_permits(), 0, "both permits taken");
952
953 // A third acquire must not be immediately ready.
954 let fut = gate.acquire();
955 tokio::pin!(fut);
956 let pending = futures::poll!(&mut fut);
957 assert!(pending.is_pending(), "third acquire blocks at limit 2");
958
959 // Raising the limit unblocks it.
960 gate.set_limit(3);
961 let p3 = futures::poll!(&mut fut);
962 assert!(p3.is_ready(), "set_limit(3) frees a permit for the waiter");
963 drop((p1, p2));
964 });
965 }
966
967 /// Mirror transfer.rs's `max_in_flight_for` idiom: drive many acquires and
968 /// record the peak concurrent holders under a fixed gate limit.
969 fn peak_under_limit(limit: usize, items: usize) -> usize {
970 let rt = runtime();
971 rt.block_on(async {
972 let gate = AdaptiveGate::new(limit, 32);
973 let in_flight = Arc::new(AtomicUsize::new(0));
974 let high = Arc::new(AtomicUsize::new(0));
975 let mut handles = Vec::new();
976 for _ in 0..items {
977 let gate = gate.clone();
978 let in_flight = Arc::clone(&in_flight);
979 let high = Arc::clone(&high);
980 handles.push(tokio::spawn(async move {
981 let _p = gate.acquire().await;
982 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
983 high.fetch_max(cur, Ordering::SeqCst);
984 tokio::time::sleep(Duration::from_millis(20)).await;
985 in_flight.fetch_sub(1, Ordering::SeqCst);
986 }));
987 }
988 for h in handles {
989 h.await.unwrap();
990 }
991 high.load(Ordering::SeqCst)
992 })
993 }
994
995 #[test]
996 fn adaptive_gate_async_set_limit_changes_max_in_flight() {
997 assert_eq!(peak_under_limit(3, 12), 3, "limit 3 caps in-flight at 3");
998 assert_eq!(peak_under_limit(1, 5), 1, "limit 1 is strictly sequential");
999 }
1000
1001 #[test]
1002 fn adaptive_gate_async_shrink_does_not_deadlock_held_permits() {
1003 let rt = runtime();
1004 rt.block_on(async {
1005 let gate = AdaptiveGate::new(4, 8);
1006 let p1 = gate.acquire().await;
1007 let p2 = gate.acquire().await;
1008 // Shrink below the number of held permits.
1009 let new = gate.set_limit(1);
1010 assert_eq!(new, 1);
1011 // Dropping held permits must not panic/deadlock; afterwards the
1012 // effective limit settles to 1.
1013 drop(p1);
1014 drop(p2);
1015 // Acquire one (should succeed) and confirm a second blocks.
1016 let _q = gate.acquire().await;
1017 let fut = gate.acquire();
1018 tokio::pin!(fut);
1019 assert!(
1020 futures::poll!(&mut fut).is_pending(),
1021 "after shrink to 1, only one permit is available"
1022 );
1023 });
1024 }
1025
1026 // ----- AdaptiveGate: blocking -------------------------------------------
1027
1028 #[test]
1029 fn adaptive_gate_blocking_acquire_and_set_limit() {
1030 let gate = AdaptiveGate::new(1, 8);
1031 let in_flight = Arc::new(AtomicUsize::new(0));
1032 let high = Arc::new(AtomicUsize::new(0));
1033
1034 // With limit 1, two threads must serialize (peak in-flight 1).
1035 let mut handles = Vec::new();
1036 for _ in 0..4 {
1037 let gate = gate.clone();
1038 let in_flight = Arc::clone(&in_flight);
1039 let high = Arc::clone(&high);
1040 handles.push(thread::spawn(move || {
1041 let _p = gate.acquire_blocking();
1042 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1043 high.fetch_max(cur, Ordering::SeqCst);
1044 thread::sleep(Duration::from_millis(30));
1045 in_flight.fetch_sub(1, Ordering::SeqCst);
1046 }));
1047 }
1048 for h in handles {
1049 h.join().unwrap();
1050 }
1051 assert_eq!(
1052 high.load(Ordering::SeqCst),
1053 1,
1054 "blocking limit 1 serializes"
1055 );
1056
1057 // Now raise the limit and confirm parallelism rises.
1058 let gate2 = AdaptiveGate::new(1, 8);
1059 gate2.set_limit(3);
1060 let in_flight = Arc::new(AtomicUsize::new(0));
1061 let high = Arc::new(AtomicUsize::new(0));
1062 let mut handles = Vec::new();
1063 for _ in 0..6 {
1064 let gate = gate2.clone();
1065 let in_flight = Arc::clone(&in_flight);
1066 let high = Arc::clone(&high);
1067 handles.push(thread::spawn(move || {
1068 let _p = gate.acquire_blocking();
1069 let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
1070 high.fetch_max(cur, Ordering::SeqCst);
1071 thread::sleep(Duration::from_millis(30));
1072 in_flight.fetch_sub(1, Ordering::SeqCst);
1073 }));
1074 }
1075 for h in handles {
1076 h.join().unwrap();
1077 }
1078 let peak = high.load(Ordering::SeqCst);
1079 assert!(
1080 (1..=3).contains(&peak) && peak >= 2,
1081 "after set_limit(3) peak in-flight should reach ~3, got {peak}"
1082 );
1083 }
1084
1085 #[test]
1086 fn adaptive_gate_blocking_shrink_does_not_deadlock() {
1087 let gate = AdaptiveGate::new(4, 8);
1088 let p1 = gate.acquire_blocking();
1089 let p2 = gate.acquire_blocking();
1090 gate.set_limit(1); // shrink below held count
1091 drop(p1);
1092 drop(p2);
1093 // A subsequent acquire must still succeed (no deadlock / no lost permit).
1094 let got = Arc::new(AtomicUsize::new(0));
1095 let g = gate.clone();
1096 let got2 = Arc::clone(&got);
1097 let h = thread::spawn(move || {
1098 let _p = g.acquire_blocking();
1099 got2.fetch_add(1, Ordering::SeqCst);
1100 });
1101 h.join().unwrap();
1102 assert_eq!(got.load(Ordering::SeqCst), 1, "acquire after shrink works");
1103 }
1104
1105 // ----- AdaptiveController -----------------------------------------------
1106
1107 /// Convenience: a healthy Ok op of `bytes` at `latency_ms`.
1108 fn ok_op(bytes: u64, latency_ms: u64) -> OpSample {
1109 OpSample {
1110 bytes,
1111 latency: Duration::from_millis(latency_ms),
1112 result: OpResult::Ok,
1113 }
1114 }
1115
1116 fn big_policy() -> AdaptivePolicy {
1117 // Big RAM so the memory guardrail never bites unless a test wants it.
1118 AdaptivePolicy::new(0.8, 64, u64::MAX, None)
1119 }
1120
1121 #[test]
1122 fn adaptive_controller_healthy_stream_ramps_up_then_caps() {
1123 let mut c = AdaptiveController::new(big_policy());
1124 let mut t = Duration::ZERO;
1125 let mut last = c.current_limit();
1126 let mut max_seen = last;
1127 // Rising goodput: each tick the ops get "faster" (more bytes/sec).
1128 for i in 0..20u64 {
1129 for _ in 0..4 {
1130 c.record_op(ok_op(1_000_000 + i * 200_000, 50));
1131 }
1132 let d = c.tick(t, Some(30.0), Some(0), 4096);
1133 assert!(d.limit <= 64, "never exceed ceiling");
1134 max_seen = max_seen.max(d.limit);
1135 last = d.limit;
1136 t += Duration::from_secs(1);
1137 }
1138 assert!(
1139 max_seen > 2,
1140 "healthy stream should ramp the limit up from the start of 2, got max {max_seen}"
1141 );
1142 assert!(last <= 64, "stays within ceiling, got {last}");
1143 }
1144
1145 #[test]
1146 fn adaptive_controller_throttle_backs_off_and_cooldown_holds() {
1147 let mut c = AdaptiveController::new(big_policy());
1148 let mut t = Duration::ZERO;
1149 // Ramp up first.
1150 for _ in 0..6 {
1151 for _ in 0..4 {
1152 c.record_op(ok_op(2_000_000, 40));
1153 }
1154 c.tick(t, Some(20.0), Some(0), 4096);
1155 t += Duration::from_secs(1);
1156 }
1157 let before = c.current_limit();
1158 assert!(before > 2, "should have grown before the throttle");
1159
1160 // Inject a Throttle.
1161 c.record_op(OpSample {
1162 bytes: 1000,
1163 latency: Duration::from_millis(40),
1164 result: OpResult::Throttle,
1165 });
1166 let d = c.tick(t, Some(20.0), Some(0), 4096);
1167 assert!(
1168 d.limit <= before / 2 + 1,
1169 "throttle should at least halve the limit: before {before}, after {}",
1170 d.limit
1171 );
1172 let after_backoff = d.limit;
1173 t += Duration::from_secs(1);
1174
1175 // For the 15s cooldown, even healthy ticks must not increase the limit.
1176 for _ in 0..10 {
1177 for _ in 0..4 {
1178 c.record_op(ok_op(5_000_000, 20)); // very healthy
1179 }
1180 let d = c.tick(t, Some(10.0), Some(0), 4096);
1181 assert!(
1182 d.limit <= after_backoff,
1183 "no increase during cooldown: {} > {after_backoff}",
1184 d.limit
1185 );
1186 t += Duration::from_secs(1);
1187 }
1188
1189 // After the cooldown (>15s), increases resume.
1190 t += Duration::from_secs(6);
1191 for _ in 0..3 {
1192 for _ in 0..4 {
1193 c.record_op(ok_op(6_000_000, 20));
1194 }
1195 c.tick(t, Some(10.0), Some(0), 4096);
1196 t += Duration::from_secs(1);
1197 }
1198 assert!(
1199 c.current_limit() > after_backoff,
1200 "limit should recover after the cooldown expires"
1201 );
1202 }
1203
1204 #[test]
1205 fn adaptive_controller_rising_latency_holds_without_error() {
1206 let mut c = AdaptiveController::new(big_policy());
1207 let mut t = Duration::ZERO;
1208 // Establish a low rtt_min with fast ops, ramp up.
1209 for _ in 0..5 {
1210 for _ in 0..4 {
1211 c.record_op(ok_op(2_000_000, 10)); // 10ms baseline
1212 }
1213 c.tick(t, Some(20.0), Some(0), 4096);
1214 t += Duration::from_secs(1);
1215 }
1216 let peak = c.current_limit();
1217
1218 // Now latency inflates massively (queueing) but NO errors: gradient
1219 // rtt_min/rtt drops well below threshold ⇒ controller must not grow.
1220 for _ in 0..6 {
1221 for _ in 0..4 {
1222 c.record_op(ok_op(2_000_000, 200)); // 200ms now, 20x baseline
1223 }
1224 let d = c.tick(t, Some(20.0), Some(0), 4096);
1225 assert!(
1226 d.limit <= peak,
1227 "high latency gradient must hold/decrease (no growth): {} > {peak}",
1228 d.limit
1229 );
1230 t += Duration::from_secs(1);
1231 }
1232 assert!(
1233 c.current_limit() <= peak,
1234 "latency-gradient guard held the limit at/below the peak"
1235 );
1236 }
1237
1238 #[test]
1239 fn adaptive_controller_memory_budget_caps_limit() {
1240 // Tiny RAM, large object size ⇒ memory budget forces a small limit.
1241 // budget = 0.8 * 10MiB = 8MiB; p95 = 2MiB ⇒ cap = floor(8/2) = 4.
1242 let total_ram = 10 * 1024 * 1024;
1243 let p95 = 2 * 1024 * 1024;
1244 let policy = AdaptivePolicy::new(0.8, 64, total_ram, None);
1245 let mut c = AdaptiveController::new(policy);
1246 let mut t = Duration::ZERO;
1247 for _ in 0..30 {
1248 for _ in 0..4 {
1249 c.record_op(ok_op(10_000_000, 10)); // very healthy, wants to grow
1250 }
1251 let d = c.tick(t, Some(10.0), Some(0), p95);
1252 // Invariant: limit * p95 <= fraction * total_ram (never violated).
1253 assert!(
1254 (d.limit as u64) * p95 <= ((0.8 * total_ram as f64) as u64),
1255 "memory budget violated: limit {} * p95 {} > budget",
1256 d.limit,
1257 p95
1258 );
1259 assert!(
1260 d.limit <= 4,
1261 "memory cap should pin limit at 4, got {}",
1262 d.limit
1263 );
1264 t += Duration::from_secs(1);
1265 }
1266 }
1267
1268 #[test]
1269 fn adaptive_controller_high_cpu_prevents_increase() {
1270 let mut c = AdaptiveController::new(big_policy());
1271 let mut t = Duration::ZERO;
1272 // Warm up a little at low CPU.
1273 for _ in 0..3 {
1274 for _ in 0..4 {
1275 c.record_op(ok_op(2_000_000, 20));
1276 }
1277 c.tick(t, Some(20.0), Some(0), 4096);
1278 t += Duration::from_secs(1);
1279 }
1280 let before = c.current_limit();
1281 // Now CPU is pinned > 85%: no increase, even with healthy ops.
1282 for _ in 0..8 {
1283 for _ in 0..4 {
1284 c.record_op(ok_op(5_000_000, 10));
1285 }
1286 let d = c.tick(t, Some(90.0), Some(0), 4096);
1287 assert!(
1288 d.limit <= before,
1289 "cpu>85 must block increases: {} > {before}",
1290 d.limit
1291 );
1292 t += Duration::from_secs(1);
1293 }
1294 }
1295
1296 #[test]
1297 fn adaptive_controller_converges_on_steady_stream() {
1298 let mut c = AdaptiveController::new(big_policy());
1299 let mut t = Duration::ZERO;
1300 // Run long enough to ramp and settle on a perfectly steady stream.
1301 let mut limits = Vec::new();
1302 for _ in 0..60 {
1303 for _ in 0..4 {
1304 c.record_op(ok_op(3_000_000, 25)); // identical every op
1305 }
1306 let d = c.tick(t, Some(40.0), Some(0), 4096);
1307 limits.push(d.limit);
1308 t += Duration::from_secs(1);
1309 }
1310 // Look at the tail: it should not oscillate wildly. Measure the spread
1311 // of the last 15 ticks.
1312 let tail = &limits[limits.len() - 15..];
1313 let min = *tail.iter().min().unwrap();
1314 let max = *tail.iter().max().unwrap();
1315 assert!(
1316 max - min <= 3,
1317 "steady stream should converge (small tail spread), got min {min} max {max} tail {tail:?}"
1318 );
1319 }
1320
1321 // ----- ControllerDriver -------------------------------------------------
1322
1323 #[test]
1324 fn p95_object_size_nearest_rank() {
1325 assert_eq!(p95_object_size(&[]), 0, "empty -> 0 (memory cap disabled)");
1326 assert_eq!(p95_object_size(&[42]), 42, "single element");
1327 // 1..=100: nearest-rank p95 = the 95th value = 95.
1328 let sizes: Vec<u64> = (1..=100).collect();
1329 assert_eq!(p95_object_size(&sizes), 95);
1330 // p95 is robust to one huge outlier in a small set (returns the max here).
1331 assert_eq!(p95_object_size(&[1, 1, 1, 1_000_000]), 1_000_000);
1332 }
1333
1334 #[test]
1335 fn controller_driver_throttle_drives_gate_decrease() {
1336 // A driver wired to a gate: record several healthy ops + tick to grow the
1337 // gate, then inject a Throttle + tick and assert the gate's live limit
1338 // dropped (the controller's backoff was applied to the real gate).
1339 let gate = AdaptiveGate::new(2, 32);
1340 let applied_rate = Arc::new(Mutex::new(None::<Option<u64>>));
1341 let rate_sink = Arc::clone(&applied_rate);
1342 let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> = Arc::new(move |r| {
1343 *rate_sink.lock().unwrap() = Some(r);
1344 });
1345 let policy = AdaptivePolicy::new(0.8, 32, u64::MAX, None);
1346 let driver = ControllerDriver::new(policy, gate.clone(), 4096, Some(rate_applier), None);
1347
1348 // Grow: healthy ops over several ticks should raise the gate above 2.
1349 for _ in 0..8 {
1350 for _ in 0..4 {
1351 driver.record_op(OpSample {
1352 bytes: 2_000_000,
1353 latency: Duration::from_millis(40),
1354 result: OpResult::Ok,
1355 });
1356 }
1357 driver.tick();
1358 }
1359 let grown = gate.limit();
1360 assert!(
1361 grown > 2,
1362 "healthy stream should grow the gate, got {grown}"
1363 );
1364
1365 // Inject a Throttle, then tick: the gate's limit must drop (backoff).
1366 driver.record_op(OpSample {
1367 bytes: 1000,
1368 latency: Duration::from_millis(40),
1369 result: OpResult::Throttle,
1370 });
1371 let decision = driver.tick();
1372 assert!(
1373 gate.limit() < grown,
1374 "throttle must shrink the gate: {} >= {grown}",
1375 gate.limit()
1376 );
1377 assert_eq!(
1378 gate.limit(),
1379 decision.limit,
1380 "the gate reflects the decision's limit"
1381 );
1382 // A rate was applied to the limiter at least once (target_rate flows out).
1383 assert!(
1384 applied_rate.lock().unwrap().is_some(),
1385 "the rate applier must have been invoked"
1386 );
1387 }
1388
1389 #[test]
1390 fn adaptive_controller_target_rate_respects_fraction() {
1391 let mut c = AdaptiveController::new(big_policy());
1392 let mut t = Duration::ZERO;
1393 let mut last_rate = None;
1394 for _ in 0..15 {
1395 for _ in 0..4 {
1396 c.record_op(ok_op(1_000_000, 100)); // steady goodput
1397 }
1398 let d = c.tick(t, Some(30.0), Some(0), 4096);
1399 last_rate = d.target_rate;
1400 t += Duration::from_secs(1);
1401 }
1402 let rate = last_rate.expect("a knee should have produced a target_rate");
1403 // target_rate must be ~fraction (0.8) of the measured goodput knee.
1404 // We don't know the exact knee, but it must be positive and the
1405 // controller computed it as 0.8 * knee, so re-deriving: rate/0.8 = knee.
1406 assert!(
1407 rate > 0,
1408 "target_rate should be positive once a knee is known"
1409 );
1410
1411 // With a max_rate cap below the knee fraction, the target is clamped.
1412 let policy = AdaptivePolicy::new(0.8, 64, u64::MAX, Some(1234));
1413 let mut c2 = AdaptiveController::new(policy);
1414 let mut t2 = Duration::ZERO;
1415 for _ in 0..10 {
1416 for _ in 0..4 {
1417 c2.record_op(ok_op(10_000_000, 10));
1418 }
1419 let d = c2.tick(t2, Some(30.0), Some(0), 4096);
1420 assert!(
1421 d.target_rate.unwrap_or(0) <= 1234,
1422 "target_rate must respect max_rate cap"
1423 );
1424 t2 += Duration::from_secs(1);
1425 }
1426 }
1427}