Skip to main content

ant_core/data/client/
adaptive.rs

1//! Adaptive concurrency controller for client data operations.
2//!
3//! Replaces hard-coded `quote_concurrency` / `store_concurrency` /
4//! download fan-out with a per-channel AIMD limiter that ramps up when
5//! the network is healthy and ramps down on stress signals (timeouts,
6//! errors, latency inflation). The goal is to give every machine and
7//! every connection profile a single client codebase that finds its
8//! own steady state without the user tweaking flags.
9//!
10//! ## Channels
11//!
12//! Three independent limiters share the same algorithm but track state
13//! separately, because their workloads have different cost profiles:
14//!
15//! - `quote`  — small DHT request/response messages, cheap per op
16//! - `store`  — multi-MB chunk PUTs to a close group, expensive per op
17//! - `fetch`  — multi-MB chunk GETs from peers, asymmetric to `store`
18//!
19//! ## Algorithms
20//!
21//! Quote and store use TCP-style AIMD with slow-start:
22//!
23//! - **Slow-start**: starting concurrency doubles after each healthy
24//!   window until first stress signal or until the configured ceiling.
25//! - **Steady state**: additive +1 per healthy window (>= success_target
26//!   success rate AND p95 latency within `latency_inflation_factor` of
27//!   the rolling baseline).
28//! - **Stress**: multiplicative decrease (current / 2, floor 1) on any
29//!   of: success rate < success_target, timeout rate > timeout_ceiling,
30//!   or p95 latency above `latency_inflation_factor * baseline`.
31//!
32//! Decisions evaluate over a sliding window of the last `window_ops`
33//! observed outcomes per channel. Below `min_window_ops` outcomes the
34//! controller holds steady — too few samples to act on.
35//!
36//! Fetch uses a throughput-seeking hill climber instead. It measures
37//! bytes/sec over epochs, probes nearby concurrency values, accepts
38//! higher caps only when goodput improves materially, and accepts lower
39//! caps when goodput is effectively unchanged. Stress signals still cut
40//! concurrency immediately.
41//!
42//! ## What this is not
43//!
44//! - Not a payment-batching controller. Wave / batch sizes are
45//!   orthogonal (gas-economics tradeoff, not throughput).
46//! - Not a peer-quality scorer. That lives in `peer_cache` and feeds
47//!   `BootstrapManager`. Outcomes flow into both, separately.
48
49use futures::stream::{self, FuturesUnordered, StreamExt};
50use serde::{Deserialize, Serialize};
51use std::collections::VecDeque;
52use std::path::{Path, PathBuf};
53use std::sync::atomic::{AtomicU64, Ordering};
54use std::sync::{Arc, Mutex, PoisonError};
55use std::time::{Duration, Instant};
56use tracing::{debug, warn};
57
58/// Process-monotonic counter for unique snapshot temp filenames.
59/// Combined with PID + nanosecond timestamp, makes collision
60/// effectively impossible across concurrent save_snapshot calls.
61static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
62
63/// Fetch starts at the residential-saturation floor validated in
64/// production. The hill climber will find higher caps on
65/// machines/networks that can actually use them.
66const FETCH_COLD_START_CONCURRENCY: usize = 4;
67
68/// Hill-climb probes grow/shrink by roughly 25% of the current best cap.
69const HILL_PROBE_STEP_DIVISOR: usize = 4;
70
71/// Minimum probe movement so low caps can still explore.
72const HILL_MIN_PROBE_STEP: usize = 1;
73
74/// Upward probes must improve measured goodput by at least 5%.
75const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
76
77/// Downward probes are accepted if goodput stays within 2% of the best.
78const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
79
80/// After rejecting a probe, wait a couple of epochs before trying again.
81const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
82
83/// At a stable best cap, periodically probe the neighbor again so the
84/// controller can adapt when machine/network conditions change.
85const HILL_STABLE_PROBE_EPOCHS: usize = 3;
86
87/// Stress cuts fetch concurrency in half.
88const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
89
90/// Fetch goodput epochs should cover complete concurrency waves. A
91/// fixed sample window can unfairly compare a full lower-cap wave with
92/// a partial higher-cap wave.
93const HILL_EPOCH_FULL_WAVES: usize = 2;
94
95/// Lock helper matching the project pattern (see `cache::ChunkCache`):
96/// poisoned mutexes still yield the inner state rather than panicking.
97fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
98    m.lock().unwrap_or_else(PoisonError::into_inner)
99}
100
101/// Outcome of a single observed operation on one channel.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum Outcome {
104    /// Completed successfully.
105    Success,
106    /// Did not complete within the per-op timeout.
107    Timeout,
108    /// Failed with a network/transport error (refused, reset, unreachable).
109    NetworkError,
110    /// Failed with an application-level error not attributable to the
111    /// network (e.g. bad payment proof). Recorded but does not push the
112    /// controller down — it is not a capacity signal.
113    ApplicationError,
114}
115
116/// Lower bound on the `fetch` channel's adaptive cap.
117///
118/// AIMD will not shrink fetch concurrency below this even under
119/// sustained timeout pressure. Specific to fetch because residential
120/// downloads exhibit a noise floor of peer-side timeouts (NAT path
121/// issues, peers in the close group not storing the chunk) that look
122/// like client saturation to the controller, causing it to fully
123/// serialize and collapse throughput. Quote and store channels keep
124/// the global `min_concurrency` floor of 1.
125const FETCH_MIN_FLOOR: usize = 4;
126
127/// Per-channel concurrency ceilings. Each channel has its own cap so
128/// that constraining one (e.g. user pinned a low store concurrency for
129/// a slow uplink) never bleeds into another (download).
130#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
131pub struct ChannelMax {
132    pub quote: usize,
133    pub store: usize,
134    pub fetch: usize,
135}
136
137impl Default for ChannelMax {
138    fn default() -> Self {
139        // Generous ceilings that give the controller real headroom to
140        // grow on healthy connections. The cold-start values
141        // (`ChannelStart::default()`) are well below these so AIMD
142        // can actually do its job. Each ceiling is independent.
143        Self {
144            quote: 128,
145            store: 64,
146            fetch: 256,
147        }
148    }
149}
150
151/// Tunable knobs for the adaptive controller. Defaults are picked so
152/// that the controller behaves at least as well as the prior static
153/// defaults on a healthy network: starts at the previous static value
154/// and only deviates when signals demand it.
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct AdaptiveConfig {
157    /// Master switch. When `false`, channels report `initial` forever
158    /// and ignore observations. Useful for benchmarks / debugging.
159    pub enabled: bool,
160    /// Floor concurrency per channel. Never go below this.
161    pub min_concurrency: usize,
162    /// Per-channel ceiling concurrency. See `ChannelMax`.
163    pub max: ChannelMax,
164    /// Sliding window size in number of recent ops considered for
165    /// adaptation decisions.
166    pub window_ops: usize,
167    /// Below this count of outcomes in the window, hold steady.
168    pub min_window_ops: usize,
169    /// Required success rate to consider the window healthy. Healthy
170    /// windows trigger increase; unhealthy windows trigger decrease.
171    pub success_target: f64,
172    /// Timeout rate above which the window counts as stressed even if
173    /// the success rate would otherwise pass.
174    pub timeout_ceiling: f64,
175    /// p95 latency above `latency_inflation_factor * baseline` is a
176    /// stress signal. Baseline is an EWMA of healthy-window p95s.
177    pub latency_inflation_factor: f64,
178    /// EWMA smoothing factor for the latency baseline. 0 = never
179    /// updates, 1 = baseline = last sample. 0.2 trades responsiveness
180    /// for stability. Validated to `[0.0, 1.0]`; `NaN`/non-finite
181    /// values are sanitized to the default at controller construction.
182    pub latency_ewma_alpha: f64,
183}
184
185impl AdaptiveConfig {
186    /// Sanitize the config: clamp `latency_ewma_alpha` to `[0,1]`
187    /// (rejecting NaN/Inf which would otherwise panic in
188    /// `Duration::from_secs_f64`), enforce `min_concurrency >= 1`,
189    /// enforce per-channel max >= min_concurrency, enforce
190    /// `min_window_ops <= window_ops`. Idempotent.
191    pub fn sanitize(&mut self) {
192        if !self.latency_ewma_alpha.is_finite() {
193            self.latency_ewma_alpha = 0.2;
194        }
195        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
196        if !self.success_target.is_finite() {
197            self.success_target = 0.95;
198        }
199        self.success_target = self.success_target.clamp(0.0, 1.0);
200        if !self.timeout_ceiling.is_finite() {
201            self.timeout_ceiling = 0.10;
202        }
203        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
204        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
205            self.latency_inflation_factor = 4.0;
206        }
207        self.min_concurrency = self.min_concurrency.max(1);
208        self.window_ops = self.window_ops.max(1);
209        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
210        self.max.quote = self.max.quote.max(self.min_concurrency);
211        self.max.store = self.max.store.max(self.min_concurrency);
212        self.max.fetch = self.max.fetch.max(self.min_concurrency);
213    }
214}
215
216impl Default for AdaptiveConfig {
217    fn default() -> Self {
218        Self {
219            enabled: true,
220            min_concurrency: 1,
221            max: ChannelMax::default(),
222            window_ops: 32,
223            min_window_ops: 8,
224            success_target: 0.95,
225            timeout_ceiling: 0.10,
226            // p95 doubling is the normal signal on a per-chunk fetch with
227            // close-group fallback (one slow peer in a chunk's close group
228            // adds ~10s on top of a sub-second median); 2.0 mis-classified
229            // that as stress and halved the fetch cap mid-download. 4.0
230            // means p95 has to quadruple before we treat the network as
231            // degraded.
232            latency_inflation_factor: 4.0,
233            latency_ewma_alpha: 0.2,
234        }
235    }
236}
237
238/// Suggested starting concurrency per channel for a brand-new client
239/// with no persisted state:
240///
241/// - quote was statically 32 — start at 32.
242/// - store was statically 8 — start at 8.
243/// - fetch starts at 4, the residential-saturation floor validated
244///   after the old 64-wide cold burst saturated home links before
245///   any adaptive observations could land. The throughput hill
246///   climber then lets measured goodput justify growth on faster
247///   links.
248#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
249pub struct ChannelStart {
250    pub quote: usize,
251    pub store: usize,
252    pub fetch: usize,
253}
254
255impl Default for ChannelStart {
256    fn default() -> Self {
257        Self {
258            quote: 32,
259            store: 8,
260            fetch: FETCH_COLD_START_CONCURRENCY,
261        }
262    }
263}
264
265/// One observed sample retained in the sliding window.
266#[derive(Debug, Clone, Copy)]
267struct Sample {
268    outcome: Outcome,
269    latency: Duration,
270}
271
272/// Limiter adaptation strategy. Kept out of `LimiterConfig` so external
273/// config literals and persisted JSON do not grow a migration surface.
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275enum LimiterAlgorithm {
276    Aimd,
277    ThroughputHillClimb,
278}
279
280/// Direction of an active hill-climb probe.
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
282enum ProbeDirection {
283    Up,
284    Down,
285}
286
287/// Epoch-local stats for the throughput hill climber.
288#[derive(Debug)]
289struct HillClimbState {
290    epoch_started: Option<Instant>,
291    epoch_samples: usize,
292    epoch_successes: usize,
293    epoch_timeouts: usize,
294    epoch_net_errors: usize,
295    epoch_bytes: u64,
296    epoch_latencies: Vec<Duration>,
297    best_goodput_per_sec: Option<f64>,
298    best_latency_p95: Option<Duration>,
299    best_concurrency: usize,
300    stable_epochs: usize,
301    cooldown_epochs: usize,
302    next_probe: ProbeDirection,
303    active_probe: Option<ProbeDirection>,
304}
305
306impl HillClimbState {
307    fn new(start: usize, epoch_capacity: usize) -> Self {
308        Self {
309            epoch_started: None,
310            epoch_samples: 0,
311            epoch_successes: 0,
312            epoch_timeouts: 0,
313            epoch_net_errors: 0,
314            epoch_bytes: 0,
315            epoch_latencies: Vec::with_capacity(epoch_capacity),
316            best_goodput_per_sec: None,
317            best_latency_p95: None,
318            best_concurrency: start,
319            stable_epochs: 0,
320            cooldown_epochs: 0,
321            next_probe: ProbeDirection::Up,
322            active_probe: None,
323        }
324    }
325
326    fn reset_epoch(&mut self) {
327        self.epoch_started = None;
328        self.epoch_samples = 0;
329        self.epoch_successes = 0;
330        self.epoch_timeouts = 0;
331        self.epoch_net_errors = 0;
332        self.epoch_bytes = 0;
333        self.epoch_latencies.clear();
334    }
335
336    fn capacity_total(&self) -> usize {
337        self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
338    }
339}
340
341/// Per-limiter configuration. Carries the shared adaptive parameters
342/// plus the channel-specific `max_concurrency`. Held behind an `Arc`
343/// so cloning a `Limiter` is a refcount bump rather than a struct copy
344/// (avoids allocating `AdaptiveConfig`-worth of bytes per chunk in
345/// hot loops).
346#[derive(Debug, Clone)]
347pub struct LimiterConfig {
348    pub enabled: bool,
349    pub min_concurrency: usize,
350    pub max_concurrency: usize,
351    pub window_ops: usize,
352    pub min_window_ops: usize,
353    pub success_target: f64,
354    pub timeout_ceiling: f64,
355    pub latency_inflation_factor: f64,
356    pub latency_ewma_alpha: f64,
357    /// While `current < slow_start_ramp_threshold`, a Decrease halves
358    /// the cap but does NOT permanently exit slow-start — the next
359    /// healthy window can double the cap back up. Above the threshold,
360    /// a Decrease exits slow-start and the controller transitions to
361    /// classic AIMD (+1 per healthy window).
362    ///
363    /// 0 (the default) reproduces the original behaviour: any Decrease
364    /// at any cap permanently exits slow-start. The fetch channel sets
365    /// this to its `max_concurrency` so download concurrency keeps
366    /// doubling toward the ceiling instead of crawling +1 per window —
367    /// additive growth simply cannot reach a useful cap on a
368    /// fast-but-lossy link before the file finishes. See
369    /// `AdaptiveController::new`.
370    pub slow_start_ramp_threshold: usize,
371    /// When `false`, the p95-latency-vs-baseline comparison never
372    /// triggers a Decrease (it still updates the baseline). The fetch
373    /// channel disables it because `chunk_get`'s observed latency
374    /// includes the internal retry sleep and slow retry-sweep for the
375    /// chunks that needed one, so a window with a couple of retry-path
376    /// chunks has a wildly inflated p95 that is retry variance, not
377    /// congestion. Genuine fetch congestion still surfaces as a rising
378    /// `Ok(None)` (Timeout) rate, which the timeout_ceiling check
379    /// catches.
380    pub latency_decrease_enabled: bool,
381}
382
383impl LimiterConfig {
384    fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
385        Self {
386            enabled: cfg.enabled,
387            min_concurrency: cfg.min_concurrency,
388            max_concurrency: max_for_channel.max(cfg.min_concurrency),
389            window_ops: cfg.window_ops,
390            min_window_ops: cfg.min_window_ops,
391            success_target: cfg.success_target,
392            timeout_ceiling: cfg.timeout_ceiling,
393            latency_inflation_factor: cfg.latency_inflation_factor,
394            latency_ewma_alpha: cfg.latency_ewma_alpha,
395            // Defaults preserve the original AIMD behaviour; the fetch
396            // channel overrides both in `AdaptiveController::new`.
397            slow_start_ramp_threshold: 0,
398            latency_decrease_enabled: true,
399        }
400    }
401
402    /// Sanitize a directly-constructed `LimiterConfig`. External
403    /// callers (or tests) that build a `LimiterConfig` literal with
404    /// hostile values (`NaN`, sub-floor mins, inverted bounds) are
405    /// protected — `Limiter::new` calls this on every construction
406    /// so the controller never holds NaN or out-of-range floats.
407    fn sanitize(&mut self) {
408        if !self.latency_ewma_alpha.is_finite() {
409            self.latency_ewma_alpha = 0.2;
410        }
411        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
412        if !self.success_target.is_finite() {
413            self.success_target = 0.95;
414        }
415        self.success_target = self.success_target.clamp(0.0, 1.0);
416        if !self.timeout_ceiling.is_finite() {
417            self.timeout_ceiling = 0.10;
418        }
419        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
420        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
421            self.latency_inflation_factor = 4.0;
422        }
423        self.min_concurrency = self.min_concurrency.max(1);
424        self.window_ops = self.window_ops.max(1);
425        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
426        self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
427    }
428}
429
430/// Per-channel adaptive limiter.
431///
432/// Cheap to clone — both fields are `Arc`. Pass clones into hot loops;
433/// do not hold the lock across `.await` points (call sites observe
434/// with short critical sections only).
435#[derive(Debug, Clone)]
436pub struct Limiter {
437    inner: Arc<Mutex<LimiterInner>>,
438    config: Arc<LimiterConfig>,
439    algorithm: LimiterAlgorithm,
440}
441
442#[derive(Debug)]
443struct LimiterInner {
444    /// Current concurrency cap returned by `current()`.
445    current: usize,
446    /// Sliding window of recent outcomes.
447    window: VecDeque<Sample>,
448    /// Samples observed since the last increase. Increases require a
449    /// fresh window's worth of evidence to avoid ramping on every
450    /// individual healthy sample.
451    samples_since_increase: usize,
452    /// Samples observed since the last decrease. Decreases require
453    /// `min_window_ops` of fresh evidence to avoid pile-driving the
454    /// cap to floor on a single bad burst when many in-flight ops all
455    /// observe stress nearly simultaneously.
456    samples_since_decrease: usize,
457    /// EWMA of p95 latency from past healthy windows. `None` until
458    /// the first healthy window completes.
459    latency_baseline: Option<Duration>,
460    /// `true` once we have observed a stress signal at least once.
461    /// Slow-start mode ends permanently after first stress.
462    left_slow_start: bool,
463    /// Fetch-only throughput optimizer state. Present for every limiter
464    /// to keep `Limiter` cheap to clone and avoid an enum around the
465    /// whole inner struct.
466    hill: HillClimbState,
467}
468
469impl Limiter {
470    /// Create a new limiter starting at `start`, clamped into
471    /// `[min_concurrency, max_concurrency]`. Sanitizes the config to
472    /// guard against directly-constructed `LimiterConfig` literals
473    /// with hostile float values (`NaN`, etc).
474    #[must_use]
475    pub fn new(start: usize, config: LimiterConfig) -> Self {
476        Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
477    }
478
479    fn new_with_algorithm(
480        start: usize,
481        config: LimiterConfig,
482        algorithm: LimiterAlgorithm,
483    ) -> Self {
484        let mut config = config;
485        config.sanitize();
486        let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
487        let window_cap = config.window_ops;
488        Self {
489            inner: Arc::new(Mutex::new(LimiterInner {
490                current: clamped,
491                window: VecDeque::with_capacity(window_cap),
492                samples_since_increase: 0,
493                samples_since_decrease: 0,
494                latency_baseline: None,
495                left_slow_start: false,
496                hill: HillClimbState::new(clamped, window_cap),
497            })),
498            config: Arc::new(config),
499            algorithm,
500        }
501    }
502
503    /// Snapshot current concurrency cap. Hot-path call: the value may
504    /// change between this call and the next, but consumers
505    /// (`buffer_unordered(n)`) capture it once per pipeline build.
506    #[must_use]
507    pub fn current(&self) -> usize {
508        lock(&self.inner).current
509    }
510
511    /// Record one observed operation. Updates the sliding window and
512    /// re-evaluates the cap if the window is full enough.
513    pub fn observe(&self, outcome: Outcome, latency: Duration) {
514        self.observe_with_bytes(outcome, latency, 0);
515    }
516
517    /// Record one observed operation with a payload byte count. Bytes
518    /// are used by the fetch hill climber; AIMD channels ignore them.
519    pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
520        let observed_at = Instant::now();
521        let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
522        self.observe_with_timing(outcome, latency, bytes, operation_started);
523    }
524
525    fn observe_with_timing(
526        &self,
527        outcome: Outcome,
528        latency: Duration,
529        bytes: u64,
530        operation_started: Instant,
531    ) {
532        if !self.config.enabled {
533            return;
534        }
535        let mut g = lock(&self.inner);
536        if g.window.len() == self.config.window_ops {
537            g.window.pop_front();
538        }
539        g.window.push_back(Sample { outcome, latency });
540        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
541            observe_hill_climb(
542                &mut g,
543                outcome,
544                latency,
545                bytes,
546                operation_started,
547                &self.config,
548            );
549            return;
550        }
551        g.samples_since_increase = g.samples_since_increase.saturating_add(1);
552        g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
553        if g.window.len() < self.config.min_window_ops {
554            return;
555        }
556        let decision = evaluate(&g.window, &self.config, g.latency_baseline);
557        apply_decision(&mut g, decision, &self.config);
558    }
559
560    /// Replace the current cap with `start`, clamped. Used for warm
561    /// loads from persisted state. Does not clear the sliding window —
562    /// fresh observations remain authoritative for adaptation
563    /// decisions.
564    ///
565    /// Slow-start state after a warm load depends on the channel's
566    /// `slow_start_ramp_threshold`:
567    ///
568    /// - Default (threshold 0, i.e. quote/store): mark slow-start as
569    ///   already-left so a single healthy window doesn't *double* a
570    ///   learned warm value — an over-aggressive jump. Subsequent
571    ///   increases are +1 per healthy window.
572    /// - Protected (threshold > clamped, i.e. fetch below the ceiling):
573    ///   keep slow-start armed. This is critical for the CLI usage
574    ///   pattern where every `ant file download` is a fresh process
575    ///   that warm-starts from the snapshot: if warm_start always
576    ///   exited slow-start, the fetch cap could only ever grow
577    ///   additively from the persisted value, which cannot climb back
578    ///   to the ceiling against an intermittent Decrease trickle (the
579    ///   exact pin-at-~20 behaviour observed on a fast-but-lossy VPS).
580    ///   Keeping slow-start armed lets the cap double back toward the
581    ///   capacity the connection can actually sustain.
582    pub fn warm_start(&self, start: usize) {
583        let clamped = start.clamp(
584            self.config.min_concurrency,
585            self.config.max_concurrency.max(1),
586        );
587        let mut g = lock(&self.inner);
588        g.current = clamped;
589        g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
590        g.hill = HillClimbState::new(clamped, self.config.window_ops);
591    }
592
593    /// Snapshot of the current cap for persistence. Cheap, lock-only.
594    #[must_use]
595    pub fn snapshot(&self) -> usize {
596        let g = lock(&self.inner);
597        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
598            g.hill.best_concurrency
599        } else {
600            g.current
601        }
602    }
603}
604
605#[derive(Debug, Clone, Copy)]
606struct HillEpochStats {
607    goodput_per_sec: f64,
608    latency_p95: Option<Duration>,
609}
610
611/// Outcome of evaluating one window.
612#[derive(Debug, Clone, Copy, PartialEq, Eq)]
613enum Decision {
614    /// Healthy window — increase concurrency.
615    Increase,
616    /// Stressed window — decrease concurrency.
617    Decrease,
618    /// Inconclusive — hold steady (e.g. mixed signals, baseline not yet set).
619    Hold,
620}
621
622fn evaluate(
623    window: &VecDeque<Sample>,
624    cfg: &LimiterConfig,
625    baseline: Option<Duration>,
626) -> Decision {
627    // Capacity-relevant denominator: ApplicationError outcomes are
628    // explicitly NOT capacity signals (per `Outcome` docs) and are
629    // excluded from rate calculations. A wave of `AlreadyStored`
630    // errors must not punish concurrency.
631    let mut successes = 0usize;
632    let mut timeouts = 0usize;
633    let mut net_errors = 0usize;
634    let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
635    for s in window {
636        match s.outcome {
637            Outcome::Success => {
638                successes += 1;
639                latencies.push(s.latency);
640            }
641            Outcome::Timeout => timeouts += 1,
642            Outcome::NetworkError => net_errors += 1,
643            Outcome::ApplicationError => {}
644        }
645    }
646    let capacity_total = successes + timeouts + net_errors;
647    if capacity_total < cfg.min_window_ops {
648        // Not enough capacity-relevant evidence to act. Hold.
649        return Decision::Hold;
650    }
651    let total_f = capacity_total as f64;
652    let success_rate = successes as f64 / total_f;
653    let timeout_rate = timeouts as f64 / total_f;
654
655    if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
656        return Decision::Decrease;
657    }
658
659    if let Some(p95) = p95_of(&mut latencies) {
660        if cfg.latency_decrease_enabled {
661            if let Some(base) = baseline {
662                let limit = base.mul_f64(cfg.latency_inflation_factor);
663                if p95 > limit {
664                    return Decision::Decrease;
665                }
666            }
667        }
668        Decision::Increase
669    } else {
670        Decision::Hold
671    }
672}
673
674fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
675    match decision {
676        Decision::Increase => {
677            // Gate increases on accumulating a fresh window's worth of
678            // evidence since the last bump.
679            if inner.samples_since_increase < cfg.window_ops {
680                return;
681            }
682            let p95 = window_p95(&inner.window);
683            inner.latency_baseline = Some(match inner.latency_baseline {
684                None => p95,
685                Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
686            });
687            let next = if inner.left_slow_start {
688                inner.current.saturating_add(1)
689            } else {
690                inner.current.saturating_mul(2)
691            };
692            let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
693            if next != inner.current {
694                debug!(
695                    from = inner.current,
696                    to = next,
697                    slow_start = !inner.left_slow_start,
698                    "adaptive: increase",
699                );
700            }
701            inner.current = next;
702            inner.samples_since_increase = 0;
703            inner.samples_since_decrease = 0;
704        }
705        Decision::Decrease => {
706            // Gate decreases on `min_window_ops` of fresh evidence
707            // since the last decrease so a burst of concurrent
708            // observations from in-flight ops can't pile-drive the
709            // cap from N to 1 in a few back-to-back ticks.
710            if inner.samples_since_decrease < cfg.min_window_ops {
711                return;
712            }
713            // Below the ramp threshold we still halve (responsiveness is
714            // preserved) but keep slow-start armed, so the next healthy
715            // window can double the cap back up rather than crawling +1.
716            // Above the threshold a Decrease is the signal to settle into
717            // classic AIMD. With threshold=0 (quote/store) this is the
718            // original behaviour: any Decrease exits slow-start.
719            if inner.current >= cfg.slow_start_ramp_threshold {
720                inner.left_slow_start = true;
721            }
722            let next = (inner.current / 2).max(cfg.min_concurrency);
723            if next != inner.current {
724                debug!(from = inner.current, to = next, "adaptive: decrease");
725            }
726            inner.current = next;
727            inner.samples_since_increase = 0;
728            inner.samples_since_decrease = 0;
729        }
730        Decision::Hold => {}
731    }
732}
733
734/// p95 of a mutable slice of Durations. Sorts in place. Returns
735/// `None` for an empty slice. Index choice: `ceil(len * 0.95) - 1`,
736/// floored at 0, capped at `len - 1`.
737fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
738    if latencies.is_empty() {
739        return None;
740    }
741    latencies.sort_unstable();
742    let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
743    let idx = idx.saturating_sub(1).min(latencies.len() - 1);
744    latencies.get(idx).copied()
745}
746
747fn window_p95(window: &VecDeque<Sample>) -> Duration {
748    let mut latencies: Vec<Duration> = window
749        .iter()
750        .filter(|s| matches!(s.outcome, Outcome::Success))
751        .map(|s| s.latency)
752        .collect();
753    p95_of(&mut latencies).unwrap_or(Duration::ZERO)
754}
755
756fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
757    let alpha = if alpha.is_finite() {
758        alpha.clamp(0.0, 1.0)
759    } else {
760        return prev;
761    };
762    let prev_ms = prev.as_secs_f64() * 1000.0;
763    let sample_ms = sample.as_secs_f64() * 1000.0;
764    let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
765    if !new_ms.is_finite() || new_ms < 0.0 {
766        return prev;
767    }
768    Duration::from_secs_f64(new_ms / 1000.0)
769}
770
771fn observe_hill_climb(
772    inner: &mut LimiterInner,
773    outcome: Outcome,
774    latency: Duration,
775    bytes: u64,
776    operation_started: Instant,
777    cfg: &LimiterConfig,
778) {
779    match inner.hill.epoch_started {
780        Some(epoch_started) if epoch_started <= operation_started => {}
781        _ => inner.hill.epoch_started = Some(operation_started),
782    }
783    inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
784    match outcome {
785        Outcome::Success => {
786            inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
787            inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
788            inner.hill.epoch_latencies.push(latency);
789        }
790        Outcome::Timeout => {
791            inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
792        }
793        Outcome::NetworkError => {
794            inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
795        }
796        Outcome::ApplicationError => {}
797    }
798
799    if hill_epoch_stressed(&inner.hill, cfg) {
800        apply_hill_stress(inner, cfg);
801        return;
802    }
803
804    if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
805        return;
806    }
807
808    if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
809        apply_hill_epoch(inner, stats, cfg);
810    }
811    inner.hill.reset_epoch();
812}
813
814fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
815    cfg.window_ops
816        .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
817        .max(cfg.min_window_ops)
818}
819
820fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
821    let capacity_total = hill.capacity_total();
822    if capacity_total < cfg.min_window_ops {
823        return false;
824    }
825    let total_f = capacity_total as f64;
826    let success_rate = hill.epoch_successes as f64 / total_f;
827    let timeout_rate = hill.epoch_timeouts as f64 / total_f;
828    success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
829}
830
831fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
832    let capacity_total = hill.capacity_total();
833    if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
834        return None;
835    }
836    let mut latencies = hill.epoch_latencies.clone();
837    let latency_p95 = p95_of(&mut latencies);
838    let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
839    let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
840    let elapsed = wall_elapsed.max(max_latency);
841    let elapsed_secs = elapsed.as_secs_f64();
842    if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
843        return None;
844    }
845
846    // Unit fallback keeps direct unit tests that call `observe(Success, ..)`
847    // meaningful; real download paths report bytes.
848    let units = if hill.epoch_bytes > 0 {
849        hill.epoch_bytes as f64
850    } else {
851        hill.epoch_successes as f64
852    };
853    Some(HillEpochStats {
854        goodput_per_sec: units / elapsed_secs,
855        latency_p95,
856    })
857}
858
859fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
860    let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
861        .max(cfg.min_concurrency)
862        .min(cfg.max_concurrency);
863    if next != inner.current {
864        debug!(
865            from = inner.current,
866            to = next,
867            "adaptive: fetch hill stress decrease"
868        );
869    }
870    inner.current = next;
871    inner.hill.best_concurrency = next;
872    inner.hill.best_goodput_per_sec = None;
873    inner.hill.best_latency_p95 = None;
874    inner.hill.stable_epochs = 0;
875    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
876    inner.hill.active_probe = None;
877    inner.hill.next_probe = ProbeDirection::Up;
878    inner.hill.reset_epoch();
879}
880
881fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
882    let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
883        inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
884        inner.hill.best_latency_p95 = stats.latency_p95;
885        inner.hill.best_concurrency = inner.current;
886        probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
887        return;
888    };
889
890    match inner.hill.active_probe {
891        Some(ProbeDirection::Up) => {
892            let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
893            if improved
894                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
895            {
896                accept_hill_probe(inner, stats, cfg);
897                probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
898            } else {
899                reject_hill_probe(inner);
900            }
901        }
902        Some(ProbeDirection::Down) => {
903            let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
904            if retained
905                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
906            {
907                accept_hill_probe(inner, stats, cfg);
908                inner.hill.next_probe = ProbeDirection::Up;
909            } else {
910                reject_hill_probe(inner);
911            }
912        }
913        None => {
914            refresh_hill_best(inner, stats, cfg);
915            if inner.hill.cooldown_epochs > 0 {
916                inner.hill.cooldown_epochs -= 1;
917                return;
918            }
919            inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
920            if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
921                let direction = inner.hill.next_probe;
922                inner.hill.next_probe = match direction {
923                    ProbeDirection::Up => ProbeDirection::Down,
924                    ProbeDirection::Down => ProbeDirection::Up,
925                };
926                probe_hill_neighbor(inner, direction, cfg);
927            }
928        }
929    }
930}
931
932fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
933    inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
934        Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
935        None => stats.goodput_per_sec,
936    });
937    if let Some(latency_p95) = stats.latency_p95 {
938        inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
939            Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
940            None => latency_p95,
941        });
942    }
943}
944
945fn hill_latency_acceptable(
946    candidate: Option<Duration>,
947    best: Option<Duration>,
948    cfg: &LimiterConfig,
949) -> bool {
950    match (candidate, best) {
951        (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
952        _ => true,
953    }
954}
955
956fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
957    let alpha = if alpha.is_finite() {
958        alpha.clamp(0.0, 1.0)
959    } else {
960        return prev;
961    };
962    let next = (1.0 - alpha) * prev + alpha * sample;
963    if next.is_finite() && next >= 0.0 {
964        next
965    } else {
966        prev
967    }
968}
969
970fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
971    debug!(
972        concurrency = inner.current,
973        goodput_per_sec = stats.goodput_per_sec,
974        "adaptive: fetch hill accepted probe"
975    );
976    inner.hill.best_concurrency = inner.current;
977    inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
978    inner.hill.best_latency_p95 = stats.latency_p95;
979    inner.hill.active_probe = None;
980    inner.hill.cooldown_epochs = 0;
981    inner.hill.stable_epochs = 0;
982    inner.current = inner
983        .hill
984        .best_concurrency
985        .clamp(cfg.min_concurrency, cfg.max_concurrency);
986}
987
988fn reject_hill_probe(inner: &mut LimiterInner) {
989    let from = inner.current;
990    let to = inner.hill.best_concurrency;
991    let rejected_direction = inner.hill.active_probe;
992    if from != to {
993        debug!(from, to, "adaptive: fetch hill rejected probe");
994    }
995    inner.current = to;
996    inner.hill.active_probe = None;
997    if let Some(direction) = rejected_direction {
998        inner.hill.next_probe = match direction {
999            ProbeDirection::Up => ProbeDirection::Down,
1000            ProbeDirection::Down => ProbeDirection::Up,
1001        };
1002    }
1003    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1004    inner.hill.stable_epochs = 0;
1005}
1006
1007fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1008    let best = inner.hill.best_concurrency;
1009    let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1010    let candidate = match direction {
1011        ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1012        ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1013    };
1014    if candidate == best {
1015        inner.current = best;
1016        inner.hill.active_probe = None;
1017        inner.hill.stable_epochs = 0;
1018        return;
1019    }
1020    debug!(
1021        from = best,
1022        to = candidate,
1023        ?direction,
1024        "adaptive: fetch hill probing"
1025    );
1026    inner.current = candidate;
1027    inner.hill.active_probe = Some(direction);
1028    inner.hill.stable_epochs = 0;
1029}
1030
1031/// Bundle of per-channel limiters owned by the `Client`.
1032#[derive(Debug, Clone)]
1033pub struct AdaptiveController {
1034    pub quote: Limiter,
1035    pub store: Limiter,
1036    pub fetch: Limiter,
1037    /// `pub(crate)` so external callers cannot mutate this
1038    /// post-construction. Each `Limiter` snapshots its own
1039    /// `Arc<LimiterConfig>` at construction time, so external
1040    /// mutation here would silently desync `warm_start`'s
1041    /// `enabled` check from the limiters' frozen copies. Read via
1042    /// `config()`.
1043    pub(crate) config: AdaptiveConfig,
1044    /// Per-instance cold-start values. `warm_start` floors snapshot
1045    /// values against THIS, not the global `ChannelStart::default()`,
1046    /// so a controller built with custom (e.g. low) starts stays
1047    /// faithful to its construction parameters. Constructed-once,
1048    /// never mutated.
1049    cold_start: ChannelStart,
1050}
1051
1052impl AdaptiveController {
1053    /// Create a controller with cold-start values per channel.
1054    /// Sanitizes the config (NaN guards, floor/ceiling enforcement)
1055    /// before constructing limiters. The supplied `start` is captured
1056    /// as the per-instance cold-start floor for `warm_start`.
1057    #[must_use]
1058    pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1059        let mut config = config;
1060        config.sanitize();
1061        let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1062        let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1063        let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1064        // Lift the fetch channel's floor above the global
1065        // `min_concurrency`. Reasoning is specific to download: on
1066        // residential links, residual peer-side timeouts (NAT path
1067        // issues, peers in the close group that don't store the chunk,
1068        // peers under temporary load) continuously push the
1069        // controller's timeout_rate above ceiling. A global floor of 1
1070        // means the controller fully serializes chunk fetches on that
1071        // noise floor and gets stuck — observed on PROD-LOCAL-DL-03
1072        // where the download stayed stable but throughput collapsed to
1073        // ~330 KB/s on a multi-MB/s link.
1074        //
1075        // 4 is the smallest floor that keeps the download from fully
1076        // serializing; it also matches the validated cold-start floor.
1077        // Floor `quote` and `store` separately if a corresponding
1078        // pathology is identified for them; today's evidence is
1079        // download-only.
1080        fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1081        // Re-establish max >= min after the bump in case the channel
1082        // ceiling was somehow lower than the new floor.
1083        fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1084        // Download-specific growth/decision tuning (see the field docs
1085        // on `LimiterConfig`):
1086        //
1087        // - Never exit slow-start. Classic AIMD additive growth (+1 per
1088        //   healthy window) cannot reach a useful cap from a low base
1089        //   before a multi-GB file finishes — a fast-but-lossy
1090        //   connection (e.g. a VPS with a steady ~4% close-group-
1091        //   exhaustion trickle) was observed stuck at cap ~13-24 across
1092        //   36 files because every transient Decrease permanently
1093        //   dropped it to additive growth. `usize::MAX` keeps slow-start
1094        //   armed at every cap including the ceiling, so a Decrease
1095        //   (e.g. 256 -> 128) still halves but the next healthy window
1096        //   doubles it back. The cap therefore tracks the connection's
1097        //   real capacity instead of crawling, and a single transient
1098        //   Decrease near the ceiling can't re-pin the link to additive
1099        //   recovery. (A threshold == max_concurrency would NOT achieve
1100        //   this: `current >= threshold` is true at the ceiling, so a
1101        //   Decrease there would exit slow-start.)
1102        // - Disable the p95-latency Decrease. chunk_get's observed
1103        //   latency includes the internal retry sleep + slow retry
1104        //   sweep for chunks that needed one, so a window with a couple
1105        //   of retry-path chunks shows a hugely inflated p95 that is
1106        //   retry variance, not congestion. Genuine fetch congestion
1107        //   still drives Decrease via the Ok(None) -> Timeout rate.
1108        fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1109        fetch_cfg.latency_decrease_enabled = false;
1110        Self {
1111            quote: Limiter::new(start.quote, quote_cfg),
1112            store: Limiter::new(start.store, store_cfg),
1113            fetch: Limiter::new_with_algorithm(
1114                start.fetch,
1115                fetch_cfg,
1116                LimiterAlgorithm::ThroughputHillClimb,
1117            ),
1118            config,
1119            cold_start: start,
1120        }
1121    }
1122
1123    /// Snapshot current per-channel caps for persistence.
1124    #[must_use]
1125    pub fn snapshot(&self) -> ChannelStart {
1126        ChannelStart {
1127            quote: self.quote.snapshot(),
1128            store: self.store.snapshot(),
1129            fetch: self.fetch.snapshot(),
1130        }
1131    }
1132
1133    /// Read-only access to the controller's adaptive config. Made
1134    /// read-only deliberately: each `Limiter` snapshots its own
1135    /// `Arc<LimiterConfig>` at construction, so post-hoc mutation
1136    /// would silently desync `warm_start`'s `enabled` check from
1137    /// the limiters' frozen copies.
1138    #[must_use]
1139    pub fn config(&self) -> &AdaptiveConfig {
1140        &self.config
1141    }
1142
1143    /// Apply a previously-saved snapshot as the warm-start cap.
1144    ///
1145    /// The effective warm value per channel is
1146    /// `max(snapshot, self.cold_start)` — flooring at the
1147    /// per-instance cold-start (NOT the global default) so:
1148    /// 1. A prior bad run that pinned cap=1 doesn't pessimize this
1149    ///    run forever.
1150    /// 2. A controller built with custom (e.g. low) cold starts for
1151    ///    benchmarking is not silently jumped above its construction
1152    ///    parameters.
1153    ///
1154    /// Does not clear sliding windows. When `enabled = false`, this
1155    /// is a no-op — fixed-concurrency mode means fixed-concurrency.
1156    pub fn warm_start(&self, snapshot: ChannelStart) {
1157        if !self.config.enabled {
1158            return;
1159        }
1160        self.quote
1161            .warm_start(snapshot.quote.max(self.cold_start.quote));
1162        self.store
1163            .warm_start(snapshot.store.max(self.cold_start.store));
1164        self.fetch
1165            .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1166    }
1167}
1168
1169impl Default for AdaptiveController {
1170    fn default() -> Self {
1171        Self::new(ChannelStart::default(), AdaptiveConfig::default())
1172    }
1173}
1174
1175/// Cancel-on-drop guard: if the wrapping future is dropped before
1176/// completion, record no outcome. We don't synthesize a Cancelled
1177/// signal because (a) dropped work was never observed by the network
1178/// and (b) injecting fake outcomes would skew the sliding window
1179/// after a fail-fast burst. The intentional behavior is "silent on
1180/// cancel, observe on completion" — callers that need to keep
1181/// fail-fast batches drained for full signal use `rebucketed`.
1182struct ObserveGuard<'a> {
1183    limiter: &'a Limiter,
1184    started: Instant,
1185    outcome: Option<(Outcome, Duration, u64)>,
1186}
1187
1188impl<'a> ObserveGuard<'a> {
1189    fn new(limiter: &'a Limiter) -> Self {
1190        Self {
1191            limiter,
1192            started: Instant::now(),
1193            outcome: None,
1194        }
1195    }
1196    fn finish(&mut self, outcome: Outcome) {
1197        self.finish_with_bytes(outcome, 0);
1198    }
1199
1200    fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1201        self.outcome = Some((outcome, self.started.elapsed(), bytes));
1202    }
1203}
1204
1205impl Drop for ObserveGuard<'_> {
1206    fn drop(&mut self) {
1207        if let Some((outcome, latency, bytes)) = self.outcome.take() {
1208            self.limiter
1209                .observe_with_timing(outcome, latency, bytes, self.started);
1210        }
1211    }
1212}
1213
1214/// Helper for instrumented call sites: time an async op, classify the
1215/// result, and report to a `Limiter`. Returns the original result.
1216///
1217/// ## Cancellation safety
1218///
1219/// Uses an internal `ObserveGuard` so the recorded outcome is
1220/// committed via `Drop` after the inner future returns. If the
1221/// wrapper future is itself dropped before `op().await` resolves
1222/// (caller cancellation, `buffer_unordered` fail-fast), no outcome
1223/// is recorded — this is intentional, see the guard's docs.
1224///
1225/// ```ignore
1226/// let res = observe_op(&controller.store, || async { do_put().await }, classify_put_err).await;
1227/// ```
1228pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1229where
1230    F: FnOnce() -> Fut,
1231    Fut: std::future::Future<Output = Result<T, E>>,
1232    C: FnOnce(&E) -> Outcome,
1233{
1234    let mut guard = ObserveGuard::new(limiter);
1235    let result = op().await;
1236    let outcome = match &result {
1237        Ok(_) => Outcome::Success,
1238        Err(e) => classify(e),
1239    };
1240    guard.finish(outcome);
1241    drop(guard); // commit observation explicitly so it lands before return
1242    result
1243}
1244
1245/// Byte-aware variant of [`observe_op`] for fetch paths. The success
1246/// byte extractor is called only for `Ok` results; errors still carry
1247/// zero bytes and are classified by the provided function.
1248pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1249    limiter: &Limiter,
1250    op: F,
1251    classify: C,
1252    success_bytes: B,
1253) -> Result<T, E>
1254where
1255    F: FnOnce() -> Fut,
1256    Fut: std::future::Future<Output = Result<T, E>>,
1257    C: FnOnce(&E) -> Outcome,
1258    B: FnOnce(&T) -> u64,
1259{
1260    let mut guard = ObserveGuard::new(limiter);
1261    let result = op().await;
1262    match &result {
1263        Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1264        Err(e) => guard.finish_with_bytes(classify(e), 0),
1265    }
1266    drop(guard);
1267    result
1268}
1269
1270/// Process an iterator of items with a rolling scheduler whose cap
1271/// is re-read from the limiter as each slot frees. Replaces the
1272/// "snapshot the cap once at pipeline build" behavior of plain
1273/// `buffer_unordered(N)` so a long pipeline (e.g. 10 GB download =
1274/// ~2500 chunks) sees adaptive growth/decay mid-flight.
1275///
1276/// Output is unordered (first-completion). For an ordered result
1277/// (e.g. `data_download` feeds chunks in DataMap order to
1278/// self_encryption decrypt), wrap items with their index and sort
1279/// after collection — see `rebucketed_ordered`.
1280///
1281/// On error: in-flight work drains to completion (so observed
1282/// outcomes still feed the controller) but no new launches happen.
1283/// The first error is preserved; later errors are discarded.
1284pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1285    limiter: &Limiter,
1286    items: I,
1287    mut op: F,
1288) -> Result<Vec<T>, E>
1289where
1290    I: IntoIterator,
1291    F: FnMut(I::Item) -> Fut,
1292    Fut: std::future::Future<Output = Result<T, E>>,
1293{
1294    let mut iter = items.into_iter().peekable();
1295    let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1296    let mut results = Vec::new();
1297    let mut pending_err: Option<E> = None;
1298    loop {
1299        // Refill: re-read the cap and launch up to `cap - in_flight.len()`
1300        // new items, but only if we are not already in error-stop.
1301        if pending_err.is_none() {
1302            let cap = limiter.current().max(1);
1303            while in_flight.len() < cap {
1304                match iter.next() {
1305                    Some(item) => in_flight.push(op(item)),
1306                    None => break,
1307                }
1308            }
1309        }
1310        if in_flight.is_empty() {
1311            break;
1312        }
1313        match in_flight.next().await {
1314            Some(Ok(v)) => results.push(v),
1315            Some(Err(e)) => {
1316                if pending_err.is_none() {
1317                    pending_err = Some(e);
1318                }
1319            }
1320            None => break,
1321        }
1322    }
1323    match pending_err {
1324        Some(e) => Err(e),
1325        None => Ok(results),
1326    }
1327}
1328
1329/// Ordered variant: items are tagged with a usize index by the
1330/// caller (typically by `iter.enumerate()`); after rolling
1331/// completion, results are sorted by index so output preserves
1332/// input order. Use this for callers that pass to APIs which
1333/// consume positionally (e.g. self_encryption's
1334/// `get_root_data_map_parallel` zips `Vec<(idx, Bytes)>` with input
1335/// hashes positionally and discards the idx — without a final sort
1336/// the bytes pair with the wrong hashes).
1337///
1338/// `op` is `FnMut(Item) -> Fut` where `Item` carries whatever
1339/// payload the caller needs; the closure must return
1340/// `Result<(usize, U), E>` so the wrapper can sort by the index.
1341pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1342    limiter: &Limiter,
1343    items: I,
1344    op: F,
1345) -> Result<Vec<U>, E>
1346where
1347    I: IntoIterator,
1348    F: FnMut(I::Item) -> Fut,
1349    Fut: std::future::Future<Output = Result<(usize, U), E>>,
1350{
1351    let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1352    indexed.sort_by_key(|(idx, _)| *idx);
1353    Ok(indexed.into_iter().map(|(_, v)| v).collect())
1354}
1355
1356/// Backward-compatible wrapper. `ordered = false` -> rolling
1357/// unordered. `ordered = true` -> the OLD batch-fence ordered path
1358/// (kept for tests that explicitly assert batch-fence semantics).
1359/// New call sites should use `rebucketed_unordered` or
1360/// `rebucketed_ordered` directly.
1361pub async fn rebucketed<I, T, E, F, Fut>(
1362    limiter: &Limiter,
1363    items: I,
1364    ordered: bool,
1365    mut op: F,
1366) -> Result<Vec<T>, E>
1367where
1368    I: IntoIterator,
1369    F: FnMut(I::Item) -> Fut,
1370    Fut: std::future::Future<Output = Result<T, E>>,
1371{
1372    if !ordered {
1373        return rebucketed_unordered(limiter, items, op).await;
1374    }
1375    let mut iter = items.into_iter();
1376    let mut results = Vec::new();
1377    let mut pending_err: Option<E> = None;
1378    loop {
1379        if pending_err.is_some() {
1380            break;
1381        }
1382        let cap = limiter.current().max(1);
1383        let mut batch = Vec::with_capacity(cap);
1384        for item in iter.by_ref().take(cap) {
1385            batch.push(op(item));
1386        }
1387        if batch.is_empty() {
1388            break;
1389        }
1390        let mut s = stream::iter(batch).buffered(cap);
1391        while let Some(r) = s.next().await {
1392            match r {
1393                Ok(v) => results.push(v),
1394                Err(e) => {
1395                    if pending_err.is_none() {
1396                        pending_err = Some(e);
1397                    }
1398                }
1399            }
1400        }
1401    }
1402    match pending_err {
1403        Some(e) => Err(e),
1404        None => Ok(results),
1405    }
1406}
1407
1408/// On-disk shape for the persisted adaptive state. Versioned so we
1409/// can evolve the controller without crashing on stale files — an
1410/// unknown future schema version simply causes a silent fallback to
1411/// cold defaults.
1412#[derive(Debug, Clone, Serialize, Deserialize)]
1413struct PersistedState {
1414    schema: u32,
1415    channels: ChannelStart,
1416}
1417
1418const PERSIST_SCHEMA: u32 = 2;
1419const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1420const PERSIST_FILENAME: &str = "client_adaptive.json";
1421
1422/// Default persistence path: `<data_dir>/client_adaptive.json`. Falls
1423/// back to `None` if the platform data dir is not resolvable; in that
1424/// case the controller still works, it just won't persist.
1425#[must_use]
1426pub fn default_persist_path() -> Option<PathBuf> {
1427    crate::config::data_dir()
1428        .ok()
1429        .map(|d| d.join(PERSIST_FILENAME))
1430}
1431
1432/// Load a persisted snapshot from disk, returning `None` if the file
1433/// does not exist, is unreadable, contains malformed JSON, or has a
1434/// schema version this build does not understand. Persistence is best
1435/// effort — never propagate errors that would block the user's
1436/// operation.
1437#[must_use]
1438pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1439    let bytes = std::fs::read(path).ok()?;
1440    let state: PersistedState = match serde_json::from_slice(&bytes) {
1441        Ok(s) => s,
1442        Err(e) => {
1443            warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1444            return None;
1445        }
1446    };
1447    match state.schema {
1448        PERSIST_SCHEMA => Some(state.channels),
1449        PERSIST_SCHEMA_AIMD_FETCH => {
1450            debug!(
1451                path = %path.display(),
1452                "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1453            );
1454            Some(ChannelStart {
1455                fetch: FETCH_COLD_START_CONCURRENCY,
1456                ..state.channels
1457            })
1458        }
1459        schema => {
1460            debug!(
1461                path = %path.display(),
1462                schema,
1463                expected = PERSIST_SCHEMA,
1464                "adaptive: snapshot schema mismatch, ignoring",
1465            );
1466            None
1467        }
1468    }
1469}
1470
1471/// Save a snapshot to disk atomically (write to `<path>.tmp`, then
1472/// rename). Best effort — failures are logged at warn and discarded.
1473pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1474    let state = PersistedState {
1475        schema: PERSIST_SCHEMA,
1476        channels,
1477    };
1478    let bytes = match serde_json::to_vec_pretty(&state) {
1479        Ok(b) => b,
1480        Err(e) => {
1481            warn!(error = %e, "adaptive: snapshot serialize failed");
1482            return;
1483        }
1484    };
1485    if let Some(parent) = path.parent() {
1486        if let Err(e) = std::fs::create_dir_all(parent) {
1487            warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1488            return;
1489        }
1490    }
1491    // Unique-per-save temp filename: PID + monotonic counter +
1492    // nanosecond timestamp guarantees no collision between concurrent
1493    // CLI invocations OR concurrent save_snapshot calls within one
1494    // process (e.g. multiple Client instances sharing the same data
1495    // dir). POSIX rename is atomic on the destination, so the rename
1496    // target overlap is fine — last writer wins.
1497    let nanos = std::time::SystemTime::now()
1498        .duration_since(std::time::UNIX_EPOCH)
1499        .map(|d| d.subsec_nanos())
1500        .unwrap_or(0);
1501    let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1502    let tmp = path.with_extension(format!(
1503        "json.tmp.{}.{}.{}",
1504        std::process::id(),
1505        counter,
1506        nanos
1507    ));
1508    if let Err(e) = std::fs::write(&tmp, &bytes) {
1509        warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1510        return;
1511    }
1512    if let Err(e) = std::fs::rename(&tmp, path) {
1513        warn!(
1514            from = %tmp.display(),
1515            to = %path.display(),
1516            error = %e,
1517            "adaptive: snapshot rename failed",
1518        );
1519        // Try to clean up the temp on rename failure so we don't
1520        // leave junk in the data dir. Best effort.
1521        let _ = std::fs::remove_file(&tmp);
1522    }
1523}
1524
1525/// Save with a wall-clock deadline. Spawns the synchronous
1526/// `save_snapshot` on a detached thread and waits up to `timeout`
1527/// for it to finish. If the thread is still running past the
1528/// deadline (e.g. because the data dir is on a hung NFS mount),
1529/// returns without joining — the OS will clean up the thread when
1530/// the process exits.
1531///
1532/// Used by `Client::drop` so a stalled filesystem cannot block
1533/// process shutdown indefinitely.
1534pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1535    let handle = std::thread::spawn(move || {
1536        save_snapshot(&path, channels);
1537    });
1538    // Park briefly waiting for the thread, polling its status. We
1539    // use a short polling interval rather than `join()` because
1540    // join() blocks indefinitely.
1541    let started = Instant::now();
1542    let poll = Duration::from_millis(5);
1543    while started.elapsed() < timeout {
1544        if handle.is_finished() {
1545            let _ = handle.join();
1546            return;
1547        }
1548        std::thread::sleep(poll);
1549    }
1550    // Deadline elapsed. Detach the thread; it will continue to run
1551    // in the background until process exit (its work is best-effort
1552    // anyway). Log so operators can see the slow filesystem.
1553    warn!(
1554        timeout_ms = timeout.as_millis() as u64,
1555        "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1556    );
1557    drop(handle);
1558}
1559
1560#[cfg(test)]
1561#[allow(clippy::unwrap_used)]
1562mod tests {
1563    use super::*;
1564
1565    const HILL_TEST_START_CAP: usize = 16;
1566    const HILL_TEST_UP_PROBE_CAP: usize = 20;
1567    const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1568    const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1569    const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1570    const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1571    const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1572    const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1573    const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1574
1575    fn cfg_for_tests() -> LimiterConfig {
1576        LimiterConfig {
1577            enabled: true,
1578            min_concurrency: 1,
1579            max_concurrency: 64,
1580            window_ops: 10,
1581            min_window_ops: 5,
1582            success_target: 0.9,
1583            timeout_ceiling: 0.2,
1584            latency_inflation_factor: 2.0,
1585            latency_ewma_alpha: 0.5,
1586            slow_start_ramp_threshold: 0,
1587            latency_decrease_enabled: true,
1588        }
1589    }
1590
1591    fn hill_cfg_for_tests() -> LimiterConfig {
1592        LimiterConfig {
1593            window_ops: 4,
1594            min_window_ops: 2,
1595            max_concurrency: 64,
1596            success_target: 0.9,
1597            timeout_ceiling: 0.2,
1598            ..cfg_for_tests()
1599        }
1600    }
1601
1602    fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1603        Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1604    }
1605
1606    fn observe_hill_success_epoch_with_latency(
1607        limiter: &Limiter,
1608        cfg: &LimiterConfig,
1609        bytes: u64,
1610        latency: Duration,
1611    ) {
1612        let samples = hill_epoch_target_samples(limiter.current(), cfg);
1613        for _ in 0..samples {
1614            limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1615        }
1616    }
1617
1618    fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1619        observe_hill_success_epoch_with_latency(
1620            limiter,
1621            cfg,
1622            bytes,
1623            Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1624        );
1625    }
1626
1627    /// Build an `AdaptiveConfig` for tests that need to construct a
1628    /// full `AdaptiveController`. Mirrors `cfg_for_tests()` defaults
1629    /// where they overlap, plus per-channel max derived from the same
1630    /// `max_concurrency` value.
1631    fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1632        let l = cfg_for_tests();
1633        AdaptiveConfig {
1634            enabled: l.enabled,
1635            min_concurrency: l.min_concurrency,
1636            max: ChannelMax {
1637                quote: l.max_concurrency,
1638                store: l.max_concurrency,
1639                fetch: l.max_concurrency,
1640            },
1641            window_ops: l.window_ops,
1642            min_window_ops: l.min_window_ops,
1643            success_target: l.success_target,
1644            timeout_ceiling: l.timeout_ceiling,
1645            latency_inflation_factor: l.latency_inflation_factor,
1646            latency_ewma_alpha: l.latency_ewma_alpha,
1647        }
1648    }
1649
1650    #[test]
1651    fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1652        // Regression guard for the CLI multi-file pattern: each
1653        // `ant file download` is a fresh process that warm-starts from
1654        // the persisted snapshot. If warm_start exited slow-start, the
1655        // fetch cap could only grow additively from the warm value and
1656        // could never climb back to the ceiling against an intermittent
1657        // Decrease trickle. A protected limiter (threshold == max) that
1658        // warm-starts BELOW the ceiling must keep slow-start armed so it
1659        // doubles back up.
1660        let cfg = LimiterConfig {
1661            max_concurrency: 256,
1662            slow_start_ramp_threshold: 256,
1663            latency_decrease_enabled: false,
1664            ..cfg_for_tests()
1665        };
1666        let l = Limiter::new(64, cfg.clone());
1667        l.warm_start(20);
1668        assert_eq!(l.current(), 20);
1669        // A single healthy window should DOUBLE (slow-start armed),
1670        // proving warm_start did not exit slow-start.
1671        for _ in 0..cfg.window_ops {
1672            l.observe(Outcome::Success, Duration::from_millis(10));
1673        }
1674        assert_eq!(
1675            l.current(),
1676            40,
1677            "protected channel must double after warm_start, not crawl +1",
1678        );
1679
1680        // Default channel (threshold 0): warm_start exits slow-start,
1681        // so the same window only adds 1.
1682        let default_cfg = LimiterConfig {
1683            max_concurrency: 256,
1684            ..cfg_for_tests()
1685        };
1686        let d = Limiter::new(64, default_cfg.clone());
1687        d.warm_start(20);
1688        for _ in 0..default_cfg.window_ops {
1689            d.observe(Outcome::Success, Duration::from_millis(10));
1690        }
1691        assert_eq!(
1692            d.current(),
1693            21,
1694            "default channel must stay additive after warm_start",
1695        );
1696    }
1697
1698    #[test]
1699    fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1700        // Regression for the "lost protection at the ceiling" bug.
1701        // threshold == usize::MAX (the fetch setting) keeps slow-start
1702        // armed even when a Decrease fires at the ceiling, so the cap
1703        // doubles back. threshold == max_concurrency (the buggy
1704        // setting) would exit slow-start there — `current >= threshold`
1705        // is true at the ceiling — and recover only additively. After
1706        // identical stress-at-ceiling + recovery, the MAX-threshold
1707        // limiter must end strictly higher.
1708        let base = LimiterConfig {
1709            max_concurrency: 256,
1710            latency_decrease_enabled: false,
1711            ..cfg_for_tests()
1712        };
1713        let fixed = Limiter::new(
1714            256,
1715            LimiterConfig {
1716                slow_start_ramp_threshold: usize::MAX,
1717                ..base.clone()
1718            },
1719        );
1720        let buggy = Limiter::new(
1721            256,
1722            LimiterConfig {
1723                slow_start_ramp_threshold: 256,
1724                ..base.clone()
1725            },
1726        );
1727        for l in [&fixed, &buggy] {
1728            for _ in 0..base.window_ops {
1729                l.observe(Outcome::Timeout, Duration::from_millis(10));
1730            }
1731            for _ in 0..(base.window_ops * 10) {
1732                l.observe(Outcome::Success, Duration::from_millis(10));
1733            }
1734        }
1735        assert!(
1736            fixed.current() > buggy.current(),
1737            "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1738            fixed.current(),
1739            buggy.current(),
1740        );
1741    }
1742
1743    #[test]
1744    fn protected_slow_start_recovers_faster_than_additive() {
1745        // After identical stress + recovery, a limiter with slow-start
1746        // protected to the ceiling (fetch behaviour) must end at a
1747        // higher cap than one that exits slow-start on first Decrease
1748        // (quote/store behaviour): doubling outpaces +1-per-window.
1749        let base = LimiterConfig {
1750            max_concurrency: 256,
1751            latency_decrease_enabled: false,
1752            ..cfg_for_tests()
1753        };
1754        let protected = Limiter::new(
1755            64,
1756            LimiterConfig {
1757                slow_start_ramp_threshold: 256,
1758                ..base.clone()
1759            },
1760        );
1761        let unprotected = Limiter::new(
1762            64,
1763            LimiterConfig {
1764                slow_start_ramp_threshold: 0,
1765                ..base.clone()
1766            },
1767        );
1768
1769        // Identical stress: a window of timeouts forces decreases on both.
1770        for l in [&protected, &unprotected] {
1771            for _ in 0..base.window_ops {
1772                l.observe(Outcome::Timeout, Duration::from_millis(10));
1773            }
1774        }
1775        // Identical recovery: a long stretch of healthy windows. The
1776        // protected limiter doubles each window; the unprotected one
1777        // only adds 1.
1778        for l in [&protected, &unprotected] {
1779            for _ in 0..(base.window_ops * 10) {
1780                l.observe(Outcome::Success, Duration::from_millis(10));
1781            }
1782        }
1783        assert!(
1784            protected.current() > unprotected.current(),
1785            "protected slow-start ({}) should recover faster than additive ({})",
1786            protected.current(),
1787            unprotected.current(),
1788        );
1789    }
1790
1791    #[test]
1792    fn latency_decrease_disabled_ignores_p95_inflation() {
1793        // With latency_decrease_enabled=false, a window of successes
1794        // whose p95 latency is far above the baseline must NOT trigger
1795        // a Decrease — only success/timeout rate can. (Fetch disables
1796        // this because chunk_get's observed latency is polluted by
1797        // retry-path variance.)
1798        let cfg = LimiterConfig {
1799            max_concurrency: 256,
1800            slow_start_ramp_threshold: 256,
1801            latency_decrease_enabled: false,
1802            ..cfg_for_tests()
1803        };
1804        let l = Limiter::new(16, cfg.clone());
1805        // Establish a fast baseline.
1806        for _ in 0..cfg.window_ops {
1807            l.observe(Outcome::Success, Duration::from_millis(5));
1808        }
1809        let after_baseline = l.current();
1810        // Now a window of successes with 100x the latency. With the
1811        // latency check disabled this is still a healthy window, so the
1812        // cap must not drop.
1813        for _ in 0..cfg.window_ops {
1814            l.observe(Outcome::Success, Duration::from_millis(500));
1815        }
1816        assert!(
1817            l.current() >= after_baseline,
1818            "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1819            l.current(),
1820            after_baseline,
1821        );
1822    }
1823
1824    #[test]
1825    fn controller_sets_fetch_channel_download_tuning() {
1826        // AdaptiveController::new must apply the download-specific
1827        // tuning to fetch only, leaving quote/store on classic AIMD.
1828        let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1829        assert!(
1830            !c.fetch.config.latency_decrease_enabled,
1831            "fetch latency-decrease must be disabled",
1832        );
1833        assert_eq!(
1834            c.fetch.config.slow_start_ramp_threshold,
1835            usize::MAX,
1836            "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1837        );
1838        assert!(
1839            c.quote.config.latency_decrease_enabled,
1840            "quote must keep the latency-decrease check",
1841        );
1842        assert_eq!(
1843            c.quote.config.slow_start_ramp_threshold, 0,
1844            "quote must keep classic AIMD slow-start exit",
1845        );
1846        assert!(c.store.config.latency_decrease_enabled);
1847        assert_eq!(c.store.config.slow_start_ramp_threshold, 0);
1848    }
1849
1850    #[test]
1851    fn cold_start_clamps_into_bounds() {
1852        let cfg = cfg_for_tests();
1853        let l = Limiter::new(1000, cfg.clone());
1854        assert_eq!(l.current(), cfg.max_concurrency);
1855        let l = Limiter::new(0, cfg.clone());
1856        assert_eq!(l.current(), cfg.min_concurrency);
1857    }
1858
1859    #[test]
1860    fn slow_start_doubles_then_caps() {
1861        let cfg = cfg_for_tests();
1862        let l = Limiter::new(2, cfg.clone());
1863        // Feed a full healthy window — concurrency doubles.
1864        for _ in 0..cfg.window_ops {
1865            l.observe(Outcome::Success, Duration::from_millis(50));
1866        }
1867        assert_eq!(l.current(), 4);
1868        for _ in 0..cfg.window_ops {
1869            l.observe(Outcome::Success, Duration::from_millis(50));
1870        }
1871        assert_eq!(l.current(), 8);
1872    }
1873
1874    #[test]
1875    fn first_failure_exits_slow_start() {
1876        let cfg = cfg_for_tests();
1877        let l = Limiter::new(4, cfg.clone());
1878        // 6 successes + 4 timeouts in a window of 10. Decisions fire
1879        // per-sample once the window has min_window_ops entries, so
1880        // the four timeouts each drive Decrease. That floors the cap.
1881        for _ in 0..6 {
1882            l.observe(Outcome::Success, Duration::from_millis(50));
1883        }
1884        for _ in 0..4 {
1885            l.observe(Outcome::Timeout, Duration::from_millis(50));
1886        }
1887        let after_stress = l.current();
1888        assert!(
1889            after_stress < 4,
1890            "stress should reduce concurrency from 4, got {after_stress}",
1891        );
1892        // After exiting slow-start, recovery is +1 per fresh window,
1893        // not doubling. The first `window_ops` successes flush prior
1894        // timeouts out of the sliding window. Decreases now also need
1895        // `min_window_ops` of fresh evidence before re-firing, and
1896        // increases need `window_ops` of fresh evidence. Feed enough
1897        // successes to clear the window AND accumulate evidence for
1898        // multiple increases.
1899        for _ in 0..(cfg.window_ops * 5) {
1900            l.observe(Outcome::Success, Duration::from_millis(50));
1901        }
1902        assert!(
1903            l.current() > after_stress,
1904            "expected recovery above {after_stress}, got {}",
1905            l.current(),
1906        );
1907    }
1908
1909    #[test]
1910    fn floor_holds_at_one() {
1911        let cfg = cfg_for_tests();
1912        let l = Limiter::new(2, cfg);
1913        for _ in 0..30 {
1914            l.observe(Outcome::Timeout, Duration::from_millis(50));
1915        }
1916        assert_eq!(l.current(), 1);
1917    }
1918
1919    #[test]
1920    fn application_errors_do_not_punish() {
1921        let cfg = cfg_for_tests();
1922        let l = Limiter::new(4, cfg.clone());
1923        // ApplicationError is NOT a capacity signal (per `Outcome`
1924        // docs and the reviewer's M1 finding). A wave of e.g.
1925        // `AlreadyStored` errors must not lower concurrency, because
1926        // they say nothing about the network's ability to take more
1927        // load. Specifically: the controller should HOLD at 4 because
1928        // there are zero capacity-relevant samples to act on.
1929        for _ in 0..cfg.window_ops * 5 {
1930            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
1931        }
1932        assert_eq!(
1933            l.current(),
1934            4,
1935            "ApplicationError must not move the cap; got {}",
1936            l.current()
1937        );
1938    }
1939
1940    #[test]
1941    fn latency_inflation_triggers_decrease() {
1942        let cfg = LimiterConfig {
1943            window_ops: 20,
1944            min_window_ops: 5,
1945            ..cfg_for_tests()
1946        };
1947        let l = Limiter::new(4, cfg.clone());
1948        // Establish a baseline with many fast successes.
1949        for _ in 0..cfg.window_ops {
1950            l.observe(Outcome::Success, Duration::from_millis(50));
1951        }
1952        let after_baseline = l.current();
1953        // Now flood with slow successes — same outcome, 5x latency.
1954        for _ in 0..cfg.window_ops {
1955            l.observe(Outcome::Success, Duration::from_millis(500));
1956        }
1957        // Latency inflation > 2x baseline must drop concurrency.
1958        assert!(
1959            l.current() < after_baseline,
1960            "expected decrease from {after_baseline}, got {}",
1961            l.current(),
1962        );
1963    }
1964
1965    #[test]
1966    fn warm_start_overrides_current() {
1967        let cfg = cfg_for_tests();
1968        let l = Limiter::new(2, cfg);
1969        l.warm_start(20);
1970        assert_eq!(l.current(), 20);
1971    }
1972
1973    #[test]
1974    fn warm_start_clamps() {
1975        let cfg = cfg_for_tests();
1976        let l = Limiter::new(2, cfg.clone());
1977        l.warm_start(1_000_000);
1978        assert_eq!(l.current(), cfg.max_concurrency);
1979    }
1980
1981    #[test]
1982    fn disabled_controller_holds_steady() {
1983        let cfg = LimiterConfig {
1984            enabled: false,
1985            ..cfg_for_tests()
1986        };
1987        let l = Limiter::new(8, cfg);
1988        for _ in 0..50 {
1989            l.observe(Outcome::Timeout, Duration::from_millis(50));
1990        }
1991        assert_eq!(l.current(), 8);
1992    }
1993
1994    #[test]
1995    fn controller_snapshot_round_trips() {
1996        // The test cfg has max=64 for every channel (cfg_for_tests's
1997        // max_concurrency=64 -> ChannelMax::{quote: 64, store: 64, fetch: 64}).
1998        // Pick start values <= 64 so they survive cap clamping at
1999        // construction. Pick values >= cold-defaults (32/8/4) so they
2000        // also survive the warm-start floor.
2001        let c = AdaptiveController::new(
2002            ChannelStart {
2003                quote: 64,
2004                store: 16,
2005                fetch: 64,
2006            },
2007            adaptive_cfg_for_tests(),
2008        );
2009        let snap = c.snapshot();
2010        assert_eq!(snap.quote, 64);
2011        assert_eq!(snap.store, 16);
2012        assert_eq!(snap.fetch, 64);
2013
2014        let c2 = AdaptiveController::default();
2015        c2.warm_start(snap);
2016        assert_eq!(c2.quote.current(), 64);
2017        assert_eq!(c2.store.current(), 16);
2018        assert_eq!(c2.fetch.current(), 64);
2019    }
2020
2021    #[tokio::test]
2022    async fn observe_op_records_success() {
2023        let cfg = cfg_for_tests();
2024        let l = Limiter::new(4, cfg.clone());
2025        for _ in 0..cfg.window_ops {
2026            let _: Result<(), &str> =
2027                observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2028        }
2029        // Healthy window from cold start doubles 4 -> 8.
2030        assert_eq!(l.current(), 8);
2031    }
2032
2033    #[test]
2034    fn snapshot_round_trips_through_disk() {
2035        let dir = tempfile::tempdir().unwrap();
2036        let path = dir.path().join("client_adaptive.json");
2037        let snap = ChannelStart {
2038            quote: 24,
2039            store: 6,
2040            fetch: 12,
2041        };
2042        save_snapshot(&path, snap);
2043        let loaded = load_snapshot(&path).unwrap();
2044        assert_eq!(loaded.quote, 24);
2045        assert_eq!(loaded.store, 6);
2046        assert_eq!(loaded.fetch, 12);
2047    }
2048
2049    #[test]
2050    fn load_missing_returns_none() {
2051        let dir = tempfile::tempdir().unwrap();
2052        let path = dir.path().join("does_not_exist.json");
2053        assert!(load_snapshot(&path).is_none());
2054    }
2055
2056    #[test]
2057    fn load_corrupt_returns_none() {
2058        let dir = tempfile::tempdir().unwrap();
2059        let path = dir.path().join("bad.json");
2060        std::fs::write(&path, b"not valid json{{{").unwrap();
2061        assert!(load_snapshot(&path).is_none());
2062    }
2063
2064    #[test]
2065    fn load_wrong_schema_returns_none() {
2066        let dir = tempfile::tempdir().unwrap();
2067        let path = dir.path().join("future.json");
2068        // Schema 999 is from a future build — current build must not
2069        // crash and must not act on it.
2070        let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2071        std::fs::write(&path, payload).unwrap();
2072        assert!(load_snapshot(&path).is_none());
2073    }
2074
2075    #[test]
2076    fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2077        const LEGACY_QUOTE_CAP: usize = 48;
2078        const LEGACY_STORE_CAP: usize = 24;
2079        const LEGACY_FETCH_CAP: usize = 96;
2080
2081        let dir = tempfile::tempdir().unwrap();
2082        let path = dir.path().join("legacy.json");
2083        let payload = format!(
2084            r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2085            PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2086        );
2087        std::fs::write(&path, payload).unwrap();
2088
2089        let loaded = load_snapshot(&path).unwrap();
2090
2091        assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2092        assert_eq!(loaded.store, LEGACY_STORE_CAP);
2093        assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2094    }
2095
2096    #[tokio::test]
2097    async fn observe_op_records_classified_error() {
2098        let cfg = cfg_for_tests();
2099        let l = Limiter::new(4, cfg.clone());
2100        for _ in 0..cfg.window_ops {
2101            let _: Result<(), &str> =
2102                observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2103        }
2104        assert!(l.current() < 4);
2105    }
2106
2107    // ----- Adversarial / regression-guard tests below ---------------------
2108    //
2109    // These exist primarily to prove the controller never silently regresses
2110    // upload/download throughput and never panics under hostile workloads.
2111
2112    /// Cold-start defaults for quote/store must preserve the prior static
2113    /// knobs. Fetch intentionally starts at the validated residential floor
2114    /// because the throughput hill climber now has to prove that higher
2115    /// fan-out improves goodput.
2116    #[test]
2117    fn no_regression_cold_start_at_least_static_defaults() {
2118        let s = ChannelStart::default();
2119        assert!(
2120            s.quote >= 32,
2121            "quote cold-start regressed: got {}, prior static was 32",
2122            s.quote,
2123        );
2124        assert!(
2125            s.store >= 8,
2126            "store cold-start regressed: got {}, prior static was 8",
2127            s.store,
2128        );
2129        assert_eq!(
2130            s.fetch, FETCH_COLD_START_CONCURRENCY,
2131            "fetch cold-start changed unexpectedly: got {}, expected {}",
2132            s.fetch, FETCH_COLD_START_CONCURRENCY,
2133        );
2134    }
2135
2136    /// The production `AdaptiveController::default()` (NOT the test cfg)
2137    /// must come up reporting the cold-start values immediately, with no
2138    /// observations recorded.
2139    #[test]
2140    fn controller_default_config_is_sane() {
2141        let c = AdaptiveController::default();
2142        let starts = ChannelStart::default();
2143        assert_eq!(c.quote.current(), starts.quote);
2144        assert_eq!(c.store.current(), starts.store);
2145        assert_eq!(c.fetch.current(), starts.fetch);
2146        // No observations made yet — internal windows must be empty.
2147        assert_eq!(lock(&c.quote.inner).window.len(), 0);
2148        assert_eq!(lock(&c.store.inner).window.len(), 0);
2149        assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2150    }
2151
2152    /// Mixed signals (every other op fails) must not pin the controller
2153    /// at the floor for the whole run. The cap should oscillate or settle
2154    /// somewhere above the floor — collapse to 1 forever would be a bug.
2155    #[test]
2156    fn alternating_success_failure_collapses_to_floor() {
2157        // 50% timeout rate is far above `timeout_ceiling` (0.2 in test
2158        // config), so the window is always stressed. The controller
2159        // MUST collapse to the floor, and once there must NEVER go
2160        // below it. Assert both invariants explicitly: floor reached
2161        // and floor held.
2162        let cfg = cfg_for_tests();
2163        let l = Limiter::new(8, cfg.clone());
2164        let mut min_observed = usize::MAX;
2165        let mut max_observed = 0usize;
2166        let mut floor_visits = 0usize;
2167        for i in 0..1000 {
2168            let outcome = if i % 2 == 0 {
2169                Outcome::Success
2170            } else {
2171                Outcome::Timeout
2172            };
2173            l.observe(outcome, Duration::from_millis(50));
2174            let cur = l.current();
2175            assert!(
2176                cur >= cfg.min_concurrency,
2177                "cap underflowed floor at iter {i}: got {cur}",
2178            );
2179            min_observed = min_observed.min(cur);
2180            max_observed = max_observed.max(cur);
2181            if cur == cfg.min_concurrency {
2182                floor_visits += 1;
2183            }
2184        }
2185        assert_eq!(
2186            min_observed, cfg.min_concurrency,
2187            "cap never reached the floor under 50% timeout rate"
2188        );
2189        assert!(
2190            max_observed >= 8,
2191            "cap never visited the start value: max_observed={max_observed}"
2192        );
2193        // Should spend MOST of the run at the floor — a single
2194        // healthy window is not enough to climb back from a 50% loss
2195        // environment.
2196        assert!(
2197            floor_visits > 500,
2198            "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2199        );
2200        assert_eq!(
2201            l.current(),
2202            cfg.min_concurrency,
2203            "controller did not settle at floor after 1000 alternations"
2204        );
2205    }
2206
2207    /// From the floor, a long stream of healthy successes must walk the
2208    /// cap all the way back up to `max_concurrency`. Otherwise transient
2209    /// stress on a slow link would permanently penalize throughput.
2210    #[test]
2211    fn pure_success_stream_recovers_to_max() {
2212        let cfg = cfg_for_tests();
2213        let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2214        for _ in 0..10_000 {
2215            l.observe(Outcome::Success, Duration::from_millis(5));
2216        }
2217        assert_eq!(
2218            l.current(),
2219            cfg.max_concurrency,
2220            "expected recovery to max ({}), got {}",
2221            cfg.max_concurrency,
2222            l.current(),
2223        );
2224    }
2225
2226    /// Heavy stress drives the cap to the floor; subsequent recovery
2227    /// must climb meaningfully higher than the floor with enough healthy
2228    /// evidence. No "permanent floor" failure mode allowed.
2229    #[test]
2230    fn stress_then_heal_drives_floor_then_recovery() {
2231        let cfg = cfg_for_tests();
2232        let l = Limiter::new(8, cfg.clone());
2233        for _ in 0..100 {
2234            l.observe(Outcome::Timeout, Duration::from_millis(50));
2235        }
2236        let after_stress = l.current();
2237        assert_eq!(
2238            after_stress, cfg.min_concurrency,
2239            "stress should drive cap to floor, got {after_stress}",
2240        );
2241        for _ in 0..1_000 {
2242            l.observe(Outcome::Success, Duration::from_millis(10));
2243        }
2244        let after_heal = l.current();
2245        assert!(
2246            after_heal >= cfg.min_concurrency.saturating_add(4),
2247            "expected substantial recovery from floor, got {after_heal}",
2248        );
2249    }
2250
2251    /// The latency baseline must track actual workload latency. If it
2252    /// stayed pinned at `Duration::ZERO`, every healthy sample would
2253    /// look like infinite inflation and inflate the decrease rate.
2254    #[test]
2255    fn baseline_does_not_grow_unbounded_under_slow_links() {
2256        let cfg = cfg_for_tests();
2257        let l = Limiter::new(2, cfg.clone());
2258        for _ in 0..(cfg.window_ops * 10) {
2259            l.observe(Outcome::Success, Duration::from_millis(500));
2260        }
2261        let baseline = lock(&l.inner).latency_baseline;
2262        let base = baseline.expect("baseline should be set after many healthy windows");
2263        assert!(
2264            base > Duration::ZERO,
2265            "baseline must not stay at ZERO, got {base:?}",
2266        );
2267        // Within 2x of the actual latency: 250ms..=1000ms.
2268        let lo = Duration::from_millis(250);
2269        let hi = Duration::from_millis(1000);
2270        assert!(
2271            base >= lo && base <= hi,
2272            "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2273        );
2274    }
2275
2276    /// Until the first healthy window completes, the latency baseline
2277    /// stays `None` (so no false-inflation alarms). Decreases during the
2278    /// stress phase are driven purely by success/timeout rate, not by
2279    /// inflated p95 vs a phantom zero baseline.
2280    #[test]
2281    fn baseline_initialized_only_after_first_healthy_window() {
2282        let cfg = cfg_for_tests();
2283        let l = Limiter::new(8, cfg.clone());
2284        for _ in 0..50 {
2285            l.observe(Outcome::Timeout, Duration::from_millis(50));
2286        }
2287        // Without any healthy window, baseline must still be None.
2288        assert!(
2289            lock(&l.inner).latency_baseline.is_none(),
2290            "baseline must be None before any healthy window",
2291        );
2292        // Now drain healthy windows.
2293        for _ in 0..(cfg.window_ops * 5) {
2294            l.observe(Outcome::Success, Duration::from_millis(20));
2295        }
2296        let baseline = lock(&l.inner).latency_baseline;
2297        assert!(
2298            baseline.is_some(),
2299            "baseline must be Some after healthy windows",
2300        );
2301        let base = baseline.unwrap_or_default();
2302        assert!(
2303            base > Duration::ZERO,
2304            "baseline must reflect real latency, got {base:?}",
2305        );
2306    }
2307
2308    /// A torrent of timeouts must not underflow the cap. Sample at
2309    /// several depths to catch any wraparound.
2310    #[test]
2311    fn min_concurrency_floor_holds_under_torrent_of_errors() {
2312        let cfg = cfg_for_tests();
2313        let l = Limiter::new(8, cfg.clone());
2314        for i in 0..50_000 {
2315            l.observe(Outcome::Timeout, Duration::from_millis(50));
2316            if i == 100 || i == 1_000 || i == 49_999 {
2317                let cur = l.current();
2318                assert_eq!(
2319                    cur, cfg.min_concurrency,
2320                    "floor breached at iter {i}: got {cur}",
2321                );
2322            }
2323        }
2324    }
2325
2326    /// Mirror: a torrent of successes must not exceed `max_concurrency`.
2327    #[test]
2328    fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2329        let cfg = cfg_for_tests();
2330        let start = cfg
2331            .max_concurrency
2332            .saturating_sub(1)
2333            .max(cfg.min_concurrency);
2334        let l = Limiter::new(start, cfg.clone());
2335        for i in 0..50_000 {
2336            l.observe(Outcome::Success, Duration::from_millis(5));
2337            if i == 100 || i == 1_000 || i == 49_999 {
2338                let cur = l.current();
2339                assert!(
2340                    cur <= cfg.max_concurrency,
2341                    "ceiling breached at iter {i}: got {cur} > {}",
2342                    cfg.max_concurrency,
2343                );
2344            }
2345        }
2346        assert_eq!(l.current(), cfg.max_concurrency);
2347    }
2348
2349    /// Slow-start doubles the cap; with `max_concurrency = usize::MAX/2`
2350    /// a naive `*2` would overflow. The controller must use saturating
2351    /// arithmetic and never panic. Also asserts the cap actually
2352    /// REACHED max — proving that "no panic" wasn't achieved by
2353    /// the cap getting stuck somewhere instead of growing.
2354    #[test]
2355    fn saturating_arithmetic_handles_extreme_config() {
2356        let cfg = LimiterConfig {
2357            max_concurrency: usize::MAX / 2,
2358            ..cfg_for_tests()
2359        };
2360        let start = usize::MAX / 4;
2361        let l = Limiter::new(start, cfg.clone());
2362        for _ in 0..(cfg.window_ops * 10) {
2363            l.observe(Outcome::Success, Duration::from_millis(1));
2364        }
2365        // First-iteration doubles start (which is max/4) to max/2 = ceiling.
2366        // The cap MUST have grown to the ceiling; if saturating math
2367        // were broken (panic) we'd never get here, but we'd also fail
2368        // if the cap got stuck at the start value.
2369        assert_eq!(
2370            l.current(),
2371            cfg.max_concurrency,
2372            "saturating math survived but cap did not grow to ceiling"
2373        );
2374    }
2375
2376    /// FIFO eviction: prove that a window of pure-timeout collapses
2377    /// the cap, and once enough successes flush ALL timeouts out of
2378    /// the window, the cap can rise. The earlier version of this test
2379    /// used an OR clause that made the assertion satisfiable trivially;
2380    /// this version asserts the strict invariant: after eviction, cap
2381    /// must be STRICTLY GREATER than the post-stress cap.
2382    #[test]
2383    fn window_eviction_is_fifo() {
2384        let cfg = LimiterConfig {
2385            window_ops: 10,
2386            min_window_ops: 5,
2387            success_target: 0.9,
2388            timeout_ceiling: 0.1,
2389            ..cfg_for_tests()
2390        };
2391        let l = Limiter::new(8, cfg.clone());
2392        // Fill the window with timeouts. With decrease-gating
2393        // (samples_since_decrease >= min_window_ops between halvings),
2394        // window_ops=10 + min_window_ops=5 timeouts allow at most
2395        // ~2 halvings: 8 -> 4 -> 2. Cap must DROP from 8.
2396        for _ in 0..cfg.window_ops {
2397            l.observe(Outcome::Timeout, Duration::from_millis(50));
2398        }
2399        let after_stress = l.current();
2400        assert!(
2401            after_stress < 8,
2402            "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2403        );
2404        // Push enough successes to fully evict the timeouts AND
2405        // accumulate at least one full window of fresh evidence for
2406        // an Increase. window_ops to evict + window_ops to gate first
2407        // +1 = 2 * window_ops minimum; use 3x for safety margin.
2408        for _ in 0..(cfg.window_ops * 3) {
2409            l.observe(Outcome::Success, Duration::from_millis(20));
2410        }
2411        let after_recovery = l.current();
2412        // Strict greater-than: FIFO MUST flush the timeouts so a
2413        // fresh-window Increase can fire.
2414        assert!(
2415            after_recovery > after_stress,
2416            "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2417        );
2418    }
2419
2420    /// With `enabled = false`, the controller is a no-op. Hot paths
2421    /// must see exactly `initial` at every check, no exceptions.
2422    #[test]
2423    fn disabled_controller_returns_initial_value_invariantly() {
2424        let cfg = LimiterConfig {
2425            enabled: false,
2426            ..cfg_for_tests()
2427        };
2428        let initial = 8;
2429        let l = Limiter::new(initial, cfg);
2430        for i in 0..1_000 {
2431            let outcome = match i % 4 {
2432                0 => Outcome::Success,
2433                1 => Outcome::Timeout,
2434                2 => Outcome::NetworkError,
2435                _ => Outcome::ApplicationError,
2436            };
2437            l.observe(outcome, Duration::from_millis(50));
2438            assert_eq!(
2439                l.current(),
2440                initial,
2441                "disabled controller moved at iter {i}",
2442            );
2443        }
2444    }
2445
2446    /// 100 tasks concurrently observing 100 successes each. The cap
2447    /// must remain a valid in-bounds value, no panic, no deadlock.
2448    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2449    async fn concurrent_observations_do_not_corrupt_window() {
2450        let cfg = cfg_for_tests();
2451        let l = Limiter::new(4, cfg.clone());
2452        let mut handles = Vec::with_capacity(100);
2453        for _ in 0..100 {
2454            let l_clone = l.clone();
2455            handles.push(tokio::spawn(async move {
2456                for _ in 0..100 {
2457                    l_clone.observe(Outcome::Success, Duration::from_millis(5));
2458                }
2459            }));
2460        }
2461        for h in handles {
2462            h.await.unwrap();
2463        }
2464        let cur = l.current();
2465        assert!(
2466            cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2467            "cap out of bounds after concurrent observations: {cur}",
2468        );
2469    }
2470
2471    /// Persisted higher values from a prior run must beat low cold-start
2472    /// defaults. Otherwise warm-start would silently pessimize throughput.
2473    /// (Values BELOW cold-start are floored — see
2474    /// `warm_start_floors_at_cold_defaults`.)
2475    #[test]
2476    fn persisted_snapshot_warm_starts_above_cold_floor() {
2477        let dir = tempfile::tempdir().unwrap();
2478        let path = dir.path().join("client_adaptive.json");
2479        // All snapshot values ABOVE the production cold-start defaults
2480        // so the warm_start floor doesn't kick in.
2481        let saved = ChannelStart {
2482            quote: 64,
2483            store: 32,
2484            fetch: 128,
2485        };
2486        save_snapshot(&path, saved);
2487        let loaded = load_snapshot(&path).unwrap();
2488
2489        // Build a controller with intentionally low cold-start values
2490        // — these get overridden by warm_start.
2491        let low = ChannelStart {
2492            quote: 2,
2493            store: 2,
2494            fetch: 2,
2495        };
2496        let c = AdaptiveController::new(low, AdaptiveConfig::default());
2497        c.warm_start(loaded);
2498        assert_eq!(c.quote.current(), 64);
2499        assert_eq!(c.store.current(), 32);
2500        assert_eq!(c.fetch.current(), 128);
2501    }
2502
2503    /// Two threads racing on `save_snapshot` must never produce a
2504    /// half-written file. Atomic-rename guarantees we either see the
2505    /// old content or the new content, never a torn write.
2506    #[test]
2507    fn save_load_round_trip_with_concurrent_writes() {
2508        use std::thread;
2509        let dir = tempfile::tempdir().unwrap();
2510        let path = dir.path().join("client_adaptive.json");
2511        let path_a = path.clone();
2512        let path_b = path.clone();
2513        let snap_a = ChannelStart {
2514            quote: 10,
2515            store: 10,
2516            fetch: 10,
2517        };
2518        let snap_b = ChannelStart {
2519            quote: 99,
2520            store: 99,
2521            fetch: 99,
2522        };
2523        let h_a = thread::spawn(move || {
2524            for _ in 0..50 {
2525                save_snapshot(&path_a, snap_a);
2526            }
2527        });
2528        let h_b = thread::spawn(move || {
2529            for _ in 0..50 {
2530                save_snapshot(&path_b, snap_b);
2531            }
2532        });
2533        h_a.join().unwrap();
2534        h_b.join().unwrap();
2535        let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2536        let valid = (loaded.quote == snap_a.quote
2537            && loaded.store == snap_a.store
2538            && loaded.fetch == snap_a.fetch)
2539            || (loaded.quote == snap_b.quote
2540                && loaded.store == snap_b.store
2541                && loaded.fetch == snap_b.fetch);
2542        assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2543    }
2544
2545    /// `save_snapshot` to an unwritable / impossible path must be a
2546    /// quiet no-op: best-effort, no panic, no error propagation.
2547    #[test]
2548    fn save_snapshot_to_unwritable_dir_does_not_panic() {
2549        // A path under a non-existent absolute root that the process
2550        // also cannot create. On macOS/Linux a write under "/" requires
2551        // root; create_dir_all will fail on this path.
2552        let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2553        let snap = ChannelStart {
2554            quote: 1,
2555            store: 1,
2556            fetch: 1,
2557        };
2558        // No panic = pass. Function returns unit, errors are logged.
2559        save_snapshot(&path, snap);
2560        // File should not exist.
2561        assert!(!path.exists());
2562    }
2563
2564    /// A truncated/partial JSON file must not crash the loader; it must
2565    /// return None so the controller falls back to cold defaults.
2566    #[test]
2567    fn load_snapshot_from_truncated_file_returns_none() {
2568        let dir = tempfile::tempdir().unwrap();
2569        let path = dir.path().join("truncated.json");
2570        std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2571        assert!(load_snapshot(&path).is_none());
2572    }
2573
2574    /// Microbench: 100k observe+current pairs must complete in well
2575    /// under 100ms. Catches any accidental quadratic behaviour or
2576    /// massive lock contention introduced by future changes.
2577    #[test]
2578    fn controller_perf_overhead_is_bounded() {
2579        let cfg = cfg_for_tests();
2580        let l = Limiter::new(8, cfg);
2581        let started = Instant::now();
2582        for _ in 0..100_000 {
2583            let _ = l.current();
2584            l.observe(Outcome::Success, Duration::from_micros(1));
2585        }
2586        let elapsed = started.elapsed();
2587        // 1µs per pair on a modern machine is generous; allow 500ms to
2588        // tolerate slow CI runners while still catching real regressions.
2589        assert!(
2590            elapsed < Duration::from_millis(500),
2591            "100k observe+current pairs took {elapsed:?}, expected <500ms",
2592        );
2593    }
2594
2595    // ---- Regression tests for adversarial-review findings ----
2596
2597    /// M10 fix: hand-edited or future-schema configs may plant `NaN`
2598    /// or out-of-range values into the float fields. Constructing a
2599    /// controller and feeding observations must not panic.
2600    /// `Duration::from_secs_f64(NaN)` panics per std docs, so without
2601    /// `sanitize()` and the EWMA NaN guard this would crash.
2602    #[test]
2603    fn nan_and_out_of_range_config_does_not_panic() {
2604        let cfg = AdaptiveConfig {
2605            enabled: true,
2606            min_concurrency: 0, // sub-floor; sanitize raises to 1
2607            max: ChannelMax {
2608                quote: 0, // sub-min; sanitize raises to min
2609                store: 0,
2610                fetch: 0,
2611            },
2612            window_ops: 10,
2613            min_window_ops: 50, // > window_ops; sanitize clamps
2614            success_target: f64::NAN,
2615            timeout_ceiling: f64::INFINITY,
2616            latency_inflation_factor: f64::NEG_INFINITY,
2617            latency_ewma_alpha: f64::NAN,
2618        };
2619        let c = AdaptiveController::new(ChannelStart::default(), cfg);
2620        // Verify sanitize() ACTUALLY corrected the values (not just
2621        // that no panic occurred). Reading c.config back proves the
2622        // sanitization landed.
2623        let post = &c.config;
2624        assert_eq!(
2625            post.min_concurrency, 1,
2626            "sanitize did not raise min_concurrency from 0"
2627        );
2628        assert!(
2629            post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2630            "sanitize did not clamp success_target from NaN: {}",
2631            post.success_target
2632        );
2633        assert!(
2634            post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2635            "sanitize did not clamp timeout_ceiling from Inf: {}",
2636            post.timeout_ceiling
2637        );
2638        assert!(
2639            post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2640            "sanitize did not fix latency_inflation_factor from -Inf: {}",
2641            post.latency_inflation_factor
2642        );
2643        assert!(
2644            post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2645            "sanitize did not fix latency_ewma_alpha from NaN: {}",
2646            post.latency_ewma_alpha
2647        );
2648        assert!(
2649            post.min_window_ops <= post.window_ops,
2650            "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2651            post.min_window_ops,
2652            post.window_ops
2653        );
2654        assert!(
2655            post.max.quote >= post.min_concurrency,
2656            "max.quote below min_concurrency"
2657        );
2658        // Now exercise the runtime under hostile latencies — must
2659        // not panic.
2660        for _ in 0..200 {
2661            c.store
2662                .observe(Outcome::Success, Duration::from_secs(99_999));
2663            c.store.observe(Outcome::Timeout, Duration::ZERO);
2664        }
2665        let cur = c.store.current();
2666        assert!(cur >= 1, "cap below floor: {cur}");
2667    }
2668
2669    /// M3+M6 fix: a burst of N concurrent in-flight chunks all
2670    /// observing stress at almost the same time used to pile-drive
2671    /// the cap from N to 1 in N back-to-back ticks. After the fix,
2672    /// decreases require `min_window_ops` of FRESH evidence between
2673    /// successive Decreases, so a single transient burst can drop
2674    /// the cap by at most one halving.
2675    #[test]
2676    fn transient_burst_does_not_pile_drive_to_floor() {
2677        let cfg = LimiterConfig {
2678            window_ops: 32,
2679            min_window_ops: 8,
2680            success_target: 0.95,
2681            timeout_ceiling: 0.10,
2682            ..cfg_for_tests()
2683        };
2684        let l = Limiter::new(32, cfg);
2685        // Simulate 8 concurrent ops all completing as Timeout in a
2686        // back-to-back burst (the kind of event that previously
2687        // floor-slammed the cap).
2688        for _ in 0..8 {
2689            l.observe(Outcome::Timeout, Duration::from_millis(10));
2690        }
2691        // After one burst, cap should have decreased AT MOST once
2692        // (32 -> 16). Pile-driving would land at 1 or 2.
2693        let after_burst = l.current();
2694        assert!(
2695            after_burst >= 16,
2696            "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2697        );
2698    }
2699
2700    /// M2 fix: classifier must map transport-related errors to
2701    /// `NetworkError`, not `ApplicationError`. Test EACH transport
2702    /// variant separately so a regression in any one variant is
2703    /// caught by name.
2704    #[tokio::test]
2705    async fn transport_errors_classify_as_capacity_signal() {
2706        use crate::data::client::classify_error;
2707        use crate::data::error::Error;
2708        let make_cfg = || LimiterConfig {
2709            window_ops: 16,
2710            min_window_ops: 5,
2711            success_target: 0.5,
2712            timeout_ceiling: 0.5,
2713            ..cfg_for_tests()
2714        };
2715        // Cases: (variant_name, error_factory)
2716        type ErrFactory = Box<dyn Fn() -> Error>;
2717        let cases: Vec<(&str, ErrFactory)> = vec![
2718            ("Network", Box::new(|| Error::Network("net".to_string()))),
2719            (
2720                "InsufficientPeers",
2721                Box::new(|| Error::InsufficientPeers("ip".to_string())),
2722            ),
2723            ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2724            ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2725            ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2726            (
2727                "PartialUpload",
2728                Box::new(|| Error::PartialUpload {
2729                    stored: vec![],
2730                    stored_count: 0,
2731                    failed: vec![],
2732                    failed_count: 0,
2733                    total_chunks: 0,
2734                    reason: "r".to_string(),
2735                }),
2736            ),
2737        ];
2738        for (name, mk) in &cases {
2739            let l = Limiter::new(8, make_cfg());
2740            for _ in 0..16 {
2741                let _: std::result::Result<(), Error> =
2742                    observe_op(&l, || async { Err(mk()) }, classify_error).await;
2743            }
2744            // Each variant alone must drive the cap STRICTLY below
2745            // the start (8 -> 4 via one halving). If a variant maps
2746            // to ApplicationError, cap stays at 8.
2747            let cur = l.current();
2748            assert!(
2749                cur < 8,
2750                "{name} not classified as capacity signal: cap stayed at {cur}",
2751            );
2752        }
2753    }
2754
2755    /// C4 fix: per-channel max ceilings. Confirm that a `LimiterConfig`
2756    /// with a constrained `max_concurrency` does not bleed into other
2757    /// channels. The ceilings are independent.
2758    #[test]
2759    fn per_channel_ceilings_are_independent() {
2760        let cfg = AdaptiveConfig {
2761            max: ChannelMax {
2762                quote: 4,    // tightly capped
2763                store: 8,    // moderate
2764                fetch: 1024, // very high
2765            },
2766            ..AdaptiveConfig::default()
2767        };
2768        let c = AdaptiveController::new(
2769            ChannelStart {
2770                quote: 4,
2771                store: 8,
2772                fetch: 64,
2773            },
2774            cfg,
2775        );
2776        // Feed 1000 successes to each channel; each must respect its
2777        // own ceiling and never one another's.
2778        for _ in 0..1000 {
2779            c.quote.observe(Outcome::Success, Duration::from_micros(10));
2780            c.store.observe(Outcome::Success, Duration::from_micros(10));
2781            c.fetch.observe(Outcome::Success, Duration::from_micros(10));
2782        }
2783        assert_eq!(c.quote.current(), 4, "quote should cap at 4");
2784        assert_eq!(c.store.current(), 8, "store should cap at 8");
2785        // Fetch uses the hill climber now, so it should not blindly jump to
2786        // its max on success-only samples. It still must prove the fetch
2787        // ceiling is independent by climbing above the quote/store caps.
2788        assert!(
2789            c.fetch.current() > 8 && c.fetch.current() <= 1024,
2790            "fetch did not use its independent ceiling; got {}",
2791            c.fetch.current()
2792        );
2793    }
2794
2795    #[test]
2796    fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
2797        let cfg = hill_cfg_for_tests();
2798        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2799
2800        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2801        assert_eq!(
2802            l.current(),
2803            HILL_TEST_UP_PROBE_CAP,
2804            "first healthy epoch should probe upward"
2805        );
2806
2807        observe_hill_success_epoch_with_latency(
2808            &l,
2809            &cfg,
2810            HILL_TEST_CHUNK_BYTES,
2811            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2812        );
2813        assert_eq!(
2814            l.current(),
2815            HILL_TEST_START_CAP,
2816            "slower higher-cap wave should reject the upward probe"
2817        );
2818        assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
2819    }
2820
2821    #[test]
2822    fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
2823        let cfg = hill_cfg_for_tests();
2824        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2825
2826        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2827        assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
2828
2829        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2830        assert_eq!(
2831            l.snapshot(),
2832            HILL_TEST_UP_PROBE_CAP,
2833            "same-size chunks at same latency should promote the higher cap"
2834        );
2835        assert_eq!(
2836            l.current(),
2837            HILL_TEST_NEXT_UP_PROBE_CAP,
2838            "after accepting an upward probe, hill climber should probe higher"
2839        );
2840    }
2841
2842    #[test]
2843    fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
2844        let cfg = hill_cfg_for_tests();
2845        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2846
2847        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2848        observe_hill_success_epoch_with_latency(
2849            &l,
2850            &cfg,
2851            HILL_TEST_CHUNK_BYTES,
2852            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2853        );
2854        assert_eq!(l.current(), HILL_TEST_START_CAP);
2855
2856        for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
2857            observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2858        }
2859        assert_eq!(
2860            l.current(),
2861            HILL_TEST_DOWN_PROBE_CAP,
2862            "stable best should eventually probe a lower cap"
2863        );
2864
2865        observe_hill_success_epoch_with_latency(
2866            &l,
2867            &cfg,
2868            HILL_TEST_CHUNK_BYTES,
2869            Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
2870        );
2871        assert_eq!(
2872            l.snapshot(),
2873            HILL_TEST_DOWN_PROBE_CAP,
2874            "retained goodput at lower concurrency should become the new best"
2875        );
2876    }
2877
2878    #[tokio::test]
2879    async fn fetch_hill_accepts_constant_size_upward_probe_from_timed_ops() {
2880        let cfg = hill_cfg_for_tests();
2881        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2882        let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
2883            + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
2884        let limiter_for_ops = l.clone();
2885
2886        let result: std::result::Result<Vec<()>, ()> =
2887            rebucketed_unordered(&l, 0..total_ops, move |_| {
2888                let limiter = limiter_for_ops.clone();
2889                async move {
2890                    observe_op_with_success_bytes(
2891                        &limiter,
2892                        || async {
2893                            tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
2894                                .await;
2895                            Ok::<(), ()>(())
2896                        },
2897                        |_| Outcome::NetworkError,
2898                        |_| HILL_TEST_CHUNK_BYTES,
2899                    )
2900                    .await
2901                }
2902            })
2903            .await;
2904        result.unwrap();
2905
2906        assert_eq!(
2907            l.snapshot(),
2908            HILL_TEST_UP_PROBE_CAP,
2909            "timed constant-size chunks should prove the higher cap improves goodput"
2910        );
2911        assert_eq!(
2912            l.current(),
2913            HILL_TEST_NEXT_UP_PROBE_CAP,
2914            "accepted upward probe should immediately test the next cap"
2915        );
2916    }
2917
2918    #[test]
2919    fn fetch_hill_stress_cuts_before_full_epoch() {
2920        let cfg = LimiterConfig {
2921            window_ops: 8,
2922            min_window_ops: 4,
2923            ..hill_cfg_for_tests()
2924        };
2925        let l = fetch_hill_for_tests(16, cfg.clone());
2926
2927        for _ in 0..cfg.min_window_ops {
2928            l.observe(Outcome::Timeout, Duration::from_millis(10));
2929        }
2930
2931        assert_eq!(
2932            l.current(),
2933            8,
2934            "fetch hill climber should halve on early stress"
2935        );
2936    }
2937
2938    /// Quote/store cold-starts preserve prior static defaults. Fetch starts
2939    /// at the new conservative hill-climb default to avoid download
2940    /// overshoot on fresh installs.
2941    #[test]
2942    fn cold_start_at_least_prior_static_defaults() {
2943        let cs = ChannelStart::default();
2944        assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
2945        assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
2946        assert_eq!(
2947            cs.fetch, FETCH_COLD_START_CONCURRENCY,
2948            "fetch cold-start changed unexpectedly"
2949        );
2950    }
2951
2952    /// Reviewer N-M5 guard: with the new gated-decrease semantics
2953    /// (decreases require `min_window_ops` of fresh evidence), the
2954    /// controller must STILL reach the floor under sustained stress
2955    /// within a bounded number of observations. Otherwise we've made
2956    /// the controller too sluggish to react to a real network
2957    /// outage.
2958    ///
2959    /// From start = 64 with `min_window_ops = 8`, reaching floor 1
2960    /// takes log2(64) = 6 halvings, each gated on 8 fresh samples,
2961    /// so the upper bound is roughly `6 * 8 + min_window_ops = ~56`
2962    /// observations. We allow 200 to absorb the warm-up window and
2963    /// any per-sample evaluation slack.
2964    #[test]
2965    fn sustained_stress_reaches_floor_within_bounded_ops() {
2966        let cfg = LimiterConfig {
2967            window_ops: 32,
2968            min_window_ops: 8,
2969            success_target: 0.95,
2970            timeout_ceiling: 0.10,
2971            max_concurrency: 64,
2972            ..cfg_for_tests()
2973        };
2974        let l = Limiter::new(64, cfg);
2975        let mut ops = 0usize;
2976        while l.current() > 1 && ops < 200 {
2977            l.observe(Outcome::Timeout, Duration::from_millis(10));
2978            ops += 1;
2979        }
2980        assert_eq!(
2981            l.current(),
2982            1,
2983            "controller did not reach floor within 200 observations under \
2984             sustained timeout stress; took {ops} ops, ended at cap {}",
2985            l.current()
2986        );
2987    }
2988
2989    /// The default `AdaptiveController` (production defaults) starts
2990    /// each channel at the documented cold-start value, with each
2991    /// per-channel max strictly above the start (so the controller
2992    /// has room to grow).
2993    #[test]
2994    fn default_controller_has_growth_headroom() {
2995        let c = AdaptiveController::default();
2996        let cs = ChannelStart::default();
2997        let max = ChannelMax::default();
2998        assert_eq!(c.quote.current(), cs.quote);
2999        assert_eq!(c.store.current(), cs.store);
3000        assert_eq!(c.fetch.current(), cs.fetch);
3001        assert!(
3002            max.quote > cs.quote,
3003            "no growth headroom for quote: max={} start={}",
3004            max.quote,
3005            cs.quote
3006        );
3007        assert!(
3008            max.store > cs.store,
3009            "no growth headroom for store: max={} start={}",
3010            max.store,
3011            cs.store
3012        );
3013        assert!(
3014            max.fetch > cs.fetch,
3015            "no growth headroom for fetch: max={} start={}",
3016            max.fetch,
3017            cs.fetch
3018        );
3019    }
3020
3021    // ---- Codex review (round 3) regression tests ----
3022
3023    /// Codex CRITICAL: warm_start was blindly restoring caps below the
3024    /// cold-start floor. A prior bad run that drove store=1 would
3025    /// pessimize every subsequent run forever. The fix floors warm
3026    /// values at `ChannelStart::default()` per channel.
3027    #[test]
3028    fn warm_start_floors_at_cold_defaults() {
3029        let c = AdaptiveController::default();
3030        let cold = ChannelStart::default();
3031        // Snapshot from a "bad prior run" — every channel pinned to 1.
3032        let bad_snap = ChannelStart {
3033            quote: 1,
3034            store: 1,
3035            fetch: 1,
3036        };
3037        c.warm_start(bad_snap);
3038        // After warm_start, each channel should be AT LEAST the
3039        // cold-start value, not the persisted 1.
3040        assert_eq!(
3041            c.quote.current(),
3042            cold.quote,
3043            "quote warm_start did not floor at cold default"
3044        );
3045        assert_eq!(
3046            c.store.current(),
3047            cold.store,
3048            "store warm_start did not floor at cold default"
3049        );
3050        assert_eq!(
3051            c.fetch.current(),
3052            cold.fetch,
3053            "fetch warm_start did not floor at cold default"
3054        );
3055    }
3056
3057    /// Warm values ABOVE the cold-start floor must still be honored —
3058    /// the floor is a one-sided lower bound, not a clamp.
3059    #[test]
3060    fn warm_start_honors_values_above_cold_floor() {
3061        let c = AdaptiveController::default();
3062        let cold = ChannelStart::default();
3063        let snap = ChannelStart {
3064            quote: cold.quote * 2,
3065            store: cold.store * 4,
3066            fetch: cold.fetch * 2,
3067        };
3068        c.warm_start(snap);
3069        assert_eq!(c.quote.current(), snap.quote);
3070        assert_eq!(c.store.current(), snap.store);
3071        assert_eq!(c.fetch.current(), snap.fetch);
3072    }
3073
3074    /// Codex MAJOR: long pipelines used to capture the cap once via
3075    /// `buffer_unordered(N)`. `rebucketed` re-reads the cap at each
3076    /// batch boundary so adaptive growth/decay actually takes effect
3077    /// mid-stream. Test: fire 200 items at start cap=4, then halfway
3078    /// through bump the cap manually via warm_start to 16, and assert
3079    /// the LATER batches see the new cap.
3080    #[tokio::test]
3081    async fn rebucketed_picks_up_cap_changes_mid_stream() {
3082        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3083        use std::sync::Arc as StdArc;
3084        let cfg = LimiterConfig {
3085            min_concurrency: 1,
3086            max_concurrency: 32,
3087            ..cfg_for_tests()
3088        };
3089        let l = Limiter::new(4, cfg);
3090        let max_seen = StdArc::new(AtomicUsize::new(0));
3091        let in_flight = StdArc::new(AtomicUsize::new(0));
3092        let processed = StdArc::new(AtomicUsize::new(0));
3093        let l_for_bump = l.clone();
3094        let processed_for_bump = processed.clone();
3095        // Spawn a watcher that bumps the cap once enough items have
3096        // started to "warm up".
3097        let bump_handle = tokio::spawn(async move {
3098            loop {
3099                tokio::time::sleep(Duration::from_millis(2)).await;
3100                if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3101                    l_for_bump.warm_start(16);
3102                    return;
3103                }
3104            }
3105        });
3106        let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3107            let max_seen = max_seen.clone();
3108            let in_flight = in_flight.clone();
3109            let processed = processed.clone();
3110            async move {
3111                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3112                max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3113                tokio::time::sleep(Duration::from_millis(1)).await;
3114                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3115                processed.fetch_add(1, AtomicOrdering::Relaxed);
3116                Ok::<(), &'static str>(())
3117            }
3118        })
3119        .await
3120        .unwrap();
3121        bump_handle.await.unwrap();
3122        // The cap was bumped to 16 mid-stream. If rebucketing actually
3123        // picks up cap changes, max_seen should reach above the
3124        // initial 4.
3125        let peak = max_seen.load(AtomicOrdering::Relaxed);
3126        assert!(
3127            peak > 4,
3128            "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3129        );
3130    }
3131
3132    /// Codex MAJOR: `observe_op` cancellation safety. If the wrapper
3133    /// future is dropped before the inner op completes, no outcome is
3134    /// recorded (intentional — dropped work was never observed by
3135    /// the network). This test asserts the contract: completed ops
3136    /// land observations, dropped ops do not corrupt the window.
3137    /// Two-sided: confirms cancellation is a NO-OP, AND confirms
3138    /// post-cancellation observations DO land normally (proving the
3139    /// limiter's internal state was not corrupted).
3140    #[tokio::test]
3141    async fn observe_op_cancellation_drops_silently() {
3142        let cfg = LimiterConfig {
3143            window_ops: 16,
3144            min_window_ops: 4,
3145            ..cfg_for_tests()
3146        };
3147        let l = Limiter::new(4, cfg);
3148        // Build a future that never completes, then drop it before
3149        // awaiting. observe_op must not panic on drop and must not
3150        // record an outcome.
3151        let l_clone = l.clone();
3152        let fut = observe_op(
3153            &l_clone,
3154            || async {
3155                std::future::pending::<()>().await;
3156                Ok::<(), &'static str>(())
3157            },
3158            |_| Outcome::Timeout,
3159        );
3160        drop(fut);
3161        // Cap unchanged: no observation was recorded.
3162        assert_eq!(l.current(), 4, "cancelled op moved the cap");
3163        // Now feed observations that ACTUALLY count as Success (the
3164        // Ok branch of observe_op is always Outcome::Success — the
3165        // classifier only runs on Err). Cold-start at 4 + a full
3166        // window of healthy successes = double to 8.
3167        for _ in 0..16 {
3168            let _: Result<(), &'static str> = observe_op(
3169                &l,
3170                || async { Ok(()) },
3171                // classifier only fires on Err; Ok is always Success
3172                |_| Outcome::NetworkError,
3173            )
3174            .await;
3175        }
3176        // STRICT: cap must have GROWN, not just held. If cancellation
3177        // had corrupted internal counters, slow-start might be stuck.
3178        assert!(
3179            l.current() > 4,
3180            "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3181            l.current(),
3182        );
3183    }
3184
3185    /// Codex MAJOR: Drop persistence must be reliable. The CLI relies
3186    /// on Client::drop firing a synchronous save. If save_snapshot
3187    /// were dispatched via fire-and-forget spawn_blocking, runtime
3188    /// teardown would silently lose the snapshot. This test asserts
3189    /// that calling save_snapshot synchronously from a normal context
3190    /// (not Drop, but the same code path) actually writes.
3191    #[test]
3192    fn save_snapshot_is_synchronous_and_durable() {
3193        let dir = tempfile::tempdir().unwrap();
3194        let path = dir.path().join("client_adaptive.json");
3195        let snap = ChannelStart {
3196            quote: 100,
3197            store: 50,
3198            fetch: 200,
3199        };
3200        save_snapshot(&path, snap);
3201        // The file must exist immediately after save_snapshot returns.
3202        // No async waiting, no spawn_blocking, no eventual consistency.
3203        assert!(
3204            path.exists(),
3205            "save_snapshot did not write file synchronously"
3206        );
3207        let loaded = load_snapshot(&path).unwrap();
3208        assert_eq!(loaded.quote, 100);
3209        assert_eq!(loaded.store, 50);
3210        assert_eq!(loaded.fetch, 200);
3211    }
3212
3213    // ---- Codex round 4 regression tests ----
3214
3215    /// Codex CR-2 fix: warm_start marks the limiter as having
3216    /// already-left-slow-start, so a single healthy window does NOT
3217    /// double the cap (which would be over-aggressive resume from a
3218    /// learned value).
3219    #[tokio::test]
3220    async fn warm_start_disables_slow_start_doubling() {
3221        let cfg = LimiterConfig {
3222            window_ops: 8,
3223            min_window_ops: 4,
3224            success_target: 0.9,
3225            ..cfg_for_tests()
3226        };
3227        let l = Limiter::new(2, cfg.clone());
3228        // Warm-start to a learned value of 16. This must not be
3229        // treated as a fresh slow-start.
3230        l.warm_start(16);
3231        assert_eq!(l.current(), 16);
3232        // One full healthy window: in slow-start would double to 32;
3233        // post-warm-start it should add +1 to 17.
3234        for _ in 0..cfg.window_ops {
3235            l.observe(Outcome::Success, Duration::from_millis(10));
3236        }
3237        assert_eq!(
3238            l.current(),
3239            17,
3240            "warm-start triggered slow-start doubling instead of additive +1"
3241        );
3242    }
3243
3244    /// Codex CR-3 fix: warm_start floors against the per-instance
3245    /// cold-start, NOT the global ChannelStart::default. A controller
3246    /// built with custom low starts must stay faithful to its
3247    /// construction parameters even after warm_start.
3248    #[test]
3249    fn controller_warm_start_floors_at_per_instance_cold_start() {
3250        let custom_cold = ChannelStart {
3251            quote: 2,
3252            store: 1,
3253            fetch: 4,
3254        };
3255        let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3256        // Snapshot below the per-instance cold-start floors at custom values.
3257        c.warm_start(ChannelStart {
3258            quote: 1,
3259            store: 1,
3260            fetch: 1,
3261        });
3262        assert_eq!(c.quote.current(), 2);
3263        assert_eq!(c.store.current(), 1);
3264        assert_eq!(c.fetch.current(), 4);
3265        // Snapshot above the per-instance cold-start uses the snapshot.
3266        c.warm_start(ChannelStart {
3267            quote: 10,
3268            store: 10,
3269            fetch: 10,
3270        });
3271        assert_eq!(c.quote.current(), 10);
3272        assert_eq!(c.store.current(), 10);
3273        assert_eq!(c.fetch.current(), 10);
3274    }
3275
3276    /// Codex CR-3 fix: when adaptive.enabled = false, warm_start is
3277    /// a no-op — fixed-concurrency mode means the user wants exactly
3278    /// the cold start, not a learned value from a prior run.
3279    #[test]
3280    fn warm_start_is_noop_when_adaptive_disabled() {
3281        let cfg = AdaptiveConfig {
3282            enabled: false,
3283            ..AdaptiveConfig::default()
3284        };
3285        let custom_cold = ChannelStart {
3286            quote: 5,
3287            store: 5,
3288            fetch: 5,
3289        };
3290        let c = AdaptiveController::new(custom_cold, cfg);
3291        c.warm_start(ChannelStart {
3292            quote: 100,
3293            store: 100,
3294            fetch: 100,
3295        });
3296        assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3297        assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3298        assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3299    }
3300
3301    /// Codex CR-4 fix: rebucketed_unordered is rolling, not batch-fenced.
3302    /// One slow item must NOT block other items in the same logical
3303    /// "wave" — the in-flight set should refill as fast items complete.
3304    #[tokio::test]
3305    async fn rebucketed_unordered_is_rolling_not_fenced() {
3306        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3307        use std::sync::Arc as StdArc;
3308        let cfg = LimiterConfig {
3309            min_concurrency: 1,
3310            max_concurrency: 8,
3311            window_ops: 100,
3312            min_window_ops: 50,
3313            ..cfg_for_tests()
3314        };
3315        let l = Limiter::new(4, cfg);
3316        let in_flight = StdArc::new(AtomicUsize::new(0));
3317        let max_in_flight = StdArc::new(AtomicUsize::new(0));
3318        let started = StdArc::new(AtomicUsize::new(0));
3319        let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3320            let in_flight = in_flight.clone();
3321            let max_in_flight = max_in_flight.clone();
3322            let started = started.clone();
3323            async move {
3324                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3325                max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3326                started.fetch_add(1, AtomicOrdering::Relaxed);
3327                // Item 0 is intentionally slow; items 1..20 are fast.
3328                // In a batch-fenced scheduler, item 0 would gate the
3329                // start of items in the next batch. In a rolling
3330                // scheduler, items 1..N can start as soon as their
3331                // slot frees from a fast completion.
3332                if i == 0 {
3333                    tokio::time::sleep(Duration::from_millis(50)).await;
3334                } else {
3335                    tokio::time::sleep(Duration::from_millis(1)).await;
3336                }
3337                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3338                Ok::<(), &'static str>(())
3339            }
3340        })
3341        .await
3342        .unwrap();
3343        // All 20 items must have started; in a rolling scheduler the
3344        // peak in-flight should reach at least 4 (the cap).
3345        assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3346        let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3347        assert!(
3348            peak >= 4,
3349            "rolling scheduler did not fill cap; peak in-flight = {peak}"
3350        );
3351    }
3352
3353    /// Codex CR-4 fix: rebucketed_ordered preserves input order.
3354    #[tokio::test]
3355    async fn rebucketed_ordered_preserves_input_order() {
3356        let cfg = LimiterConfig {
3357            min_concurrency: 1,
3358            max_concurrency: 4,
3359            ..cfg_for_tests()
3360        };
3361        let l = Limiter::new(4, cfg);
3362        let items: Vec<usize> = (0..50).collect();
3363        let result: Vec<usize> = rebucketed_ordered(
3364            &l,
3365            items.iter().copied().enumerate(),
3366            |(idx, v)| async move {
3367                // Reverse-bias delay so out-of-order completion is likely.
3368                let delay = (50 - v) as u64;
3369                tokio::time::sleep(Duration::from_micros(delay)).await;
3370                Ok::<_, &'static str>((idx, v * 10))
3371            },
3372        )
3373        .await
3374        .unwrap();
3375        assert_eq!(result.len(), 50);
3376        for (i, v) in result.iter().enumerate() {
3377            assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3378        }
3379    }
3380
3381    /// Codex CR-1 regression guard (logical, not the actual data path):
3382    /// rebucketed_ordered with a payload of (idx, hash) must always
3383    /// pair the right hash with the right chunk content even under
3384    /// adversarial out-of-order completion.
3385    #[tokio::test]
3386    async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3387        let cfg = LimiterConfig {
3388            min_concurrency: 1,
3389            max_concurrency: 8,
3390            ..cfg_for_tests()
3391        };
3392        let l = Limiter::new(8, cfg);
3393        // Each item is (idx, fake_hash). The "fetch" returns
3394        // (idx, content_for_hash). We adversarially out-of-order them
3395        // and assert that the post-sort puts content with the right
3396        // index.
3397        let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3398        let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3399            let delay = (40 - idx) as u64; // reverse delay
3400            tokio::time::sleep(Duration::from_micros(delay)).await;
3401            // "content_for_hash" derived from the hash.
3402            Ok::<_, &'static str>((idx, hash * 7))
3403        })
3404        .await
3405        .unwrap();
3406        for (i, v) in result.iter().enumerate() {
3407            let expected = (1000 + i as u64) * 7;
3408            assert_eq!(
3409                *v, expected,
3410                "idx {i} paired with wrong content: {v}, expected {expected}"
3411            );
3412        }
3413    }
3414
3415    /// Codex CR-5 fix: snapshot temp file is unique per save call,
3416    /// not just per-PID. Two save_snapshot calls in the SAME process
3417    /// must not collide on the temp file.
3418    #[test]
3419    fn save_snapshot_temp_file_is_unique_per_call() {
3420        let dir = tempfile::tempdir().unwrap();
3421        let path = dir.path().join("client_adaptive.json");
3422        // Fire many saves back-to-back in the same process. Without
3423        // a per-call unique suffix, the temp file would be the same
3424        // for every call (PID is constant), and any partial write +
3425        // crash window would expose the race. We can't simulate the
3426        // exact race in a unit test, but we can confirm no panic and
3427        // the final file is correct after many calls.
3428        for i in 0..100 {
3429            save_snapshot(
3430                &path,
3431                ChannelStart {
3432                    quote: i + 1,
3433                    store: i + 1,
3434                    fetch: i + 1,
3435                },
3436            );
3437        }
3438        let loaded = load_snapshot(&path).unwrap();
3439        assert_eq!(loaded.quote, 100);
3440        assert_eq!(loaded.store, 100);
3441        assert_eq!(loaded.fetch, 100);
3442        // Confirm no leftover .tmp files.
3443        let leftover: Vec<_> = std::fs::read_dir(dir.path())
3444            .unwrap()
3445            .filter_map(|e| e.ok())
3446            .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3447            .collect();
3448        assert!(
3449            leftover.is_empty(),
3450            "temp files leaked: {:?}",
3451            leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3452        );
3453    }
3454
3455    // ---- Edge case tests ----
3456
3457    /// Edge case: rebucketed_unordered with EMPTY input returns empty
3458    /// Vec immediately, no panic, no work scheduled.
3459    #[tokio::test]
3460    async fn rebucketed_empty_input_returns_empty() {
3461        let cfg = cfg_for_tests();
3462        let l = Limiter::new(4, cfg);
3463        let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3464            Ok::<_, &'static str>(42usize)
3465        })
3466        .await
3467        .unwrap();
3468        assert!(v.is_empty());
3469        let v: Vec<usize> = rebucketed_ordered(
3470            &l,
3471            std::iter::empty::<(usize, ())>(),
3472            |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3473        )
3474        .await
3475        .unwrap();
3476        assert!(v.is_empty());
3477    }
3478
3479    /// Edge case: rebucketed_unordered with EXACTLY cap items.
3480    #[tokio::test]
3481    async fn rebucketed_exactly_cap_items() {
3482        let cfg = LimiterConfig {
3483            min_concurrency: 1,
3484            max_concurrency: 4,
3485            ..cfg_for_tests()
3486        };
3487        let l = Limiter::new(4, cfg);
3488        let v: Vec<usize> =
3489            rebucketed_unordered(
3490                &l,
3491                0..4usize,
3492                |i| async move { Ok::<_, &'static str>(i * 2) },
3493            )
3494            .await
3495            .unwrap();
3496        assert_eq!(v.len(), 4);
3497    }
3498
3499    /// Edge case: rebucketed_unordered preserves the FIRST error and
3500    /// discards subsequent ones, draining in-flight work first.
3501    #[tokio::test]
3502    async fn rebucketed_preserves_first_error() {
3503        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3504        use std::sync::Arc as StdArc;
3505        let cfg = LimiterConfig {
3506            min_concurrency: 1,
3507            max_concurrency: 4,
3508            ..cfg_for_tests()
3509        };
3510        let l = Limiter::new(4, cfg);
3511        let started = StdArc::new(AtomicUsize::new(0));
3512        let started_clone = started.clone();
3513        let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3514            let started = started_clone.clone();
3515            async move {
3516                started.fetch_add(1, AtomicOrdering::Relaxed);
3517                if i == 5 {
3518                    // Slight delay so item 6, 7 also start before
3519                    // this error propagates.
3520                    tokio::time::sleep(Duration::from_micros(100)).await;
3521                    return Err("first error");
3522                }
3523                if i == 10 {
3524                    return Err("second error - should be ignored");
3525                }
3526                tokio::time::sleep(Duration::from_micros(50)).await;
3527                Ok(())
3528            }
3529        })
3530        .await;
3531        match result {
3532            Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3533            Ok(_) => panic!("expected error, got ok"),
3534        }
3535        // The first error stops new launches, but in-flight items
3536        // drain. We don't assert exact count (nondeterministic) — only
3537        // that we did not launch ALL 20 items (proving error-stop
3538        // works) and we did launch more than just item 5 (proving
3539        // in-flight drain happens).
3540        let total = started.load(AtomicOrdering::Relaxed);
3541        assert!(
3542            (5..20).contains(&total),
3543            "started count out of range: {total}"
3544        );
3545    }
3546
3547    /// Edge case: limiter with min == max (degenerate single-value).
3548    /// Cap stays at the single value regardless of observations.
3549    #[test]
3550    fn limiter_with_min_equal_max_is_pinned() {
3551        let cfg = LimiterConfig {
3552            min_concurrency: 5,
3553            max_concurrency: 5,
3554            ..cfg_for_tests()
3555        };
3556        let l = Limiter::new(5, cfg);
3557        for _ in 0..1000 {
3558            l.observe(Outcome::Success, Duration::from_millis(1));
3559        }
3560        assert_eq!(l.current(), 5, "cap moved despite min==max");
3561        for _ in 0..1000 {
3562            l.observe(Outcome::Timeout, Duration::from_millis(50));
3563        }
3564        assert_eq!(l.current(), 5, "cap moved despite min==max");
3565    }
3566
3567    /// Direct test of `ewma()` math: alpha = 0 means new value =
3568    /// prev (the baseline never updates from new samples).
3569    #[test]
3570    fn ewma_alpha_zero_returns_prev() {
3571        let prev = Duration::from_millis(100);
3572        let sample = Duration::from_millis(500);
3573        let result = ewma(prev, sample, 0.0);
3574        assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3575    }
3576
3577    /// Direct test of `ewma()` math: alpha = 1 means new value =
3578    /// sample (full overwrite).
3579    #[test]
3580    fn ewma_alpha_one_returns_sample() {
3581        let prev = Duration::from_millis(100);
3582        let sample = Duration::from_millis(500);
3583        let result = ewma(prev, sample, 1.0);
3584        // Allow 1ms of float-conversion slop.
3585        let diff = result.abs_diff(sample);
3586        assert!(
3587            diff <= Duration::from_millis(1),
3588            "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3589        );
3590    }
3591
3592    /// Direct test of `ewma()`: alpha = 0.5 should give the midpoint.
3593    #[test]
3594    fn ewma_alpha_half_returns_midpoint() {
3595        let prev = Duration::from_millis(200);
3596        let sample = Duration::from_millis(400);
3597        let result = ewma(prev, sample, 0.5);
3598        let expected = Duration::from_millis(300);
3599        let diff = result.abs_diff(expected);
3600        assert!(
3601            diff <= Duration::from_millis(1),
3602            "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3603        );
3604    }
3605
3606    /// Direct test of `ewma()`: NaN alpha must NOT panic and must
3607    /// preserve the previous value (defense against
3608    /// `Duration::from_secs_f64(NaN)` panic).
3609    #[test]
3610    fn ewma_nan_alpha_returns_prev() {
3611        let prev = Duration::from_millis(100);
3612        let sample = Duration::from_millis(500);
3613        let result = ewma(prev, sample, f64::NAN);
3614        assert_eq!(result, prev);
3615        let result = ewma(prev, sample, f64::INFINITY);
3616        assert_eq!(result, prev);
3617        let result = ewma(prev, sample, f64::NEG_INFINITY);
3618        assert_eq!(result, prev);
3619    }
3620
3621    /// Out-of-range alpha (e.g. 2.5) must clamp to [0,1] and NOT
3622    /// produce a negative result.
3623    #[test]
3624    fn ewma_clamps_alpha_above_one() {
3625        let prev = Duration::from_millis(100);
3626        let sample = Duration::from_millis(500);
3627        let result = ewma(prev, sample, 2.5);
3628        // Clamped to 1.0 -> should equal sample (~500ms).
3629        assert!(result >= Duration::from_millis(499));
3630        assert!(result <= Duration::from_millis(501));
3631    }
3632
3633    /// Edge case: window contains ONLY ApplicationErrors. Controller
3634    /// must HOLD (not move at all), because there are zero
3635    /// capacity-relevant samples.
3636    #[test]
3637    fn window_full_of_application_errors_does_not_move_cap() {
3638        let cfg = cfg_for_tests();
3639        let l = Limiter::new(8, cfg.clone());
3640        for _ in 0..(cfg.window_ops * 5) {
3641            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3642        }
3643        assert_eq!(
3644            l.current(),
3645            8,
3646            "cap moved on pure-app-error window; should hold"
3647        );
3648    }
3649
3650    /// Edge case: AdaptiveController with `enabled = false` plus
3651    /// observations does not move and does not interact with the
3652    /// observation window.
3653    #[test]
3654    fn disabled_adaptive_controller_truly_inert() {
3655        let cfg = AdaptiveConfig {
3656            enabled: false,
3657            ..AdaptiveConfig::default()
3658        };
3659        let c = AdaptiveController::new(ChannelStart::default(), cfg);
3660        let baseline_quote = c.quote.current();
3661        let baseline_store = c.store.current();
3662        let baseline_fetch = c.fetch.current();
3663        for _ in 0..10000 {
3664            c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3665            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3666            c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3667        }
3668        assert_eq!(c.quote.current(), baseline_quote);
3669        assert_eq!(c.store.current(), baseline_store);
3670        assert_eq!(c.fetch.current(), baseline_fetch);
3671    }
3672
3673    /// Edge case: per-channel limiters share NO state. Hammering one
3674    /// channel must not move another. Two-sided: assert store DROPS
3675    /// to the floor (proving observations landed) AND quote/fetch
3676    /// are EXACTLY unchanged (proving zero cross-channel leakage).
3677    #[test]
3678    fn channel_state_is_independent() {
3679        let c = AdaptiveController::default();
3680        let q0 = c.quote.current();
3681        let f0 = c.fetch.current();
3682        let s0 = c.store.current();
3683        for _ in 0..1000 {
3684            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3685        }
3686        // Strict: store reached the floor (observations landed).
3687        assert_eq!(
3688            c.store.current(),
3689            c.config.min_concurrency,
3690            "store did not reach floor after 1000 timeouts; cap={}",
3691            c.store.current()
3692        );
3693        assert!(c.store.current() < s0, "store cap did not move at all");
3694        // Strict: quote and fetch unchanged.
3695        assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3696        assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3697    }
3698
3699    // ---- Round-5 test reviewer suggestions ----
3700
3701    /// Direct unit test for `AdaptiveConfig::sanitize`. Verifies that
3702    /// every clamped field is correctly fixed up, not merely that
3703    /// the controller doesn't crash.
3704    #[test]
3705    fn sanitize_corrects_pathological_floats() {
3706        let mut cfg = AdaptiveConfig {
3707            success_target: f64::NAN,
3708            timeout_ceiling: 5.0,
3709            latency_inflation_factor: f64::NEG_INFINITY,
3710            latency_ewma_alpha: 2.5,
3711            window_ops: 4,
3712            min_window_ops: 10,
3713            ..AdaptiveConfig::default()
3714        };
3715        cfg.sanitize();
3716        assert!(cfg.success_target.is_finite());
3717        assert!((0.0..=1.0).contains(&cfg.success_target));
3718        assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3719        assert!(cfg.latency_inflation_factor.is_finite());
3720        assert!(cfg.latency_inflation_factor > 0.0);
3721        assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3722        assert!(
3723            cfg.min_window_ops <= cfg.window_ops,
3724            "min_window_ops {} > window_ops {}",
3725            cfg.min_window_ops,
3726            cfg.window_ops
3727        );
3728    }
3729
3730    /// Snapshot persistence relies on serde for ChannelStart and
3731    /// ChannelMax. A field rename in either type would silently
3732    /// break warm-start across binary upgrades — this test catches
3733    /// that.
3734    #[test]
3735    fn channel_max_serde_round_trips() {
3736        let m = ChannelMax {
3737            quote: 7,
3738            store: 13,
3739            fetch: 200,
3740        };
3741        let json = serde_json::to_string(&m).unwrap();
3742        let back: ChannelMax = serde_json::from_str(&json).unwrap();
3743        assert_eq!(back.quote, 7);
3744        assert_eq!(back.store, 13);
3745        assert_eq!(back.fetch, 200);
3746    }
3747
3748    #[test]
3749    fn channel_start_serde_round_trips() {
3750        let s = ChannelStart {
3751            quote: 11,
3752            store: 22,
3753            fetch: 33,
3754        };
3755        let json = serde_json::to_string(&s).unwrap();
3756        let back: ChannelStart = serde_json::from_str(&json).unwrap();
3757        assert_eq!(back.quote, 11);
3758        assert_eq!(back.store, 22);
3759        assert_eq!(back.fetch, 33);
3760    }
3761
3762    /// Mid-flight cap SHRINKAGE: `rebucketed_picks_up_cap_changes_mid_stream`
3763    /// only proves growth. Overload protection requires the reverse —
3764    /// when the controller halves the cap mid-pipeline, in-flight
3765    /// must respect the new lower cap on the next refill.
3766    #[tokio::test]
3767    async fn rebucketed_honors_cap_shrinkage_mid_stream() {
3768        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3769        use std::sync::Arc as StdArc;
3770        let cfg = LimiterConfig {
3771            min_concurrency: 1,
3772            max_concurrency: 16,
3773            ..cfg_for_tests()
3774        };
3775        let l = Limiter::new(16, cfg);
3776        let in_flight = StdArc::new(AtomicUsize::new(0));
3777        let max_after_shrink = StdArc::new(AtomicUsize::new(0));
3778        let processed = StdArc::new(AtomicUsize::new(0));
3779        let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
3780        let l_for_shrink = l.clone();
3781        let p_for_shrink = processed.clone();
3782        let shrunk_for_shrink = shrunk.clone();
3783        let shrink_handle = tokio::spawn(async move {
3784            // Bump down the cap once 50 items have completed.
3785            loop {
3786                tokio::time::sleep(Duration::from_millis(2)).await;
3787                if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
3788                    l_for_shrink.warm_start(2);
3789                    shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
3790                    return;
3791                }
3792            }
3793        });
3794        let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
3795            let in_flight = in_flight.clone();
3796            let max_after_shrink = max_after_shrink.clone();
3797            let processed = processed.clone();
3798            let shrunk = shrunk.clone();
3799            async move {
3800                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3801                if shrunk.load(AtomicOrdering::Relaxed) {
3802                    max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
3803                }
3804                tokio::time::sleep(Duration::from_millis(1)).await;
3805                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3806                processed.fetch_add(1, AtomicOrdering::Relaxed);
3807                Ok::<(), &'static str>(())
3808            }
3809        })
3810        .await
3811        .unwrap();
3812        shrink_handle.await.unwrap();
3813        let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
3814        // After the shrink to cap=2, no NEW launches should put us
3815        // above 2. Already-launched in-flight may still be draining
3816        // briefly, so allow a small overshoot for the natural
3817        // refill-after-completion lag.
3818        assert!(
3819            peak <= 4,
3820            "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
3821        );
3822    }
3823
3824    /// Mixed `ApplicationError` + capacity-relevant items in one
3825    /// window. ApplicationError must NOT contribute to the success
3826    /// rate denominator — otherwise a wave with some AppErrors and
3827    /// some healthy successes would falsely look like a stressed
3828    /// window.
3829    #[test]
3830    fn mixed_window_app_errors_with_capacity_signal() {
3831        let cfg = LimiterConfig {
3832            window_ops: 10,
3833            min_window_ops: 5,
3834            timeout_ceiling: 0.2,
3835            success_target: 0.9,
3836            ..cfg_for_tests()
3837        };
3838        // Case 1: 5 AppErrors + 5 Successes. Capacity-relevant
3839        // success_rate = 5/5 = 100%. Cap must NOT decrease (it may
3840        // hold at 8 or grow via slow-start; both prove the AppErrors
3841        // didn't poison the success-rate denominator).
3842        let l = Limiter::new(8, cfg.clone());
3843        for _ in 0..5 {
3844            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3845        }
3846        for _ in 0..5 {
3847            l.observe(Outcome::Success, Duration::from_millis(50));
3848        }
3849        assert!(
3850            l.current() >= 8,
3851            "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
3852            l.current()
3853        );
3854        // Case 2: 5 AppErrors + 5 Timeouts. Capacity-relevant
3855        // success_rate = 0/5 = 0%. Cap MUST decrease.
3856        let l2 = Limiter::new(8, cfg);
3857        for _ in 0..5 {
3858            l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
3859        }
3860        for _ in 0..5 {
3861            l2.observe(Outcome::Timeout, Duration::from_millis(50));
3862        }
3863        assert!(
3864            l2.current() < 8,
3865            "all-timeouts (with AppError padding) did not decrease cap; got {}",
3866            l2.current()
3867        );
3868    }
3869
3870    /// Real concurrent torn-read test for save/load. The previous
3871    /// concurrent-write test only reads after both writers join;
3872    /// this version interleaves a reader thread with writers and
3873    /// asserts every successful load returns a coherent (non-torn)
3874    /// snapshot.
3875    #[test]
3876    fn concurrent_save_load_no_torn_reads() {
3877        use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
3878        use std::thread;
3879        let dir = tempfile::tempdir().unwrap();
3880        let path = dir.path().join("snap.json");
3881        // Seed the file so the reader doesn't get a None on first read.
3882        save_snapshot(
3883            &path,
3884            ChannelStart {
3885                quote: 1,
3886                store: 1,
3887                fetch: 1,
3888            },
3889        );
3890        let stop = std::sync::Arc::new(AtomicBool::new(false));
3891        let p_w = path.clone();
3892        let s_w = stop.clone();
3893        let writer = thread::spawn(move || {
3894            let mut i = 1usize;
3895            while !s_w.load(AtomicOrdering::Relaxed) {
3896                save_snapshot(
3897                    &p_w,
3898                    ChannelStart {
3899                        quote: i,
3900                        store: i,
3901                        fetch: i,
3902                    },
3903                );
3904                i = i.wrapping_add(1).max(1);
3905            }
3906        });
3907        let p_r = path.clone();
3908        let reader = thread::spawn(move || {
3909            let mut torn = 0usize;
3910            for _ in 0..2_000 {
3911                if let Some(snap) = load_snapshot(&p_r) {
3912                    // Coherent snapshots have all three channels equal
3913                    // (writer always saves equal values).
3914                    if snap.quote != snap.store || snap.store != snap.fetch {
3915                        torn += 1;
3916                    }
3917                }
3918            }
3919            torn
3920        });
3921        let torn = reader.join().unwrap();
3922        stop.store(true, AtomicOrdering::Relaxed);
3923        writer.join().unwrap();
3924        assert_eq!(
3925            torn, 0,
3926            "observed {torn} torn reads under concurrent writes"
3927        );
3928    }
3929
3930    /// Round-5 follow-up: `save_snapshot_with_timeout` returns
3931    /// promptly even when the underlying write would otherwise hang.
3932    /// Use a path under a non-existent root that mkdir cannot create
3933    /// to simulate a slow/failing filesystem (mkdir returns Err
3934    /// quickly so this isn't a real hang test, but it confirms the
3935    /// timeout wrapper does not block longer than the deadline on a
3936    /// fast-failing operation either).
3937    #[test]
3938    fn save_with_timeout_returns_promptly_on_fast_failure() {
3939        let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
3940        let snap = ChannelStart {
3941            quote: 1,
3942            store: 1,
3943            fetch: 1,
3944        };
3945        let started = Instant::now();
3946        save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
3947        let elapsed = started.elapsed();
3948        // Fast-failing mkdir returns immediately. The timeout
3949        // wrapper should not add measurable overhead.
3950        assert!(
3951            elapsed < Duration::from_secs(1),
3952            "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
3953        );
3954    }
3955
3956    /// Round-5 follow-up: a hung writer thread (simulated by a path
3957    /// the writer never returns from). The wrapper must time out and
3958    /// return without joining; the test must complete near the
3959    /// deadline, not hang.
3960    #[test]
3961    fn save_with_timeout_bounds_wall_time_on_hang() {
3962        // Use a real-but-slow-write simulation: hand the writer a
3963        // path that the OS will accept but with a synthetic delay
3964        // baked into a wrapping thread. Since save_snapshot itself
3965        // does no sleep, we instead test that the timeout wrapper
3966        // exits within deadline + small slack when the inner work
3967        // takes longer than the deadline. We approximate by giving
3968        // the wrapper a deadline shorter than any plausible local
3969        // disk write (1ms is too tight; 0ms is too tight). Use
3970        // 1ms deadline and assert wall time < 100ms — proving the
3971        // wrapper does NOT wait for the writer to actually finish
3972        // (the inner write to a tempdir takes a few ms typically).
3973        let dir = tempfile::tempdir().unwrap();
3974        let path = dir.path().join("snap.json");
3975        let snap = ChannelStart {
3976            quote: 1,
3977            store: 1,
3978            fetch: 1,
3979        };
3980        let started = Instant::now();
3981        // Deadline so short that on most machines the writer is
3982        // still running. The wrapper must NOT wait for it.
3983        save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
3984        let elapsed = started.elapsed();
3985        assert!(
3986            elapsed < Duration::from_millis(200),
3987            "timeout wrapper did not bound wall time: {elapsed:?}"
3988        );
3989    }
3990}