Skip to main content

ant_core/data/client/
adaptive.rs

1//! Adaptive concurrency controller for client data operations.
2//!
3//! Replaces hard-coded `quote_concurrency` / `store_concurrency` /
4//! download fan-out with a per-channel AIMD limiter that ramps up when
5//! the network is healthy and ramps down on stress signals (timeouts,
6//! errors, latency inflation). The goal is to give every machine and
7//! every connection profile a single client codebase that finds its
8//! own steady state without the user tweaking flags.
9//!
10//! ## Channels
11//!
12//! Three independent limiters share the same algorithm but track state
13//! separately, because their workloads have different cost profiles:
14//!
15//! - `quote`  — small DHT request/response messages, cheap per op
16//! - `store`  — multi-MB chunk PUTs to a close group, expensive per op
17//! - `fetch`  — multi-MB chunk GETs from peers, asymmetric to `store`
18//!
19//! ## Algorithms
20//!
21//! Quote and store use TCP-style AIMD with slow-start:
22//!
23//! - **Slow-start**: starting concurrency doubles after each healthy
24//!   window until first stress signal or until the configured ceiling.
25//! - **Steady state**: additive +1 per healthy window (>= success_target
26//!   success rate AND p95 latency within `latency_inflation_factor` of
27//!   the rolling baseline).
28//! - **Stress**: multiplicative decrease (current / 2, floor 1) on any
29//!   of: success rate < success_target, timeout rate > timeout_ceiling,
30//!   or p95 latency above `latency_inflation_factor * baseline`.
31//!
32//! Decisions evaluate over a sliding window of the last `window_ops`
33//! observed outcomes per channel. Below `min_window_ops` outcomes the
34//! controller holds steady — too few samples to act on.
35//!
36//! Fetch uses a throughput-seeking hill climber instead. It measures
37//! bytes/sec over epochs, probes nearby concurrency values, accepts
38//! higher caps only when goodput improves materially, and accepts lower
39//! caps when goodput is effectively unchanged. Stress signals still cut
40//! concurrency immediately.
41//!
42//! ## What this is not
43//!
44//! - Not a payment-batching controller. Wave / batch sizes are
45//!   orthogonal (gas-economics tradeoff, not throughput).
46//! - Not a persistent peer-quality scorer. Bootstrap cache scoring was
47//!   removed from saorsa-core; this controller only tunes client
48//!   concurrency.
49
50use futures::stream::{self, FuturesUnordered, StreamExt};
51use serde::{Deserialize, Serialize};
52use std::collections::VecDeque;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Mutex, PoisonError};
56use std::time::{Duration, Instant};
57use tracing::{debug, warn};
58
59/// Process-monotonic counter for unique snapshot temp filenames.
60/// Combined with PID + nanosecond timestamp, makes collision
61/// effectively impossible across concurrent save_snapshot calls.
62static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
63
64/// Fetch starts at the residential-saturation floor validated in
65/// production. The hill climber will find higher caps on
66/// machines/networks that can actually use them.
67const FETCH_COLD_START_CONCURRENCY: usize = 4;
68
69/// Hill-climb probes grow/shrink by roughly 25% of the current best cap.
70const HILL_PROBE_STEP_DIVISOR: usize = 4;
71
72/// Minimum probe movement so low caps can still explore.
73const HILL_MIN_PROBE_STEP: usize = 1;
74
75/// Upward probes must improve measured goodput by at least 5%.
76const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
77
78/// Downward probes are accepted if goodput stays within 2% of the best.
79const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
80
81/// After rejecting a probe, wait a couple of epochs before trying again.
82const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
83
84/// At a stable best cap, periodically probe the neighbor again so the
85/// controller can adapt when machine/network conditions change.
86const HILL_STABLE_PROBE_EPOCHS: usize = 3;
87
88/// Stress cuts fetch concurrency in half.
89const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
90
91/// Fetch goodput epochs should cover complete concurrency waves. A
92/// fixed sample window can unfairly compare a full lower-cap wave with
93/// a partial higher-cap wave.
94const HILL_EPOCH_FULL_WAVES: usize = 2;
95
96/// Lock helper matching the project pattern (see `cache::ChunkCache`):
97/// poisoned mutexes still yield the inner state rather than panicking.
98fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
99    m.lock().unwrap_or_else(PoisonError::into_inner)
100}
101
102/// Outcome of a single observed operation on one channel.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum Outcome {
105    /// Completed successfully.
106    Success,
107    /// Did not complete within the per-op timeout.
108    Timeout,
109    /// Failed with a network/transport error (refused, reset, unreachable).
110    NetworkError,
111    /// Failed with an application-level error not attributable to the
112    /// network (e.g. bad payment proof). Recorded but does not push the
113    /// controller down — it is not a capacity signal.
114    ApplicationError,
115}
116
117/// Lower bound on the `fetch` channel's adaptive cap.
118///
119/// AIMD will not shrink fetch concurrency below this even under
120/// sustained timeout pressure. Specific to fetch because residential
121/// downloads exhibit a noise floor of peer-side timeouts (NAT path
122/// issues, peers in the close group not storing the chunk) that look
123/// like client saturation to the controller, causing it to fully
124/// serialize and collapse throughput. Quote and store channels keep
125/// the global `min_concurrency` floor of 1.
126const FETCH_MIN_FLOOR: usize = 4;
127
128/// Per-channel concurrency ceilings. Each channel has its own cap so
129/// that constraining one (e.g. user pinned a low store concurrency for
130/// a slow uplink) never bleeds into another (download).
131#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
132pub struct ChannelMax {
133    pub quote: usize,
134    pub store: usize,
135    pub fetch: usize,
136}
137
138impl Default for ChannelMax {
139    fn default() -> Self {
140        // Generous ceilings that give the controller real headroom to
141        // grow on healthy connections. The cold-start values
142        // (`ChannelStart::default()`) are well below these so AIMD
143        // can actually do its job. Each ceiling is independent.
144        Self {
145            quote: 128,
146            store: 64,
147            fetch: 256,
148        }
149    }
150}
151
152/// Tunable knobs for the adaptive controller. Defaults are picked so
153/// that the controller behaves at least as well as the prior static
154/// defaults on a healthy network: starts at the previous static value
155/// and only deviates when signals demand it.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AdaptiveConfig {
158    /// Master switch. When `false`, channels report `initial` forever
159    /// and ignore observations. Useful for benchmarks / debugging.
160    pub enabled: bool,
161    /// Floor concurrency per channel. Never go below this.
162    pub min_concurrency: usize,
163    /// Per-channel ceiling concurrency. See `ChannelMax`.
164    pub max: ChannelMax,
165    /// Sliding window size in number of recent ops considered for
166    /// adaptation decisions.
167    pub window_ops: usize,
168    /// Below this count of outcomes in the window, hold steady.
169    pub min_window_ops: usize,
170    /// Required success rate to consider the window healthy. Healthy
171    /// windows trigger increase; unhealthy windows trigger decrease.
172    pub success_target: f64,
173    /// Timeout rate above which the window counts as stressed even if
174    /// the success rate would otherwise pass.
175    pub timeout_ceiling: f64,
176    /// p95 latency above `latency_inflation_factor * baseline` is a
177    /// stress signal. Baseline is an EWMA of healthy-window p95s.
178    pub latency_inflation_factor: f64,
179    /// EWMA smoothing factor for the latency baseline. 0 = never
180    /// updates, 1 = baseline = last sample. 0.2 trades responsiveness
181    /// for stability. Validated to `[0.0, 1.0]`; `NaN`/non-finite
182    /// values are sanitized to the default at controller construction.
183    pub latency_ewma_alpha: f64,
184}
185
186impl AdaptiveConfig {
187    /// Sanitize the config: clamp `latency_ewma_alpha` to `[0,1]`
188    /// (rejecting NaN/Inf which would otherwise panic in
189    /// `Duration::from_secs_f64`), enforce `min_concurrency >= 1`,
190    /// enforce per-channel max >= min_concurrency, enforce
191    /// `min_window_ops <= window_ops`. Idempotent.
192    pub fn sanitize(&mut self) {
193        if !self.latency_ewma_alpha.is_finite() {
194            self.latency_ewma_alpha = 0.2;
195        }
196        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
197        if !self.success_target.is_finite() {
198            self.success_target = 0.95;
199        }
200        self.success_target = self.success_target.clamp(0.0, 1.0);
201        if !self.timeout_ceiling.is_finite() {
202            self.timeout_ceiling = 0.10;
203        }
204        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
205        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
206            self.latency_inflation_factor = 4.0;
207        }
208        self.min_concurrency = self.min_concurrency.max(1);
209        self.window_ops = self.window_ops.max(1);
210        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
211        self.max.quote = self.max.quote.max(self.min_concurrency);
212        self.max.store = self.max.store.max(self.min_concurrency);
213        self.max.fetch = self.max.fetch.max(self.min_concurrency);
214    }
215}
216
217impl Default for AdaptiveConfig {
218    fn default() -> Self {
219        Self {
220            enabled: true,
221            min_concurrency: 1,
222            max: ChannelMax::default(),
223            window_ops: 32,
224            min_window_ops: 8,
225            success_target: 0.95,
226            timeout_ceiling: 0.10,
227            // p95 doubling is the normal signal on a per-chunk fetch with
228            // close-group fallback (one slow peer in a chunk's close group
229            // adds ~10s on top of a sub-second median); 2.0 mis-classified
230            // that as stress and halved the fetch cap mid-download. 4.0
231            // means p95 has to quadruple before we treat the network as
232            // degraded.
233            latency_inflation_factor: 4.0,
234            latency_ewma_alpha: 0.2,
235        }
236    }
237}
238
239/// Suggested starting concurrency per channel for a brand-new client
240/// with no persisted state:
241///
242/// - quote was statically 32 — start at 32.
243/// - store was statically 8 — start at 8.
244/// - fetch starts at 4, the residential-saturation floor validated
245///   after the old 64-wide cold burst saturated home links before
246///   any adaptive observations could land. The throughput hill
247///   climber then lets measured goodput justify growth on faster
248///   links.
249#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
250pub struct ChannelStart {
251    pub quote: usize,
252    pub store: usize,
253    pub fetch: usize,
254}
255
256impl Default for ChannelStart {
257    fn default() -> Self {
258        Self {
259            quote: 32,
260            store: 8,
261            fetch: FETCH_COLD_START_CONCURRENCY,
262        }
263    }
264}
265
266/// One observed sample retained in the sliding window.
267#[derive(Debug, Clone, Copy)]
268struct Sample {
269    outcome: Outcome,
270    latency: Duration,
271}
272
273/// Limiter adaptation strategy. Kept out of `LimiterConfig` so external
274/// config literals and persisted JSON do not grow a migration surface.
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276enum LimiterAlgorithm {
277    Aimd,
278    ThroughputHillClimb,
279}
280
281/// Direction of an active hill-climb probe.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum ProbeDirection {
284    Up,
285    Down,
286}
287
288/// Epoch-local stats for the throughput hill climber.
289#[derive(Debug)]
290struct HillClimbState {
291    epoch_started: Option<Instant>,
292    epoch_samples: usize,
293    epoch_successes: usize,
294    epoch_timeouts: usize,
295    epoch_net_errors: usize,
296    epoch_bytes: u64,
297    epoch_latencies: Vec<Duration>,
298    best_goodput_per_sec: Option<f64>,
299    best_latency_p95: Option<Duration>,
300    best_concurrency: usize,
301    stable_epochs: usize,
302    cooldown_epochs: usize,
303    next_probe: ProbeDirection,
304    active_probe: Option<ProbeDirection>,
305}
306
307impl HillClimbState {
308    fn new(start: usize, epoch_capacity: usize) -> Self {
309        Self {
310            epoch_started: None,
311            epoch_samples: 0,
312            epoch_successes: 0,
313            epoch_timeouts: 0,
314            epoch_net_errors: 0,
315            epoch_bytes: 0,
316            epoch_latencies: Vec::with_capacity(epoch_capacity),
317            best_goodput_per_sec: None,
318            best_latency_p95: None,
319            best_concurrency: start,
320            stable_epochs: 0,
321            cooldown_epochs: 0,
322            next_probe: ProbeDirection::Up,
323            active_probe: None,
324        }
325    }
326
327    fn reset_epoch(&mut self) {
328        self.epoch_started = None;
329        self.epoch_samples = 0;
330        self.epoch_successes = 0;
331        self.epoch_timeouts = 0;
332        self.epoch_net_errors = 0;
333        self.epoch_bytes = 0;
334        self.epoch_latencies.clear();
335    }
336
337    fn capacity_total(&self) -> usize {
338        self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
339    }
340}
341
342/// Per-limiter configuration. Carries the shared adaptive parameters
343/// plus the channel-specific `max_concurrency`. Held behind an `Arc`
344/// so cloning a `Limiter` is a refcount bump rather than a struct copy
345/// (avoids allocating `AdaptiveConfig`-worth of bytes per chunk in
346/// hot loops).
347#[derive(Debug, Clone)]
348pub struct LimiterConfig {
349    pub enabled: bool,
350    pub min_concurrency: usize,
351    pub max_concurrency: usize,
352    pub window_ops: usize,
353    pub min_window_ops: usize,
354    pub success_target: f64,
355    pub timeout_ceiling: f64,
356    pub latency_inflation_factor: f64,
357    pub latency_ewma_alpha: f64,
358    /// While `current < slow_start_ramp_threshold`, a Decrease halves
359    /// the cap but does NOT permanently exit slow-start — the next
360    /// healthy window can double the cap back up. Above the threshold,
361    /// a Decrease exits slow-start and the controller transitions to
362    /// classic AIMD (+1 per healthy window).
363    ///
364    /// 0 (the default) reproduces the original behaviour: any Decrease
365    /// at any cap permanently exits slow-start. The fetch channel sets
366    /// this to its `max_concurrency` so download concurrency keeps
367    /// doubling toward the ceiling instead of crawling +1 per window —
368    /// additive growth simply cannot reach a useful cap on a
369    /// fast-but-lossy link before the file finishes. See
370    /// `AdaptiveController::new`.
371    pub slow_start_ramp_threshold: usize,
372    /// When `false`, the p95-latency-vs-baseline comparison never
373    /// triggers a Decrease (it still updates the baseline). The fetch
374    /// channel disables it because `chunk_get`'s observed latency
375    /// includes the internal retry sleep and slow retry-sweep for the
376    /// chunks that needed one, so a window with a couple of retry-path
377    /// chunks has a wildly inflated p95 that is retry variance, not
378    /// congestion. Genuine fetch congestion still surfaces as a rising
379    /// `Ok(None)` (Timeout) rate, which the timeout_ceiling check
380    /// catches.
381    pub latency_decrease_enabled: bool,
382}
383
384impl LimiterConfig {
385    fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
386        Self {
387            enabled: cfg.enabled,
388            min_concurrency: cfg.min_concurrency,
389            max_concurrency: max_for_channel.max(cfg.min_concurrency),
390            window_ops: cfg.window_ops,
391            min_window_ops: cfg.min_window_ops,
392            success_target: cfg.success_target,
393            timeout_ceiling: cfg.timeout_ceiling,
394            latency_inflation_factor: cfg.latency_inflation_factor,
395            latency_ewma_alpha: cfg.latency_ewma_alpha,
396            // Defaults preserve the original AIMD behaviour; the fetch
397            // channel overrides both in `AdaptiveController::new`.
398            slow_start_ramp_threshold: 0,
399            latency_decrease_enabled: true,
400        }
401    }
402
403    /// Sanitize a directly-constructed `LimiterConfig`. External
404    /// callers (or tests) that build a `LimiterConfig` literal with
405    /// hostile values (`NaN`, sub-floor mins, inverted bounds) are
406    /// protected — `Limiter::new` calls this on every construction
407    /// so the controller never holds NaN or out-of-range floats.
408    fn sanitize(&mut self) {
409        if !self.latency_ewma_alpha.is_finite() {
410            self.latency_ewma_alpha = 0.2;
411        }
412        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
413        if !self.success_target.is_finite() {
414            self.success_target = 0.95;
415        }
416        self.success_target = self.success_target.clamp(0.0, 1.0);
417        if !self.timeout_ceiling.is_finite() {
418            self.timeout_ceiling = 0.10;
419        }
420        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
421        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
422            self.latency_inflation_factor = 4.0;
423        }
424        self.min_concurrency = self.min_concurrency.max(1);
425        self.window_ops = self.window_ops.max(1);
426        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
427        self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
428    }
429}
430
431/// Per-channel adaptive limiter.
432///
433/// Cheap to clone — both fields are `Arc`. Pass clones into hot loops;
434/// do not hold the lock across `.await` points (call sites observe
435/// with short critical sections only).
436#[derive(Debug, Clone)]
437pub struct Limiter {
438    inner: Arc<Mutex<LimiterInner>>,
439    config: Arc<LimiterConfig>,
440    algorithm: LimiterAlgorithm,
441}
442
443#[derive(Debug)]
444struct LimiterInner {
445    /// Current concurrency cap returned by `current()`.
446    current: usize,
447    /// Sliding window of recent outcomes.
448    window: VecDeque<Sample>,
449    /// Samples observed since the last increase. Increases require a
450    /// fresh window's worth of evidence to avoid ramping on every
451    /// individual healthy sample.
452    samples_since_increase: usize,
453    /// Samples observed since the last decrease. Decreases require
454    /// `min_window_ops` of fresh evidence to avoid pile-driving the
455    /// cap to floor on a single bad burst when many in-flight ops all
456    /// observe stress nearly simultaneously.
457    samples_since_decrease: usize,
458    /// EWMA of p95 latency from past healthy windows. `None` until
459    /// the first healthy window completes.
460    latency_baseline: Option<Duration>,
461    /// `true` once we have observed a stress signal at least once.
462    /// Slow-start mode ends permanently after first stress.
463    left_slow_start: bool,
464    /// Fetch-only throughput optimizer state. Present for every limiter
465    /// to keep `Limiter` cheap to clone and avoid an enum around the
466    /// whole inner struct.
467    hill: HillClimbState,
468}
469
470impl Limiter {
471    /// Create a new limiter starting at `start`, clamped into
472    /// `[min_concurrency, max_concurrency]`. Sanitizes the config to
473    /// guard against directly-constructed `LimiterConfig` literals
474    /// with hostile float values (`NaN`, etc).
475    #[must_use]
476    pub fn new(start: usize, config: LimiterConfig) -> Self {
477        Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
478    }
479
480    fn new_with_algorithm(
481        start: usize,
482        config: LimiterConfig,
483        algorithm: LimiterAlgorithm,
484    ) -> Self {
485        let mut config = config;
486        config.sanitize();
487        let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
488        let window_cap = config.window_ops;
489        Self {
490            inner: Arc::new(Mutex::new(LimiterInner {
491                current: clamped,
492                window: VecDeque::with_capacity(window_cap),
493                samples_since_increase: 0,
494                samples_since_decrease: 0,
495                latency_baseline: None,
496                left_slow_start: false,
497                hill: HillClimbState::new(clamped, window_cap),
498            })),
499            config: Arc::new(config),
500            algorithm,
501        }
502    }
503
504    /// Snapshot current concurrency cap. Hot-path call: the value may
505    /// change between this call and the next, but consumers
506    /// (`buffer_unordered(n)`) capture it once per pipeline build.
507    #[must_use]
508    pub fn current(&self) -> usize {
509        lock(&self.inner).current
510    }
511
512    /// Record one observed operation. Updates the sliding window and
513    /// re-evaluates the cap if the window is full enough.
514    pub fn observe(&self, outcome: Outcome, latency: Duration) {
515        self.observe_with_bytes(outcome, latency, 0);
516    }
517
518    /// Record one observed operation with a payload byte count. Bytes
519    /// are used by the fetch hill climber; AIMD channels ignore them.
520    pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
521        let observed_at = Instant::now();
522        let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
523        self.observe_with_timing(outcome, latency, bytes, operation_started);
524    }
525
526    fn observe_with_timing(
527        &self,
528        outcome: Outcome,
529        latency: Duration,
530        bytes: u64,
531        operation_started: Instant,
532    ) {
533        if !self.config.enabled {
534            return;
535        }
536        let mut g = lock(&self.inner);
537        if g.window.len() == self.config.window_ops {
538            g.window.pop_front();
539        }
540        g.window.push_back(Sample { outcome, latency });
541        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
542            observe_hill_climb(
543                &mut g,
544                outcome,
545                latency,
546                bytes,
547                operation_started,
548                &self.config,
549            );
550            return;
551        }
552        g.samples_since_increase = g.samples_since_increase.saturating_add(1);
553        g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
554        if g.window.len() < self.config.min_window_ops {
555            return;
556        }
557        let decision = evaluate(&g.window, &self.config, g.latency_baseline);
558        apply_decision(&mut g, decision, &self.config);
559    }
560
561    /// Replace the current cap with `start`, clamped. Used for warm
562    /// loads from persisted state. Does not clear the sliding window —
563    /// fresh observations remain authoritative for adaptation
564    /// decisions.
565    ///
566    /// Slow-start state after a warm load depends on the channel's
567    /// `slow_start_ramp_threshold`:
568    ///
569    /// - Default (threshold 0, i.e. quote/store): mark slow-start as
570    ///   already-left so a single healthy window doesn't *double* a
571    ///   learned warm value — an over-aggressive jump. Subsequent
572    ///   increases are +1 per healthy window.
573    /// - Protected (threshold > clamped, i.e. fetch below the ceiling):
574    ///   keep slow-start armed. This is critical for the CLI usage
575    ///   pattern where every `ant file download` is a fresh process
576    ///   that warm-starts from the snapshot: if warm_start always
577    ///   exited slow-start, the fetch cap could only ever grow
578    ///   additively from the persisted value, which cannot climb back
579    ///   to the ceiling against an intermittent Decrease trickle (the
580    ///   exact pin-at-~20 behaviour observed on a fast-but-lossy VPS).
581    ///   Keeping slow-start armed lets the cap double back toward the
582    ///   capacity the connection can actually sustain.
583    pub fn warm_start(&self, start: usize) {
584        let clamped = start.clamp(
585            self.config.min_concurrency,
586            self.config.max_concurrency.max(1),
587        );
588        let mut g = lock(&self.inner);
589        g.current = clamped;
590        g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
591        g.hill = HillClimbState::new(clamped, self.config.window_ops);
592    }
593
594    /// Snapshot of the current cap for persistence. Cheap, lock-only.
595    #[must_use]
596    pub fn snapshot(&self) -> usize {
597        let g = lock(&self.inner);
598        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
599            g.hill.best_concurrency
600        } else {
601            g.current
602        }
603    }
604}
605
606#[derive(Debug, Clone, Copy)]
607struct HillEpochStats {
608    goodput_per_sec: f64,
609    latency_p95: Option<Duration>,
610}
611
612/// Outcome of evaluating one window.
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
614enum Decision {
615    /// Healthy window — increase concurrency.
616    Increase,
617    /// Stressed window — decrease concurrency.
618    Decrease,
619    /// Inconclusive — hold steady (e.g. mixed signals, baseline not yet set).
620    Hold,
621}
622
623fn evaluate(
624    window: &VecDeque<Sample>,
625    cfg: &LimiterConfig,
626    baseline: Option<Duration>,
627) -> Decision {
628    // Capacity-relevant denominator: ApplicationError outcomes are
629    // explicitly NOT capacity signals (per `Outcome` docs) and are
630    // excluded from rate calculations. A wave of `AlreadyStored`
631    // errors must not punish concurrency.
632    let mut successes = 0usize;
633    let mut timeouts = 0usize;
634    let mut net_errors = 0usize;
635    let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
636    for s in window {
637        match s.outcome {
638            Outcome::Success => {
639                successes += 1;
640                latencies.push(s.latency);
641            }
642            Outcome::Timeout => timeouts += 1,
643            Outcome::NetworkError => net_errors += 1,
644            Outcome::ApplicationError => {}
645        }
646    }
647    let capacity_total = successes + timeouts + net_errors;
648    if capacity_total < cfg.min_window_ops {
649        // Not enough capacity-relevant evidence to act. Hold.
650        return Decision::Hold;
651    }
652    let total_f = capacity_total as f64;
653    let success_rate = successes as f64 / total_f;
654    let timeout_rate = timeouts as f64 / total_f;
655
656    if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
657        return Decision::Decrease;
658    }
659
660    if let Some(p95) = p95_of(&mut latencies) {
661        if cfg.latency_decrease_enabled {
662            if let Some(base) = baseline {
663                let limit = base.mul_f64(cfg.latency_inflation_factor);
664                if p95 > limit {
665                    return Decision::Decrease;
666                }
667            }
668        }
669        Decision::Increase
670    } else {
671        Decision::Hold
672    }
673}
674
675fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
676    match decision {
677        Decision::Increase => {
678            // Gate increases on accumulating a fresh window's worth of
679            // evidence since the last bump.
680            if inner.samples_since_increase < cfg.window_ops {
681                return;
682            }
683            let p95 = window_p95(&inner.window);
684            inner.latency_baseline = Some(match inner.latency_baseline {
685                None => p95,
686                Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
687            });
688            let next = if inner.left_slow_start {
689                inner.current.saturating_add(1)
690            } else {
691                inner.current.saturating_mul(2)
692            };
693            let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
694            if next != inner.current {
695                debug!(
696                    from = inner.current,
697                    to = next,
698                    slow_start = !inner.left_slow_start,
699                    "adaptive: increase",
700                );
701            }
702            inner.current = next;
703            inner.samples_since_increase = 0;
704            inner.samples_since_decrease = 0;
705        }
706        Decision::Decrease => {
707            // Gate decreases on `min_window_ops` of fresh evidence
708            // since the last decrease so a burst of concurrent
709            // observations from in-flight ops can't pile-drive the
710            // cap from N to 1 in a few back-to-back ticks.
711            if inner.samples_since_decrease < cfg.min_window_ops {
712                return;
713            }
714            // Below the ramp threshold we still halve (responsiveness is
715            // preserved) but keep slow-start armed, so the next healthy
716            // window can double the cap back up rather than crawling +1.
717            // Above the threshold a Decrease is the signal to settle into
718            // classic AIMD. With threshold=0 (quote/store) this is the
719            // original behaviour: any Decrease exits slow-start.
720            if inner.current >= cfg.slow_start_ramp_threshold {
721                inner.left_slow_start = true;
722            }
723            let next = (inner.current / 2).max(cfg.min_concurrency);
724            if next != inner.current {
725                debug!(from = inner.current, to = next, "adaptive: decrease");
726            }
727            inner.current = next;
728            inner.samples_since_increase = 0;
729            inner.samples_since_decrease = 0;
730        }
731        Decision::Hold => {}
732    }
733}
734
735/// p95 of a mutable slice of Durations. Sorts in place. Returns
736/// `None` for an empty slice. Index choice: `ceil(len * 0.95) - 1`,
737/// floored at 0, capped at `len - 1`.
738fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
739    if latencies.is_empty() {
740        return None;
741    }
742    latencies.sort_unstable();
743    let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
744    let idx = idx.saturating_sub(1).min(latencies.len() - 1);
745    latencies.get(idx).copied()
746}
747
748fn window_p95(window: &VecDeque<Sample>) -> Duration {
749    let mut latencies: Vec<Duration> = window
750        .iter()
751        .filter(|s| matches!(s.outcome, Outcome::Success))
752        .map(|s| s.latency)
753        .collect();
754    p95_of(&mut latencies).unwrap_or(Duration::ZERO)
755}
756
757fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
758    let alpha = if alpha.is_finite() {
759        alpha.clamp(0.0, 1.0)
760    } else {
761        return prev;
762    };
763    let prev_ms = prev.as_secs_f64() * 1000.0;
764    let sample_ms = sample.as_secs_f64() * 1000.0;
765    let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
766    if !new_ms.is_finite() || new_ms < 0.0 {
767        return prev;
768    }
769    Duration::from_secs_f64(new_ms / 1000.0)
770}
771
772fn observe_hill_climb(
773    inner: &mut LimiterInner,
774    outcome: Outcome,
775    latency: Duration,
776    bytes: u64,
777    operation_started: Instant,
778    cfg: &LimiterConfig,
779) {
780    match inner.hill.epoch_started {
781        Some(epoch_started) if epoch_started <= operation_started => {}
782        _ => inner.hill.epoch_started = Some(operation_started),
783    }
784    inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
785    match outcome {
786        Outcome::Success => {
787            inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
788            inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
789            inner.hill.epoch_latencies.push(latency);
790        }
791        Outcome::Timeout => {
792            inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
793        }
794        Outcome::NetworkError => {
795            inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
796        }
797        Outcome::ApplicationError => {}
798    }
799
800    if hill_epoch_stressed(&inner.hill, cfg) {
801        apply_hill_stress(inner, cfg);
802        return;
803    }
804
805    if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
806        return;
807    }
808
809    if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
810        apply_hill_epoch(inner, stats, cfg);
811    }
812    inner.hill.reset_epoch();
813}
814
815fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
816    cfg.window_ops
817        .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
818        .max(cfg.min_window_ops)
819}
820
821fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
822    let capacity_total = hill.capacity_total();
823    if capacity_total < cfg.min_window_ops {
824        return false;
825    }
826    let total_f = capacity_total as f64;
827    let success_rate = hill.epoch_successes as f64 / total_f;
828    let timeout_rate = hill.epoch_timeouts as f64 / total_f;
829    success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
830}
831
832fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
833    let capacity_total = hill.capacity_total();
834    if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
835        return None;
836    }
837    let mut latencies = hill.epoch_latencies.clone();
838    let latency_p95 = p95_of(&mut latencies);
839    let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
840    let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
841    let elapsed = wall_elapsed.max(max_latency);
842    let elapsed_secs = elapsed.as_secs_f64();
843    if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
844        return None;
845    }
846
847    // Unit fallback keeps direct unit tests that call `observe(Success, ..)`
848    // meaningful; real download paths report bytes.
849    let units = if hill.epoch_bytes > 0 {
850        hill.epoch_bytes as f64
851    } else {
852        hill.epoch_successes as f64
853    };
854    Some(HillEpochStats {
855        goodput_per_sec: units / elapsed_secs,
856        latency_p95,
857    })
858}
859
860fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
861    let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
862        .max(cfg.min_concurrency)
863        .min(cfg.max_concurrency);
864    if next != inner.current {
865        debug!(
866            from = inner.current,
867            to = next,
868            "adaptive: fetch hill stress decrease"
869        );
870    }
871    inner.current = next;
872    inner.hill.best_concurrency = next;
873    inner.hill.best_goodput_per_sec = None;
874    inner.hill.best_latency_p95 = None;
875    inner.hill.stable_epochs = 0;
876    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
877    inner.hill.active_probe = None;
878    inner.hill.next_probe = ProbeDirection::Up;
879    inner.hill.reset_epoch();
880}
881
882fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
883    let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
884        inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
885        inner.hill.best_latency_p95 = stats.latency_p95;
886        inner.hill.best_concurrency = inner.current;
887        probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
888        return;
889    };
890
891    match inner.hill.active_probe {
892        Some(ProbeDirection::Up) => {
893            let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
894            if improved
895                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
896            {
897                accept_hill_probe(inner, stats, cfg);
898                probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
899            } else {
900                reject_hill_probe(inner);
901            }
902        }
903        Some(ProbeDirection::Down) => {
904            let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
905            if retained
906                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
907            {
908                accept_hill_probe(inner, stats, cfg);
909                inner.hill.next_probe = ProbeDirection::Up;
910            } else {
911                reject_hill_probe(inner);
912            }
913        }
914        None => {
915            refresh_hill_best(inner, stats, cfg);
916            if inner.hill.cooldown_epochs > 0 {
917                inner.hill.cooldown_epochs -= 1;
918                return;
919            }
920            inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
921            if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
922                let direction = inner.hill.next_probe;
923                inner.hill.next_probe = match direction {
924                    ProbeDirection::Up => ProbeDirection::Down,
925                    ProbeDirection::Down => ProbeDirection::Up,
926                };
927                probe_hill_neighbor(inner, direction, cfg);
928            }
929        }
930    }
931}
932
933fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
934    inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
935        Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
936        None => stats.goodput_per_sec,
937    });
938    if let Some(latency_p95) = stats.latency_p95 {
939        inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
940            Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
941            None => latency_p95,
942        });
943    }
944}
945
946fn hill_latency_acceptable(
947    candidate: Option<Duration>,
948    best: Option<Duration>,
949    cfg: &LimiterConfig,
950) -> bool {
951    match (candidate, best) {
952        (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
953        _ => true,
954    }
955}
956
957fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
958    let alpha = if alpha.is_finite() {
959        alpha.clamp(0.0, 1.0)
960    } else {
961        return prev;
962    };
963    let next = (1.0 - alpha) * prev + alpha * sample;
964    if next.is_finite() && next >= 0.0 {
965        next
966    } else {
967        prev
968    }
969}
970
971fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
972    debug!(
973        concurrency = inner.current,
974        goodput_per_sec = stats.goodput_per_sec,
975        "adaptive: fetch hill accepted probe"
976    );
977    inner.hill.best_concurrency = inner.current;
978    inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
979    inner.hill.best_latency_p95 = stats.latency_p95;
980    inner.hill.active_probe = None;
981    inner.hill.cooldown_epochs = 0;
982    inner.hill.stable_epochs = 0;
983    inner.current = inner
984        .hill
985        .best_concurrency
986        .clamp(cfg.min_concurrency, cfg.max_concurrency);
987}
988
989fn reject_hill_probe(inner: &mut LimiterInner) {
990    let from = inner.current;
991    let to = inner.hill.best_concurrency;
992    let rejected_direction = inner.hill.active_probe;
993    if from != to {
994        debug!(from, to, "adaptive: fetch hill rejected probe");
995    }
996    inner.current = to;
997    inner.hill.active_probe = None;
998    if let Some(direction) = rejected_direction {
999        inner.hill.next_probe = match direction {
1000            ProbeDirection::Up => ProbeDirection::Down,
1001            ProbeDirection::Down => ProbeDirection::Up,
1002        };
1003    }
1004    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1005    inner.hill.stable_epochs = 0;
1006}
1007
1008fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1009    let best = inner.hill.best_concurrency;
1010    let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1011    let candidate = match direction {
1012        ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1013        ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1014    };
1015    if candidate == best {
1016        inner.current = best;
1017        inner.hill.active_probe = None;
1018        inner.hill.stable_epochs = 0;
1019        return;
1020    }
1021    debug!(
1022        from = best,
1023        to = candidate,
1024        ?direction,
1025        "adaptive: fetch hill probing"
1026    );
1027    inner.current = candidate;
1028    inner.hill.active_probe = Some(direction);
1029    inner.hill.stable_epochs = 0;
1030}
1031
1032/// Bundle of per-channel limiters owned by the `Client`.
1033#[derive(Debug, Clone)]
1034pub struct AdaptiveController {
1035    pub quote: Limiter,
1036    pub store: Limiter,
1037    pub fetch: Limiter,
1038    /// `pub(crate)` so external callers cannot mutate this
1039    /// post-construction. Each `Limiter` snapshots its own
1040    /// `Arc<LimiterConfig>` at construction time, so external
1041    /// mutation here would silently desync `warm_start`'s
1042    /// `enabled` check from the limiters' frozen copies. Read via
1043    /// `config()`.
1044    pub(crate) config: AdaptiveConfig,
1045    /// Per-instance cold-start values. `warm_start` floors snapshot
1046    /// values against THIS, not the global `ChannelStart::default()`,
1047    /// so a controller built with custom (e.g. low) starts stays
1048    /// faithful to its construction parameters. Constructed-once,
1049    /// never mutated.
1050    cold_start: ChannelStart,
1051}
1052
1053impl AdaptiveController {
1054    /// Create a controller with cold-start values per channel.
1055    /// Sanitizes the config (NaN guards, floor/ceiling enforcement)
1056    /// before constructing limiters. The supplied `start` is captured
1057    /// as the per-instance cold-start floor for `warm_start`.
1058    #[must_use]
1059    pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1060        let mut config = config;
1061        config.sanitize();
1062        let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1063        let mut store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1064        // Store-channel growth/decision tuning (V2-468). The store limiter
1065        // starts at 8 (correct — deliberately low for low-bandwidth uplinks)
1066        // but on the merkle upload path its health signals are polluted by two
1067        // things that are NOT local-capacity signals, so it never ramps and
1068        // gets crushed to a +1-per-window crawl. Both are the structural twin
1069        // of the fetch-channel overrides below (verification variance instead
1070        // of retry variance); the cold-start floor is deliberately untouched.
1071        //
1072        // - Disable the p95-latency Decrease. Node-side PUT latency is
1073        //   dominated by the ~28s synchronous merkle closeness lookup, giving a
1074        //   client-observed p95/median of ~3-6x that straddles
1075        //   `latency_inflation_factor` (4.0) and trips Decrease even though
1076        //   nothing about it is local congestion. Genuine store congestion
1077        //   still surfaces via the timeout-rate ceiling.
1078        // - Never exit slow-start. With the default threshold 0, any single
1079        //   Decrease at any cap permanently drops the store cap to additive
1080        //   +1-per-healthy-window growth, which cannot reach a useful cap
1081        //   before a file finishes (843 chunks stuck at effective ~5-9 in the
1082        //   PROD-UL-01 incident). `usize::MAX` keeps slow-start armed at every
1083        //   cap, so a transient Decrease still halves but the next healthy
1084        //   window doubles it back instead of condemning the rest of the file
1085        //   to a crawl. See the fetch override and `LimiterConfig` field docs.
1086        store_cfg.latency_decrease_enabled = false;
1087        store_cfg.slow_start_ramp_threshold = usize::MAX;
1088        let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1089        // Lift the fetch channel's floor above the global
1090        // `min_concurrency`. Reasoning is specific to download: on
1091        // residential links, residual peer-side timeouts (NAT path
1092        // issues, peers in the close group that don't store the chunk,
1093        // peers under temporary load) continuously push the
1094        // controller's timeout_rate above ceiling. A global floor of 1
1095        // means the controller fully serializes chunk fetches on that
1096        // noise floor and gets stuck — observed on PROD-LOCAL-DL-03
1097        // where the download stayed stable but throughput collapsed to
1098        // ~330 KB/s on a multi-MB/s link.
1099        //
1100        // 4 is the smallest floor that keeps the download from fully
1101        // serializing; it also matches the validated cold-start floor.
1102        // Floor `quote` and `store` separately if a corresponding
1103        // pathology is identified for them; today's evidence is
1104        // download-only.
1105        fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1106        // Re-establish max >= min after the bump in case the channel
1107        // ceiling was somehow lower than the new floor.
1108        fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1109        // Download-specific growth/decision tuning (see the field docs
1110        // on `LimiterConfig`):
1111        //
1112        // - Never exit slow-start. Classic AIMD additive growth (+1 per
1113        //   healthy window) cannot reach a useful cap from a low base
1114        //   before a multi-GB file finishes — a fast-but-lossy
1115        //   connection (e.g. a VPS with a steady ~4% close-group-
1116        //   exhaustion trickle) was observed stuck at cap ~13-24 across
1117        //   36 files because every transient Decrease permanently
1118        //   dropped it to additive growth. `usize::MAX` keeps slow-start
1119        //   armed at every cap including the ceiling, so a Decrease
1120        //   (e.g. 256 -> 128) still halves but the next healthy window
1121        //   doubles it back. The cap therefore tracks the connection's
1122        //   real capacity instead of crawling, and a single transient
1123        //   Decrease near the ceiling can't re-pin the link to additive
1124        //   recovery. (A threshold == max_concurrency would NOT achieve
1125        //   this: `current >= threshold` is true at the ceiling, so a
1126        //   Decrease there would exit slow-start.)
1127        // - Disable the p95-latency Decrease. chunk_get's observed
1128        //   latency includes the internal retry sleep + slow retry
1129        //   sweep for chunks that needed one, so a window with a couple
1130        //   of retry-path chunks shows a hugely inflated p95 that is
1131        //   retry variance, not congestion. Genuine fetch congestion
1132        //   still drives Decrease via the Ok(None) -> Timeout rate.
1133        fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1134        fetch_cfg.latency_decrease_enabled = false;
1135        Self {
1136            quote: Limiter::new(start.quote, quote_cfg),
1137            store: Limiter::new(start.store, store_cfg),
1138            fetch: Limiter::new_with_algorithm(
1139                start.fetch,
1140                fetch_cfg,
1141                LimiterAlgorithm::ThroughputHillClimb,
1142            ),
1143            config,
1144            cold_start: start,
1145        }
1146    }
1147
1148    /// Snapshot current per-channel caps for persistence.
1149    #[must_use]
1150    pub fn snapshot(&self) -> ChannelStart {
1151        ChannelStart {
1152            quote: self.quote.snapshot(),
1153            store: self.store.snapshot(),
1154            fetch: self.fetch.snapshot(),
1155        }
1156    }
1157
1158    /// Read-only access to the controller's adaptive config. Made
1159    /// read-only deliberately: each `Limiter` snapshots its own
1160    /// `Arc<LimiterConfig>` at construction, so post-hoc mutation
1161    /// would silently desync `warm_start`'s `enabled` check from
1162    /// the limiters' frozen copies.
1163    #[must_use]
1164    pub fn config(&self) -> &AdaptiveConfig {
1165        &self.config
1166    }
1167
1168    /// Apply a previously-saved snapshot as the warm-start cap.
1169    ///
1170    /// The effective warm value per channel is
1171    /// `max(snapshot, self.cold_start)` — flooring at the
1172    /// per-instance cold-start (NOT the global default) so:
1173    /// 1. A prior bad run that pinned cap=1 doesn't pessimize this
1174    ///    run forever.
1175    /// 2. A controller built with custom (e.g. low) cold starts for
1176    ///    benchmarking is not silently jumped above its construction
1177    ///    parameters.
1178    ///
1179    /// Does not clear sliding windows. When `enabled = false`, this
1180    /// is a no-op — fixed-concurrency mode means fixed-concurrency.
1181    pub fn warm_start(&self, snapshot: ChannelStart) {
1182        if !self.config.enabled {
1183            return;
1184        }
1185        self.quote
1186            .warm_start(snapshot.quote.max(self.cold_start.quote));
1187        self.store
1188            .warm_start(snapshot.store.max(self.cold_start.store));
1189        self.fetch
1190            .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1191    }
1192}
1193
1194impl Default for AdaptiveController {
1195    fn default() -> Self {
1196        Self::new(ChannelStart::default(), AdaptiveConfig::default())
1197    }
1198}
1199
1200/// Cancel-on-drop guard: if the wrapping future is dropped before
1201/// completion, record no outcome. We don't synthesize a Cancelled
1202/// signal because (a) dropped work was never observed by the network
1203/// and (b) injecting fake outcomes would skew the sliding window
1204/// after a fail-fast burst. The intentional behavior is "silent on
1205/// cancel, observe on completion" — callers that need to keep
1206/// fail-fast batches drained for full signal use `rebucketed`.
1207struct ObserveGuard<'a> {
1208    limiter: &'a Limiter,
1209    started: Instant,
1210    outcome: Option<(Outcome, Duration, u64)>,
1211}
1212
1213impl<'a> ObserveGuard<'a> {
1214    fn new(limiter: &'a Limiter) -> Self {
1215        Self {
1216            limiter,
1217            started: Instant::now(),
1218            outcome: None,
1219        }
1220    }
1221    fn finish(&mut self, outcome: Outcome) {
1222        self.finish_with_bytes(outcome, 0);
1223    }
1224
1225    fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1226        self.outcome = Some((outcome, self.started.elapsed(), bytes));
1227    }
1228}
1229
1230impl Drop for ObserveGuard<'_> {
1231    fn drop(&mut self) {
1232        if let Some((outcome, latency, bytes)) = self.outcome.take() {
1233            self.limiter
1234                .observe_with_timing(outcome, latency, bytes, self.started);
1235        }
1236    }
1237}
1238
1239/// Helper for instrumented call sites: time an async op, classify the
1240/// result, and report to a `Limiter`. Returns the original result.
1241///
1242/// ## Cancellation safety
1243///
1244/// Uses an internal `ObserveGuard` so the recorded outcome is
1245/// committed via `Drop` after the inner future returns. If the
1246/// wrapper future is itself dropped before `op().await` resolves
1247/// (caller cancellation, `buffer_unordered` fail-fast), no outcome
1248/// is recorded — this is intentional, see the guard's docs.
1249///
1250/// ```ignore
1251/// let res = observe_op(&controller.store, || async { do_put().await }, classify_put_err).await;
1252/// ```
1253pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1254where
1255    F: FnOnce() -> Fut,
1256    Fut: std::future::Future<Output = Result<T, E>>,
1257    C: FnOnce(&E) -> Outcome,
1258{
1259    let mut guard = ObserveGuard::new(limiter);
1260    let result = op().await;
1261    let outcome = match &result {
1262        Ok(_) => Outcome::Success,
1263        Err(e) => classify(e),
1264    };
1265    guard.finish(outcome);
1266    drop(guard); // commit observation explicitly so it lands before return
1267    result
1268}
1269
1270/// Byte-aware variant of [`observe_op`] for fetch paths. The success
1271/// byte extractor is called only for `Ok` results; errors still carry
1272/// zero bytes and are classified by the provided function.
1273pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1274    limiter: &Limiter,
1275    op: F,
1276    classify: C,
1277    success_bytes: B,
1278) -> Result<T, E>
1279where
1280    F: FnOnce() -> Fut,
1281    Fut: std::future::Future<Output = Result<T, E>>,
1282    C: FnOnce(&E) -> Outcome,
1283    B: FnOnce(&T) -> u64,
1284{
1285    let mut guard = ObserveGuard::new(limiter);
1286    let result = op().await;
1287    match &result {
1288        Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1289        Err(e) => guard.finish_with_bytes(classify(e), 0),
1290    }
1291    drop(guard);
1292    result
1293}
1294
1295/// Process an iterator of items with a rolling scheduler whose cap
1296/// is re-read from the limiter as each slot frees. Replaces the
1297/// "snapshot the cap once at pipeline build" behavior of plain
1298/// `buffer_unordered(N)` so a long pipeline (e.g. 10 GB download =
1299/// ~2500 chunks) sees adaptive growth/decay mid-flight.
1300///
1301/// Output is unordered (first-completion). For an ordered result
1302/// (e.g. `data_download` feeds chunks in DataMap order to
1303/// self_encryption decrypt), wrap items with their index and sort
1304/// after collection — see `rebucketed_ordered`.
1305///
1306/// On error: in-flight work drains to completion (so observed
1307/// outcomes still feed the controller) but no new launches happen.
1308/// The first error is preserved; later errors are discarded.
1309pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1310    limiter: &Limiter,
1311    items: I,
1312    mut op: F,
1313) -> Result<Vec<T>, E>
1314where
1315    I: IntoIterator,
1316    F: FnMut(I::Item) -> Fut,
1317    Fut: std::future::Future<Output = Result<T, E>>,
1318{
1319    let mut iter = items.into_iter().peekable();
1320    let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1321    let mut results = Vec::new();
1322    let mut pending_err: Option<E> = None;
1323    loop {
1324        // Refill: re-read the cap and launch up to `cap - in_flight.len()`
1325        // new items, but only if we are not already in error-stop.
1326        if pending_err.is_none() {
1327            let cap = limiter.current().max(1);
1328            while in_flight.len() < cap {
1329                match iter.next() {
1330                    Some(item) => in_flight.push(op(item)),
1331                    None => break,
1332                }
1333            }
1334        }
1335        if in_flight.is_empty() {
1336            break;
1337        }
1338        match in_flight.next().await {
1339            Some(Ok(v)) => results.push(v),
1340            Some(Err(e)) => {
1341                if pending_err.is_none() {
1342                    pending_err = Some(e);
1343                }
1344            }
1345            None => break,
1346        }
1347    }
1348    match pending_err {
1349        Some(e) => Err(e),
1350        None => Ok(results),
1351    }
1352}
1353
1354/// Ordered variant: items are tagged with a usize index by the
1355/// caller (typically by `iter.enumerate()`); after rolling
1356/// completion, results are sorted by index so output preserves
1357/// input order. Use this for callers that pass to APIs which
1358/// consume positionally (e.g. self_encryption's
1359/// `get_root_data_map_parallel` zips `Vec<(idx, Bytes)>` with input
1360/// hashes positionally and discards the idx — without a final sort
1361/// the bytes pair with the wrong hashes).
1362///
1363/// `op` is `FnMut(Item) -> Fut` where `Item` carries whatever
1364/// payload the caller needs; the closure must return
1365/// `Result<(usize, U), E>` so the wrapper can sort by the index.
1366pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1367    limiter: &Limiter,
1368    items: I,
1369    op: F,
1370) -> Result<Vec<U>, E>
1371where
1372    I: IntoIterator,
1373    F: FnMut(I::Item) -> Fut,
1374    Fut: std::future::Future<Output = Result<(usize, U), E>>,
1375{
1376    let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1377    indexed.sort_by_key(|(idx, _)| *idx);
1378    Ok(indexed.into_iter().map(|(_, v)| v).collect())
1379}
1380
1381/// Backward-compatible wrapper. `ordered = false` -> rolling
1382/// unordered. `ordered = true` -> the OLD batch-fence ordered path
1383/// (kept for tests that explicitly assert batch-fence semantics).
1384/// New call sites should use `rebucketed_unordered` or
1385/// `rebucketed_ordered` directly.
1386pub async fn rebucketed<I, T, E, F, Fut>(
1387    limiter: &Limiter,
1388    items: I,
1389    ordered: bool,
1390    mut op: F,
1391) -> Result<Vec<T>, E>
1392where
1393    I: IntoIterator,
1394    F: FnMut(I::Item) -> Fut,
1395    Fut: std::future::Future<Output = Result<T, E>>,
1396{
1397    if !ordered {
1398        return rebucketed_unordered(limiter, items, op).await;
1399    }
1400    let mut iter = items.into_iter();
1401    let mut results = Vec::new();
1402    let mut pending_err: Option<E> = None;
1403    loop {
1404        if pending_err.is_some() {
1405            break;
1406        }
1407        let cap = limiter.current().max(1);
1408        let mut batch = Vec::with_capacity(cap);
1409        for item in iter.by_ref().take(cap) {
1410            batch.push(op(item));
1411        }
1412        if batch.is_empty() {
1413            break;
1414        }
1415        let mut s = stream::iter(batch).buffered(cap);
1416        while let Some(r) = s.next().await {
1417            match r {
1418                Ok(v) => results.push(v),
1419                Err(e) => {
1420                    if pending_err.is_none() {
1421                        pending_err = Some(e);
1422                    }
1423                }
1424            }
1425        }
1426    }
1427    match pending_err {
1428        Some(e) => Err(e),
1429        None => Ok(results),
1430    }
1431}
1432
1433/// On-disk shape for the persisted adaptive state. Versioned so we
1434/// can evolve the controller without crashing on stale files — an
1435/// unknown future schema version simply causes a silent fallback to
1436/// cold defaults.
1437#[derive(Debug, Clone, Serialize, Deserialize)]
1438struct PersistedState {
1439    schema: u32,
1440    channels: ChannelStart,
1441}
1442
1443const PERSIST_SCHEMA: u32 = 2;
1444const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1445const PERSIST_FILENAME: &str = "client_adaptive.json";
1446
1447/// Default persistence path: `<data_dir>/client_adaptive.json`. Falls
1448/// back to `None` if the platform data dir is not resolvable; in that
1449/// case the controller still works, it just won't persist.
1450#[must_use]
1451pub fn default_persist_path() -> Option<PathBuf> {
1452    crate::config::data_dir()
1453        .ok()
1454        .map(|d| d.join(PERSIST_FILENAME))
1455}
1456
1457/// Load a persisted snapshot from disk, returning `None` if the file
1458/// does not exist, is unreadable, contains malformed JSON, or has a
1459/// schema version this build does not understand. Persistence is best
1460/// effort — never propagate errors that would block the user's
1461/// operation.
1462#[must_use]
1463pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1464    let bytes = std::fs::read(path).ok()?;
1465    let state: PersistedState = match serde_json::from_slice(&bytes) {
1466        Ok(s) => s,
1467        Err(e) => {
1468            warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1469            return None;
1470        }
1471    };
1472    match state.schema {
1473        PERSIST_SCHEMA => Some(state.channels),
1474        PERSIST_SCHEMA_AIMD_FETCH => {
1475            debug!(
1476                path = %path.display(),
1477                "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1478            );
1479            Some(ChannelStart {
1480                fetch: FETCH_COLD_START_CONCURRENCY,
1481                ..state.channels
1482            })
1483        }
1484        schema => {
1485            debug!(
1486                path = %path.display(),
1487                schema,
1488                expected = PERSIST_SCHEMA,
1489                "adaptive: snapshot schema mismatch, ignoring",
1490            );
1491            None
1492        }
1493    }
1494}
1495
1496/// Save a snapshot to disk atomically (write to `<path>.tmp`, then
1497/// rename). Best effort — failures are logged at warn and discarded.
1498pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1499    let state = PersistedState {
1500        schema: PERSIST_SCHEMA,
1501        channels,
1502    };
1503    let bytes = match serde_json::to_vec_pretty(&state) {
1504        Ok(b) => b,
1505        Err(e) => {
1506            warn!(error = %e, "adaptive: snapshot serialize failed");
1507            return;
1508        }
1509    };
1510    if let Some(parent) = path.parent() {
1511        if let Err(e) = std::fs::create_dir_all(parent) {
1512            warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1513            return;
1514        }
1515    }
1516    // Unique-per-save temp filename: PID + monotonic counter +
1517    // nanosecond timestamp guarantees no collision between concurrent
1518    // CLI invocations OR concurrent save_snapshot calls within one
1519    // process (e.g. multiple Client instances sharing the same data
1520    // dir). POSIX rename is atomic on the destination, so the rename
1521    // target overlap is fine — last writer wins.
1522    let nanos = std::time::SystemTime::now()
1523        .duration_since(std::time::UNIX_EPOCH)
1524        .map(|d| d.subsec_nanos())
1525        .unwrap_or(0);
1526    let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1527    let tmp = path.with_extension(format!(
1528        "json.tmp.{}.{}.{}",
1529        std::process::id(),
1530        counter,
1531        nanos
1532    ));
1533    if let Err(e) = std::fs::write(&tmp, &bytes) {
1534        warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1535        return;
1536    }
1537    if let Err(e) = std::fs::rename(&tmp, path) {
1538        warn!(
1539            from = %tmp.display(),
1540            to = %path.display(),
1541            error = %e,
1542            "adaptive: snapshot rename failed",
1543        );
1544        // Try to clean up the temp on rename failure so we don't
1545        // leave junk in the data dir. Best effort.
1546        let _ = std::fs::remove_file(&tmp);
1547    }
1548}
1549
1550/// Save with a wall-clock deadline. Spawns the synchronous
1551/// `save_snapshot` on a detached thread and waits up to `timeout`
1552/// for it to finish. If the thread is still running past the
1553/// deadline (e.g. because the data dir is on a hung NFS mount),
1554/// returns without joining — the OS will clean up the thread when
1555/// the process exits.
1556///
1557/// Used by `Client::drop` so a stalled filesystem cannot block
1558/// process shutdown indefinitely.
1559pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1560    let handle = std::thread::spawn(move || {
1561        save_snapshot(&path, channels);
1562    });
1563    // Park briefly waiting for the thread, polling its status. We
1564    // use a short polling interval rather than `join()` because
1565    // join() blocks indefinitely.
1566    let started = Instant::now();
1567    let poll = Duration::from_millis(5);
1568    while started.elapsed() < timeout {
1569        if handle.is_finished() {
1570            let _ = handle.join();
1571            return;
1572        }
1573        std::thread::sleep(poll);
1574    }
1575    // Deadline elapsed. Detach the thread; it will continue to run
1576    // in the background until process exit (its work is best-effort
1577    // anyway). Log so operators can see the slow filesystem.
1578    warn!(
1579        timeout_ms = timeout.as_millis() as u64,
1580        "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1581    );
1582    drop(handle);
1583}
1584
1585#[cfg(test)]
1586#[allow(clippy::unwrap_used)]
1587mod tests {
1588    use super::*;
1589
1590    const HILL_TEST_START_CAP: usize = 16;
1591    const HILL_TEST_UP_PROBE_CAP: usize = 20;
1592    const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1593    const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1594    const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1595    const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1596    const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1597    const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1598    const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1599
1600    fn cfg_for_tests() -> LimiterConfig {
1601        LimiterConfig {
1602            enabled: true,
1603            min_concurrency: 1,
1604            max_concurrency: 64,
1605            window_ops: 10,
1606            min_window_ops: 5,
1607            success_target: 0.9,
1608            timeout_ceiling: 0.2,
1609            latency_inflation_factor: 2.0,
1610            latency_ewma_alpha: 0.5,
1611            slow_start_ramp_threshold: 0,
1612            latency_decrease_enabled: true,
1613        }
1614    }
1615
1616    fn hill_cfg_for_tests() -> LimiterConfig {
1617        LimiterConfig {
1618            window_ops: 4,
1619            min_window_ops: 2,
1620            max_concurrency: 64,
1621            success_target: 0.9,
1622            timeout_ceiling: 0.2,
1623            ..cfg_for_tests()
1624        }
1625    }
1626
1627    fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1628        Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1629    }
1630
1631    fn observe_hill_success_epoch_with_latency(
1632        limiter: &Limiter,
1633        cfg: &LimiterConfig,
1634        bytes: u64,
1635        latency: Duration,
1636    ) {
1637        let samples = hill_epoch_target_samples(limiter.current(), cfg);
1638        for _ in 0..samples {
1639            limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1640        }
1641    }
1642
1643    fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1644        observe_hill_success_epoch_with_latency(
1645            limiter,
1646            cfg,
1647            bytes,
1648            Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1649        );
1650    }
1651
1652    /// Build an `AdaptiveConfig` for tests that need to construct a
1653    /// full `AdaptiveController`. Mirrors `cfg_for_tests()` defaults
1654    /// where they overlap, plus per-channel max derived from the same
1655    /// `max_concurrency` value.
1656    fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1657        let l = cfg_for_tests();
1658        AdaptiveConfig {
1659            enabled: l.enabled,
1660            min_concurrency: l.min_concurrency,
1661            max: ChannelMax {
1662                quote: l.max_concurrency,
1663                store: l.max_concurrency,
1664                fetch: l.max_concurrency,
1665            },
1666            window_ops: l.window_ops,
1667            min_window_ops: l.min_window_ops,
1668            success_target: l.success_target,
1669            timeout_ceiling: l.timeout_ceiling,
1670            latency_inflation_factor: l.latency_inflation_factor,
1671            latency_ewma_alpha: l.latency_ewma_alpha,
1672        }
1673    }
1674
1675    #[test]
1676    fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1677        // Regression guard for the CLI multi-file pattern: each
1678        // `ant file download` is a fresh process that warm-starts from
1679        // the persisted snapshot. If warm_start exited slow-start, the
1680        // fetch cap could only grow additively from the warm value and
1681        // could never climb back to the ceiling against an intermittent
1682        // Decrease trickle. A protected limiter (threshold == max) that
1683        // warm-starts BELOW the ceiling must keep slow-start armed so it
1684        // doubles back up.
1685        let cfg = LimiterConfig {
1686            max_concurrency: 256,
1687            slow_start_ramp_threshold: 256,
1688            latency_decrease_enabled: false,
1689            ..cfg_for_tests()
1690        };
1691        let l = Limiter::new(64, cfg.clone());
1692        l.warm_start(20);
1693        assert_eq!(l.current(), 20);
1694        // A single healthy window should DOUBLE (slow-start armed),
1695        // proving warm_start did not exit slow-start.
1696        for _ in 0..cfg.window_ops {
1697            l.observe(Outcome::Success, Duration::from_millis(10));
1698        }
1699        assert_eq!(
1700            l.current(),
1701            40,
1702            "protected channel must double after warm_start, not crawl +1",
1703        );
1704
1705        // Default channel (threshold 0): warm_start exits slow-start,
1706        // so the same window only adds 1.
1707        let default_cfg = LimiterConfig {
1708            max_concurrency: 256,
1709            ..cfg_for_tests()
1710        };
1711        let d = Limiter::new(64, default_cfg.clone());
1712        d.warm_start(20);
1713        for _ in 0..default_cfg.window_ops {
1714            d.observe(Outcome::Success, Duration::from_millis(10));
1715        }
1716        assert_eq!(
1717            d.current(),
1718            21,
1719            "default channel must stay additive after warm_start",
1720        );
1721    }
1722
1723    #[test]
1724    fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1725        // Regression for the "lost protection at the ceiling" bug.
1726        // threshold == usize::MAX (the fetch setting) keeps slow-start
1727        // armed even when a Decrease fires at the ceiling, so the cap
1728        // doubles back. threshold == max_concurrency (the buggy
1729        // setting) would exit slow-start there — `current >= threshold`
1730        // is true at the ceiling — and recover only additively. After
1731        // identical stress-at-ceiling + recovery, the MAX-threshold
1732        // limiter must end strictly higher.
1733        let base = LimiterConfig {
1734            max_concurrency: 256,
1735            latency_decrease_enabled: false,
1736            ..cfg_for_tests()
1737        };
1738        let fixed = Limiter::new(
1739            256,
1740            LimiterConfig {
1741                slow_start_ramp_threshold: usize::MAX,
1742                ..base.clone()
1743            },
1744        );
1745        let buggy = Limiter::new(
1746            256,
1747            LimiterConfig {
1748                slow_start_ramp_threshold: 256,
1749                ..base.clone()
1750            },
1751        );
1752        for l in [&fixed, &buggy] {
1753            for _ in 0..base.window_ops {
1754                l.observe(Outcome::Timeout, Duration::from_millis(10));
1755            }
1756            for _ in 0..(base.window_ops * 10) {
1757                l.observe(Outcome::Success, Duration::from_millis(10));
1758            }
1759        }
1760        assert!(
1761            fixed.current() > buggy.current(),
1762            "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1763            fixed.current(),
1764            buggy.current(),
1765        );
1766    }
1767
1768    #[test]
1769    fn protected_slow_start_recovers_faster_than_additive() {
1770        // After identical stress + recovery, a limiter with slow-start
1771        // protected to the ceiling (fetch behaviour) must end at a
1772        // higher cap than one that exits slow-start on first Decrease
1773        // (quote/store behaviour): doubling outpaces +1-per-window.
1774        let base = LimiterConfig {
1775            max_concurrency: 256,
1776            latency_decrease_enabled: false,
1777            ..cfg_for_tests()
1778        };
1779        let protected = Limiter::new(
1780            64,
1781            LimiterConfig {
1782                slow_start_ramp_threshold: 256,
1783                ..base.clone()
1784            },
1785        );
1786        let unprotected = Limiter::new(
1787            64,
1788            LimiterConfig {
1789                slow_start_ramp_threshold: 0,
1790                ..base.clone()
1791            },
1792        );
1793
1794        // Identical stress: a window of timeouts forces decreases on both.
1795        for l in [&protected, &unprotected] {
1796            for _ in 0..base.window_ops {
1797                l.observe(Outcome::Timeout, Duration::from_millis(10));
1798            }
1799        }
1800        // Identical recovery: a long stretch of healthy windows. The
1801        // protected limiter doubles each window; the unprotected one
1802        // only adds 1.
1803        for l in [&protected, &unprotected] {
1804            for _ in 0..(base.window_ops * 10) {
1805                l.observe(Outcome::Success, Duration::from_millis(10));
1806            }
1807        }
1808        assert!(
1809            protected.current() > unprotected.current(),
1810            "protected slow-start ({}) should recover faster than additive ({})",
1811            protected.current(),
1812            unprotected.current(),
1813        );
1814    }
1815
1816    #[test]
1817    fn latency_decrease_disabled_ignores_p95_inflation() {
1818        // With latency_decrease_enabled=false, a window of successes
1819        // whose p95 latency is far above the baseline must NOT trigger
1820        // a Decrease — only success/timeout rate can. (Fetch disables
1821        // this because chunk_get's observed latency is polluted by
1822        // retry-path variance.)
1823        let cfg = LimiterConfig {
1824            max_concurrency: 256,
1825            slow_start_ramp_threshold: 256,
1826            latency_decrease_enabled: false,
1827            ..cfg_for_tests()
1828        };
1829        let l = Limiter::new(16, cfg.clone());
1830        // Establish a fast baseline.
1831        for _ in 0..cfg.window_ops {
1832            l.observe(Outcome::Success, Duration::from_millis(5));
1833        }
1834        let after_baseline = l.current();
1835        // Now a window of successes with 100x the latency. With the
1836        // latency check disabled this is still a healthy window, so the
1837        // cap must not drop.
1838        for _ in 0..cfg.window_ops {
1839            l.observe(Outcome::Success, Duration::from_millis(500));
1840        }
1841        assert!(
1842            l.current() >= after_baseline,
1843            "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1844            l.current(),
1845            after_baseline,
1846        );
1847    }
1848
1849    #[test]
1850    fn controller_sets_fetch_channel_download_tuning() {
1851        // AdaptiveController::new must apply the slow-start /
1852        // latency-decrease tuning to fetch AND store (V2-468), leaving
1853        // quote on classic AIMD.
1854        let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1855        assert!(
1856            !c.fetch.config.latency_decrease_enabled,
1857            "fetch latency-decrease must be disabled",
1858        );
1859        assert_eq!(
1860            c.fetch.config.slow_start_ramp_threshold,
1861            usize::MAX,
1862            "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1863        );
1864        assert!(
1865            c.quote.config.latency_decrease_enabled,
1866            "quote must keep the latency-decrease check",
1867        );
1868        assert_eq!(
1869            c.quote.config.slow_start_ramp_threshold, 0,
1870            "quote must keep classic AIMD slow-start exit",
1871        );
1872        // Store now mirrors fetch on these two knobs: node-side merkle
1873        // verification latency is not local congestion, and a transient
1874        // Decrease must not condemn the cap to a +1-per-window crawl.
1875        assert!(
1876            !c.store.config.latency_decrease_enabled,
1877            "store latency-decrease must be disabled (verification variance is not congestion)",
1878        );
1879        assert_eq!(
1880            c.store.config.slow_start_ramp_threshold,
1881            usize::MAX,
1882            "store slow-start must never exit so a transient Decrease re-doubles",
1883        );
1884        // The store floor must stay at the cold-start value — V2-468 does
1885        // NOT change the floor, only the polluted ramp/decrease signals.
1886        assert_eq!(
1887            c.store.current(),
1888            ChannelStart::default().store,
1889            "store cold-start floor must remain unchanged at 8",
1890        );
1891    }
1892
1893    #[test]
1894    fn store_channel_ramps_and_recovers_under_v2_468_tuning() {
1895        // End-to-end on the real `controller.store` limiter: with the
1896        // V2-468 tuning, (a) verification-latency p95 inflation alone must
1897        // not shrink the cap, (b) a genuine timeout burst still cuts it,
1898        // and (c) the cap re-doubles on the next healthy window instead of
1899        // crawling +1 (slow-start stays armed).
1900        let mut adaptive = adaptive_cfg_for_tests();
1901        // Give the store channel real headroom to ramp.
1902        adaptive.max.store = 256;
1903        let c = AdaptiveController::new(
1904            ChannelStart {
1905                quote: 8,
1906                store: 8,
1907                fetch: 8,
1908            },
1909            adaptive,
1910        );
1911        let store = &c.store;
1912        let win = c.config().window_ops;
1913
1914        // (a) Establish a fast baseline, then a window of slow successes
1915        // (the ~28s verification tail). The cap must not drop.
1916        for _ in 0..win {
1917            store.observe(Outcome::Success, Duration::from_millis(5));
1918        }
1919        let after_baseline = store.current();
1920        assert!(after_baseline >= 8, "store should ramp on healthy windows");
1921        for _ in 0..win {
1922            store.observe(Outcome::Success, Duration::from_secs(30));
1923        }
1924        assert!(
1925            store.current() >= after_baseline,
1926            "verification-latency p95 must not shrink store cap: {} < {}",
1927            store.current(),
1928            after_baseline,
1929        );
1930
1931        // (b) A genuine local-congestion timeout burst must still cut it.
1932        let before_stress = store.current();
1933        for _ in 0..win {
1934            store.observe(Outcome::Timeout, Duration::from_millis(50));
1935        }
1936        let after_stress = store.current();
1937        assert!(
1938            after_stress < before_stress,
1939            "timeout-rate breach must still cut the store cap: {after_stress} !< {before_stress}",
1940        );
1941
1942        // (c) Slow-start stays armed, so healthy windows re-DOUBLE the cap
1943        // back to where it was instead of crawling +1 per window. Over this
1944        // many windows additive +1 recovery could not climb back to
1945        // `before_stress` from the stressed floor — only multiplicative
1946        // doubling can — so reaching it proves the crawl pathology is gone.
1947        for _ in 0..(win * 8) {
1948            store.observe(Outcome::Success, Duration::from_millis(5));
1949        }
1950        assert!(
1951            store.current() >= before_stress,
1952            "store must re-double back to {before_stress} after a transient Decrease, got {}",
1953            store.current(),
1954        );
1955    }
1956
1957    #[test]
1958    fn store_application_rejections_do_not_move_cap() {
1959        // The merkle incident's 397 remote app-rejections (now classified
1960        // ApplicationError via `Error::RemotePut`) must not push the store
1961        // cap down — they are not capacity signals.
1962        let mut adaptive = adaptive_cfg_for_tests();
1963        adaptive.max.store = 256;
1964        let c = AdaptiveController::new(
1965            ChannelStart {
1966                quote: 8,
1967                store: 8,
1968                fetch: 8,
1969            },
1970            adaptive,
1971        );
1972        let store = &c.store;
1973        let start = store.current();
1974        for _ in 0..(c.config().window_ops * 5) {
1975            store.observe(Outcome::ApplicationError, Duration::from_secs(30));
1976        }
1977        assert_eq!(
1978            store.current(),
1979            start,
1980            "remote app-rejections must not move the store cap",
1981        );
1982    }
1983
1984    #[test]
1985    fn cold_start_clamps_into_bounds() {
1986        let cfg = cfg_for_tests();
1987        let l = Limiter::new(1000, cfg.clone());
1988        assert_eq!(l.current(), cfg.max_concurrency);
1989        let l = Limiter::new(0, cfg.clone());
1990        assert_eq!(l.current(), cfg.min_concurrency);
1991    }
1992
1993    #[test]
1994    fn slow_start_doubles_then_caps() {
1995        let cfg = cfg_for_tests();
1996        let l = Limiter::new(2, cfg.clone());
1997        // Feed a full healthy window — concurrency doubles.
1998        for _ in 0..cfg.window_ops {
1999            l.observe(Outcome::Success, Duration::from_millis(50));
2000        }
2001        assert_eq!(l.current(), 4);
2002        for _ in 0..cfg.window_ops {
2003            l.observe(Outcome::Success, Duration::from_millis(50));
2004        }
2005        assert_eq!(l.current(), 8);
2006    }
2007
2008    #[test]
2009    fn first_failure_exits_slow_start() {
2010        let cfg = cfg_for_tests();
2011        let l = Limiter::new(4, cfg.clone());
2012        // 6 successes + 4 timeouts in a window of 10. Decisions fire
2013        // per-sample once the window has min_window_ops entries, so
2014        // the four timeouts each drive Decrease. That floors the cap.
2015        for _ in 0..6 {
2016            l.observe(Outcome::Success, Duration::from_millis(50));
2017        }
2018        for _ in 0..4 {
2019            l.observe(Outcome::Timeout, Duration::from_millis(50));
2020        }
2021        let after_stress = l.current();
2022        assert!(
2023            after_stress < 4,
2024            "stress should reduce concurrency from 4, got {after_stress}",
2025        );
2026        // After exiting slow-start, recovery is +1 per fresh window,
2027        // not doubling. The first `window_ops` successes flush prior
2028        // timeouts out of the sliding window. Decreases now also need
2029        // `min_window_ops` of fresh evidence before re-firing, and
2030        // increases need `window_ops` of fresh evidence. Feed enough
2031        // successes to clear the window AND accumulate evidence for
2032        // multiple increases.
2033        for _ in 0..(cfg.window_ops * 5) {
2034            l.observe(Outcome::Success, Duration::from_millis(50));
2035        }
2036        assert!(
2037            l.current() > after_stress,
2038            "expected recovery above {after_stress}, got {}",
2039            l.current(),
2040        );
2041    }
2042
2043    #[test]
2044    fn floor_holds_at_one() {
2045        let cfg = cfg_for_tests();
2046        let l = Limiter::new(2, cfg);
2047        for _ in 0..30 {
2048            l.observe(Outcome::Timeout, Duration::from_millis(50));
2049        }
2050        assert_eq!(l.current(), 1);
2051    }
2052
2053    #[test]
2054    fn application_errors_do_not_punish() {
2055        let cfg = cfg_for_tests();
2056        let l = Limiter::new(4, cfg.clone());
2057        // ApplicationError is NOT a capacity signal (per `Outcome`
2058        // docs and the reviewer's M1 finding). A wave of e.g.
2059        // `AlreadyStored` errors must not lower concurrency, because
2060        // they say nothing about the network's ability to take more
2061        // load. Specifically: the controller should HOLD at 4 because
2062        // there are zero capacity-relevant samples to act on.
2063        for _ in 0..cfg.window_ops * 5 {
2064            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2065        }
2066        assert_eq!(
2067            l.current(),
2068            4,
2069            "ApplicationError must not move the cap; got {}",
2070            l.current()
2071        );
2072    }
2073
2074    #[test]
2075    fn latency_inflation_triggers_decrease() {
2076        let cfg = LimiterConfig {
2077            window_ops: 20,
2078            min_window_ops: 5,
2079            ..cfg_for_tests()
2080        };
2081        let l = Limiter::new(4, cfg.clone());
2082        // Establish a baseline with many fast successes.
2083        for _ in 0..cfg.window_ops {
2084            l.observe(Outcome::Success, Duration::from_millis(50));
2085        }
2086        let after_baseline = l.current();
2087        // Now flood with slow successes — same outcome, 5x latency.
2088        for _ in 0..cfg.window_ops {
2089            l.observe(Outcome::Success, Duration::from_millis(500));
2090        }
2091        // Latency inflation > 2x baseline must drop concurrency.
2092        assert!(
2093            l.current() < after_baseline,
2094            "expected decrease from {after_baseline}, got {}",
2095            l.current(),
2096        );
2097    }
2098
2099    #[test]
2100    fn warm_start_overrides_current() {
2101        let cfg = cfg_for_tests();
2102        let l = Limiter::new(2, cfg);
2103        l.warm_start(20);
2104        assert_eq!(l.current(), 20);
2105    }
2106
2107    #[test]
2108    fn warm_start_clamps() {
2109        let cfg = cfg_for_tests();
2110        let l = Limiter::new(2, cfg.clone());
2111        l.warm_start(1_000_000);
2112        assert_eq!(l.current(), cfg.max_concurrency);
2113    }
2114
2115    #[test]
2116    fn disabled_controller_holds_steady() {
2117        let cfg = LimiterConfig {
2118            enabled: false,
2119            ..cfg_for_tests()
2120        };
2121        let l = Limiter::new(8, cfg);
2122        for _ in 0..50 {
2123            l.observe(Outcome::Timeout, Duration::from_millis(50));
2124        }
2125        assert_eq!(l.current(), 8);
2126    }
2127
2128    #[test]
2129    fn controller_snapshot_round_trips() {
2130        // The test cfg has max=64 for every channel (cfg_for_tests's
2131        // max_concurrency=64 -> ChannelMax::{quote: 64, store: 64, fetch: 64}).
2132        // Pick start values <= 64 so they survive cap clamping at
2133        // construction. Pick values >= cold-defaults (32/8/4) so they
2134        // also survive the warm-start floor.
2135        let c = AdaptiveController::new(
2136            ChannelStart {
2137                quote: 64,
2138                store: 16,
2139                fetch: 64,
2140            },
2141            adaptive_cfg_for_tests(),
2142        );
2143        let snap = c.snapshot();
2144        assert_eq!(snap.quote, 64);
2145        assert_eq!(snap.store, 16);
2146        assert_eq!(snap.fetch, 64);
2147
2148        let c2 = AdaptiveController::default();
2149        c2.warm_start(snap);
2150        assert_eq!(c2.quote.current(), 64);
2151        assert_eq!(c2.store.current(), 16);
2152        assert_eq!(c2.fetch.current(), 64);
2153    }
2154
2155    #[tokio::test]
2156    async fn observe_op_records_success() {
2157        let cfg = cfg_for_tests();
2158        let l = Limiter::new(4, cfg.clone());
2159        for _ in 0..cfg.window_ops {
2160            let _: Result<(), &str> =
2161                observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2162        }
2163        // Healthy window from cold start doubles 4 -> 8.
2164        assert_eq!(l.current(), 8);
2165    }
2166
2167    #[test]
2168    fn snapshot_round_trips_through_disk() {
2169        let dir = tempfile::tempdir().unwrap();
2170        let path = dir.path().join("client_adaptive.json");
2171        let snap = ChannelStart {
2172            quote: 24,
2173            store: 6,
2174            fetch: 12,
2175        };
2176        save_snapshot(&path, snap);
2177        let loaded = load_snapshot(&path).unwrap();
2178        assert_eq!(loaded.quote, 24);
2179        assert_eq!(loaded.store, 6);
2180        assert_eq!(loaded.fetch, 12);
2181    }
2182
2183    #[test]
2184    fn load_missing_returns_none() {
2185        let dir = tempfile::tempdir().unwrap();
2186        let path = dir.path().join("does_not_exist.json");
2187        assert!(load_snapshot(&path).is_none());
2188    }
2189
2190    #[test]
2191    fn load_corrupt_returns_none() {
2192        let dir = tempfile::tempdir().unwrap();
2193        let path = dir.path().join("bad.json");
2194        std::fs::write(&path, b"not valid json{{{").unwrap();
2195        assert!(load_snapshot(&path).is_none());
2196    }
2197
2198    #[test]
2199    fn load_wrong_schema_returns_none() {
2200        let dir = tempfile::tempdir().unwrap();
2201        let path = dir.path().join("future.json");
2202        // Schema 999 is from a future build — current build must not
2203        // crash and must not act on it.
2204        let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2205        std::fs::write(&path, payload).unwrap();
2206        assert!(load_snapshot(&path).is_none());
2207    }
2208
2209    #[test]
2210    fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2211        const LEGACY_QUOTE_CAP: usize = 48;
2212        const LEGACY_STORE_CAP: usize = 24;
2213        const LEGACY_FETCH_CAP: usize = 96;
2214
2215        let dir = tempfile::tempdir().unwrap();
2216        let path = dir.path().join("legacy.json");
2217        let payload = format!(
2218            r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2219            PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2220        );
2221        std::fs::write(&path, payload).unwrap();
2222
2223        let loaded = load_snapshot(&path).unwrap();
2224
2225        assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2226        assert_eq!(loaded.store, LEGACY_STORE_CAP);
2227        assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2228    }
2229
2230    #[tokio::test]
2231    async fn observe_op_records_classified_error() {
2232        let cfg = cfg_for_tests();
2233        let l = Limiter::new(4, cfg.clone());
2234        for _ in 0..cfg.window_ops {
2235            let _: Result<(), &str> =
2236                observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2237        }
2238        assert!(l.current() < 4);
2239    }
2240
2241    // ----- Adversarial / regression-guard tests below ---------------------
2242    //
2243    // These exist primarily to prove the controller never silently regresses
2244    // upload/download throughput and never panics under hostile workloads.
2245
2246    /// Cold-start defaults for quote/store must preserve the prior static
2247    /// knobs. Fetch intentionally starts at the validated residential floor
2248    /// because the throughput hill climber now has to prove that higher
2249    /// fan-out improves goodput.
2250    #[test]
2251    fn no_regression_cold_start_at_least_static_defaults() {
2252        let s = ChannelStart::default();
2253        assert!(
2254            s.quote >= 32,
2255            "quote cold-start regressed: got {}, prior static was 32",
2256            s.quote,
2257        );
2258        assert!(
2259            s.store >= 8,
2260            "store cold-start regressed: got {}, prior static was 8",
2261            s.store,
2262        );
2263        assert_eq!(
2264            s.fetch, FETCH_COLD_START_CONCURRENCY,
2265            "fetch cold-start changed unexpectedly: got {}, expected {}",
2266            s.fetch, FETCH_COLD_START_CONCURRENCY,
2267        );
2268    }
2269
2270    /// The production `AdaptiveController::default()` (NOT the test cfg)
2271    /// must come up reporting the cold-start values immediately, with no
2272    /// observations recorded.
2273    #[test]
2274    fn controller_default_config_is_sane() {
2275        let c = AdaptiveController::default();
2276        let starts = ChannelStart::default();
2277        assert_eq!(c.quote.current(), starts.quote);
2278        assert_eq!(c.store.current(), starts.store);
2279        assert_eq!(c.fetch.current(), starts.fetch);
2280        // No observations made yet — internal windows must be empty.
2281        assert_eq!(lock(&c.quote.inner).window.len(), 0);
2282        assert_eq!(lock(&c.store.inner).window.len(), 0);
2283        assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2284    }
2285
2286    /// Mixed signals (every other op fails) must not pin the controller
2287    /// at the floor for the whole run. The cap should oscillate or settle
2288    /// somewhere above the floor — collapse to 1 forever would be a bug.
2289    #[test]
2290    fn alternating_success_failure_collapses_to_floor() {
2291        // 50% timeout rate is far above `timeout_ceiling` (0.2 in test
2292        // config), so the window is always stressed. The controller
2293        // MUST collapse to the floor, and once there must NEVER go
2294        // below it. Assert both invariants explicitly: floor reached
2295        // and floor held.
2296        let cfg = cfg_for_tests();
2297        let l = Limiter::new(8, cfg.clone());
2298        let mut min_observed = usize::MAX;
2299        let mut max_observed = 0usize;
2300        let mut floor_visits = 0usize;
2301        for i in 0..1000 {
2302            let outcome = if i % 2 == 0 {
2303                Outcome::Success
2304            } else {
2305                Outcome::Timeout
2306            };
2307            l.observe(outcome, Duration::from_millis(50));
2308            let cur = l.current();
2309            assert!(
2310                cur >= cfg.min_concurrency,
2311                "cap underflowed floor at iter {i}: got {cur}",
2312            );
2313            min_observed = min_observed.min(cur);
2314            max_observed = max_observed.max(cur);
2315            if cur == cfg.min_concurrency {
2316                floor_visits += 1;
2317            }
2318        }
2319        assert_eq!(
2320            min_observed, cfg.min_concurrency,
2321            "cap never reached the floor under 50% timeout rate"
2322        );
2323        assert!(
2324            max_observed >= 8,
2325            "cap never visited the start value: max_observed={max_observed}"
2326        );
2327        // Should spend MOST of the run at the floor — a single
2328        // healthy window is not enough to climb back from a 50% loss
2329        // environment.
2330        assert!(
2331            floor_visits > 500,
2332            "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2333        );
2334        assert_eq!(
2335            l.current(),
2336            cfg.min_concurrency,
2337            "controller did not settle at floor after 1000 alternations"
2338        );
2339    }
2340
2341    /// From the floor, a long stream of healthy successes must walk the
2342    /// cap all the way back up to `max_concurrency`. Otherwise transient
2343    /// stress on a slow link would permanently penalize throughput.
2344    #[test]
2345    fn pure_success_stream_recovers_to_max() {
2346        let cfg = cfg_for_tests();
2347        let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2348        for _ in 0..10_000 {
2349            l.observe(Outcome::Success, Duration::from_millis(5));
2350        }
2351        assert_eq!(
2352            l.current(),
2353            cfg.max_concurrency,
2354            "expected recovery to max ({}), got {}",
2355            cfg.max_concurrency,
2356            l.current(),
2357        );
2358    }
2359
2360    /// Heavy stress drives the cap to the floor; subsequent recovery
2361    /// must climb meaningfully higher than the floor with enough healthy
2362    /// evidence. No "permanent floor" failure mode allowed.
2363    #[test]
2364    fn stress_then_heal_drives_floor_then_recovery() {
2365        let cfg = cfg_for_tests();
2366        let l = Limiter::new(8, cfg.clone());
2367        for _ in 0..100 {
2368            l.observe(Outcome::Timeout, Duration::from_millis(50));
2369        }
2370        let after_stress = l.current();
2371        assert_eq!(
2372            after_stress, cfg.min_concurrency,
2373            "stress should drive cap to floor, got {after_stress}",
2374        );
2375        for _ in 0..1_000 {
2376            l.observe(Outcome::Success, Duration::from_millis(10));
2377        }
2378        let after_heal = l.current();
2379        assert!(
2380            after_heal >= cfg.min_concurrency.saturating_add(4),
2381            "expected substantial recovery from floor, got {after_heal}",
2382        );
2383    }
2384
2385    /// The latency baseline must track actual workload latency. If it
2386    /// stayed pinned at `Duration::ZERO`, every healthy sample would
2387    /// look like infinite inflation and inflate the decrease rate.
2388    #[test]
2389    fn baseline_does_not_grow_unbounded_under_slow_links() {
2390        let cfg = cfg_for_tests();
2391        let l = Limiter::new(2, cfg.clone());
2392        for _ in 0..(cfg.window_ops * 10) {
2393            l.observe(Outcome::Success, Duration::from_millis(500));
2394        }
2395        let baseline = lock(&l.inner).latency_baseline;
2396        let base = baseline.expect("baseline should be set after many healthy windows");
2397        assert!(
2398            base > Duration::ZERO,
2399            "baseline must not stay at ZERO, got {base:?}",
2400        );
2401        // Within 2x of the actual latency: 250ms..=1000ms.
2402        let lo = Duration::from_millis(250);
2403        let hi = Duration::from_millis(1000);
2404        assert!(
2405            base >= lo && base <= hi,
2406            "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2407        );
2408    }
2409
2410    /// Until the first healthy window completes, the latency baseline
2411    /// stays `None` (so no false-inflation alarms). Decreases during the
2412    /// stress phase are driven purely by success/timeout rate, not by
2413    /// inflated p95 vs a phantom zero baseline.
2414    #[test]
2415    fn baseline_initialized_only_after_first_healthy_window() {
2416        let cfg = cfg_for_tests();
2417        let l = Limiter::new(8, cfg.clone());
2418        for _ in 0..50 {
2419            l.observe(Outcome::Timeout, Duration::from_millis(50));
2420        }
2421        // Without any healthy window, baseline must still be None.
2422        assert!(
2423            lock(&l.inner).latency_baseline.is_none(),
2424            "baseline must be None before any healthy window",
2425        );
2426        // Now drain healthy windows.
2427        for _ in 0..(cfg.window_ops * 5) {
2428            l.observe(Outcome::Success, Duration::from_millis(20));
2429        }
2430        let baseline = lock(&l.inner).latency_baseline;
2431        assert!(
2432            baseline.is_some(),
2433            "baseline must be Some after healthy windows",
2434        );
2435        let base = baseline.unwrap_or_default();
2436        assert!(
2437            base > Duration::ZERO,
2438            "baseline must reflect real latency, got {base:?}",
2439        );
2440    }
2441
2442    /// A torrent of timeouts must not underflow the cap. Sample at
2443    /// several depths to catch any wraparound.
2444    #[test]
2445    fn min_concurrency_floor_holds_under_torrent_of_errors() {
2446        let cfg = cfg_for_tests();
2447        let l = Limiter::new(8, cfg.clone());
2448        for i in 0..50_000 {
2449            l.observe(Outcome::Timeout, Duration::from_millis(50));
2450            if i == 100 || i == 1_000 || i == 49_999 {
2451                let cur = l.current();
2452                assert_eq!(
2453                    cur, cfg.min_concurrency,
2454                    "floor breached at iter {i}: got {cur}",
2455                );
2456            }
2457        }
2458    }
2459
2460    /// Mirror: a torrent of successes must not exceed `max_concurrency`.
2461    #[test]
2462    fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2463        let cfg = cfg_for_tests();
2464        let start = cfg
2465            .max_concurrency
2466            .saturating_sub(1)
2467            .max(cfg.min_concurrency);
2468        let l = Limiter::new(start, cfg.clone());
2469        for i in 0..50_000 {
2470            l.observe(Outcome::Success, Duration::from_millis(5));
2471            if i == 100 || i == 1_000 || i == 49_999 {
2472                let cur = l.current();
2473                assert!(
2474                    cur <= cfg.max_concurrency,
2475                    "ceiling breached at iter {i}: got {cur} > {}",
2476                    cfg.max_concurrency,
2477                );
2478            }
2479        }
2480        assert_eq!(l.current(), cfg.max_concurrency);
2481    }
2482
2483    /// Slow-start doubles the cap; with `max_concurrency = usize::MAX/2`
2484    /// a naive `*2` would overflow. The controller must use saturating
2485    /// arithmetic and never panic. Also asserts the cap actually
2486    /// REACHED max — proving that "no panic" wasn't achieved by
2487    /// the cap getting stuck somewhere instead of growing.
2488    #[test]
2489    fn saturating_arithmetic_handles_extreme_config() {
2490        let cfg = LimiterConfig {
2491            max_concurrency: usize::MAX / 2,
2492            ..cfg_for_tests()
2493        };
2494        let start = usize::MAX / 4;
2495        let l = Limiter::new(start, cfg.clone());
2496        for _ in 0..(cfg.window_ops * 10) {
2497            l.observe(Outcome::Success, Duration::from_millis(1));
2498        }
2499        // First-iteration doubles start (which is max/4) to max/2 = ceiling.
2500        // The cap MUST have grown to the ceiling; if saturating math
2501        // were broken (panic) we'd never get here, but we'd also fail
2502        // if the cap got stuck at the start value.
2503        assert_eq!(
2504            l.current(),
2505            cfg.max_concurrency,
2506            "saturating math survived but cap did not grow to ceiling"
2507        );
2508    }
2509
2510    /// FIFO eviction: prove that a window of pure-timeout collapses
2511    /// the cap, and once enough successes flush ALL timeouts out of
2512    /// the window, the cap can rise. The earlier version of this test
2513    /// used an OR clause that made the assertion satisfiable trivially;
2514    /// this version asserts the strict invariant: after eviction, cap
2515    /// must be STRICTLY GREATER than the post-stress cap.
2516    #[test]
2517    fn window_eviction_is_fifo() {
2518        let cfg = LimiterConfig {
2519            window_ops: 10,
2520            min_window_ops: 5,
2521            success_target: 0.9,
2522            timeout_ceiling: 0.1,
2523            ..cfg_for_tests()
2524        };
2525        let l = Limiter::new(8, cfg.clone());
2526        // Fill the window with timeouts. With decrease-gating
2527        // (samples_since_decrease >= min_window_ops between halvings),
2528        // window_ops=10 + min_window_ops=5 timeouts allow at most
2529        // ~2 halvings: 8 -> 4 -> 2. Cap must DROP from 8.
2530        for _ in 0..cfg.window_ops {
2531            l.observe(Outcome::Timeout, Duration::from_millis(50));
2532        }
2533        let after_stress = l.current();
2534        assert!(
2535            after_stress < 8,
2536            "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2537        );
2538        // Push enough successes to fully evict the timeouts AND
2539        // accumulate at least one full window of fresh evidence for
2540        // an Increase. window_ops to evict + window_ops to gate first
2541        // +1 = 2 * window_ops minimum; use 3x for safety margin.
2542        for _ in 0..(cfg.window_ops * 3) {
2543            l.observe(Outcome::Success, Duration::from_millis(20));
2544        }
2545        let after_recovery = l.current();
2546        // Strict greater-than: FIFO MUST flush the timeouts so a
2547        // fresh-window Increase can fire.
2548        assert!(
2549            after_recovery > after_stress,
2550            "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2551        );
2552    }
2553
2554    /// With `enabled = false`, the controller is a no-op. Hot paths
2555    /// must see exactly `initial` at every check, no exceptions.
2556    #[test]
2557    fn disabled_controller_returns_initial_value_invariantly() {
2558        let cfg = LimiterConfig {
2559            enabled: false,
2560            ..cfg_for_tests()
2561        };
2562        let initial = 8;
2563        let l = Limiter::new(initial, cfg);
2564        for i in 0..1_000 {
2565            let outcome = match i % 4 {
2566                0 => Outcome::Success,
2567                1 => Outcome::Timeout,
2568                2 => Outcome::NetworkError,
2569                _ => Outcome::ApplicationError,
2570            };
2571            l.observe(outcome, Duration::from_millis(50));
2572            assert_eq!(
2573                l.current(),
2574                initial,
2575                "disabled controller moved at iter {i}",
2576            );
2577        }
2578    }
2579
2580    /// 100 tasks concurrently observing 100 successes each. The cap
2581    /// must remain a valid in-bounds value, no panic, no deadlock.
2582    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2583    async fn concurrent_observations_do_not_corrupt_window() {
2584        let cfg = cfg_for_tests();
2585        let l = Limiter::new(4, cfg.clone());
2586        let mut handles = Vec::with_capacity(100);
2587        for _ in 0..100 {
2588            let l_clone = l.clone();
2589            handles.push(tokio::spawn(async move {
2590                for _ in 0..100 {
2591                    l_clone.observe(Outcome::Success, Duration::from_millis(5));
2592                }
2593            }));
2594        }
2595        for h in handles {
2596            h.await.unwrap();
2597        }
2598        let cur = l.current();
2599        assert!(
2600            cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2601            "cap out of bounds after concurrent observations: {cur}",
2602        );
2603    }
2604
2605    /// Persisted higher values from a prior run must beat low cold-start
2606    /// defaults. Otherwise warm-start would silently pessimize throughput.
2607    /// (Values BELOW cold-start are floored — see
2608    /// `warm_start_floors_at_cold_defaults`.)
2609    #[test]
2610    fn persisted_snapshot_warm_starts_above_cold_floor() {
2611        let dir = tempfile::tempdir().unwrap();
2612        let path = dir.path().join("client_adaptive.json");
2613        // All snapshot values ABOVE the production cold-start defaults
2614        // so the warm_start floor doesn't kick in.
2615        let saved = ChannelStart {
2616            quote: 64,
2617            store: 32,
2618            fetch: 128,
2619        };
2620        save_snapshot(&path, saved);
2621        let loaded = load_snapshot(&path).unwrap();
2622
2623        // Build a controller with intentionally low cold-start values
2624        // — these get overridden by warm_start.
2625        let low = ChannelStart {
2626            quote: 2,
2627            store: 2,
2628            fetch: 2,
2629        };
2630        let c = AdaptiveController::new(low, AdaptiveConfig::default());
2631        c.warm_start(loaded);
2632        assert_eq!(c.quote.current(), 64);
2633        assert_eq!(c.store.current(), 32);
2634        assert_eq!(c.fetch.current(), 128);
2635    }
2636
2637    /// Two threads racing on `save_snapshot` must never produce a
2638    /// half-written file. Atomic-rename guarantees we either see the
2639    /// old content or the new content, never a torn write.
2640    #[test]
2641    fn save_load_round_trip_with_concurrent_writes() {
2642        use std::thread;
2643        let dir = tempfile::tempdir().unwrap();
2644        let path = dir.path().join("client_adaptive.json");
2645        let path_a = path.clone();
2646        let path_b = path.clone();
2647        let snap_a = ChannelStart {
2648            quote: 10,
2649            store: 10,
2650            fetch: 10,
2651        };
2652        let snap_b = ChannelStart {
2653            quote: 99,
2654            store: 99,
2655            fetch: 99,
2656        };
2657        let h_a = thread::spawn(move || {
2658            for _ in 0..50 {
2659                save_snapshot(&path_a, snap_a);
2660            }
2661        });
2662        let h_b = thread::spawn(move || {
2663            for _ in 0..50 {
2664                save_snapshot(&path_b, snap_b);
2665            }
2666        });
2667        h_a.join().unwrap();
2668        h_b.join().unwrap();
2669        let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2670        let valid = (loaded.quote == snap_a.quote
2671            && loaded.store == snap_a.store
2672            && loaded.fetch == snap_a.fetch)
2673            || (loaded.quote == snap_b.quote
2674                && loaded.store == snap_b.store
2675                && loaded.fetch == snap_b.fetch);
2676        assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2677    }
2678
2679    /// `save_snapshot` to an unwritable / impossible path must be a
2680    /// quiet no-op: best-effort, no panic, no error propagation.
2681    #[test]
2682    fn save_snapshot_to_unwritable_dir_does_not_panic() {
2683        // A path under a non-existent absolute root that the process
2684        // also cannot create. On macOS/Linux a write under "/" requires
2685        // root; create_dir_all will fail on this path.
2686        let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2687        let snap = ChannelStart {
2688            quote: 1,
2689            store: 1,
2690            fetch: 1,
2691        };
2692        // No panic = pass. Function returns unit, errors are logged.
2693        save_snapshot(&path, snap);
2694        // File should not exist.
2695        assert!(!path.exists());
2696    }
2697
2698    /// A truncated/partial JSON file must not crash the loader; it must
2699    /// return None so the controller falls back to cold defaults.
2700    #[test]
2701    fn load_snapshot_from_truncated_file_returns_none() {
2702        let dir = tempfile::tempdir().unwrap();
2703        let path = dir.path().join("truncated.json");
2704        std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2705        assert!(load_snapshot(&path).is_none());
2706    }
2707
2708    /// Microbench: 100k observe+current pairs must complete in well
2709    /// under 100ms. Catches any accidental quadratic behaviour or
2710    /// massive lock contention introduced by future changes.
2711    #[test]
2712    fn controller_perf_overhead_is_bounded() {
2713        let cfg = cfg_for_tests();
2714        let l = Limiter::new(8, cfg);
2715        let started = Instant::now();
2716        for _ in 0..100_000 {
2717            let _ = l.current();
2718            l.observe(Outcome::Success, Duration::from_micros(1));
2719        }
2720        let elapsed = started.elapsed();
2721        // 1µs per pair on a modern machine is generous; allow 500ms to
2722        // tolerate slow CI runners while still catching real regressions.
2723        assert!(
2724            elapsed < Duration::from_millis(500),
2725            "100k observe+current pairs took {elapsed:?}, expected <500ms",
2726        );
2727    }
2728
2729    // ---- Regression tests for adversarial-review findings ----
2730
2731    /// M10 fix: hand-edited or future-schema configs may plant `NaN`
2732    /// or out-of-range values into the float fields. Constructing a
2733    /// controller and feeding observations must not panic.
2734    /// `Duration::from_secs_f64(NaN)` panics per std docs, so without
2735    /// `sanitize()` and the EWMA NaN guard this would crash.
2736    #[test]
2737    fn nan_and_out_of_range_config_does_not_panic() {
2738        let cfg = AdaptiveConfig {
2739            enabled: true,
2740            min_concurrency: 0, // sub-floor; sanitize raises to 1
2741            max: ChannelMax {
2742                quote: 0, // sub-min; sanitize raises to min
2743                store: 0,
2744                fetch: 0,
2745            },
2746            window_ops: 10,
2747            min_window_ops: 50, // > window_ops; sanitize clamps
2748            success_target: f64::NAN,
2749            timeout_ceiling: f64::INFINITY,
2750            latency_inflation_factor: f64::NEG_INFINITY,
2751            latency_ewma_alpha: f64::NAN,
2752        };
2753        let c = AdaptiveController::new(ChannelStart::default(), cfg);
2754        // Verify sanitize() ACTUALLY corrected the values (not just
2755        // that no panic occurred). Reading c.config back proves the
2756        // sanitization landed.
2757        let post = &c.config;
2758        assert_eq!(
2759            post.min_concurrency, 1,
2760            "sanitize did not raise min_concurrency from 0"
2761        );
2762        assert!(
2763            post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2764            "sanitize did not clamp success_target from NaN: {}",
2765            post.success_target
2766        );
2767        assert!(
2768            post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2769            "sanitize did not clamp timeout_ceiling from Inf: {}",
2770            post.timeout_ceiling
2771        );
2772        assert!(
2773            post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2774            "sanitize did not fix latency_inflation_factor from -Inf: {}",
2775            post.latency_inflation_factor
2776        );
2777        assert!(
2778            post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2779            "sanitize did not fix latency_ewma_alpha from NaN: {}",
2780            post.latency_ewma_alpha
2781        );
2782        assert!(
2783            post.min_window_ops <= post.window_ops,
2784            "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2785            post.min_window_ops,
2786            post.window_ops
2787        );
2788        assert!(
2789            post.max.quote >= post.min_concurrency,
2790            "max.quote below min_concurrency"
2791        );
2792        // Now exercise the runtime under hostile latencies — must
2793        // not panic.
2794        for _ in 0..200 {
2795            c.store
2796                .observe(Outcome::Success, Duration::from_secs(99_999));
2797            c.store.observe(Outcome::Timeout, Duration::ZERO);
2798        }
2799        let cur = c.store.current();
2800        assert!(cur >= 1, "cap below floor: {cur}");
2801    }
2802
2803    /// M3+M6 fix: a burst of N concurrent in-flight chunks all
2804    /// observing stress at almost the same time used to pile-drive
2805    /// the cap from N to 1 in N back-to-back ticks. After the fix,
2806    /// decreases require `min_window_ops` of FRESH evidence between
2807    /// successive Decreases, so a single transient burst can drop
2808    /// the cap by at most one halving.
2809    #[test]
2810    fn transient_burst_does_not_pile_drive_to_floor() {
2811        let cfg = LimiterConfig {
2812            window_ops: 32,
2813            min_window_ops: 8,
2814            success_target: 0.95,
2815            timeout_ceiling: 0.10,
2816            ..cfg_for_tests()
2817        };
2818        let l = Limiter::new(32, cfg);
2819        // Simulate 8 concurrent ops all completing as Timeout in a
2820        // back-to-back burst (the kind of event that previously
2821        // floor-slammed the cap).
2822        for _ in 0..8 {
2823            l.observe(Outcome::Timeout, Duration::from_millis(10));
2824        }
2825        // After one burst, cap should have decreased AT MOST once
2826        // (32 -> 16). Pile-driving would land at 1 or 2.
2827        let after_burst = l.current();
2828        assert!(
2829            after_burst >= 16,
2830            "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2831        );
2832    }
2833
2834    /// M2 fix: classifier must map transport-related errors to
2835    /// `NetworkError`, not `ApplicationError`. Test EACH transport
2836    /// variant separately so a regression in any one variant is
2837    /// caught by name.
2838    #[tokio::test]
2839    async fn transport_errors_classify_as_capacity_signal() {
2840        use crate::data::client::classify_error;
2841        use crate::data::error::Error;
2842        let make_cfg = || LimiterConfig {
2843            window_ops: 16,
2844            min_window_ops: 5,
2845            success_target: 0.5,
2846            timeout_ceiling: 0.5,
2847            ..cfg_for_tests()
2848        };
2849        // Cases: (variant_name, error_factory)
2850        type ErrFactory = Box<dyn Fn() -> Error>;
2851        let cases: Vec<(&str, ErrFactory)> = vec![
2852            ("Network", Box::new(|| Error::Network("net".to_string()))),
2853            (
2854                "InsufficientPeers",
2855                Box::new(|| Error::InsufficientPeers("ip".to_string())),
2856            ),
2857            ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2858            ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2859            ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2860            (
2861                "PartialUpload",
2862                Box::new(|| Error::PartialUpload {
2863                    stored: vec![],
2864                    stored_count: 0,
2865                    failed: vec![],
2866                    failed_count: 0,
2867                    total_chunks: 0,
2868                    spend: Box::new(crate::data::error::PartialUploadSpend {
2869                        storage_cost_atto: "0".to_string(),
2870                        gas_cost_wei: 0,
2871                    }),
2872                    reason: "r".to_string(),
2873                }),
2874            ),
2875        ];
2876        for (name, mk) in &cases {
2877            let l = Limiter::new(8, make_cfg());
2878            for _ in 0..16 {
2879                let _: std::result::Result<(), Error> =
2880                    observe_op(&l, || async { Err(mk()) }, classify_error).await;
2881            }
2882            // Each variant alone must drive the cap STRICTLY below
2883            // the start (8 -> 4 via one halving). If a variant maps
2884            // to ApplicationError, cap stays at 8.
2885            let cur = l.current();
2886            assert!(
2887                cur < 8,
2888                "{name} not classified as capacity signal: cap stayed at {cur}",
2889            );
2890        }
2891    }
2892
2893    /// C4 fix: per-channel max ceilings. Confirm that a `LimiterConfig`
2894    /// with a constrained `max_concurrency` does not bleed into other
2895    /// channels. The ceilings are independent.
2896    #[test]
2897    fn per_channel_ceilings_are_independent() {
2898        let cfg = AdaptiveConfig {
2899            max: ChannelMax {
2900                quote: 4,    // tightly capped
2901                store: 8,    // moderate
2902                fetch: 1024, // very high
2903            },
2904            ..AdaptiveConfig::default()
2905        };
2906        let c = AdaptiveController::new(
2907            ChannelStart {
2908                quote: 4,
2909                store: 8,
2910                fetch: 64,
2911            },
2912            cfg,
2913        );
2914        // Feed 1000 successes to each channel; each must respect its
2915        // own ceiling and never one another's.
2916        for _ in 0..1000 {
2917            c.quote.observe(Outcome::Success, Duration::from_micros(10));
2918            c.store.observe(Outcome::Success, Duration::from_micros(10));
2919            c.fetch.observe(Outcome::Success, Duration::from_micros(10));
2920        }
2921        assert_eq!(c.quote.current(), 4, "quote should cap at 4");
2922        assert_eq!(c.store.current(), 8, "store should cap at 8");
2923        // Fetch uses the hill climber now, so it should not blindly jump to
2924        // its max on success-only samples. It still must prove the fetch
2925        // ceiling is independent by climbing above the quote/store caps.
2926        assert!(
2927            c.fetch.current() > 8 && c.fetch.current() <= 1024,
2928            "fetch did not use its independent ceiling; got {}",
2929            c.fetch.current()
2930        );
2931    }
2932
2933    #[test]
2934    fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
2935        let cfg = hill_cfg_for_tests();
2936        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2937
2938        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2939        assert_eq!(
2940            l.current(),
2941            HILL_TEST_UP_PROBE_CAP,
2942            "first healthy epoch should probe upward"
2943        );
2944
2945        observe_hill_success_epoch_with_latency(
2946            &l,
2947            &cfg,
2948            HILL_TEST_CHUNK_BYTES,
2949            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2950        );
2951        assert_eq!(
2952            l.current(),
2953            HILL_TEST_START_CAP,
2954            "slower higher-cap wave should reject the upward probe"
2955        );
2956        assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
2957    }
2958
2959    #[test]
2960    fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
2961        let cfg = hill_cfg_for_tests();
2962        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2963
2964        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2965        assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
2966
2967        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2968        assert_eq!(
2969            l.snapshot(),
2970            HILL_TEST_UP_PROBE_CAP,
2971            "same-size chunks at same latency should promote the higher cap"
2972        );
2973        assert_eq!(
2974            l.current(),
2975            HILL_TEST_NEXT_UP_PROBE_CAP,
2976            "after accepting an upward probe, hill climber should probe higher"
2977        );
2978    }
2979
2980    #[test]
2981    fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
2982        let cfg = hill_cfg_for_tests();
2983        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
2984
2985        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2986        observe_hill_success_epoch_with_latency(
2987            &l,
2988            &cfg,
2989            HILL_TEST_CHUNK_BYTES,
2990            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
2991        );
2992        assert_eq!(l.current(), HILL_TEST_START_CAP);
2993
2994        for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
2995            observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
2996        }
2997        assert_eq!(
2998            l.current(),
2999            HILL_TEST_DOWN_PROBE_CAP,
3000            "stable best should eventually probe a lower cap"
3001        );
3002
3003        observe_hill_success_epoch_with_latency(
3004            &l,
3005            &cfg,
3006            HILL_TEST_CHUNK_BYTES,
3007            Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
3008        );
3009        assert_eq!(
3010            l.snapshot(),
3011            HILL_TEST_DOWN_PROBE_CAP,
3012            "retained goodput at lower concurrency should become the new best"
3013        );
3014    }
3015
3016    #[tokio::test]
3017    async fn fetch_hill_records_constant_size_timed_ops_without_stress() {
3018        let cfg = hill_cfg_for_tests();
3019        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3020        let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
3021            + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
3022        let limiter_for_ops = l.clone();
3023
3024        let result: std::result::Result<Vec<()>, ()> =
3025            rebucketed_unordered(&l, 0..total_ops, move |_| {
3026                let limiter = limiter_for_ops.clone();
3027                async move {
3028                    observe_op_with_success_bytes(
3029                        &limiter,
3030                        || async {
3031                            tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
3032                                .await;
3033                            Ok::<(), ()>(())
3034                        },
3035                        |_| Outcome::NetworkError,
3036                        |_| HILL_TEST_CHUNK_BYTES,
3037                    )
3038                    .await
3039                }
3040            })
3041            .await;
3042        result.unwrap();
3043
3044        // The timed wrapper records real wall-clock latency. Loaded runners can make the
3045        // wider probe miss the deterministic gain covered by
3046        // `fetch_hill_accepts_upward_probe_with_goodput_gain`, so this test constrains
3047        // the async observation path to a non-stress outcome.
3048        let snapshot = l.snapshot();
3049        assert!(
3050            matches!(snapshot, HILL_TEST_START_CAP | HILL_TEST_UP_PROBE_CAP),
3051            "timed successes should finish at the existing or accepted best cap, got {snapshot}"
3052        );
3053        let current = l.current();
3054        assert!(
3055            matches!(current, HILL_TEST_START_CAP | HILL_TEST_NEXT_UP_PROBE_CAP),
3056            "timed successes should leave the controller unstressed, got {current}"
3057        );
3058    }
3059
3060    #[test]
3061    fn fetch_hill_stress_cuts_before_full_epoch() {
3062        let cfg = LimiterConfig {
3063            window_ops: 8,
3064            min_window_ops: 4,
3065            ..hill_cfg_for_tests()
3066        };
3067        let l = fetch_hill_for_tests(16, cfg.clone());
3068
3069        for _ in 0..cfg.min_window_ops {
3070            l.observe(Outcome::Timeout, Duration::from_millis(10));
3071        }
3072
3073        assert_eq!(
3074            l.current(),
3075            8,
3076            "fetch hill climber should halve on early stress"
3077        );
3078    }
3079
3080    /// Quote/store cold-starts preserve prior static defaults. Fetch starts
3081    /// at the new conservative hill-climb default to avoid download
3082    /// overshoot on fresh installs.
3083    #[test]
3084    fn cold_start_at_least_prior_static_defaults() {
3085        let cs = ChannelStart::default();
3086        assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
3087        assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
3088        assert_eq!(
3089            cs.fetch, FETCH_COLD_START_CONCURRENCY,
3090            "fetch cold-start changed unexpectedly"
3091        );
3092    }
3093
3094    /// Reviewer N-M5 guard: with the new gated-decrease semantics
3095    /// (decreases require `min_window_ops` of fresh evidence), the
3096    /// controller must STILL reach the floor under sustained stress
3097    /// within a bounded number of observations. Otherwise we've made
3098    /// the controller too sluggish to react to a real network
3099    /// outage.
3100    ///
3101    /// From start = 64 with `min_window_ops = 8`, reaching floor 1
3102    /// takes log2(64) = 6 halvings, each gated on 8 fresh samples,
3103    /// so the upper bound is roughly `6 * 8 + min_window_ops = ~56`
3104    /// observations. We allow 200 to absorb the warm-up window and
3105    /// any per-sample evaluation slack.
3106    #[test]
3107    fn sustained_stress_reaches_floor_within_bounded_ops() {
3108        let cfg = LimiterConfig {
3109            window_ops: 32,
3110            min_window_ops: 8,
3111            success_target: 0.95,
3112            timeout_ceiling: 0.10,
3113            max_concurrency: 64,
3114            ..cfg_for_tests()
3115        };
3116        let l = Limiter::new(64, cfg);
3117        let mut ops = 0usize;
3118        while l.current() > 1 && ops < 200 {
3119            l.observe(Outcome::Timeout, Duration::from_millis(10));
3120            ops += 1;
3121        }
3122        assert_eq!(
3123            l.current(),
3124            1,
3125            "controller did not reach floor within 200 observations under \
3126             sustained timeout stress; took {ops} ops, ended at cap {}",
3127            l.current()
3128        );
3129    }
3130
3131    /// The default `AdaptiveController` (production defaults) starts
3132    /// each channel at the documented cold-start value, with each
3133    /// per-channel max strictly above the start (so the controller
3134    /// has room to grow).
3135    #[test]
3136    fn default_controller_has_growth_headroom() {
3137        let c = AdaptiveController::default();
3138        let cs = ChannelStart::default();
3139        let max = ChannelMax::default();
3140        assert_eq!(c.quote.current(), cs.quote);
3141        assert_eq!(c.store.current(), cs.store);
3142        assert_eq!(c.fetch.current(), cs.fetch);
3143        assert!(
3144            max.quote > cs.quote,
3145            "no growth headroom for quote: max={} start={}",
3146            max.quote,
3147            cs.quote
3148        );
3149        assert!(
3150            max.store > cs.store,
3151            "no growth headroom for store: max={} start={}",
3152            max.store,
3153            cs.store
3154        );
3155        assert!(
3156            max.fetch > cs.fetch,
3157            "no growth headroom for fetch: max={} start={}",
3158            max.fetch,
3159            cs.fetch
3160        );
3161    }
3162
3163    // ---- Codex review (round 3) regression tests ----
3164
3165    /// Codex CRITICAL: warm_start was blindly restoring caps below the
3166    /// cold-start floor. A prior bad run that drove store=1 would
3167    /// pessimize every subsequent run forever. The fix floors warm
3168    /// values at `ChannelStart::default()` per channel.
3169    #[test]
3170    fn warm_start_floors_at_cold_defaults() {
3171        let c = AdaptiveController::default();
3172        let cold = ChannelStart::default();
3173        // Snapshot from a "bad prior run" — every channel pinned to 1.
3174        let bad_snap = ChannelStart {
3175            quote: 1,
3176            store: 1,
3177            fetch: 1,
3178        };
3179        c.warm_start(bad_snap);
3180        // After warm_start, each channel should be AT LEAST the
3181        // cold-start value, not the persisted 1.
3182        assert_eq!(
3183            c.quote.current(),
3184            cold.quote,
3185            "quote warm_start did not floor at cold default"
3186        );
3187        assert_eq!(
3188            c.store.current(),
3189            cold.store,
3190            "store warm_start did not floor at cold default"
3191        );
3192        assert_eq!(
3193            c.fetch.current(),
3194            cold.fetch,
3195            "fetch warm_start did not floor at cold default"
3196        );
3197    }
3198
3199    /// Warm values ABOVE the cold-start floor must still be honored —
3200    /// the floor is a one-sided lower bound, not a clamp.
3201    #[test]
3202    fn warm_start_honors_values_above_cold_floor() {
3203        let c = AdaptiveController::default();
3204        let cold = ChannelStart::default();
3205        let snap = ChannelStart {
3206            quote: cold.quote * 2,
3207            store: cold.store * 4,
3208            fetch: cold.fetch * 2,
3209        };
3210        c.warm_start(snap);
3211        assert_eq!(c.quote.current(), snap.quote);
3212        assert_eq!(c.store.current(), snap.store);
3213        assert_eq!(c.fetch.current(), snap.fetch);
3214    }
3215
3216    /// Codex MAJOR: long pipelines used to capture the cap once via
3217    /// `buffer_unordered(N)`. `rebucketed` re-reads the cap at each
3218    /// batch boundary so adaptive growth/decay actually takes effect
3219    /// mid-stream. Test: fire 200 items at start cap=4, then halfway
3220    /// through bump the cap manually via warm_start to 16, and assert
3221    /// the LATER batches see the new cap.
3222    #[tokio::test]
3223    async fn rebucketed_picks_up_cap_changes_mid_stream() {
3224        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3225        use std::sync::Arc as StdArc;
3226        let cfg = LimiterConfig {
3227            min_concurrency: 1,
3228            max_concurrency: 32,
3229            ..cfg_for_tests()
3230        };
3231        let l = Limiter::new(4, cfg);
3232        let max_seen = StdArc::new(AtomicUsize::new(0));
3233        let in_flight = StdArc::new(AtomicUsize::new(0));
3234        let processed = StdArc::new(AtomicUsize::new(0));
3235        let l_for_bump = l.clone();
3236        let processed_for_bump = processed.clone();
3237        // Spawn a watcher that bumps the cap once enough items have
3238        // started to "warm up".
3239        let bump_handle = tokio::spawn(async move {
3240            loop {
3241                tokio::time::sleep(Duration::from_millis(2)).await;
3242                if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3243                    l_for_bump.warm_start(16);
3244                    return;
3245                }
3246            }
3247        });
3248        let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3249            let max_seen = max_seen.clone();
3250            let in_flight = in_flight.clone();
3251            let processed = processed.clone();
3252            async move {
3253                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3254                max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3255                tokio::time::sleep(Duration::from_millis(1)).await;
3256                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3257                processed.fetch_add(1, AtomicOrdering::Relaxed);
3258                Ok::<(), &'static str>(())
3259            }
3260        })
3261        .await
3262        .unwrap();
3263        bump_handle.await.unwrap();
3264        // The cap was bumped to 16 mid-stream. If rebucketing actually
3265        // picks up cap changes, max_seen should reach above the
3266        // initial 4.
3267        let peak = max_seen.load(AtomicOrdering::Relaxed);
3268        assert!(
3269            peak > 4,
3270            "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3271        );
3272    }
3273
3274    /// Codex MAJOR: `observe_op` cancellation safety. If the wrapper
3275    /// future is dropped before the inner op completes, no outcome is
3276    /// recorded (intentional — dropped work was never observed by
3277    /// the network). This test asserts the contract: completed ops
3278    /// land observations, dropped ops do not corrupt the window.
3279    /// Two-sided: confirms cancellation is a NO-OP, AND confirms
3280    /// post-cancellation observations DO land normally (proving the
3281    /// limiter's internal state was not corrupted).
3282    #[tokio::test]
3283    async fn observe_op_cancellation_drops_silently() {
3284        let cfg = LimiterConfig {
3285            window_ops: 16,
3286            min_window_ops: 4,
3287            ..cfg_for_tests()
3288        };
3289        let l = Limiter::new(4, cfg);
3290        // Build a future that never completes, then drop it before
3291        // awaiting. observe_op must not panic on drop and must not
3292        // record an outcome.
3293        let l_clone = l.clone();
3294        let fut = observe_op(
3295            &l_clone,
3296            || async {
3297                std::future::pending::<()>().await;
3298                Ok::<(), &'static str>(())
3299            },
3300            |_| Outcome::Timeout,
3301        );
3302        drop(fut);
3303        // Cap unchanged: no observation was recorded.
3304        assert_eq!(l.current(), 4, "cancelled op moved the cap");
3305        // Now feed observations that ACTUALLY count as Success (the
3306        // Ok branch of observe_op is always Outcome::Success — the
3307        // classifier only runs on Err). Cold-start at 4 + a full
3308        // window of healthy successes = double to 8.
3309        for _ in 0..16 {
3310            let _: Result<(), &'static str> = observe_op(
3311                &l,
3312                || async { Ok(()) },
3313                // classifier only fires on Err; Ok is always Success
3314                |_| Outcome::NetworkError,
3315            )
3316            .await;
3317        }
3318        // STRICT: cap must have GROWN, not just held. If cancellation
3319        // had corrupted internal counters, slow-start might be stuck.
3320        assert!(
3321            l.current() > 4,
3322            "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3323            l.current(),
3324        );
3325    }
3326
3327    /// Codex MAJOR: Drop persistence must be reliable. The CLI relies
3328    /// on Client::drop firing a synchronous save. If save_snapshot
3329    /// were dispatched via fire-and-forget spawn_blocking, runtime
3330    /// teardown would silently lose the snapshot. This test asserts
3331    /// that calling save_snapshot synchronously from a normal context
3332    /// (not Drop, but the same code path) actually writes.
3333    #[test]
3334    fn save_snapshot_is_synchronous_and_durable() {
3335        let dir = tempfile::tempdir().unwrap();
3336        let path = dir.path().join("client_adaptive.json");
3337        let snap = ChannelStart {
3338            quote: 100,
3339            store: 50,
3340            fetch: 200,
3341        };
3342        save_snapshot(&path, snap);
3343        // The file must exist immediately after save_snapshot returns.
3344        // No async waiting, no spawn_blocking, no eventual consistency.
3345        assert!(
3346            path.exists(),
3347            "save_snapshot did not write file synchronously"
3348        );
3349        let loaded = load_snapshot(&path).unwrap();
3350        assert_eq!(loaded.quote, 100);
3351        assert_eq!(loaded.store, 50);
3352        assert_eq!(loaded.fetch, 200);
3353    }
3354
3355    // ---- Codex round 4 regression tests ----
3356
3357    /// Codex CR-2 fix: warm_start marks the limiter as having
3358    /// already-left-slow-start, so a single healthy window does NOT
3359    /// double the cap (which would be over-aggressive resume from a
3360    /// learned value).
3361    #[tokio::test]
3362    async fn warm_start_disables_slow_start_doubling() {
3363        let cfg = LimiterConfig {
3364            window_ops: 8,
3365            min_window_ops: 4,
3366            success_target: 0.9,
3367            ..cfg_for_tests()
3368        };
3369        let l = Limiter::new(2, cfg.clone());
3370        // Warm-start to a learned value of 16. This must not be
3371        // treated as a fresh slow-start.
3372        l.warm_start(16);
3373        assert_eq!(l.current(), 16);
3374        // One full healthy window: in slow-start would double to 32;
3375        // post-warm-start it should add +1 to 17.
3376        for _ in 0..cfg.window_ops {
3377            l.observe(Outcome::Success, Duration::from_millis(10));
3378        }
3379        assert_eq!(
3380            l.current(),
3381            17,
3382            "warm-start triggered slow-start doubling instead of additive +1"
3383        );
3384    }
3385
3386    /// Codex CR-3 fix: warm_start floors against the per-instance
3387    /// cold-start, NOT the global ChannelStart::default. A controller
3388    /// built with custom low starts must stay faithful to its
3389    /// construction parameters even after warm_start.
3390    #[test]
3391    fn controller_warm_start_floors_at_per_instance_cold_start() {
3392        let custom_cold = ChannelStart {
3393            quote: 2,
3394            store: 1,
3395            fetch: 4,
3396        };
3397        let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3398        // Snapshot below the per-instance cold-start floors at custom values.
3399        c.warm_start(ChannelStart {
3400            quote: 1,
3401            store: 1,
3402            fetch: 1,
3403        });
3404        assert_eq!(c.quote.current(), 2);
3405        assert_eq!(c.store.current(), 1);
3406        assert_eq!(c.fetch.current(), 4);
3407        // Snapshot above the per-instance cold-start uses the snapshot.
3408        c.warm_start(ChannelStart {
3409            quote: 10,
3410            store: 10,
3411            fetch: 10,
3412        });
3413        assert_eq!(c.quote.current(), 10);
3414        assert_eq!(c.store.current(), 10);
3415        assert_eq!(c.fetch.current(), 10);
3416    }
3417
3418    /// Codex CR-3 fix: when adaptive.enabled = false, warm_start is
3419    /// a no-op — fixed-concurrency mode means the user wants exactly
3420    /// the cold start, not a learned value from a prior run.
3421    #[test]
3422    fn warm_start_is_noop_when_adaptive_disabled() {
3423        let cfg = AdaptiveConfig {
3424            enabled: false,
3425            ..AdaptiveConfig::default()
3426        };
3427        let custom_cold = ChannelStart {
3428            quote: 5,
3429            store: 5,
3430            fetch: 5,
3431        };
3432        let c = AdaptiveController::new(custom_cold, cfg);
3433        c.warm_start(ChannelStart {
3434            quote: 100,
3435            store: 100,
3436            fetch: 100,
3437        });
3438        assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3439        assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3440        assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3441    }
3442
3443    /// Codex CR-4 fix: rebucketed_unordered is rolling, not batch-fenced.
3444    /// One slow item must NOT block other items in the same logical
3445    /// "wave" — the in-flight set should refill as fast items complete.
3446    #[tokio::test]
3447    async fn rebucketed_unordered_is_rolling_not_fenced() {
3448        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3449        use std::sync::Arc as StdArc;
3450        let cfg = LimiterConfig {
3451            min_concurrency: 1,
3452            max_concurrency: 8,
3453            window_ops: 100,
3454            min_window_ops: 50,
3455            ..cfg_for_tests()
3456        };
3457        let l = Limiter::new(4, cfg);
3458        let in_flight = StdArc::new(AtomicUsize::new(0));
3459        let max_in_flight = StdArc::new(AtomicUsize::new(0));
3460        let started = StdArc::new(AtomicUsize::new(0));
3461        let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3462            let in_flight = in_flight.clone();
3463            let max_in_flight = max_in_flight.clone();
3464            let started = started.clone();
3465            async move {
3466                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3467                max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3468                started.fetch_add(1, AtomicOrdering::Relaxed);
3469                // Item 0 is intentionally slow; items 1..20 are fast.
3470                // In a batch-fenced scheduler, item 0 would gate the
3471                // start of items in the next batch. In a rolling
3472                // scheduler, items 1..N can start as soon as their
3473                // slot frees from a fast completion.
3474                if i == 0 {
3475                    tokio::time::sleep(Duration::from_millis(50)).await;
3476                } else {
3477                    tokio::time::sleep(Duration::from_millis(1)).await;
3478                }
3479                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3480                Ok::<(), &'static str>(())
3481            }
3482        })
3483        .await
3484        .unwrap();
3485        // All 20 items must have started; in a rolling scheduler the
3486        // peak in-flight should reach at least 4 (the cap).
3487        assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3488        let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3489        assert!(
3490            peak >= 4,
3491            "rolling scheduler did not fill cap; peak in-flight = {peak}"
3492        );
3493    }
3494
3495    /// Codex CR-4 fix: rebucketed_ordered preserves input order.
3496    #[tokio::test]
3497    async fn rebucketed_ordered_preserves_input_order() {
3498        let cfg = LimiterConfig {
3499            min_concurrency: 1,
3500            max_concurrency: 4,
3501            ..cfg_for_tests()
3502        };
3503        let l = Limiter::new(4, cfg);
3504        let items: Vec<usize> = (0..50).collect();
3505        let result: Vec<usize> = rebucketed_ordered(
3506            &l,
3507            items.iter().copied().enumerate(),
3508            |(idx, v)| async move {
3509                // Reverse-bias delay so out-of-order completion is likely.
3510                let delay = (50 - v) as u64;
3511                tokio::time::sleep(Duration::from_micros(delay)).await;
3512                Ok::<_, &'static str>((idx, v * 10))
3513            },
3514        )
3515        .await
3516        .unwrap();
3517        assert_eq!(result.len(), 50);
3518        for (i, v) in result.iter().enumerate() {
3519            assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3520        }
3521    }
3522
3523    /// Codex CR-1 regression guard (logical, not the actual data path):
3524    /// rebucketed_ordered with a payload of (idx, hash) must always
3525    /// pair the right hash with the right chunk content even under
3526    /// adversarial out-of-order completion.
3527    #[tokio::test]
3528    async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3529        let cfg = LimiterConfig {
3530            min_concurrency: 1,
3531            max_concurrency: 8,
3532            ..cfg_for_tests()
3533        };
3534        let l = Limiter::new(8, cfg);
3535        // Each item is (idx, fake_hash). The "fetch" returns
3536        // (idx, content_for_hash). We adversarially out-of-order them
3537        // and assert that the post-sort puts content with the right
3538        // index.
3539        let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3540        let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3541            let delay = (40 - idx) as u64; // reverse delay
3542            tokio::time::sleep(Duration::from_micros(delay)).await;
3543            // "content_for_hash" derived from the hash.
3544            Ok::<_, &'static str>((idx, hash * 7))
3545        })
3546        .await
3547        .unwrap();
3548        for (i, v) in result.iter().enumerate() {
3549            let expected = (1000 + i as u64) * 7;
3550            assert_eq!(
3551                *v, expected,
3552                "idx {i} paired with wrong content: {v}, expected {expected}"
3553            );
3554        }
3555    }
3556
3557    /// Codex CR-5 fix: snapshot temp file is unique per save call,
3558    /// not just per-PID. Two save_snapshot calls in the SAME process
3559    /// must not collide on the temp file.
3560    #[test]
3561    fn save_snapshot_temp_file_is_unique_per_call() {
3562        let dir = tempfile::tempdir().unwrap();
3563        let path = dir.path().join("client_adaptive.json");
3564        // Fire many saves back-to-back in the same process. Without
3565        // a per-call unique suffix, the temp file would be the same
3566        // for every call (PID is constant), and any partial write +
3567        // crash window would expose the race. We can't simulate the
3568        // exact race in a unit test, but we can confirm no panic and
3569        // the final file is correct after many calls.
3570        for i in 0..100 {
3571            save_snapshot(
3572                &path,
3573                ChannelStart {
3574                    quote: i + 1,
3575                    store: i + 1,
3576                    fetch: i + 1,
3577                },
3578            );
3579        }
3580        let loaded = load_snapshot(&path).unwrap();
3581        assert_eq!(loaded.quote, 100);
3582        assert_eq!(loaded.store, 100);
3583        assert_eq!(loaded.fetch, 100);
3584        // Confirm no leftover .tmp files.
3585        let leftover: Vec<_> = std::fs::read_dir(dir.path())
3586            .unwrap()
3587            .filter_map(|e| e.ok())
3588            .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3589            .collect();
3590        assert!(
3591            leftover.is_empty(),
3592            "temp files leaked: {:?}",
3593            leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3594        );
3595    }
3596
3597    // ---- Edge case tests ----
3598
3599    /// Edge case: rebucketed_unordered with EMPTY input returns empty
3600    /// Vec immediately, no panic, no work scheduled.
3601    #[tokio::test]
3602    async fn rebucketed_empty_input_returns_empty() {
3603        let cfg = cfg_for_tests();
3604        let l = Limiter::new(4, cfg);
3605        let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3606            Ok::<_, &'static str>(42usize)
3607        })
3608        .await
3609        .unwrap();
3610        assert!(v.is_empty());
3611        let v: Vec<usize> = rebucketed_ordered(
3612            &l,
3613            std::iter::empty::<(usize, ())>(),
3614            |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3615        )
3616        .await
3617        .unwrap();
3618        assert!(v.is_empty());
3619    }
3620
3621    /// Edge case: rebucketed_unordered with EXACTLY cap items.
3622    #[tokio::test]
3623    async fn rebucketed_exactly_cap_items() {
3624        let cfg = LimiterConfig {
3625            min_concurrency: 1,
3626            max_concurrency: 4,
3627            ..cfg_for_tests()
3628        };
3629        let l = Limiter::new(4, cfg);
3630        let v: Vec<usize> =
3631            rebucketed_unordered(
3632                &l,
3633                0..4usize,
3634                |i| async move { Ok::<_, &'static str>(i * 2) },
3635            )
3636            .await
3637            .unwrap();
3638        assert_eq!(v.len(), 4);
3639    }
3640
3641    /// Edge case: rebucketed_unordered preserves the FIRST error and
3642    /// discards subsequent ones, draining in-flight work first.
3643    #[tokio::test]
3644    async fn rebucketed_preserves_first_error() {
3645        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3646        use std::sync::Arc as StdArc;
3647        let cfg = LimiterConfig {
3648            min_concurrency: 1,
3649            max_concurrency: 4,
3650            ..cfg_for_tests()
3651        };
3652        let l = Limiter::new(4, cfg);
3653        let started = StdArc::new(AtomicUsize::new(0));
3654        let started_clone = started.clone();
3655        let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3656            let started = started_clone.clone();
3657            async move {
3658                started.fetch_add(1, AtomicOrdering::Relaxed);
3659                if i == 5 {
3660                    // Slight delay so item 6, 7 also start before
3661                    // this error propagates.
3662                    tokio::time::sleep(Duration::from_micros(100)).await;
3663                    return Err("first error");
3664                }
3665                if i == 10 {
3666                    return Err("second error - should be ignored");
3667                }
3668                tokio::time::sleep(Duration::from_micros(50)).await;
3669                Ok(())
3670            }
3671        })
3672        .await;
3673        match result {
3674            Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3675            Ok(_) => panic!("expected error, got ok"),
3676        }
3677        // The first error stops new launches, but in-flight items
3678        // drain. We don't assert exact count (nondeterministic) — only
3679        // that we did not launch ALL 20 items (proving error-stop
3680        // works) and we did launch more than just item 5 (proving
3681        // in-flight drain happens).
3682        let total = started.load(AtomicOrdering::Relaxed);
3683        assert!(
3684            (5..20).contains(&total),
3685            "started count out of range: {total}"
3686        );
3687    }
3688
3689    /// Edge case: limiter with min == max (degenerate single-value).
3690    /// Cap stays at the single value regardless of observations.
3691    #[test]
3692    fn limiter_with_min_equal_max_is_pinned() {
3693        let cfg = LimiterConfig {
3694            min_concurrency: 5,
3695            max_concurrency: 5,
3696            ..cfg_for_tests()
3697        };
3698        let l = Limiter::new(5, cfg);
3699        for _ in 0..1000 {
3700            l.observe(Outcome::Success, Duration::from_millis(1));
3701        }
3702        assert_eq!(l.current(), 5, "cap moved despite min==max");
3703        for _ in 0..1000 {
3704            l.observe(Outcome::Timeout, Duration::from_millis(50));
3705        }
3706        assert_eq!(l.current(), 5, "cap moved despite min==max");
3707    }
3708
3709    /// Direct test of `ewma()` math: alpha = 0 means new value =
3710    /// prev (the baseline never updates from new samples).
3711    #[test]
3712    fn ewma_alpha_zero_returns_prev() {
3713        let prev = Duration::from_millis(100);
3714        let sample = Duration::from_millis(500);
3715        let result = ewma(prev, sample, 0.0);
3716        assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3717    }
3718
3719    /// Direct test of `ewma()` math: alpha = 1 means new value =
3720    /// sample (full overwrite).
3721    #[test]
3722    fn ewma_alpha_one_returns_sample() {
3723        let prev = Duration::from_millis(100);
3724        let sample = Duration::from_millis(500);
3725        let result = ewma(prev, sample, 1.0);
3726        // Allow 1ms of float-conversion slop.
3727        let diff = result.abs_diff(sample);
3728        assert!(
3729            diff <= Duration::from_millis(1),
3730            "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3731        );
3732    }
3733
3734    /// Direct test of `ewma()`: alpha = 0.5 should give the midpoint.
3735    #[test]
3736    fn ewma_alpha_half_returns_midpoint() {
3737        let prev = Duration::from_millis(200);
3738        let sample = Duration::from_millis(400);
3739        let result = ewma(prev, sample, 0.5);
3740        let expected = Duration::from_millis(300);
3741        let diff = result.abs_diff(expected);
3742        assert!(
3743            diff <= Duration::from_millis(1),
3744            "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3745        );
3746    }
3747
3748    /// Direct test of `ewma()`: NaN alpha must NOT panic and must
3749    /// preserve the previous value (defense against
3750    /// `Duration::from_secs_f64(NaN)` panic).
3751    #[test]
3752    fn ewma_nan_alpha_returns_prev() {
3753        let prev = Duration::from_millis(100);
3754        let sample = Duration::from_millis(500);
3755        let result = ewma(prev, sample, f64::NAN);
3756        assert_eq!(result, prev);
3757        let result = ewma(prev, sample, f64::INFINITY);
3758        assert_eq!(result, prev);
3759        let result = ewma(prev, sample, f64::NEG_INFINITY);
3760        assert_eq!(result, prev);
3761    }
3762
3763    /// Out-of-range alpha (e.g. 2.5) must clamp to [0,1] and NOT
3764    /// produce a negative result.
3765    #[test]
3766    fn ewma_clamps_alpha_above_one() {
3767        let prev = Duration::from_millis(100);
3768        let sample = Duration::from_millis(500);
3769        let result = ewma(prev, sample, 2.5);
3770        // Clamped to 1.0 -> should equal sample (~500ms).
3771        assert!(result >= Duration::from_millis(499));
3772        assert!(result <= Duration::from_millis(501));
3773    }
3774
3775    /// Edge case: window contains ONLY ApplicationErrors. Controller
3776    /// must HOLD (not move at all), because there are zero
3777    /// capacity-relevant samples.
3778    #[test]
3779    fn window_full_of_application_errors_does_not_move_cap() {
3780        let cfg = cfg_for_tests();
3781        let l = Limiter::new(8, cfg.clone());
3782        for _ in 0..(cfg.window_ops * 5) {
3783            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3784        }
3785        assert_eq!(
3786            l.current(),
3787            8,
3788            "cap moved on pure-app-error window; should hold"
3789        );
3790    }
3791
3792    /// Edge case: AdaptiveController with `enabled = false` plus
3793    /// observations does not move and does not interact with the
3794    /// observation window.
3795    #[test]
3796    fn disabled_adaptive_controller_truly_inert() {
3797        let cfg = AdaptiveConfig {
3798            enabled: false,
3799            ..AdaptiveConfig::default()
3800        };
3801        let c = AdaptiveController::new(ChannelStart::default(), cfg);
3802        let baseline_quote = c.quote.current();
3803        let baseline_store = c.store.current();
3804        let baseline_fetch = c.fetch.current();
3805        for _ in 0..10000 {
3806            c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3807            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3808            c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3809        }
3810        assert_eq!(c.quote.current(), baseline_quote);
3811        assert_eq!(c.store.current(), baseline_store);
3812        assert_eq!(c.fetch.current(), baseline_fetch);
3813    }
3814
3815    /// Edge case: per-channel limiters share NO state. Hammering one
3816    /// channel must not move another. Two-sided: assert store DROPS
3817    /// to the floor (proving observations landed) AND quote/fetch
3818    /// are EXACTLY unchanged (proving zero cross-channel leakage).
3819    #[test]
3820    fn channel_state_is_independent() {
3821        let c = AdaptiveController::default();
3822        let q0 = c.quote.current();
3823        let f0 = c.fetch.current();
3824        let s0 = c.store.current();
3825        for _ in 0..1000 {
3826            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3827        }
3828        // Strict: store reached the floor (observations landed).
3829        assert_eq!(
3830            c.store.current(),
3831            c.config.min_concurrency,
3832            "store did not reach floor after 1000 timeouts; cap={}",
3833            c.store.current()
3834        );
3835        assert!(c.store.current() < s0, "store cap did not move at all");
3836        // Strict: quote and fetch unchanged.
3837        assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3838        assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3839    }
3840
3841    // ---- Round-5 test reviewer suggestions ----
3842
3843    /// Direct unit test for `AdaptiveConfig::sanitize`. Verifies that
3844    /// every clamped field is correctly fixed up, not merely that
3845    /// the controller doesn't crash.
3846    #[test]
3847    fn sanitize_corrects_pathological_floats() {
3848        let mut cfg = AdaptiveConfig {
3849            success_target: f64::NAN,
3850            timeout_ceiling: 5.0,
3851            latency_inflation_factor: f64::NEG_INFINITY,
3852            latency_ewma_alpha: 2.5,
3853            window_ops: 4,
3854            min_window_ops: 10,
3855            ..AdaptiveConfig::default()
3856        };
3857        cfg.sanitize();
3858        assert!(cfg.success_target.is_finite());
3859        assert!((0.0..=1.0).contains(&cfg.success_target));
3860        assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3861        assert!(cfg.latency_inflation_factor.is_finite());
3862        assert!(cfg.latency_inflation_factor > 0.0);
3863        assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3864        assert!(
3865            cfg.min_window_ops <= cfg.window_ops,
3866            "min_window_ops {} > window_ops {}",
3867            cfg.min_window_ops,
3868            cfg.window_ops
3869        );
3870    }
3871
3872    /// Snapshot persistence relies on serde for ChannelStart and
3873    /// ChannelMax. A field rename in either type would silently
3874    /// break warm-start across binary upgrades — this test catches
3875    /// that.
3876    #[test]
3877    fn channel_max_serde_round_trips() {
3878        let m = ChannelMax {
3879            quote: 7,
3880            store: 13,
3881            fetch: 200,
3882        };
3883        let json = serde_json::to_string(&m).unwrap();
3884        let back: ChannelMax = serde_json::from_str(&json).unwrap();
3885        assert_eq!(back.quote, 7);
3886        assert_eq!(back.store, 13);
3887        assert_eq!(back.fetch, 200);
3888    }
3889
3890    #[test]
3891    fn channel_start_serde_round_trips() {
3892        let s = ChannelStart {
3893            quote: 11,
3894            store: 22,
3895            fetch: 33,
3896        };
3897        let json = serde_json::to_string(&s).unwrap();
3898        let back: ChannelStart = serde_json::from_str(&json).unwrap();
3899        assert_eq!(back.quote, 11);
3900        assert_eq!(back.store, 22);
3901        assert_eq!(back.fetch, 33);
3902    }
3903
3904    /// Mid-flight cap SHRINKAGE: `rebucketed_picks_up_cap_changes_mid_stream`
3905    /// only proves growth. Overload protection requires the reverse —
3906    /// when the controller halves the cap mid-pipeline, in-flight
3907    /// must respect the new lower cap on the next refill.
3908    #[tokio::test]
3909    async fn rebucketed_honors_cap_shrinkage_mid_stream() {
3910        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3911        use std::sync::Arc as StdArc;
3912        let cfg = LimiterConfig {
3913            min_concurrency: 1,
3914            max_concurrency: 16,
3915            ..cfg_for_tests()
3916        };
3917        let l = Limiter::new(16, cfg);
3918        let in_flight = StdArc::new(AtomicUsize::new(0));
3919        let max_after_shrink = StdArc::new(AtomicUsize::new(0));
3920        let processed = StdArc::new(AtomicUsize::new(0));
3921        let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
3922        let l_for_shrink = l.clone();
3923        let p_for_shrink = processed.clone();
3924        let shrunk_for_shrink = shrunk.clone();
3925        let shrink_handle = tokio::spawn(async move {
3926            // Bump down the cap once 50 items have completed.
3927            loop {
3928                tokio::time::sleep(Duration::from_millis(2)).await;
3929                if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
3930                    l_for_shrink.warm_start(2);
3931                    shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
3932                    return;
3933                }
3934            }
3935        });
3936        let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
3937            let in_flight = in_flight.clone();
3938            let max_after_shrink = max_after_shrink.clone();
3939            let processed = processed.clone();
3940            let shrunk = shrunk.clone();
3941            async move {
3942                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3943                if shrunk.load(AtomicOrdering::Relaxed) {
3944                    max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
3945                }
3946                tokio::time::sleep(Duration::from_millis(1)).await;
3947                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3948                processed.fetch_add(1, AtomicOrdering::Relaxed);
3949                Ok::<(), &'static str>(())
3950            }
3951        })
3952        .await
3953        .unwrap();
3954        shrink_handle.await.unwrap();
3955        let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
3956        // After the shrink to cap=2, no NEW launches should put us
3957        // above 2. Already-launched in-flight may still be draining
3958        // briefly, so allow a small overshoot for the natural
3959        // refill-after-completion lag.
3960        assert!(
3961            peak <= 4,
3962            "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
3963        );
3964    }
3965
3966    /// Mixed `ApplicationError` + capacity-relevant items in one
3967    /// window. ApplicationError must NOT contribute to the success
3968    /// rate denominator — otherwise a wave with some AppErrors and
3969    /// some healthy successes would falsely look like a stressed
3970    /// window.
3971    #[test]
3972    fn mixed_window_app_errors_with_capacity_signal() {
3973        let cfg = LimiterConfig {
3974            window_ops: 10,
3975            min_window_ops: 5,
3976            timeout_ceiling: 0.2,
3977            success_target: 0.9,
3978            ..cfg_for_tests()
3979        };
3980        // Case 1: 5 AppErrors + 5 Successes. Capacity-relevant
3981        // success_rate = 5/5 = 100%. Cap must NOT decrease (it may
3982        // hold at 8 or grow via slow-start; both prove the AppErrors
3983        // didn't poison the success-rate denominator).
3984        let l = Limiter::new(8, cfg.clone());
3985        for _ in 0..5 {
3986            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3987        }
3988        for _ in 0..5 {
3989            l.observe(Outcome::Success, Duration::from_millis(50));
3990        }
3991        assert!(
3992            l.current() >= 8,
3993            "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
3994            l.current()
3995        );
3996        // Case 2: 5 AppErrors + 5 Timeouts. Capacity-relevant
3997        // success_rate = 0/5 = 0%. Cap MUST decrease.
3998        let l2 = Limiter::new(8, cfg);
3999        for _ in 0..5 {
4000            l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
4001        }
4002        for _ in 0..5 {
4003            l2.observe(Outcome::Timeout, Duration::from_millis(50));
4004        }
4005        assert!(
4006            l2.current() < 8,
4007            "all-timeouts (with AppError padding) did not decrease cap; got {}",
4008            l2.current()
4009        );
4010    }
4011
4012    /// Real concurrent torn-read test for save/load. The previous
4013    /// concurrent-write test only reads after both writers join;
4014    /// this version interleaves a reader thread with writers and
4015    /// asserts every successful load returns a coherent (non-torn)
4016    /// snapshot.
4017    #[test]
4018    fn concurrent_save_load_no_torn_reads() {
4019        use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4020        use std::thread;
4021        let dir = tempfile::tempdir().unwrap();
4022        let path = dir.path().join("snap.json");
4023        // Seed the file so the reader doesn't get a None on first read.
4024        save_snapshot(
4025            &path,
4026            ChannelStart {
4027                quote: 1,
4028                store: 1,
4029                fetch: 1,
4030            },
4031        );
4032        let stop = std::sync::Arc::new(AtomicBool::new(false));
4033        let p_w = path.clone();
4034        let s_w = stop.clone();
4035        let writer = thread::spawn(move || {
4036            let mut i = 1usize;
4037            while !s_w.load(AtomicOrdering::Relaxed) {
4038                save_snapshot(
4039                    &p_w,
4040                    ChannelStart {
4041                        quote: i,
4042                        store: i,
4043                        fetch: i,
4044                    },
4045                );
4046                i = i.wrapping_add(1).max(1);
4047            }
4048        });
4049        let p_r = path.clone();
4050        let reader = thread::spawn(move || {
4051            let mut torn = 0usize;
4052            for _ in 0..2_000 {
4053                if let Some(snap) = load_snapshot(&p_r) {
4054                    // Coherent snapshots have all three channels equal
4055                    // (writer always saves equal values).
4056                    if snap.quote != snap.store || snap.store != snap.fetch {
4057                        torn += 1;
4058                    }
4059                }
4060            }
4061            torn
4062        });
4063        let torn = reader.join().unwrap();
4064        stop.store(true, AtomicOrdering::Relaxed);
4065        writer.join().unwrap();
4066        assert_eq!(
4067            torn, 0,
4068            "observed {torn} torn reads under concurrent writes"
4069        );
4070    }
4071
4072    /// Round-5 follow-up: `save_snapshot_with_timeout` returns
4073    /// promptly even when the underlying write would otherwise hang.
4074    /// Use a path under a non-existent root that mkdir cannot create
4075    /// to simulate a slow/failing filesystem (mkdir returns Err
4076    /// quickly so this isn't a real hang test, but it confirms the
4077    /// timeout wrapper does not block longer than the deadline on a
4078    /// fast-failing operation either).
4079    #[test]
4080    fn save_with_timeout_returns_promptly_on_fast_failure() {
4081        let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
4082        let snap = ChannelStart {
4083            quote: 1,
4084            store: 1,
4085            fetch: 1,
4086        };
4087        let started = Instant::now();
4088        save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
4089        let elapsed = started.elapsed();
4090        // Fast-failing mkdir returns immediately. The timeout
4091        // wrapper should not add measurable overhead.
4092        assert!(
4093            elapsed < Duration::from_secs(1),
4094            "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
4095        );
4096    }
4097
4098    /// Round-5 follow-up: a hung writer thread (simulated by a path
4099    /// the writer never returns from). The wrapper must time out and
4100    /// return without joining; the test must complete near the
4101    /// deadline, not hang.
4102    #[test]
4103    fn save_with_timeout_bounds_wall_time_on_hang() {
4104        // Use a real-but-slow-write simulation: hand the writer a
4105        // path that the OS will accept but with a synthetic delay
4106        // baked into a wrapping thread. Since save_snapshot itself
4107        // does no sleep, we instead test that the timeout wrapper
4108        // exits within deadline + small slack when the inner work
4109        // takes longer than the deadline. We approximate by giving
4110        // the wrapper a deadline shorter than any plausible local
4111        // disk write (1ms is too tight; 0ms is too tight). Use
4112        // 1ms deadline and assert wall time < 100ms — proving the
4113        // wrapper does NOT wait for the writer to actually finish
4114        // (the inner write to a tempdir takes a few ms typically).
4115        let dir = tempfile::tempdir().unwrap();
4116        let path = dir.path().join("snap.json");
4117        let snap = ChannelStart {
4118            quote: 1,
4119            store: 1,
4120            fetch: 1,
4121        };
4122        let started = Instant::now();
4123        // Deadline so short that on most machines the writer is
4124        // still running. The wrapper must NOT wait for it.
4125        save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
4126        let elapsed = started.elapsed();
4127        assert!(
4128            elapsed < Duration::from_millis(200),
4129            "timeout wrapper did not bound wall time: {elapsed:?}"
4130        );
4131    }
4132}