Skip to main content

ant_core/data/client/
adaptive.rs

1//! Adaptive concurrency controller for client data operations.
2//!
3//! Replaces hard-coded `quote_concurrency` / `store_concurrency` /
4//! download fan-out with a per-channel AIMD limiter that ramps up when
5//! the network is healthy and ramps down on stress signals (timeouts,
6//! errors, latency inflation). The goal is to give every machine and
7//! every connection profile a single client codebase that finds its
8//! own steady state without the user tweaking flags.
9//!
10//! ## Channels
11//!
12//! Three independent limiters share the same algorithm but track state
13//! separately, because their workloads have different cost profiles:
14//!
15//! - `quote`  — small DHT request/response messages, cheap per op
16//! - `store`  — multi-MB chunk PUTs to a close group, expensive per op
17//! - `fetch`  — multi-MB chunk GETs from peers, asymmetric to `store`
18//!
19//! ## Algorithm
20//!
21//! TCP-style AIMD with slow-start:
22//!
23//! - **Slow-start**: starting concurrency doubles after each healthy
24//!   window until first stress signal or until the configured ceiling.
25//! - **Steady state**: additive +1 per healthy window (>= success_target
26//!   success rate AND p95 latency within `latency_inflation_factor` of
27//!   the rolling baseline).
28//! - **Stress**: multiplicative decrease (current / 2, floor 1) on any
29//!   of: success rate < success_target, timeout rate > timeout_ceiling,
30//!   or p95 latency above `latency_inflation_factor * baseline`.
31//!
32//! Decisions evaluate over a sliding window of the last `window_ops`
33//! observed outcomes per channel. Below `min_window_ops` outcomes the
34//! controller holds steady — too few samples to act on.
35//!
36//! ## What this is not
37//!
38//! - Not a payment-batching controller. Wave / batch sizes are
39//!   orthogonal (gas-economics tradeoff, not throughput).
40//! - Not a peer-quality scorer. That lives in `peer_cache` and feeds
41//!   `BootstrapManager`. Outcomes flow into both, separately.
42
43use serde::{Deserialize, Serialize};
44use std::collections::VecDeque;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, Ordering};
47use std::sync::{Arc, Mutex, PoisonError};
48use std::time::{Duration, Instant};
49use tracing::{debug, warn};
50
51/// Process-monotonic counter for unique snapshot temp filenames.
52/// Combined with PID + nanosecond timestamp, makes collision
53/// effectively impossible across concurrent save_snapshot calls.
54static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
55
56/// Lock helper matching the project pattern (see `cache::ChunkCache`):
57/// poisoned mutexes still yield the inner state rather than panicking.
58fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
59    m.lock().unwrap_or_else(PoisonError::into_inner)
60}
61
62/// Outcome of a single observed operation on one channel.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum Outcome {
65    /// Completed successfully.
66    Success,
67    /// Did not complete within the per-op timeout.
68    Timeout,
69    /// Failed with a network/transport error (refused, reset, unreachable).
70    NetworkError,
71    /// Failed with an application-level error not attributable to the
72    /// network (e.g. bad payment proof). Recorded but does not push the
73    /// controller down — it is not a capacity signal.
74    ApplicationError,
75}
76
77/// Per-channel concurrency ceilings. Each channel has its own cap so
78/// that constraining one (e.g. user pinned a low store concurrency for
79/// a slow uplink) never bleeds into another (download).
80#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
81pub struct ChannelMax {
82    pub quote: usize,
83    pub store: usize,
84    pub fetch: usize,
85}
86
87impl Default for ChannelMax {
88    fn default() -> Self {
89        // Generous ceilings that give the controller real headroom to
90        // grow on healthy connections. The cold-start values
91        // (`ChannelStart::default()`) are well below these so AIMD
92        // can actually do its job. Each ceiling is independent.
93        Self {
94            quote: 128,
95            store: 64,
96            fetch: 256,
97        }
98    }
99}
100
101/// Tunable knobs for the adaptive controller. Defaults are picked so
102/// that the controller behaves at least as well as the prior static
103/// defaults on a healthy network: starts at the previous static value
104/// and only deviates when signals demand it.
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct AdaptiveConfig {
107    /// Master switch. When `false`, channels report `initial` forever
108    /// and ignore observations. Useful for benchmarks / debugging.
109    pub enabled: bool,
110    /// Floor concurrency per channel. Never go below this.
111    pub min_concurrency: usize,
112    /// Per-channel ceiling concurrency. See `ChannelMax`.
113    pub max: ChannelMax,
114    /// Sliding window size in number of recent ops considered for
115    /// adaptation decisions.
116    pub window_ops: usize,
117    /// Below this count of outcomes in the window, hold steady.
118    pub min_window_ops: usize,
119    /// Required success rate to consider the window healthy. Healthy
120    /// windows trigger increase; unhealthy windows trigger decrease.
121    pub success_target: f64,
122    /// Timeout rate above which the window counts as stressed even if
123    /// the success rate would otherwise pass.
124    pub timeout_ceiling: f64,
125    /// p95 latency above `latency_inflation_factor * baseline` is a
126    /// stress signal. Baseline is an EWMA of healthy-window p95s.
127    pub latency_inflation_factor: f64,
128    /// EWMA smoothing factor for the latency baseline. 0 = never
129    /// updates, 1 = baseline = last sample. 0.2 trades responsiveness
130    /// for stability. Validated to `[0.0, 1.0]`; `NaN`/non-finite
131    /// values are sanitized to the default at controller construction.
132    pub latency_ewma_alpha: f64,
133}
134
135impl AdaptiveConfig {
136    /// Sanitize the config: clamp `latency_ewma_alpha` to `[0,1]`
137    /// (rejecting NaN/Inf which would otherwise panic in
138    /// `Duration::from_secs_f64`), enforce `min_concurrency >= 1`,
139    /// enforce per-channel max >= min_concurrency, enforce
140    /// `min_window_ops <= window_ops`. Idempotent.
141    pub fn sanitize(&mut self) {
142        if !self.latency_ewma_alpha.is_finite() {
143            self.latency_ewma_alpha = 0.2;
144        }
145        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
146        if !self.success_target.is_finite() {
147            self.success_target = 0.95;
148        }
149        self.success_target = self.success_target.clamp(0.0, 1.0);
150        if !self.timeout_ceiling.is_finite() {
151            self.timeout_ceiling = 0.10;
152        }
153        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
154        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
155            self.latency_inflation_factor = 2.0;
156        }
157        self.min_concurrency = self.min_concurrency.max(1);
158        self.window_ops = self.window_ops.max(1);
159        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
160        self.max.quote = self.max.quote.max(self.min_concurrency);
161        self.max.store = self.max.store.max(self.min_concurrency);
162        self.max.fetch = self.max.fetch.max(self.min_concurrency);
163    }
164}
165
166impl Default for AdaptiveConfig {
167    fn default() -> Self {
168        Self {
169            enabled: true,
170            min_concurrency: 1,
171            max: ChannelMax::default(),
172            window_ops: 32,
173            min_window_ops: 8,
174            success_target: 0.95,
175            timeout_ceiling: 0.10,
176            latency_inflation_factor: 2.0,
177            latency_ewma_alpha: 0.2,
178        }
179    }
180}
181
182/// Suggested starting concurrency per channel for a brand-new client
183/// with no persisted state. Intentionally matches or exceeds the prior
184/// static defaults so the cold path is not slower:
185///
186/// - quote was statically 32 — start at 32.
187/// - store was statically 8 — start at 8.
188/// - fetch was previously unbounded (the entire self_encryption batch
189///   was fired at once via `FuturesUnordered`). Typical batches are
190///   small (handful of chunks); occasional larger ones are capped at
191///   `max_concurrency`. Start at 64 to keep small/medium downloads
192///   indistinguishable from the prior unbounded behavior.
193#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
194pub struct ChannelStart {
195    pub quote: usize,
196    pub store: usize,
197    pub fetch: usize,
198}
199
200impl Default for ChannelStart {
201    fn default() -> Self {
202        Self {
203            quote: 32,
204            store: 8,
205            fetch: 64,
206        }
207    }
208}
209
210/// One observed sample retained in the sliding window.
211#[derive(Debug, Clone, Copy)]
212struct Sample {
213    outcome: Outcome,
214    latency: Duration,
215}
216
217/// Per-limiter configuration. Carries the shared adaptive parameters
218/// plus the channel-specific `max_concurrency`. Held behind an `Arc`
219/// so cloning a `Limiter` is a refcount bump rather than a struct copy
220/// (avoids allocating `AdaptiveConfig`-worth of bytes per chunk in
221/// hot loops).
222#[derive(Debug, Clone)]
223pub struct LimiterConfig {
224    pub enabled: bool,
225    pub min_concurrency: usize,
226    pub max_concurrency: usize,
227    pub window_ops: usize,
228    pub min_window_ops: usize,
229    pub success_target: f64,
230    pub timeout_ceiling: f64,
231    pub latency_inflation_factor: f64,
232    pub latency_ewma_alpha: f64,
233}
234
235impl LimiterConfig {
236    fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
237        Self {
238            enabled: cfg.enabled,
239            min_concurrency: cfg.min_concurrency,
240            max_concurrency: max_for_channel.max(cfg.min_concurrency),
241            window_ops: cfg.window_ops,
242            min_window_ops: cfg.min_window_ops,
243            success_target: cfg.success_target,
244            timeout_ceiling: cfg.timeout_ceiling,
245            latency_inflation_factor: cfg.latency_inflation_factor,
246            latency_ewma_alpha: cfg.latency_ewma_alpha,
247        }
248    }
249
250    /// Sanitize a directly-constructed `LimiterConfig`. External
251    /// callers (or tests) that build a `LimiterConfig` literal with
252    /// hostile values (`NaN`, sub-floor mins, inverted bounds) are
253    /// protected — `Limiter::new` calls this on every construction
254    /// so the controller never holds NaN or out-of-range floats.
255    fn sanitize(&mut self) {
256        if !self.latency_ewma_alpha.is_finite() {
257            self.latency_ewma_alpha = 0.2;
258        }
259        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
260        if !self.success_target.is_finite() {
261            self.success_target = 0.95;
262        }
263        self.success_target = self.success_target.clamp(0.0, 1.0);
264        if !self.timeout_ceiling.is_finite() {
265            self.timeout_ceiling = 0.10;
266        }
267        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
268        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
269            self.latency_inflation_factor = 2.0;
270        }
271        self.min_concurrency = self.min_concurrency.max(1);
272        self.window_ops = self.window_ops.max(1);
273        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
274        self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
275    }
276}
277
278/// Per-channel adaptive limiter.
279///
280/// Cheap to clone — both fields are `Arc`. Pass clones into hot loops;
281/// do not hold the lock across `.await` points (call sites observe
282/// with short critical sections only).
283#[derive(Debug, Clone)]
284pub struct Limiter {
285    inner: Arc<Mutex<LimiterInner>>,
286    config: Arc<LimiterConfig>,
287}
288
289#[derive(Debug)]
290struct LimiterInner {
291    /// Current concurrency cap returned by `current()`.
292    current: usize,
293    /// Sliding window of recent outcomes.
294    window: VecDeque<Sample>,
295    /// Samples observed since the last increase. Increases require a
296    /// fresh window's worth of evidence to avoid ramping on every
297    /// individual healthy sample.
298    samples_since_increase: usize,
299    /// Samples observed since the last decrease. Decreases require
300    /// `min_window_ops` of fresh evidence to avoid pile-driving the
301    /// cap to floor on a single bad burst when many in-flight ops all
302    /// observe stress nearly simultaneously.
303    samples_since_decrease: usize,
304    /// EWMA of p95 latency from past healthy windows. `None` until
305    /// the first healthy window completes.
306    latency_baseline: Option<Duration>,
307    /// `true` once we have observed a stress signal at least once.
308    /// Slow-start mode ends permanently after first stress.
309    left_slow_start: bool,
310}
311
312impl Limiter {
313    /// Create a new limiter starting at `start`, clamped into
314    /// `[min_concurrency, max_concurrency]`. Sanitizes the config to
315    /// guard against directly-constructed `LimiterConfig` literals
316    /// with hostile float values (`NaN`, etc).
317    #[must_use]
318    pub fn new(start: usize, config: LimiterConfig) -> Self {
319        let mut config = config;
320        config.sanitize();
321        let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
322        let window_cap = config.window_ops;
323        Self {
324            inner: Arc::new(Mutex::new(LimiterInner {
325                current: clamped,
326                window: VecDeque::with_capacity(window_cap),
327                samples_since_increase: 0,
328                samples_since_decrease: 0,
329                latency_baseline: None,
330                left_slow_start: false,
331            })),
332            config: Arc::new(config),
333        }
334    }
335
336    /// Snapshot current concurrency cap. Hot-path call: the value may
337    /// change between this call and the next, but consumers
338    /// (`buffer_unordered(n)`) capture it once per pipeline build.
339    #[must_use]
340    pub fn current(&self) -> usize {
341        lock(&self.inner).current
342    }
343
344    /// Record one observed operation. Updates the sliding window and
345    /// re-evaluates the cap if the window is full enough.
346    pub fn observe(&self, outcome: Outcome, latency: Duration) {
347        if !self.config.enabled {
348            return;
349        }
350        let mut g = lock(&self.inner);
351        if g.window.len() == self.config.window_ops {
352            g.window.pop_front();
353        }
354        g.window.push_back(Sample { outcome, latency });
355        g.samples_since_increase = g.samples_since_increase.saturating_add(1);
356        g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
357        if g.window.len() < self.config.min_window_ops {
358            return;
359        }
360        let decision = evaluate(&g.window, &self.config, g.latency_baseline);
361        apply_decision(&mut g, decision, &self.config);
362    }
363
364    /// Replace the current cap with `start`, clamped. Used for warm
365    /// loads from persisted state. Marks the limiter as having
366    /// already-left-slow-start so a single healthy window doesn't
367    /// double the cap (an over-aggressive cold-start from a warm
368    /// value). Subsequent increases are +1 per healthy window.
369    /// Does not clear the sliding window — fresh observations remain
370    /// authoritative for adaptation decisions.
371    pub fn warm_start(&self, start: usize) {
372        let clamped = start.clamp(
373            self.config.min_concurrency,
374            self.config.max_concurrency.max(1),
375        );
376        let mut g = lock(&self.inner);
377        g.current = clamped;
378        g.left_slow_start = true;
379    }
380
381    /// Snapshot of the current cap for persistence. Cheap, lock-only.
382    #[must_use]
383    pub fn snapshot(&self) -> usize {
384        lock(&self.inner).current
385    }
386}
387
388/// Outcome of evaluating one window.
389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390enum Decision {
391    /// Healthy window — increase concurrency.
392    Increase,
393    /// Stressed window — decrease concurrency.
394    Decrease,
395    /// Inconclusive — hold steady (e.g. mixed signals, baseline not yet set).
396    Hold,
397}
398
399fn evaluate(
400    window: &VecDeque<Sample>,
401    cfg: &LimiterConfig,
402    baseline: Option<Duration>,
403) -> Decision {
404    // Capacity-relevant denominator: ApplicationError outcomes are
405    // explicitly NOT capacity signals (per `Outcome` docs) and are
406    // excluded from rate calculations. A wave of `AlreadyStored`
407    // errors must not punish concurrency.
408    let mut successes = 0usize;
409    let mut timeouts = 0usize;
410    let mut net_errors = 0usize;
411    let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
412    for s in window {
413        match s.outcome {
414            Outcome::Success => {
415                successes += 1;
416                latencies.push(s.latency);
417            }
418            Outcome::Timeout => timeouts += 1,
419            Outcome::NetworkError => net_errors += 1,
420            Outcome::ApplicationError => {}
421        }
422    }
423    let capacity_total = successes + timeouts + net_errors;
424    if capacity_total < cfg.min_window_ops {
425        // Not enough capacity-relevant evidence to act. Hold.
426        return Decision::Hold;
427    }
428    let total_f = capacity_total as f64;
429    let success_rate = successes as f64 / total_f;
430    let timeout_rate = timeouts as f64 / total_f;
431
432    if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
433        return Decision::Decrease;
434    }
435
436    if let Some(p95) = p95_of(&mut latencies) {
437        if let Some(base) = baseline {
438            let limit = base.mul_f64(cfg.latency_inflation_factor);
439            if p95 > limit {
440                return Decision::Decrease;
441            }
442        }
443        Decision::Increase
444    } else {
445        Decision::Hold
446    }
447}
448
449fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
450    match decision {
451        Decision::Increase => {
452            // Gate increases on accumulating a fresh window's worth of
453            // evidence since the last bump.
454            if inner.samples_since_increase < cfg.window_ops {
455                return;
456            }
457            let p95 = window_p95(&inner.window);
458            inner.latency_baseline = Some(match inner.latency_baseline {
459                None => p95,
460                Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
461            });
462            let next = if inner.left_slow_start {
463                inner.current.saturating_add(1)
464            } else {
465                inner.current.saturating_mul(2)
466            };
467            let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
468            if next != inner.current {
469                debug!(
470                    from = inner.current,
471                    to = next,
472                    slow_start = !inner.left_slow_start,
473                    "adaptive: increase",
474                );
475            }
476            inner.current = next;
477            inner.samples_since_increase = 0;
478            inner.samples_since_decrease = 0;
479        }
480        Decision::Decrease => {
481            // Gate decreases on `min_window_ops` of fresh evidence
482            // since the last decrease so a burst of concurrent
483            // observations from in-flight ops can't pile-drive the
484            // cap from N to 1 in a few back-to-back ticks.
485            if inner.samples_since_decrease < cfg.min_window_ops {
486                return;
487            }
488            inner.left_slow_start = true;
489            let next = (inner.current / 2).max(cfg.min_concurrency);
490            if next != inner.current {
491                debug!(from = inner.current, to = next, "adaptive: decrease");
492            }
493            inner.current = next;
494            inner.samples_since_increase = 0;
495            inner.samples_since_decrease = 0;
496        }
497        Decision::Hold => {}
498    }
499}
500
501/// p95 of a mutable slice of Durations. Sorts in place. Returns
502/// `None` for an empty slice. Index choice: `ceil(len * 0.95) - 1`,
503/// floored at 0, capped at `len - 1`.
504fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
505    if latencies.is_empty() {
506        return None;
507    }
508    latencies.sort_unstable();
509    let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
510    let idx = idx.saturating_sub(1).min(latencies.len() - 1);
511    latencies.get(idx).copied()
512}
513
514fn window_p95(window: &VecDeque<Sample>) -> Duration {
515    let mut latencies: Vec<Duration> = window
516        .iter()
517        .filter(|s| matches!(s.outcome, Outcome::Success))
518        .map(|s| s.latency)
519        .collect();
520    p95_of(&mut latencies).unwrap_or(Duration::ZERO)
521}
522
523fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
524    let alpha = if alpha.is_finite() {
525        alpha.clamp(0.0, 1.0)
526    } else {
527        return prev;
528    };
529    let prev_ms = prev.as_secs_f64() * 1000.0;
530    let sample_ms = sample.as_secs_f64() * 1000.0;
531    let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
532    if !new_ms.is_finite() || new_ms < 0.0 {
533        return prev;
534    }
535    Duration::from_secs_f64(new_ms / 1000.0)
536}
537
538/// Bundle of per-channel limiters owned by the `Client`.
539#[derive(Debug, Clone)]
540pub struct AdaptiveController {
541    pub quote: Limiter,
542    pub store: Limiter,
543    pub fetch: Limiter,
544    /// `pub(crate)` so external callers cannot mutate this
545    /// post-construction. Each `Limiter` snapshots its own
546    /// `Arc<LimiterConfig>` at construction time, so external
547    /// mutation here would silently desync `warm_start`'s
548    /// `enabled` check from the limiters' frozen copies. Read via
549    /// `config()`.
550    pub(crate) config: AdaptiveConfig,
551    /// Per-instance cold-start values. `warm_start` floors snapshot
552    /// values against THIS, not the global `ChannelStart::default()`,
553    /// so a controller built with custom (e.g. low) starts stays
554    /// faithful to its construction parameters. Constructed-once,
555    /// never mutated.
556    cold_start: ChannelStart,
557}
558
559impl AdaptiveController {
560    /// Create a controller with cold-start values per channel.
561    /// Sanitizes the config (NaN guards, floor/ceiling enforcement)
562    /// before constructing limiters. The supplied `start` is captured
563    /// as the per-instance cold-start floor for `warm_start`.
564    #[must_use]
565    pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
566        let mut config = config;
567        config.sanitize();
568        let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
569        let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
570        let fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
571        Self {
572            quote: Limiter::new(start.quote, quote_cfg),
573            store: Limiter::new(start.store, store_cfg),
574            fetch: Limiter::new(start.fetch, fetch_cfg),
575            config,
576            cold_start: start,
577        }
578    }
579
580    /// Snapshot current per-channel caps for persistence.
581    #[must_use]
582    pub fn snapshot(&self) -> ChannelStart {
583        ChannelStart {
584            quote: self.quote.snapshot(),
585            store: self.store.snapshot(),
586            fetch: self.fetch.snapshot(),
587        }
588    }
589
590    /// Read-only access to the controller's adaptive config. Made
591    /// read-only deliberately: each `Limiter` snapshots its own
592    /// `Arc<LimiterConfig>` at construction, so post-hoc mutation
593    /// would silently desync `warm_start`'s `enabled` check from
594    /// the limiters' frozen copies.
595    #[must_use]
596    pub fn config(&self) -> &AdaptiveConfig {
597        &self.config
598    }
599
600    /// Apply a previously-saved snapshot as the warm-start cap.
601    ///
602    /// The effective warm value per channel is
603    /// `max(snapshot, self.cold_start)` — flooring at the
604    /// per-instance cold-start (NOT the global default) so:
605    /// 1. A prior bad run that pinned cap=1 doesn't pessimize this
606    ///    run forever.
607    /// 2. A controller built with custom (e.g. low) cold starts for
608    ///    benchmarking is not silently jumped above its construction
609    ///    parameters.
610    ///
611    /// Does not clear sliding windows. When `enabled = false`, this
612    /// is a no-op — fixed-concurrency mode means fixed-concurrency.
613    pub fn warm_start(&self, snapshot: ChannelStart) {
614        if !self.config.enabled {
615            return;
616        }
617        self.quote
618            .warm_start(snapshot.quote.max(self.cold_start.quote));
619        self.store
620            .warm_start(snapshot.store.max(self.cold_start.store));
621        self.fetch
622            .warm_start(snapshot.fetch.max(self.cold_start.fetch));
623    }
624}
625
626impl Default for AdaptiveController {
627    fn default() -> Self {
628        Self::new(ChannelStart::default(), AdaptiveConfig::default())
629    }
630}
631
632/// Cancel-on-drop guard: if the wrapping future is dropped before
633/// completion, record no outcome. We don't synthesize a Cancelled
634/// signal because (a) dropped work was never observed by the network
635/// and (b) injecting fake outcomes would skew the sliding window
636/// after a fail-fast burst. The intentional behavior is "silent on
637/// cancel, observe on completion" — callers that need to keep
638/// fail-fast batches drained for full signal use `rebucketed`.
639struct ObserveGuard<'a> {
640    limiter: &'a Limiter,
641    started: Instant,
642    outcome: Option<(Outcome, Duration)>,
643}
644
645impl<'a> ObserveGuard<'a> {
646    fn new(limiter: &'a Limiter) -> Self {
647        Self {
648            limiter,
649            started: Instant::now(),
650            outcome: None,
651        }
652    }
653    fn finish(&mut self, outcome: Outcome) {
654        self.outcome = Some((outcome, self.started.elapsed()));
655    }
656}
657
658impl Drop for ObserveGuard<'_> {
659    fn drop(&mut self) {
660        if let Some((outcome, latency)) = self.outcome.take() {
661            self.limiter.observe(outcome, latency);
662        }
663    }
664}
665
666/// Helper for instrumented call sites: time an async op, classify the
667/// result, and report to a `Limiter`. Returns the original result.
668///
669/// ## Cancellation safety
670///
671/// Uses an internal `ObserveGuard` so the recorded outcome is
672/// committed via `Drop` after the inner future returns. If the
673/// wrapper future is itself dropped before `op().await` resolves
674/// (caller cancellation, `buffer_unordered` fail-fast), no outcome
675/// is recorded — this is intentional, see the guard's docs.
676///
677/// ```ignore
678/// let res = observe_op(&controller.store, || async { do_put().await }, classify_put_err).await;
679/// ```
680pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
681where
682    F: FnOnce() -> Fut,
683    Fut: std::future::Future<Output = Result<T, E>>,
684    C: FnOnce(&E) -> Outcome,
685{
686    let mut guard = ObserveGuard::new(limiter);
687    let result = op().await;
688    let outcome = match &result {
689        Ok(_) => Outcome::Success,
690        Err(e) => classify(e),
691    };
692    guard.finish(outcome);
693    drop(guard); // commit observation explicitly so it lands before return
694    result
695}
696
697/// Process an iterator of items with a rolling scheduler whose cap
698/// is re-read from the limiter as each slot frees. Replaces the
699/// "snapshot the cap once at pipeline build" behavior of plain
700/// `buffer_unordered(N)` so a long pipeline (e.g. 10 GB download =
701/// ~2500 chunks) sees adaptive growth/decay mid-flight.
702///
703/// Output is unordered (first-completion). For an ordered result
704/// (e.g. `data_download` feeds chunks in DataMap order to
705/// self_encryption decrypt), wrap items with their index and sort
706/// after collection — see `rebucketed_ordered`.
707///
708/// On error: in-flight work drains to completion (so observed
709/// outcomes still feed the controller) but no new launches happen.
710/// The first error is preserved; later errors are discarded.
711pub async fn rebucketed_unordered<I, T, E, F, Fut>(
712    limiter: &Limiter,
713    items: I,
714    mut op: F,
715) -> Result<Vec<T>, E>
716where
717    I: IntoIterator,
718    F: FnMut(I::Item) -> Fut,
719    Fut: std::future::Future<Output = Result<T, E>>,
720{
721    use futures::stream::{FuturesUnordered, StreamExt};
722    let mut iter = items.into_iter().peekable();
723    let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
724    let mut results = Vec::new();
725    let mut pending_err: Option<E> = None;
726    loop {
727        // Refill: re-read the cap and launch up to `cap - in_flight.len()`
728        // new items, but only if we are not already in error-stop.
729        if pending_err.is_none() {
730            let cap = limiter.current().max(1);
731            while in_flight.len() < cap {
732                match iter.next() {
733                    Some(item) => in_flight.push(op(item)),
734                    None => break,
735                }
736            }
737        }
738        if in_flight.is_empty() {
739            break;
740        }
741        match in_flight.next().await {
742            Some(Ok(v)) => results.push(v),
743            Some(Err(e)) => {
744                if pending_err.is_none() {
745                    pending_err = Some(e);
746                }
747            }
748            None => break,
749        }
750    }
751    match pending_err {
752        Some(e) => Err(e),
753        None => Ok(results),
754    }
755}
756
757/// Ordered variant: items are tagged with a usize index by the
758/// caller (typically by `iter.enumerate()`); after rolling
759/// completion, results are sorted by index so output preserves
760/// input order. Use this for callers that pass to APIs which
761/// consume positionally (e.g. self_encryption's
762/// `get_root_data_map_parallel` zips `Vec<(idx, Bytes)>` with input
763/// hashes positionally and discards the idx — without a final sort
764/// the bytes pair with the wrong hashes).
765///
766/// `op` is `FnMut(Item) -> Fut` where `Item` carries whatever
767/// payload the caller needs; the closure must return
768/// `Result<(usize, U), E>` so the wrapper can sort by the index.
769pub async fn rebucketed_ordered<I, U, E, F, Fut>(
770    limiter: &Limiter,
771    items: I,
772    op: F,
773) -> Result<Vec<U>, E>
774where
775    I: IntoIterator,
776    F: FnMut(I::Item) -> Fut,
777    Fut: std::future::Future<Output = Result<(usize, U), E>>,
778{
779    let mut indexed = rebucketed_unordered(limiter, items, op).await?;
780    indexed.sort_by_key(|(idx, _)| *idx);
781    Ok(indexed.into_iter().map(|(_, v)| v).collect())
782}
783
784/// Backward-compatible wrapper. `ordered = false` -> rolling
785/// unordered. `ordered = true` -> the OLD batch-fence ordered path
786/// (kept for tests that explicitly assert batch-fence semantics).
787/// New call sites should use `rebucketed_unordered` or
788/// `rebucketed_ordered` directly.
789pub async fn rebucketed<I, T, E, F, Fut>(
790    limiter: &Limiter,
791    items: I,
792    ordered: bool,
793    mut op: F,
794) -> Result<Vec<T>, E>
795where
796    I: IntoIterator,
797    F: FnMut(I::Item) -> Fut,
798    Fut: std::future::Future<Output = Result<T, E>>,
799{
800    if !ordered {
801        return rebucketed_unordered(limiter, items, op).await;
802    }
803    use futures::stream::{self, StreamExt};
804    let mut iter = items.into_iter();
805    let mut results = Vec::new();
806    let mut pending_err: Option<E> = None;
807    loop {
808        if pending_err.is_some() {
809            break;
810        }
811        let cap = limiter.current().max(1);
812        let mut batch = Vec::with_capacity(cap);
813        for item in iter.by_ref().take(cap) {
814            batch.push(op(item));
815        }
816        if batch.is_empty() {
817            break;
818        }
819        let mut s = stream::iter(batch).buffered(cap);
820        while let Some(r) = s.next().await {
821            match r {
822                Ok(v) => results.push(v),
823                Err(e) => {
824                    if pending_err.is_none() {
825                        pending_err = Some(e);
826                    }
827                }
828            }
829        }
830    }
831    match pending_err {
832        Some(e) => Err(e),
833        None => Ok(results),
834    }
835}
836
837/// On-disk shape for the persisted adaptive state. Versioned so we
838/// can evolve the controller without crashing on stale files — an
839/// unknown schema version simply causes a silent fallback to cold
840/// defaults.
841#[derive(Debug, Clone, Serialize, Deserialize)]
842struct PersistedState {
843    schema: u32,
844    channels: ChannelStart,
845}
846
847const PERSIST_SCHEMA: u32 = 1;
848const PERSIST_FILENAME: &str = "client_adaptive.json";
849
850/// Default persistence path: `<data_dir>/client_adaptive.json`. Falls
851/// back to `None` if the platform data dir is not resolvable; in that
852/// case the controller still works, it just won't persist.
853#[must_use]
854pub fn default_persist_path() -> Option<PathBuf> {
855    crate::config::data_dir()
856        .ok()
857        .map(|d| d.join(PERSIST_FILENAME))
858}
859
860/// Load a persisted snapshot from disk, returning `None` if the file
861/// does not exist, is unreadable, contains malformed JSON, or has a
862/// schema version this build does not understand. Persistence is best
863/// effort — never propagate errors that would block the user's
864/// operation.
865#[must_use]
866pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
867    let bytes = std::fs::read(path).ok()?;
868    let state: PersistedState = match serde_json::from_slice(&bytes) {
869        Ok(s) => s,
870        Err(e) => {
871            warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
872            return None;
873        }
874    };
875    if state.schema != PERSIST_SCHEMA {
876        debug!(
877            path = %path.display(),
878            schema = state.schema,
879            expected = PERSIST_SCHEMA,
880            "adaptive: snapshot schema mismatch, ignoring",
881        );
882        return None;
883    }
884    Some(state.channels)
885}
886
887/// Save a snapshot to disk atomically (write to `<path>.tmp`, then
888/// rename). Best effort — failures are logged at warn and discarded.
889pub fn save_snapshot(path: &Path, channels: ChannelStart) {
890    let state = PersistedState {
891        schema: PERSIST_SCHEMA,
892        channels,
893    };
894    let bytes = match serde_json::to_vec_pretty(&state) {
895        Ok(b) => b,
896        Err(e) => {
897            warn!(error = %e, "adaptive: snapshot serialize failed");
898            return;
899        }
900    };
901    if let Some(parent) = path.parent() {
902        if let Err(e) = std::fs::create_dir_all(parent) {
903            warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
904            return;
905        }
906    }
907    // Unique-per-save temp filename: PID + monotonic counter +
908    // nanosecond timestamp guarantees no collision between concurrent
909    // CLI invocations OR concurrent save_snapshot calls within one
910    // process (e.g. multiple Client instances sharing the same data
911    // dir). POSIX rename is atomic on the destination, so the rename
912    // target overlap is fine — last writer wins.
913    let nanos = std::time::SystemTime::now()
914        .duration_since(std::time::UNIX_EPOCH)
915        .map(|d| d.subsec_nanos())
916        .unwrap_or(0);
917    let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
918    let tmp = path.with_extension(format!(
919        "json.tmp.{}.{}.{}",
920        std::process::id(),
921        counter,
922        nanos
923    ));
924    if let Err(e) = std::fs::write(&tmp, &bytes) {
925        warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
926        return;
927    }
928    if let Err(e) = std::fs::rename(&tmp, path) {
929        warn!(
930            from = %tmp.display(),
931            to = %path.display(),
932            error = %e,
933            "adaptive: snapshot rename failed",
934        );
935        // Try to clean up the temp on rename failure so we don't
936        // leave junk in the data dir. Best effort.
937        let _ = std::fs::remove_file(&tmp);
938    }
939}
940
941/// Save with a wall-clock deadline. Spawns the synchronous
942/// `save_snapshot` on a detached thread and waits up to `timeout`
943/// for it to finish. If the thread is still running past the
944/// deadline (e.g. because the data dir is on a hung NFS mount),
945/// returns without joining — the OS will clean up the thread when
946/// the process exits.
947///
948/// Used by `Client::drop` so a stalled filesystem cannot block
949/// process shutdown indefinitely.
950pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
951    let handle = std::thread::spawn(move || {
952        save_snapshot(&path, channels);
953    });
954    // Park briefly waiting for the thread, polling its status. We
955    // use a short polling interval rather than `join()` because
956    // join() blocks indefinitely.
957    let started = Instant::now();
958    let poll = Duration::from_millis(5);
959    while started.elapsed() < timeout {
960        if handle.is_finished() {
961            let _ = handle.join();
962            return;
963        }
964        std::thread::sleep(poll);
965    }
966    // Deadline elapsed. Detach the thread; it will continue to run
967    // in the background until process exit (its work is best-effort
968    // anyway). Log so operators can see the slow filesystem.
969    warn!(
970        timeout_ms = timeout.as_millis() as u64,
971        "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
972    );
973    drop(handle);
974}
975
976#[cfg(test)]
977#[allow(clippy::unwrap_used)]
978mod tests {
979    use super::*;
980
981    fn cfg_for_tests() -> LimiterConfig {
982        LimiterConfig {
983            enabled: true,
984            min_concurrency: 1,
985            max_concurrency: 64,
986            window_ops: 10,
987            min_window_ops: 5,
988            success_target: 0.9,
989            timeout_ceiling: 0.2,
990            latency_inflation_factor: 2.0,
991            latency_ewma_alpha: 0.5,
992        }
993    }
994
995    /// Build an `AdaptiveConfig` for tests that need to construct a
996    /// full `AdaptiveController`. Mirrors `cfg_for_tests()` defaults
997    /// where they overlap, plus per-channel max derived from the same
998    /// `max_concurrency` value.
999    fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1000        let l = cfg_for_tests();
1001        AdaptiveConfig {
1002            enabled: l.enabled,
1003            min_concurrency: l.min_concurrency,
1004            max: ChannelMax {
1005                quote: l.max_concurrency,
1006                store: l.max_concurrency,
1007                fetch: l.max_concurrency,
1008            },
1009            window_ops: l.window_ops,
1010            min_window_ops: l.min_window_ops,
1011            success_target: l.success_target,
1012            timeout_ceiling: l.timeout_ceiling,
1013            latency_inflation_factor: l.latency_inflation_factor,
1014            latency_ewma_alpha: l.latency_ewma_alpha,
1015        }
1016    }
1017
1018    #[test]
1019    fn cold_start_clamps_into_bounds() {
1020        let cfg = cfg_for_tests();
1021        let l = Limiter::new(1000, cfg.clone());
1022        assert_eq!(l.current(), cfg.max_concurrency);
1023        let l = Limiter::new(0, cfg.clone());
1024        assert_eq!(l.current(), cfg.min_concurrency);
1025    }
1026
1027    #[test]
1028    fn slow_start_doubles_then_caps() {
1029        let cfg = cfg_for_tests();
1030        let l = Limiter::new(2, cfg.clone());
1031        // Feed a full healthy window — concurrency doubles.
1032        for _ in 0..cfg.window_ops {
1033            l.observe(Outcome::Success, Duration::from_millis(50));
1034        }
1035        assert_eq!(l.current(), 4);
1036        for _ in 0..cfg.window_ops {
1037            l.observe(Outcome::Success, Duration::from_millis(50));
1038        }
1039        assert_eq!(l.current(), 8);
1040    }
1041
1042    #[test]
1043    fn first_failure_exits_slow_start() {
1044        let cfg = cfg_for_tests();
1045        let l = Limiter::new(4, cfg.clone());
1046        // 6 successes + 4 timeouts in a window of 10. Decisions fire
1047        // per-sample once the window has min_window_ops entries, so
1048        // the four timeouts each drive Decrease. That floors the cap.
1049        for _ in 0..6 {
1050            l.observe(Outcome::Success, Duration::from_millis(50));
1051        }
1052        for _ in 0..4 {
1053            l.observe(Outcome::Timeout, Duration::from_millis(50));
1054        }
1055        let after_stress = l.current();
1056        assert!(
1057            after_stress < 4,
1058            "stress should reduce concurrency from 4, got {after_stress}",
1059        );
1060        // After exiting slow-start, recovery is +1 per fresh window,
1061        // not doubling. The first `window_ops` successes flush prior
1062        // timeouts out of the sliding window. Decreases now also need
1063        // `min_window_ops` of fresh evidence before re-firing, and
1064        // increases need `window_ops` of fresh evidence. Feed enough
1065        // successes to clear the window AND accumulate evidence for
1066        // multiple increases.
1067        for _ in 0..(cfg.window_ops * 5) {
1068            l.observe(Outcome::Success, Duration::from_millis(50));
1069        }
1070        assert!(
1071            l.current() > after_stress,
1072            "expected recovery above {after_stress}, got {}",
1073            l.current(),
1074        );
1075    }
1076
1077    #[test]
1078    fn floor_holds_at_one() {
1079        let cfg = cfg_for_tests();
1080        let l = Limiter::new(2, cfg);
1081        for _ in 0..30 {
1082            l.observe(Outcome::Timeout, Duration::from_millis(50));
1083        }
1084        assert_eq!(l.current(), 1);
1085    }
1086
1087    #[test]
1088    fn application_errors_do_not_punish() {
1089        let cfg = cfg_for_tests();
1090        let l = Limiter::new(4, cfg.clone());
1091        // ApplicationError is NOT a capacity signal (per `Outcome`
1092        // docs and the reviewer's M1 finding). A wave of e.g.
1093        // `AlreadyStored` errors must not lower concurrency, because
1094        // they say nothing about the network's ability to take more
1095        // load. Specifically: the controller should HOLD at 4 because
1096        // there are zero capacity-relevant samples to act on.
1097        for _ in 0..cfg.window_ops * 5 {
1098            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
1099        }
1100        assert_eq!(
1101            l.current(),
1102            4,
1103            "ApplicationError must not move the cap; got {}",
1104            l.current()
1105        );
1106    }
1107
1108    #[test]
1109    fn latency_inflation_triggers_decrease() {
1110        let cfg = LimiterConfig {
1111            window_ops: 20,
1112            min_window_ops: 5,
1113            ..cfg_for_tests()
1114        };
1115        let l = Limiter::new(4, cfg.clone());
1116        // Establish a baseline with many fast successes.
1117        for _ in 0..cfg.window_ops {
1118            l.observe(Outcome::Success, Duration::from_millis(50));
1119        }
1120        let after_baseline = l.current();
1121        // Now flood with slow successes — same outcome, 5x latency.
1122        for _ in 0..cfg.window_ops {
1123            l.observe(Outcome::Success, Duration::from_millis(500));
1124        }
1125        // Latency inflation > 2x baseline must drop concurrency.
1126        assert!(
1127            l.current() < after_baseline,
1128            "expected decrease from {after_baseline}, got {}",
1129            l.current(),
1130        );
1131    }
1132
1133    #[test]
1134    fn warm_start_overrides_current() {
1135        let cfg = cfg_for_tests();
1136        let l = Limiter::new(2, cfg);
1137        l.warm_start(20);
1138        assert_eq!(l.current(), 20);
1139    }
1140
1141    #[test]
1142    fn warm_start_clamps() {
1143        let cfg = cfg_for_tests();
1144        let l = Limiter::new(2, cfg.clone());
1145        l.warm_start(1_000_000);
1146        assert_eq!(l.current(), cfg.max_concurrency);
1147    }
1148
1149    #[test]
1150    fn disabled_controller_holds_steady() {
1151        let cfg = LimiterConfig {
1152            enabled: false,
1153            ..cfg_for_tests()
1154        };
1155        let l = Limiter::new(8, cfg);
1156        for _ in 0..50 {
1157            l.observe(Outcome::Timeout, Duration::from_millis(50));
1158        }
1159        assert_eq!(l.current(), 8);
1160    }
1161
1162    #[test]
1163    fn controller_snapshot_round_trips() {
1164        // The test cfg has max=64 for every channel (cfg_for_tests's
1165        // max_concurrency=64 -> ChannelMax::{quote: 64, store: 64, fetch: 64}).
1166        // Pick start values <= 64 so they survive cap clamping at
1167        // construction. Pick values >= cold-defaults (32/8/64) so they
1168        // also survive the warm-start floor.
1169        let c = AdaptiveController::new(
1170            ChannelStart {
1171                quote: 64,
1172                store: 16,
1173                fetch: 64,
1174            },
1175            adaptive_cfg_for_tests(),
1176        );
1177        let snap = c.snapshot();
1178        assert_eq!(snap.quote, 64);
1179        assert_eq!(snap.store, 16);
1180        assert_eq!(snap.fetch, 64);
1181
1182        let c2 = AdaptiveController::default();
1183        c2.warm_start(snap);
1184        assert_eq!(c2.quote.current(), 64);
1185        assert_eq!(c2.store.current(), 16);
1186        assert_eq!(c2.fetch.current(), 64);
1187    }
1188
1189    #[tokio::test]
1190    async fn observe_op_records_success() {
1191        let cfg = cfg_for_tests();
1192        let l = Limiter::new(4, cfg.clone());
1193        for _ in 0..cfg.window_ops {
1194            let _: Result<(), &str> =
1195                observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
1196        }
1197        // Healthy window from cold start doubles 4 -> 8.
1198        assert_eq!(l.current(), 8);
1199    }
1200
1201    #[test]
1202    fn snapshot_round_trips_through_disk() {
1203        let dir = tempfile::tempdir().unwrap();
1204        let path = dir.path().join("client_adaptive.json");
1205        let snap = ChannelStart {
1206            quote: 24,
1207            store: 6,
1208            fetch: 12,
1209        };
1210        save_snapshot(&path, snap);
1211        let loaded = load_snapshot(&path).unwrap();
1212        assert_eq!(loaded.quote, 24);
1213        assert_eq!(loaded.store, 6);
1214        assert_eq!(loaded.fetch, 12);
1215    }
1216
1217    #[test]
1218    fn load_missing_returns_none() {
1219        let dir = tempfile::tempdir().unwrap();
1220        let path = dir.path().join("does_not_exist.json");
1221        assert!(load_snapshot(&path).is_none());
1222    }
1223
1224    #[test]
1225    fn load_corrupt_returns_none() {
1226        let dir = tempfile::tempdir().unwrap();
1227        let path = dir.path().join("bad.json");
1228        std::fs::write(&path, b"not valid json{{{").unwrap();
1229        assert!(load_snapshot(&path).is_none());
1230    }
1231
1232    #[test]
1233    fn load_wrong_schema_returns_none() {
1234        let dir = tempfile::tempdir().unwrap();
1235        let path = dir.path().join("future.json");
1236        // Schema 999 is from a future build — current build must not
1237        // crash and must not act on it.
1238        let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
1239        std::fs::write(&path, payload).unwrap();
1240        assert!(load_snapshot(&path).is_none());
1241    }
1242
1243    #[tokio::test]
1244    async fn observe_op_records_classified_error() {
1245        let cfg = cfg_for_tests();
1246        let l = Limiter::new(4, cfg.clone());
1247        for _ in 0..cfg.window_ops {
1248            let _: Result<(), &str> =
1249                observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
1250        }
1251        assert!(l.current() < 4);
1252    }
1253
1254    // ----- Adversarial / regression-guard tests below ---------------------
1255    //
1256    // These exist primarily to prove the controller never silently regresses
1257    // upload/download throughput and never panics under hostile workloads.
1258
1259    /// Cold-start defaults must equal-or-exceed the prior static knobs so
1260    /// the very first batch on a fresh install is no slower than before
1261    /// the adaptive controller existed. Hard-coded literals are intentional
1262    /// — this is a guard against future commits accidentally lowering them.
1263    #[test]
1264    fn no_regression_cold_start_at_least_static_defaults() {
1265        let s = ChannelStart::default();
1266        assert!(
1267            s.quote >= 32,
1268            "quote cold-start regressed: got {}, prior static was 32",
1269            s.quote,
1270        );
1271        assert!(
1272            s.store >= 8,
1273            "store cold-start regressed: got {}, prior static was 8",
1274            s.store,
1275        );
1276        assert!(
1277            s.fetch >= 64,
1278            "fetch cold-start regressed: got {}, prior static was 64 (unbounded before)",
1279            s.fetch,
1280        );
1281    }
1282
1283    /// The production `AdaptiveController::default()` (NOT the test cfg)
1284    /// must come up reporting the cold-start values immediately, with no
1285    /// observations recorded.
1286    #[test]
1287    fn controller_default_config_is_sane() {
1288        let c = AdaptiveController::default();
1289        let starts = ChannelStart::default();
1290        assert_eq!(c.quote.current(), starts.quote);
1291        assert_eq!(c.store.current(), starts.store);
1292        assert_eq!(c.fetch.current(), starts.fetch);
1293        // No observations made yet — internal windows must be empty.
1294        assert_eq!(lock(&c.quote.inner).window.len(), 0);
1295        assert_eq!(lock(&c.store.inner).window.len(), 0);
1296        assert_eq!(lock(&c.fetch.inner).window.len(), 0);
1297    }
1298
1299    /// Mixed signals (every other op fails) must not pin the controller
1300    /// at the floor for the whole run. The cap should oscillate or settle
1301    /// somewhere above the floor — collapse to 1 forever would be a bug.
1302    #[test]
1303    fn alternating_success_failure_collapses_to_floor() {
1304        // 50% timeout rate is far above `timeout_ceiling` (0.2 in test
1305        // config), so the window is always stressed. The controller
1306        // MUST collapse to the floor, and once there must NEVER go
1307        // below it. Assert both invariants explicitly: floor reached
1308        // and floor held.
1309        let cfg = cfg_for_tests();
1310        let l = Limiter::new(8, cfg.clone());
1311        let mut min_observed = usize::MAX;
1312        let mut max_observed = 0usize;
1313        let mut floor_visits = 0usize;
1314        for i in 0..1000 {
1315            let outcome = if i % 2 == 0 {
1316                Outcome::Success
1317            } else {
1318                Outcome::Timeout
1319            };
1320            l.observe(outcome, Duration::from_millis(50));
1321            let cur = l.current();
1322            assert!(
1323                cur >= cfg.min_concurrency,
1324                "cap underflowed floor at iter {i}: got {cur}",
1325            );
1326            min_observed = min_observed.min(cur);
1327            max_observed = max_observed.max(cur);
1328            if cur == cfg.min_concurrency {
1329                floor_visits += 1;
1330            }
1331        }
1332        assert_eq!(
1333            min_observed, cfg.min_concurrency,
1334            "cap never reached the floor under 50% timeout rate"
1335        );
1336        assert!(
1337            max_observed >= 8,
1338            "cap never visited the start value: max_observed={max_observed}"
1339        );
1340        // Should spend MOST of the run at the floor — a single
1341        // healthy window is not enough to climb back from a 50% loss
1342        // environment.
1343        assert!(
1344            floor_visits > 500,
1345            "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
1346        );
1347        assert_eq!(
1348            l.current(),
1349            cfg.min_concurrency,
1350            "controller did not settle at floor after 1000 alternations"
1351        );
1352    }
1353
1354    /// From the floor, a long stream of healthy successes must walk the
1355    /// cap all the way back up to `max_concurrency`. Otherwise transient
1356    /// stress on a slow link would permanently penalize throughput.
1357    #[test]
1358    fn pure_success_stream_recovers_to_max() {
1359        let cfg = cfg_for_tests();
1360        let l = Limiter::new(cfg.min_concurrency, cfg.clone());
1361        for _ in 0..10_000 {
1362            l.observe(Outcome::Success, Duration::from_millis(5));
1363        }
1364        assert_eq!(
1365            l.current(),
1366            cfg.max_concurrency,
1367            "expected recovery to max ({}), got {}",
1368            cfg.max_concurrency,
1369            l.current(),
1370        );
1371    }
1372
1373    /// Heavy stress drives the cap to the floor; subsequent recovery
1374    /// must climb meaningfully higher than the floor with enough healthy
1375    /// evidence. No "permanent floor" failure mode allowed.
1376    #[test]
1377    fn stress_then_heal_drives_floor_then_recovery() {
1378        let cfg = cfg_for_tests();
1379        let l = Limiter::new(8, cfg.clone());
1380        for _ in 0..100 {
1381            l.observe(Outcome::Timeout, Duration::from_millis(50));
1382        }
1383        let after_stress = l.current();
1384        assert_eq!(
1385            after_stress, cfg.min_concurrency,
1386            "stress should drive cap to floor, got {after_stress}",
1387        );
1388        for _ in 0..1_000 {
1389            l.observe(Outcome::Success, Duration::from_millis(10));
1390        }
1391        let after_heal = l.current();
1392        assert!(
1393            after_heal >= cfg.min_concurrency.saturating_add(4),
1394            "expected substantial recovery from floor, got {after_heal}",
1395        );
1396    }
1397
1398    /// The latency baseline must track actual workload latency. If it
1399    /// stayed pinned at `Duration::ZERO`, every healthy sample would
1400    /// look like infinite inflation and inflate the decrease rate.
1401    #[test]
1402    fn baseline_does_not_grow_unbounded_under_slow_links() {
1403        let cfg = cfg_for_tests();
1404        let l = Limiter::new(2, cfg.clone());
1405        for _ in 0..(cfg.window_ops * 10) {
1406            l.observe(Outcome::Success, Duration::from_millis(500));
1407        }
1408        let baseline = lock(&l.inner).latency_baseline;
1409        let base = baseline.expect("baseline should be set after many healthy windows");
1410        assert!(
1411            base > Duration::ZERO,
1412            "baseline must not stay at ZERO, got {base:?}",
1413        );
1414        // Within 2x of the actual latency: 250ms..=1000ms.
1415        let lo = Duration::from_millis(250);
1416        let hi = Duration::from_millis(1000);
1417        assert!(
1418            base >= lo && base <= hi,
1419            "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
1420        );
1421    }
1422
1423    /// Until the first healthy window completes, the latency baseline
1424    /// stays `None` (so no false-inflation alarms). Decreases during the
1425    /// stress phase are driven purely by success/timeout rate, not by
1426    /// inflated p95 vs a phantom zero baseline.
1427    #[test]
1428    fn baseline_initialized_only_after_first_healthy_window() {
1429        let cfg = cfg_for_tests();
1430        let l = Limiter::new(8, cfg.clone());
1431        for _ in 0..50 {
1432            l.observe(Outcome::Timeout, Duration::from_millis(50));
1433        }
1434        // Without any healthy window, baseline must still be None.
1435        assert!(
1436            lock(&l.inner).latency_baseline.is_none(),
1437            "baseline must be None before any healthy window",
1438        );
1439        // Now drain healthy windows.
1440        for _ in 0..(cfg.window_ops * 5) {
1441            l.observe(Outcome::Success, Duration::from_millis(20));
1442        }
1443        let baseline = lock(&l.inner).latency_baseline;
1444        assert!(
1445            baseline.is_some(),
1446            "baseline must be Some after healthy windows",
1447        );
1448        let base = baseline.unwrap_or_default();
1449        assert!(
1450            base > Duration::ZERO,
1451            "baseline must reflect real latency, got {base:?}",
1452        );
1453    }
1454
1455    /// A torrent of timeouts must not underflow the cap. Sample at
1456    /// several depths to catch any wraparound.
1457    #[test]
1458    fn min_concurrency_floor_holds_under_torrent_of_errors() {
1459        let cfg = cfg_for_tests();
1460        let l = Limiter::new(8, cfg.clone());
1461        for i in 0..50_000 {
1462            l.observe(Outcome::Timeout, Duration::from_millis(50));
1463            if i == 100 || i == 1_000 || i == 49_999 {
1464                let cur = l.current();
1465                assert_eq!(
1466                    cur, cfg.min_concurrency,
1467                    "floor breached at iter {i}: got {cur}",
1468                );
1469            }
1470        }
1471    }
1472
1473    /// Mirror: a torrent of successes must not exceed `max_concurrency`.
1474    #[test]
1475    fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
1476        let cfg = cfg_for_tests();
1477        let start = cfg
1478            .max_concurrency
1479            .saturating_sub(1)
1480            .max(cfg.min_concurrency);
1481        let l = Limiter::new(start, cfg.clone());
1482        for i in 0..50_000 {
1483            l.observe(Outcome::Success, Duration::from_millis(5));
1484            if i == 100 || i == 1_000 || i == 49_999 {
1485                let cur = l.current();
1486                assert!(
1487                    cur <= cfg.max_concurrency,
1488                    "ceiling breached at iter {i}: got {cur} > {}",
1489                    cfg.max_concurrency,
1490                );
1491            }
1492        }
1493        assert_eq!(l.current(), cfg.max_concurrency);
1494    }
1495
1496    /// Slow-start doubles the cap; with `max_concurrency = usize::MAX/2`
1497    /// a naive `*2` would overflow. The controller must use saturating
1498    /// arithmetic and never panic. Also asserts the cap actually
1499    /// REACHED max — proving that "no panic" wasn't achieved by
1500    /// the cap getting stuck somewhere instead of growing.
1501    #[test]
1502    fn saturating_arithmetic_handles_extreme_config() {
1503        let cfg = LimiterConfig {
1504            max_concurrency: usize::MAX / 2,
1505            ..cfg_for_tests()
1506        };
1507        let start = usize::MAX / 4;
1508        let l = Limiter::new(start, cfg.clone());
1509        for _ in 0..(cfg.window_ops * 10) {
1510            l.observe(Outcome::Success, Duration::from_millis(1));
1511        }
1512        // First-iteration doubles start (which is max/4) to max/2 = ceiling.
1513        // The cap MUST have grown to the ceiling; if saturating math
1514        // were broken (panic) we'd never get here, but we'd also fail
1515        // if the cap got stuck at the start value.
1516        assert_eq!(
1517            l.current(),
1518            cfg.max_concurrency,
1519            "saturating math survived but cap did not grow to ceiling"
1520        );
1521    }
1522
1523    /// FIFO eviction: prove that a window of pure-timeout collapses
1524    /// the cap, and once enough successes flush ALL timeouts out of
1525    /// the window, the cap can rise. The earlier version of this test
1526    /// used an OR clause that made the assertion satisfiable trivially;
1527    /// this version asserts the strict invariant: after eviction, cap
1528    /// must be STRICTLY GREATER than the post-stress cap.
1529    #[test]
1530    fn window_eviction_is_fifo() {
1531        let cfg = LimiterConfig {
1532            window_ops: 10,
1533            min_window_ops: 5,
1534            success_target: 0.9,
1535            timeout_ceiling: 0.1,
1536            ..cfg_for_tests()
1537        };
1538        let l = Limiter::new(8, cfg.clone());
1539        // Fill the window with timeouts. With decrease-gating
1540        // (samples_since_decrease >= min_window_ops between halvings),
1541        // window_ops=10 + min_window_ops=5 timeouts allow at most
1542        // ~2 halvings: 8 -> 4 -> 2. Cap must DROP from 8.
1543        for _ in 0..cfg.window_ops {
1544            l.observe(Outcome::Timeout, Duration::from_millis(50));
1545        }
1546        let after_stress = l.current();
1547        assert!(
1548            after_stress < 8,
1549            "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
1550        );
1551        // Push enough successes to fully evict the timeouts AND
1552        // accumulate at least one full window of fresh evidence for
1553        // an Increase. window_ops to evict + window_ops to gate first
1554        // +1 = 2 * window_ops minimum; use 3x for safety margin.
1555        for _ in 0..(cfg.window_ops * 3) {
1556            l.observe(Outcome::Success, Duration::from_millis(20));
1557        }
1558        let after_recovery = l.current();
1559        // Strict greater-than: FIFO MUST flush the timeouts so a
1560        // fresh-window Increase can fire.
1561        assert!(
1562            after_recovery > after_stress,
1563            "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
1564        );
1565    }
1566
1567    /// With `enabled = false`, the controller is a no-op. Hot paths
1568    /// must see exactly `initial` at every check, no exceptions.
1569    #[test]
1570    fn disabled_controller_returns_initial_value_invariantly() {
1571        let cfg = LimiterConfig {
1572            enabled: false,
1573            ..cfg_for_tests()
1574        };
1575        let initial = 8;
1576        let l = Limiter::new(initial, cfg);
1577        for i in 0..1_000 {
1578            let outcome = match i % 4 {
1579                0 => Outcome::Success,
1580                1 => Outcome::Timeout,
1581                2 => Outcome::NetworkError,
1582                _ => Outcome::ApplicationError,
1583            };
1584            l.observe(outcome, Duration::from_millis(50));
1585            assert_eq!(
1586                l.current(),
1587                initial,
1588                "disabled controller moved at iter {i}",
1589            );
1590        }
1591    }
1592
1593    /// 100 tasks concurrently observing 100 successes each. The cap
1594    /// must remain a valid in-bounds value, no panic, no deadlock.
1595    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1596    async fn concurrent_observations_do_not_corrupt_window() {
1597        let cfg = cfg_for_tests();
1598        let l = Limiter::new(4, cfg.clone());
1599        let mut handles = Vec::with_capacity(100);
1600        for _ in 0..100 {
1601            let l_clone = l.clone();
1602            handles.push(tokio::spawn(async move {
1603                for _ in 0..100 {
1604                    l_clone.observe(Outcome::Success, Duration::from_millis(5));
1605                }
1606            }));
1607        }
1608        for h in handles {
1609            h.await.unwrap();
1610        }
1611        let cur = l.current();
1612        assert!(
1613            cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
1614            "cap out of bounds after concurrent observations: {cur}",
1615        );
1616    }
1617
1618    /// Persisted higher values from a prior run must beat low cold-start
1619    /// defaults. Otherwise warm-start would silently pessimize throughput.
1620    /// (Values BELOW cold-start are floored — see
1621    /// `warm_start_floors_at_cold_defaults`.)
1622    #[test]
1623    fn persisted_snapshot_warm_starts_above_cold_floor() {
1624        let dir = tempfile::tempdir().unwrap();
1625        let path = dir.path().join("client_adaptive.json");
1626        // All snapshot values ABOVE the production cold-start defaults
1627        // so the warm_start floor doesn't kick in.
1628        let saved = ChannelStart {
1629            quote: 64,
1630            store: 32,
1631            fetch: 128,
1632        };
1633        save_snapshot(&path, saved);
1634        let loaded = load_snapshot(&path).unwrap();
1635
1636        // Build a controller with intentionally low cold-start values
1637        // — these get overridden by warm_start.
1638        let low = ChannelStart {
1639            quote: 2,
1640            store: 2,
1641            fetch: 2,
1642        };
1643        let c = AdaptiveController::new(low, AdaptiveConfig::default());
1644        c.warm_start(loaded);
1645        assert_eq!(c.quote.current(), 64);
1646        assert_eq!(c.store.current(), 32);
1647        assert_eq!(c.fetch.current(), 128);
1648    }
1649
1650    /// Two threads racing on `save_snapshot` must never produce a
1651    /// half-written file. Atomic-rename guarantees we either see the
1652    /// old content or the new content, never a torn write.
1653    #[test]
1654    fn save_load_round_trip_with_concurrent_writes() {
1655        use std::thread;
1656        let dir = tempfile::tempdir().unwrap();
1657        let path = dir.path().join("client_adaptive.json");
1658        let path_a = path.clone();
1659        let path_b = path.clone();
1660        let snap_a = ChannelStart {
1661            quote: 10,
1662            store: 10,
1663            fetch: 10,
1664        };
1665        let snap_b = ChannelStart {
1666            quote: 99,
1667            store: 99,
1668            fetch: 99,
1669        };
1670        let h_a = thread::spawn(move || {
1671            for _ in 0..50 {
1672                save_snapshot(&path_a, snap_a);
1673            }
1674        });
1675        let h_b = thread::spawn(move || {
1676            for _ in 0..50 {
1677                save_snapshot(&path_b, snap_b);
1678            }
1679        });
1680        h_a.join().unwrap();
1681        h_b.join().unwrap();
1682        let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
1683        let valid = (loaded.quote == snap_a.quote
1684            && loaded.store == snap_a.store
1685            && loaded.fetch == snap_a.fetch)
1686            || (loaded.quote == snap_b.quote
1687                && loaded.store == snap_b.store
1688                && loaded.fetch == snap_b.fetch);
1689        assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
1690    }
1691
1692    /// `save_snapshot` to an unwritable / impossible path must be a
1693    /// quiet no-op: best-effort, no panic, no error propagation.
1694    #[test]
1695    fn save_snapshot_to_unwritable_dir_does_not_panic() {
1696        // A path under a non-existent absolute root that the process
1697        // also cannot create. On macOS/Linux a write under "/" requires
1698        // root; create_dir_all will fail on this path.
1699        let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
1700        let snap = ChannelStart {
1701            quote: 1,
1702            store: 1,
1703            fetch: 1,
1704        };
1705        // No panic = pass. Function returns unit, errors are logged.
1706        save_snapshot(&path, snap);
1707        // File should not exist.
1708        assert!(!path.exists());
1709    }
1710
1711    /// A truncated/partial JSON file must not crash the loader; it must
1712    /// return None so the controller falls back to cold defaults.
1713    #[test]
1714    fn load_snapshot_from_truncated_file_returns_none() {
1715        let dir = tempfile::tempdir().unwrap();
1716        let path = dir.path().join("truncated.json");
1717        std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
1718        assert!(load_snapshot(&path).is_none());
1719    }
1720
1721    /// Microbench: 100k observe+current pairs must complete in well
1722    /// under 100ms. Catches any accidental quadratic behaviour or
1723    /// massive lock contention introduced by future changes.
1724    #[test]
1725    fn controller_perf_overhead_is_bounded() {
1726        let cfg = cfg_for_tests();
1727        let l = Limiter::new(8, cfg);
1728        let started = Instant::now();
1729        for _ in 0..100_000 {
1730            let _ = l.current();
1731            l.observe(Outcome::Success, Duration::from_micros(1));
1732        }
1733        let elapsed = started.elapsed();
1734        // 1µs per pair on a modern machine is generous; allow 500ms to
1735        // tolerate slow CI runners while still catching real regressions.
1736        assert!(
1737            elapsed < Duration::from_millis(500),
1738            "100k observe+current pairs took {elapsed:?}, expected <500ms",
1739        );
1740    }
1741
1742    // ---- Regression tests for adversarial-review findings ----
1743
1744    /// M10 fix: hand-edited or future-schema configs may plant `NaN`
1745    /// or out-of-range values into the float fields. Constructing a
1746    /// controller and feeding observations must not panic.
1747    /// `Duration::from_secs_f64(NaN)` panics per std docs, so without
1748    /// `sanitize()` and the EWMA NaN guard this would crash.
1749    #[test]
1750    fn nan_and_out_of_range_config_does_not_panic() {
1751        let cfg = AdaptiveConfig {
1752            enabled: true,
1753            min_concurrency: 0, // sub-floor; sanitize raises to 1
1754            max: ChannelMax {
1755                quote: 0, // sub-min; sanitize raises to min
1756                store: 0,
1757                fetch: 0,
1758            },
1759            window_ops: 10,
1760            min_window_ops: 50, // > window_ops; sanitize clamps
1761            success_target: f64::NAN,
1762            timeout_ceiling: f64::INFINITY,
1763            latency_inflation_factor: f64::NEG_INFINITY,
1764            latency_ewma_alpha: f64::NAN,
1765        };
1766        let c = AdaptiveController::new(ChannelStart::default(), cfg);
1767        // Verify sanitize() ACTUALLY corrected the values (not just
1768        // that no panic occurred). Reading c.config back proves the
1769        // sanitization landed.
1770        let post = &c.config;
1771        assert_eq!(
1772            post.min_concurrency, 1,
1773            "sanitize did not raise min_concurrency from 0"
1774        );
1775        assert!(
1776            post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
1777            "sanitize did not clamp success_target from NaN: {}",
1778            post.success_target
1779        );
1780        assert!(
1781            post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
1782            "sanitize did not clamp timeout_ceiling from Inf: {}",
1783            post.timeout_ceiling
1784        );
1785        assert!(
1786            post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
1787            "sanitize did not fix latency_inflation_factor from -Inf: {}",
1788            post.latency_inflation_factor
1789        );
1790        assert!(
1791            post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
1792            "sanitize did not fix latency_ewma_alpha from NaN: {}",
1793            post.latency_ewma_alpha
1794        );
1795        assert!(
1796            post.min_window_ops <= post.window_ops,
1797            "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
1798            post.min_window_ops,
1799            post.window_ops
1800        );
1801        assert!(
1802            post.max.quote >= post.min_concurrency,
1803            "max.quote below min_concurrency"
1804        );
1805        // Now exercise the runtime under hostile latencies — must
1806        // not panic.
1807        for _ in 0..200 {
1808            c.store
1809                .observe(Outcome::Success, Duration::from_secs(99_999));
1810            c.store.observe(Outcome::Timeout, Duration::ZERO);
1811        }
1812        let cur = c.store.current();
1813        assert!(cur >= 1, "cap below floor: {cur}");
1814    }
1815
1816    /// M3+M6 fix: a burst of N concurrent in-flight chunks all
1817    /// observing stress at almost the same time used to pile-drive
1818    /// the cap from N to 1 in N back-to-back ticks. After the fix,
1819    /// decreases require `min_window_ops` of FRESH evidence between
1820    /// successive Decreases, so a single transient burst can drop
1821    /// the cap by at most one halving.
1822    #[test]
1823    fn transient_burst_does_not_pile_drive_to_floor() {
1824        let cfg = LimiterConfig {
1825            window_ops: 32,
1826            min_window_ops: 8,
1827            success_target: 0.95,
1828            timeout_ceiling: 0.10,
1829            ..cfg_for_tests()
1830        };
1831        let l = Limiter::new(32, cfg);
1832        // Simulate 8 concurrent ops all completing as Timeout in a
1833        // back-to-back burst (the kind of event that previously
1834        // floor-slammed the cap).
1835        for _ in 0..8 {
1836            l.observe(Outcome::Timeout, Duration::from_millis(10));
1837        }
1838        // After one burst, cap should have decreased AT MOST once
1839        // (32 -> 16). Pile-driving would land at 1 or 2.
1840        let after_burst = l.current();
1841        assert!(
1842            after_burst >= 16,
1843            "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
1844        );
1845    }
1846
1847    /// M2 fix: classifier must map transport-related errors to
1848    /// `NetworkError`, not `ApplicationError`. Test EACH transport
1849    /// variant separately so a regression in any one variant is
1850    /// caught by name.
1851    #[tokio::test]
1852    async fn transport_errors_classify_as_capacity_signal() {
1853        use crate::data::client::classify_error;
1854        use crate::data::error::Error;
1855        let make_cfg = || LimiterConfig {
1856            window_ops: 16,
1857            min_window_ops: 5,
1858            success_target: 0.5,
1859            timeout_ceiling: 0.5,
1860            ..cfg_for_tests()
1861        };
1862        // Cases: (variant_name, error_factory)
1863        type ErrFactory = Box<dyn Fn() -> Error>;
1864        let cases: Vec<(&str, ErrFactory)> = vec![
1865            ("Network", Box::new(|| Error::Network("net".to_string()))),
1866            (
1867                "InsufficientPeers",
1868                Box::new(|| Error::InsufficientPeers("ip".to_string())),
1869            ),
1870            ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
1871            ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
1872            ("Storage", Box::new(|| Error::Storage("s".to_string()))),
1873            (
1874                "PartialUpload",
1875                Box::new(|| Error::PartialUpload {
1876                    stored: vec![],
1877                    stored_count: 0,
1878                    failed: vec![],
1879                    failed_count: 0,
1880                    total_chunks: 0,
1881                    reason: "r".to_string(),
1882                }),
1883            ),
1884        ];
1885        for (name, mk) in &cases {
1886            let l = Limiter::new(8, make_cfg());
1887            for _ in 0..16 {
1888                let _: std::result::Result<(), Error> =
1889                    observe_op(&l, || async { Err(mk()) }, classify_error).await;
1890            }
1891            // Each variant alone must drive the cap STRICTLY below
1892            // the start (8 -> 4 via one halving). If a variant maps
1893            // to ApplicationError, cap stays at 8.
1894            let cur = l.current();
1895            assert!(
1896                cur < 8,
1897                "{name} not classified as capacity signal: cap stayed at {cur}",
1898            );
1899        }
1900    }
1901
1902    /// C4 fix: per-channel max ceilings. Confirm that a `LimiterConfig`
1903    /// with a constrained `max_concurrency` does not bleed into other
1904    /// channels. The ceilings are independent.
1905    #[test]
1906    fn per_channel_ceilings_are_independent() {
1907        let cfg = AdaptiveConfig {
1908            max: ChannelMax {
1909                quote: 4,    // tightly capped
1910                store: 8,    // moderate
1911                fetch: 1024, // very high
1912            },
1913            ..AdaptiveConfig::default()
1914        };
1915        let c = AdaptiveController::new(
1916            ChannelStart {
1917                quote: 4,
1918                store: 8,
1919                fetch: 64,
1920            },
1921            cfg,
1922        );
1923        // Feed 1000 successes to each channel; each must respect its
1924        // own ceiling and never one another's.
1925        for _ in 0..1000 {
1926            c.quote.observe(Outcome::Success, Duration::from_micros(10));
1927            c.store.observe(Outcome::Success, Duration::from_micros(10));
1928            c.fetch.observe(Outcome::Success, Duration::from_micros(10));
1929        }
1930        assert_eq!(c.quote.current(), 4, "quote should cap at 4");
1931        assert_eq!(c.store.current(), 8, "store should cap at 8");
1932        // fetch starts at 64, slow-start doubles each window. With
1933        // 1000 successes and window_ops=32, ~31 windows fire; cap
1934        // doubles 64 -> 128 -> 256 -> 512 -> 1024 = 4 doublings. Cap
1935        // MUST reach the channel's max of 1024.
1936        assert_eq!(
1937            c.fetch.current(),
1938            1024,
1939            "fetch did not reach its independent max of 1024; got {}",
1940            c.fetch.current()
1941        );
1942    }
1943
1944    /// Cold-start equals the prior static defaults so the FIRST batch
1945    /// on a fresh install behaves identically. Guards against future
1946    /// commits silently dropping cold-start values below the prior
1947    /// statics.
1948    #[test]
1949    fn cold_start_at_least_prior_static_defaults() {
1950        let cs = ChannelStart::default();
1951        // Prior statics: quote=32, store=8. Fetch was effectively
1952        // unbounded (the entire self_encryption batch was fired at
1953        // once); we picked 64 as a conservative substitute so the
1954        // first batch of <= 64 chunks is indistinguishable.
1955        assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
1956        assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
1957        assert!(cs.fetch >= 64, "fetch cold-start regressed: {}", cs.fetch);
1958    }
1959
1960    /// Reviewer N-M5 guard: with the new gated-decrease semantics
1961    /// (decreases require `min_window_ops` of fresh evidence), the
1962    /// controller must STILL reach the floor under sustained stress
1963    /// within a bounded number of observations. Otherwise we've made
1964    /// the controller too sluggish to react to a real network
1965    /// outage.
1966    ///
1967    /// From start = 64 with `min_window_ops = 8`, reaching floor 1
1968    /// takes log2(64) = 6 halvings, each gated on 8 fresh samples,
1969    /// so the upper bound is roughly `6 * 8 + min_window_ops = ~56`
1970    /// observations. We allow 200 to absorb the warm-up window and
1971    /// any per-sample evaluation slack.
1972    #[test]
1973    fn sustained_stress_reaches_floor_within_bounded_ops() {
1974        let cfg = LimiterConfig {
1975            window_ops: 32,
1976            min_window_ops: 8,
1977            success_target: 0.95,
1978            timeout_ceiling: 0.10,
1979            max_concurrency: 64,
1980            ..cfg_for_tests()
1981        };
1982        let l = Limiter::new(64, cfg);
1983        let mut ops = 0usize;
1984        while l.current() > 1 && ops < 200 {
1985            l.observe(Outcome::Timeout, Duration::from_millis(10));
1986            ops += 1;
1987        }
1988        assert_eq!(
1989            l.current(),
1990            1,
1991            "controller did not reach floor within 200 observations under \
1992             sustained timeout stress; took {ops} ops, ended at cap {}",
1993            l.current()
1994        );
1995    }
1996
1997    /// The default `AdaptiveController` (production defaults) starts
1998    /// each channel at the documented cold-start value, with each
1999    /// per-channel max strictly above the start (so the controller
2000    /// has room to grow).
2001    #[test]
2002    fn default_controller_has_growth_headroom() {
2003        let c = AdaptiveController::default();
2004        let cs = ChannelStart::default();
2005        let max = ChannelMax::default();
2006        assert_eq!(c.quote.current(), cs.quote);
2007        assert_eq!(c.store.current(), cs.store);
2008        assert_eq!(c.fetch.current(), cs.fetch);
2009        assert!(
2010            max.quote > cs.quote,
2011            "no growth headroom for quote: max={} start={}",
2012            max.quote,
2013            cs.quote
2014        );
2015        assert!(
2016            max.store > cs.store,
2017            "no growth headroom for store: max={} start={}",
2018            max.store,
2019            cs.store
2020        );
2021        assert!(
2022            max.fetch > cs.fetch,
2023            "no growth headroom for fetch: max={} start={}",
2024            max.fetch,
2025            cs.fetch
2026        );
2027    }
2028
2029    // ---- Codex review (round 3) regression tests ----
2030
2031    /// Codex CRITICAL: warm_start was blindly restoring caps below the
2032    /// cold-start floor. A prior bad run that drove store=1 would
2033    /// pessimize every subsequent run forever. The fix floors warm
2034    /// values at `ChannelStart::default()` per channel.
2035    #[test]
2036    fn warm_start_floors_at_cold_defaults() {
2037        let c = AdaptiveController::default();
2038        let cold = ChannelStart::default();
2039        // Snapshot from a "bad prior run" — every channel pinned to 1.
2040        let bad_snap = ChannelStart {
2041            quote: 1,
2042            store: 1,
2043            fetch: 1,
2044        };
2045        c.warm_start(bad_snap);
2046        // After warm_start, each channel should be AT LEAST the
2047        // cold-start value, not the persisted 1.
2048        assert_eq!(
2049            c.quote.current(),
2050            cold.quote,
2051            "quote warm_start did not floor at cold default"
2052        );
2053        assert_eq!(
2054            c.store.current(),
2055            cold.store,
2056            "store warm_start did not floor at cold default"
2057        );
2058        assert_eq!(
2059            c.fetch.current(),
2060            cold.fetch,
2061            "fetch warm_start did not floor at cold default"
2062        );
2063    }
2064
2065    /// Warm values ABOVE the cold-start floor must still be honored —
2066    /// the floor is a one-sided lower bound, not a clamp.
2067    #[test]
2068    fn warm_start_honors_values_above_cold_floor() {
2069        let c = AdaptiveController::default();
2070        let cold = ChannelStart::default();
2071        let snap = ChannelStart {
2072            quote: cold.quote * 2,
2073            store: cold.store * 4,
2074            fetch: cold.fetch * 2,
2075        };
2076        c.warm_start(snap);
2077        assert_eq!(c.quote.current(), snap.quote);
2078        assert_eq!(c.store.current(), snap.store);
2079        assert_eq!(c.fetch.current(), snap.fetch);
2080    }
2081
2082    /// Codex MAJOR: long pipelines used to capture the cap once via
2083    /// `buffer_unordered(N)`. `rebucketed` re-reads the cap at each
2084    /// batch boundary so adaptive growth/decay actually takes effect
2085    /// mid-stream. Test: fire 200 items at start cap=4, then halfway
2086    /// through bump the cap manually via warm_start to 16, and assert
2087    /// the LATER batches see the new cap.
2088    #[tokio::test]
2089    async fn rebucketed_picks_up_cap_changes_mid_stream() {
2090        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2091        use std::sync::Arc as StdArc;
2092        let cfg = LimiterConfig {
2093            min_concurrency: 1,
2094            max_concurrency: 32,
2095            ..cfg_for_tests()
2096        };
2097        let l = Limiter::new(4, cfg);
2098        let max_seen = StdArc::new(AtomicUsize::new(0));
2099        let in_flight = StdArc::new(AtomicUsize::new(0));
2100        let processed = StdArc::new(AtomicUsize::new(0));
2101        let l_for_bump = l.clone();
2102        let processed_for_bump = processed.clone();
2103        // Spawn a watcher that bumps the cap once enough items have
2104        // started to "warm up".
2105        let bump_handle = tokio::spawn(async move {
2106            loop {
2107                tokio::time::sleep(Duration::from_millis(2)).await;
2108                if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
2109                    l_for_bump.warm_start(16);
2110                    return;
2111                }
2112            }
2113        });
2114        let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
2115            let max_seen = max_seen.clone();
2116            let in_flight = in_flight.clone();
2117            let processed = processed.clone();
2118            async move {
2119                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2120                max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
2121                tokio::time::sleep(Duration::from_millis(1)).await;
2122                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2123                processed.fetch_add(1, AtomicOrdering::Relaxed);
2124                Ok::<(), &'static str>(())
2125            }
2126        })
2127        .await
2128        .unwrap();
2129        bump_handle.await.unwrap();
2130        // The cap was bumped to 16 mid-stream. If rebucketing actually
2131        // picks up cap changes, max_seen should reach above the
2132        // initial 4.
2133        let peak = max_seen.load(AtomicOrdering::Relaxed);
2134        assert!(
2135            peak > 4,
2136            "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
2137        );
2138    }
2139
2140    /// Codex MAJOR: `observe_op` cancellation safety. If the wrapper
2141    /// future is dropped before the inner op completes, no outcome is
2142    /// recorded (intentional — dropped work was never observed by
2143    /// the network). This test asserts the contract: completed ops
2144    /// land observations, dropped ops do not corrupt the window.
2145    /// Two-sided: confirms cancellation is a NO-OP, AND confirms
2146    /// post-cancellation observations DO land normally (proving the
2147    /// limiter's internal state was not corrupted).
2148    #[tokio::test]
2149    async fn observe_op_cancellation_drops_silently() {
2150        let cfg = LimiterConfig {
2151            window_ops: 16,
2152            min_window_ops: 4,
2153            ..cfg_for_tests()
2154        };
2155        let l = Limiter::new(4, cfg);
2156        // Build a future that never completes, then drop it before
2157        // awaiting. observe_op must not panic on drop and must not
2158        // record an outcome.
2159        let l_clone = l.clone();
2160        let fut = observe_op(
2161            &l_clone,
2162            || async {
2163                std::future::pending::<()>().await;
2164                Ok::<(), &'static str>(())
2165            },
2166            |_| Outcome::Timeout,
2167        );
2168        drop(fut);
2169        // Cap unchanged: no observation was recorded.
2170        assert_eq!(l.current(), 4, "cancelled op moved the cap");
2171        // Now feed observations that ACTUALLY count as Success (the
2172        // Ok branch of observe_op is always Outcome::Success — the
2173        // classifier only runs on Err). Cold-start at 4 + a full
2174        // window of healthy successes = double to 8.
2175        for _ in 0..16 {
2176            let _: Result<(), &'static str> = observe_op(
2177                &l,
2178                || async { Ok(()) },
2179                // classifier only fires on Err; Ok is always Success
2180                |_| Outcome::NetworkError,
2181            )
2182            .await;
2183        }
2184        // STRICT: cap must have GROWN, not just held. If cancellation
2185        // had corrupted internal counters, slow-start might be stuck.
2186        assert!(
2187            l.current() > 4,
2188            "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
2189            l.current(),
2190        );
2191    }
2192
2193    /// Codex MAJOR: Drop persistence must be reliable. The CLI relies
2194    /// on Client::drop firing a synchronous save. If save_snapshot
2195    /// were dispatched via fire-and-forget spawn_blocking, runtime
2196    /// teardown would silently lose the snapshot. This test asserts
2197    /// that calling save_snapshot synchronously from a normal context
2198    /// (not Drop, but the same code path) actually writes.
2199    #[test]
2200    fn save_snapshot_is_synchronous_and_durable() {
2201        let dir = tempfile::tempdir().unwrap();
2202        let path = dir.path().join("client_adaptive.json");
2203        let snap = ChannelStart {
2204            quote: 100,
2205            store: 50,
2206            fetch: 200,
2207        };
2208        save_snapshot(&path, snap);
2209        // The file must exist immediately after save_snapshot returns.
2210        // No async waiting, no spawn_blocking, no eventual consistency.
2211        assert!(
2212            path.exists(),
2213            "save_snapshot did not write file synchronously"
2214        );
2215        let loaded = load_snapshot(&path).unwrap();
2216        assert_eq!(loaded.quote, 100);
2217        assert_eq!(loaded.store, 50);
2218        assert_eq!(loaded.fetch, 200);
2219    }
2220
2221    // ---- Codex round 4 regression tests ----
2222
2223    /// Codex CR-2 fix: warm_start marks the limiter as having
2224    /// already-left-slow-start, so a single healthy window does NOT
2225    /// double the cap (which would be over-aggressive resume from a
2226    /// learned value).
2227    #[tokio::test]
2228    async fn warm_start_disables_slow_start_doubling() {
2229        let cfg = LimiterConfig {
2230            window_ops: 8,
2231            min_window_ops: 4,
2232            success_target: 0.9,
2233            ..cfg_for_tests()
2234        };
2235        let l = Limiter::new(2, cfg.clone());
2236        // Warm-start to a learned value of 16. This must not be
2237        // treated as a fresh slow-start.
2238        l.warm_start(16);
2239        assert_eq!(l.current(), 16);
2240        // One full healthy window: in slow-start would double to 32;
2241        // post-warm-start it should add +1 to 17.
2242        for _ in 0..cfg.window_ops {
2243            l.observe(Outcome::Success, Duration::from_millis(10));
2244        }
2245        assert_eq!(
2246            l.current(),
2247            17,
2248            "warm-start triggered slow-start doubling instead of additive +1"
2249        );
2250    }
2251
2252    /// Codex CR-3 fix: warm_start floors against the per-instance
2253    /// cold-start, NOT the global ChannelStart::default. A controller
2254    /// built with custom low starts must stay faithful to its
2255    /// construction parameters even after warm_start.
2256    #[test]
2257    fn controller_warm_start_floors_at_per_instance_cold_start() {
2258        let custom_cold = ChannelStart {
2259            quote: 2,
2260            store: 1,
2261            fetch: 4,
2262        };
2263        let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
2264        // Snapshot below the per-instance cold-start floors at custom values.
2265        c.warm_start(ChannelStart {
2266            quote: 1,
2267            store: 1,
2268            fetch: 1,
2269        });
2270        assert_eq!(c.quote.current(), 2);
2271        assert_eq!(c.store.current(), 1);
2272        assert_eq!(c.fetch.current(), 4);
2273        // Snapshot above the per-instance cold-start uses the snapshot.
2274        c.warm_start(ChannelStart {
2275            quote: 10,
2276            store: 10,
2277            fetch: 10,
2278        });
2279        assert_eq!(c.quote.current(), 10);
2280        assert_eq!(c.store.current(), 10);
2281        assert_eq!(c.fetch.current(), 10);
2282    }
2283
2284    /// Codex CR-3 fix: when adaptive.enabled = false, warm_start is
2285    /// a no-op — fixed-concurrency mode means the user wants exactly
2286    /// the cold start, not a learned value from a prior run.
2287    #[test]
2288    fn warm_start_is_noop_when_adaptive_disabled() {
2289        let cfg = AdaptiveConfig {
2290            enabled: false,
2291            ..AdaptiveConfig::default()
2292        };
2293        let custom_cold = ChannelStart {
2294            quote: 5,
2295            store: 5,
2296            fetch: 5,
2297        };
2298        let c = AdaptiveController::new(custom_cold, cfg);
2299        c.warm_start(ChannelStart {
2300            quote: 100,
2301            store: 100,
2302            fetch: 100,
2303        });
2304        assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
2305        assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
2306        assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
2307    }
2308
2309    /// Codex CR-4 fix: rebucketed_unordered is rolling, not batch-fenced.
2310    /// One slow item must NOT block other items in the same logical
2311    /// "wave" — the in-flight set should refill as fast items complete.
2312    #[tokio::test]
2313    async fn rebucketed_unordered_is_rolling_not_fenced() {
2314        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2315        use std::sync::Arc as StdArc;
2316        let cfg = LimiterConfig {
2317            min_concurrency: 1,
2318            max_concurrency: 8,
2319            window_ops: 100,
2320            min_window_ops: 50,
2321            ..cfg_for_tests()
2322        };
2323        let l = Limiter::new(4, cfg);
2324        let in_flight = StdArc::new(AtomicUsize::new(0));
2325        let max_in_flight = StdArc::new(AtomicUsize::new(0));
2326        let started = StdArc::new(AtomicUsize::new(0));
2327        let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
2328            let in_flight = in_flight.clone();
2329            let max_in_flight = max_in_flight.clone();
2330            let started = started.clone();
2331            async move {
2332                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2333                max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
2334                started.fetch_add(1, AtomicOrdering::Relaxed);
2335                // Item 0 is intentionally slow; items 1..20 are fast.
2336                // In a batch-fenced scheduler, item 0 would gate the
2337                // start of items in the next batch. In a rolling
2338                // scheduler, items 1..N can start as soon as their
2339                // slot frees from a fast completion.
2340                if i == 0 {
2341                    tokio::time::sleep(Duration::from_millis(50)).await;
2342                } else {
2343                    tokio::time::sleep(Duration::from_millis(1)).await;
2344                }
2345                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2346                Ok::<(), &'static str>(())
2347            }
2348        })
2349        .await
2350        .unwrap();
2351        // All 20 items must have started; in a rolling scheduler the
2352        // peak in-flight should reach at least 4 (the cap).
2353        assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
2354        let peak = max_in_flight.load(AtomicOrdering::Relaxed);
2355        assert!(
2356            peak >= 4,
2357            "rolling scheduler did not fill cap; peak in-flight = {peak}"
2358        );
2359    }
2360
2361    /// Codex CR-4 fix: rebucketed_ordered preserves input order.
2362    #[tokio::test]
2363    async fn rebucketed_ordered_preserves_input_order() {
2364        let cfg = LimiterConfig {
2365            min_concurrency: 1,
2366            max_concurrency: 4,
2367            ..cfg_for_tests()
2368        };
2369        let l = Limiter::new(4, cfg);
2370        let items: Vec<usize> = (0..50).collect();
2371        let result: Vec<usize> = rebucketed_ordered(
2372            &l,
2373            items.iter().copied().enumerate(),
2374            |(idx, v)| async move {
2375                // Reverse-bias delay so out-of-order completion is likely.
2376                let delay = (50 - v) as u64;
2377                tokio::time::sleep(Duration::from_micros(delay)).await;
2378                Ok::<_, &'static str>((idx, v * 10))
2379            },
2380        )
2381        .await
2382        .unwrap();
2383        assert_eq!(result.len(), 50);
2384        for (i, v) in result.iter().enumerate() {
2385            assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
2386        }
2387    }
2388
2389    /// Codex CR-1 regression guard (logical, not the actual data path):
2390    /// rebucketed_ordered with a payload of (idx, hash) must always
2391    /// pair the right hash with the right chunk content even under
2392    /// adversarial out-of-order completion.
2393    #[tokio::test]
2394    async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
2395        let cfg = LimiterConfig {
2396            min_concurrency: 1,
2397            max_concurrency: 8,
2398            ..cfg_for_tests()
2399        };
2400        let l = Limiter::new(8, cfg);
2401        // Each item is (idx, fake_hash). The "fetch" returns
2402        // (idx, content_for_hash). We adversarially out-of-order them
2403        // and assert that the post-sort puts content with the right
2404        // index.
2405        let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
2406        let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
2407            let delay = (40 - idx) as u64; // reverse delay
2408            tokio::time::sleep(Duration::from_micros(delay)).await;
2409            // "content_for_hash" derived from the hash.
2410            Ok::<_, &'static str>((idx, hash * 7))
2411        })
2412        .await
2413        .unwrap();
2414        for (i, v) in result.iter().enumerate() {
2415            let expected = (1000 + i as u64) * 7;
2416            assert_eq!(
2417                *v, expected,
2418                "idx {i} paired with wrong content: {v}, expected {expected}"
2419            );
2420        }
2421    }
2422
2423    /// Codex CR-5 fix: snapshot temp file is unique per save call,
2424    /// not just per-PID. Two save_snapshot calls in the SAME process
2425    /// must not collide on the temp file.
2426    #[test]
2427    fn save_snapshot_temp_file_is_unique_per_call() {
2428        let dir = tempfile::tempdir().unwrap();
2429        let path = dir.path().join("client_adaptive.json");
2430        // Fire many saves back-to-back in the same process. Without
2431        // a per-call unique suffix, the temp file would be the same
2432        // for every call (PID is constant), and any partial write +
2433        // crash window would expose the race. We can't simulate the
2434        // exact race in a unit test, but we can confirm no panic and
2435        // the final file is correct after many calls.
2436        for i in 0..100 {
2437            save_snapshot(
2438                &path,
2439                ChannelStart {
2440                    quote: i + 1,
2441                    store: i + 1,
2442                    fetch: i + 1,
2443                },
2444            );
2445        }
2446        let loaded = load_snapshot(&path).unwrap();
2447        assert_eq!(loaded.quote, 100);
2448        assert_eq!(loaded.store, 100);
2449        assert_eq!(loaded.fetch, 100);
2450        // Confirm no leftover .tmp files.
2451        let leftover: Vec<_> = std::fs::read_dir(dir.path())
2452            .unwrap()
2453            .filter_map(|e| e.ok())
2454            .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
2455            .collect();
2456        assert!(
2457            leftover.is_empty(),
2458            "temp files leaked: {:?}",
2459            leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
2460        );
2461    }
2462
2463    // ---- Edge case tests ----
2464
2465    /// Edge case: rebucketed_unordered with EMPTY input returns empty
2466    /// Vec immediately, no panic, no work scheduled.
2467    #[tokio::test]
2468    async fn rebucketed_empty_input_returns_empty() {
2469        let cfg = cfg_for_tests();
2470        let l = Limiter::new(4, cfg);
2471        let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
2472            Ok::<_, &'static str>(42usize)
2473        })
2474        .await
2475        .unwrap();
2476        assert!(v.is_empty());
2477        let v: Vec<usize> = rebucketed_ordered(
2478            &l,
2479            std::iter::empty::<(usize, ())>(),
2480            |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
2481        )
2482        .await
2483        .unwrap();
2484        assert!(v.is_empty());
2485    }
2486
2487    /// Edge case: rebucketed_unordered with EXACTLY cap items.
2488    #[tokio::test]
2489    async fn rebucketed_exactly_cap_items() {
2490        let cfg = LimiterConfig {
2491            min_concurrency: 1,
2492            max_concurrency: 4,
2493            ..cfg_for_tests()
2494        };
2495        let l = Limiter::new(4, cfg);
2496        let v: Vec<usize> =
2497            rebucketed_unordered(
2498                &l,
2499                0..4usize,
2500                |i| async move { Ok::<_, &'static str>(i * 2) },
2501            )
2502            .await
2503            .unwrap();
2504        assert_eq!(v.len(), 4);
2505    }
2506
2507    /// Edge case: rebucketed_unordered preserves the FIRST error and
2508    /// discards subsequent ones, draining in-flight work first.
2509    #[tokio::test]
2510    async fn rebucketed_preserves_first_error() {
2511        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2512        use std::sync::Arc as StdArc;
2513        let cfg = LimiterConfig {
2514            min_concurrency: 1,
2515            max_concurrency: 4,
2516            ..cfg_for_tests()
2517        };
2518        let l = Limiter::new(4, cfg);
2519        let started = StdArc::new(AtomicUsize::new(0));
2520        let started_clone = started.clone();
2521        let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
2522            let started = started_clone.clone();
2523            async move {
2524                started.fetch_add(1, AtomicOrdering::Relaxed);
2525                if i == 5 {
2526                    // Slight delay so item 6, 7 also start before
2527                    // this error propagates.
2528                    tokio::time::sleep(Duration::from_micros(100)).await;
2529                    return Err("first error");
2530                }
2531                if i == 10 {
2532                    return Err("second error - should be ignored");
2533                }
2534                tokio::time::sleep(Duration::from_micros(50)).await;
2535                Ok(())
2536            }
2537        })
2538        .await;
2539        match result {
2540            Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
2541            Ok(_) => panic!("expected error, got ok"),
2542        }
2543        // The first error stops new launches, but in-flight items
2544        // drain. We don't assert exact count (nondeterministic) — only
2545        // that we did not launch ALL 20 items (proving error-stop
2546        // works) and we did launch more than just item 5 (proving
2547        // in-flight drain happens).
2548        let total = started.load(AtomicOrdering::Relaxed);
2549        assert!(
2550            (5..20).contains(&total),
2551            "started count out of range: {total}"
2552        );
2553    }
2554
2555    /// Edge case: limiter with min == max (degenerate single-value).
2556    /// Cap stays at the single value regardless of observations.
2557    #[test]
2558    fn limiter_with_min_equal_max_is_pinned() {
2559        let cfg = LimiterConfig {
2560            min_concurrency: 5,
2561            max_concurrency: 5,
2562            ..cfg_for_tests()
2563        };
2564        let l = Limiter::new(5, cfg);
2565        for _ in 0..1000 {
2566            l.observe(Outcome::Success, Duration::from_millis(1));
2567        }
2568        assert_eq!(l.current(), 5, "cap moved despite min==max");
2569        for _ in 0..1000 {
2570            l.observe(Outcome::Timeout, Duration::from_millis(50));
2571        }
2572        assert_eq!(l.current(), 5, "cap moved despite min==max");
2573    }
2574
2575    /// Direct test of `ewma()` math: alpha = 0 means new value =
2576    /// prev (the baseline never updates from new samples).
2577    #[test]
2578    fn ewma_alpha_zero_returns_prev() {
2579        let prev = Duration::from_millis(100);
2580        let sample = Duration::from_millis(500);
2581        let result = ewma(prev, sample, 0.0);
2582        assert_eq!(result, prev, "alpha=0 must return prev unchanged");
2583    }
2584
2585    /// Direct test of `ewma()` math: alpha = 1 means new value =
2586    /// sample (full overwrite).
2587    #[test]
2588    fn ewma_alpha_one_returns_sample() {
2589        let prev = Duration::from_millis(100);
2590        let sample = Duration::from_millis(500);
2591        let result = ewma(prev, sample, 1.0);
2592        // Allow 1ms of float-conversion slop.
2593        let diff = result.abs_diff(sample);
2594        assert!(
2595            diff <= Duration::from_millis(1),
2596            "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
2597        );
2598    }
2599
2600    /// Direct test of `ewma()`: alpha = 0.5 should give the midpoint.
2601    #[test]
2602    fn ewma_alpha_half_returns_midpoint() {
2603        let prev = Duration::from_millis(200);
2604        let sample = Duration::from_millis(400);
2605        let result = ewma(prev, sample, 0.5);
2606        let expected = Duration::from_millis(300);
2607        let diff = result.abs_diff(expected);
2608        assert!(
2609            diff <= Duration::from_millis(1),
2610            "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
2611        );
2612    }
2613
2614    /// Direct test of `ewma()`: NaN alpha must NOT panic and must
2615    /// preserve the previous value (defense against
2616    /// `Duration::from_secs_f64(NaN)` panic).
2617    #[test]
2618    fn ewma_nan_alpha_returns_prev() {
2619        let prev = Duration::from_millis(100);
2620        let sample = Duration::from_millis(500);
2621        let result = ewma(prev, sample, f64::NAN);
2622        assert_eq!(result, prev);
2623        let result = ewma(prev, sample, f64::INFINITY);
2624        assert_eq!(result, prev);
2625        let result = ewma(prev, sample, f64::NEG_INFINITY);
2626        assert_eq!(result, prev);
2627    }
2628
2629    /// Out-of-range alpha (e.g. 2.5) must clamp to [0,1] and NOT
2630    /// produce a negative result.
2631    #[test]
2632    fn ewma_clamps_alpha_above_one() {
2633        let prev = Duration::from_millis(100);
2634        let sample = Duration::from_millis(500);
2635        let result = ewma(prev, sample, 2.5);
2636        // Clamped to 1.0 -> should equal sample (~500ms).
2637        assert!(result >= Duration::from_millis(499));
2638        assert!(result <= Duration::from_millis(501));
2639    }
2640
2641    /// Edge case: window contains ONLY ApplicationErrors. Controller
2642    /// must HOLD (not move at all), because there are zero
2643    /// capacity-relevant samples.
2644    #[test]
2645    fn window_full_of_application_errors_does_not_move_cap() {
2646        let cfg = cfg_for_tests();
2647        let l = Limiter::new(8, cfg.clone());
2648        for _ in 0..(cfg.window_ops * 5) {
2649            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2650        }
2651        assert_eq!(
2652            l.current(),
2653            8,
2654            "cap moved on pure-app-error window; should hold"
2655        );
2656    }
2657
2658    /// Edge case: AdaptiveController with `enabled = false` plus
2659    /// observations does not move and does not interact with the
2660    /// observation window.
2661    #[test]
2662    fn disabled_adaptive_controller_truly_inert() {
2663        let cfg = AdaptiveConfig {
2664            enabled: false,
2665            ..AdaptiveConfig::default()
2666        };
2667        let c = AdaptiveController::new(ChannelStart::default(), cfg);
2668        let baseline_quote = c.quote.current();
2669        let baseline_store = c.store.current();
2670        let baseline_fetch = c.fetch.current();
2671        for _ in 0..10000 {
2672            c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
2673            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
2674            c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
2675        }
2676        assert_eq!(c.quote.current(), baseline_quote);
2677        assert_eq!(c.store.current(), baseline_store);
2678        assert_eq!(c.fetch.current(), baseline_fetch);
2679    }
2680
2681    /// Edge case: per-channel limiters share NO state. Hammering one
2682    /// channel must not move another. Two-sided: assert store DROPS
2683    /// to the floor (proving observations landed) AND quote/fetch
2684    /// are EXACTLY unchanged (proving zero cross-channel leakage).
2685    #[test]
2686    fn channel_state_is_independent() {
2687        let c = AdaptiveController::default();
2688        let q0 = c.quote.current();
2689        let f0 = c.fetch.current();
2690        let s0 = c.store.current();
2691        for _ in 0..1000 {
2692            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
2693        }
2694        // Strict: store reached the floor (observations landed).
2695        assert_eq!(
2696            c.store.current(),
2697            c.config.min_concurrency,
2698            "store did not reach floor after 1000 timeouts; cap={}",
2699            c.store.current()
2700        );
2701        assert!(c.store.current() < s0, "store cap did not move at all");
2702        // Strict: quote and fetch unchanged.
2703        assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
2704        assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
2705    }
2706
2707    // ---- Round-5 test reviewer suggestions ----
2708
2709    /// Direct unit test for `AdaptiveConfig::sanitize`. Verifies that
2710    /// every clamped field is correctly fixed up, not merely that
2711    /// the controller doesn't crash.
2712    #[test]
2713    fn sanitize_corrects_pathological_floats() {
2714        let mut cfg = AdaptiveConfig {
2715            success_target: f64::NAN,
2716            timeout_ceiling: 5.0,
2717            latency_inflation_factor: f64::NEG_INFINITY,
2718            latency_ewma_alpha: 2.5,
2719            window_ops: 4,
2720            min_window_ops: 10,
2721            ..AdaptiveConfig::default()
2722        };
2723        cfg.sanitize();
2724        assert!(cfg.success_target.is_finite());
2725        assert!((0.0..=1.0).contains(&cfg.success_target));
2726        assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
2727        assert!(cfg.latency_inflation_factor.is_finite());
2728        assert!(cfg.latency_inflation_factor > 0.0);
2729        assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
2730        assert!(
2731            cfg.min_window_ops <= cfg.window_ops,
2732            "min_window_ops {} > window_ops {}",
2733            cfg.min_window_ops,
2734            cfg.window_ops
2735        );
2736    }
2737
2738    /// Snapshot persistence relies on serde for ChannelStart and
2739    /// ChannelMax. A field rename in either type would silently
2740    /// break warm-start across binary upgrades — this test catches
2741    /// that.
2742    #[test]
2743    fn channel_max_serde_round_trips() {
2744        let m = ChannelMax {
2745            quote: 7,
2746            store: 13,
2747            fetch: 200,
2748        };
2749        let json = serde_json::to_string(&m).unwrap();
2750        let back: ChannelMax = serde_json::from_str(&json).unwrap();
2751        assert_eq!(back.quote, 7);
2752        assert_eq!(back.store, 13);
2753        assert_eq!(back.fetch, 200);
2754    }
2755
2756    #[test]
2757    fn channel_start_serde_round_trips() {
2758        let s = ChannelStart {
2759            quote: 11,
2760            store: 22,
2761            fetch: 33,
2762        };
2763        let json = serde_json::to_string(&s).unwrap();
2764        let back: ChannelStart = serde_json::from_str(&json).unwrap();
2765        assert_eq!(back.quote, 11);
2766        assert_eq!(back.store, 22);
2767        assert_eq!(back.fetch, 33);
2768    }
2769
2770    /// Mid-flight cap SHRINKAGE: `rebucketed_picks_up_cap_changes_mid_stream`
2771    /// only proves growth. Overload protection requires the reverse —
2772    /// when the controller halves the cap mid-pipeline, in-flight
2773    /// must respect the new lower cap on the next refill.
2774    #[tokio::test]
2775    async fn rebucketed_honors_cap_shrinkage_mid_stream() {
2776        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
2777        use std::sync::Arc as StdArc;
2778        let cfg = LimiterConfig {
2779            min_concurrency: 1,
2780            max_concurrency: 16,
2781            ..cfg_for_tests()
2782        };
2783        let l = Limiter::new(16, cfg);
2784        let in_flight = StdArc::new(AtomicUsize::new(0));
2785        let max_after_shrink = StdArc::new(AtomicUsize::new(0));
2786        let processed = StdArc::new(AtomicUsize::new(0));
2787        let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
2788        let l_for_shrink = l.clone();
2789        let p_for_shrink = processed.clone();
2790        let shrunk_for_shrink = shrunk.clone();
2791        let shrink_handle = tokio::spawn(async move {
2792            // Bump down the cap once 50 items have completed.
2793            loop {
2794                tokio::time::sleep(Duration::from_millis(2)).await;
2795                if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
2796                    l_for_shrink.warm_start(2);
2797                    shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
2798                    return;
2799                }
2800            }
2801        });
2802        let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
2803            let in_flight = in_flight.clone();
2804            let max_after_shrink = max_after_shrink.clone();
2805            let processed = processed.clone();
2806            let shrunk = shrunk.clone();
2807            async move {
2808                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
2809                if shrunk.load(AtomicOrdering::Relaxed) {
2810                    max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
2811                }
2812                tokio::time::sleep(Duration::from_millis(1)).await;
2813                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
2814                processed.fetch_add(1, AtomicOrdering::Relaxed);
2815                Ok::<(), &'static str>(())
2816            }
2817        })
2818        .await
2819        .unwrap();
2820        shrink_handle.await.unwrap();
2821        let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
2822        // After the shrink to cap=2, no NEW launches should put us
2823        // above 2. Already-launched in-flight may still be draining
2824        // briefly, so allow a small overshoot for the natural
2825        // refill-after-completion lag.
2826        assert!(
2827            peak <= 4,
2828            "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
2829        );
2830    }
2831
2832    /// Mixed `ApplicationError` + capacity-relevant items in one
2833    /// window. ApplicationError must NOT contribute to the success
2834    /// rate denominator — otherwise a wave with some AppErrors and
2835    /// some healthy successes would falsely look like a stressed
2836    /// window.
2837    #[test]
2838    fn mixed_window_app_errors_with_capacity_signal() {
2839        let cfg = LimiterConfig {
2840            window_ops: 10,
2841            min_window_ops: 5,
2842            timeout_ceiling: 0.2,
2843            success_target: 0.9,
2844            ..cfg_for_tests()
2845        };
2846        // Case 1: 5 AppErrors + 5 Successes. Capacity-relevant
2847        // success_rate = 5/5 = 100%. Cap must NOT decrease (it may
2848        // hold at 8 or grow via slow-start; both prove the AppErrors
2849        // didn't poison the success-rate denominator).
2850        let l = Limiter::new(8, cfg.clone());
2851        for _ in 0..5 {
2852            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2853        }
2854        for _ in 0..5 {
2855            l.observe(Outcome::Success, Duration::from_millis(50));
2856        }
2857        assert!(
2858            l.current() >= 8,
2859            "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
2860            l.current()
2861        );
2862        // Case 2: 5 AppErrors + 5 Timeouts. Capacity-relevant
2863        // success_rate = 0/5 = 0%. Cap MUST decrease.
2864        let l2 = Limiter::new(8, cfg);
2865        for _ in 0..5 {
2866            l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
2867        }
2868        for _ in 0..5 {
2869            l2.observe(Outcome::Timeout, Duration::from_millis(50));
2870        }
2871        assert!(
2872            l2.current() < 8,
2873            "all-timeouts (with AppError padding) did not decrease cap; got {}",
2874            l2.current()
2875        );
2876    }
2877
2878    /// Real concurrent torn-read test for save/load. The previous
2879    /// concurrent-write test only reads after both writers join;
2880    /// this version interleaves a reader thread with writers and
2881    /// asserts every successful load returns a coherent (non-torn)
2882    /// snapshot.
2883    #[test]
2884    fn concurrent_save_load_no_torn_reads() {
2885        use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
2886        use std::thread;
2887        let dir = tempfile::tempdir().unwrap();
2888        let path = dir.path().join("snap.json");
2889        // Seed the file so the reader doesn't get a None on first read.
2890        save_snapshot(
2891            &path,
2892            ChannelStart {
2893                quote: 1,
2894                store: 1,
2895                fetch: 1,
2896            },
2897        );
2898        let stop = std::sync::Arc::new(AtomicBool::new(false));
2899        let p_w = path.clone();
2900        let s_w = stop.clone();
2901        let writer = thread::spawn(move || {
2902            let mut i = 1usize;
2903            while !s_w.load(AtomicOrdering::Relaxed) {
2904                save_snapshot(
2905                    &p_w,
2906                    ChannelStart {
2907                        quote: i,
2908                        store: i,
2909                        fetch: i,
2910                    },
2911                );
2912                i = i.wrapping_add(1).max(1);
2913            }
2914        });
2915        let p_r = path.clone();
2916        let reader = thread::spawn(move || {
2917            let mut torn = 0usize;
2918            for _ in 0..2_000 {
2919                if let Some(snap) = load_snapshot(&p_r) {
2920                    // Coherent snapshots have all three channels equal
2921                    // (writer always saves equal values).
2922                    if snap.quote != snap.store || snap.store != snap.fetch {
2923                        torn += 1;
2924                    }
2925                }
2926            }
2927            torn
2928        });
2929        let torn = reader.join().unwrap();
2930        stop.store(true, AtomicOrdering::Relaxed);
2931        writer.join().unwrap();
2932        assert_eq!(
2933            torn, 0,
2934            "observed {torn} torn reads under concurrent writes"
2935        );
2936    }
2937
2938    /// Round-5 follow-up: `save_snapshot_with_timeout` returns
2939    /// promptly even when the underlying write would otherwise hang.
2940    /// Use a path under a non-existent root that mkdir cannot create
2941    /// to simulate a slow/failing filesystem (mkdir returns Err
2942    /// quickly so this isn't a real hang test, but it confirms the
2943    /// timeout wrapper does not block longer than the deadline on a
2944    /// fast-failing operation either).
2945    #[test]
2946    fn save_with_timeout_returns_promptly_on_fast_failure() {
2947        let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
2948        let snap = ChannelStart {
2949            quote: 1,
2950            store: 1,
2951            fetch: 1,
2952        };
2953        let started = Instant::now();
2954        save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
2955        let elapsed = started.elapsed();
2956        // Fast-failing mkdir returns immediately. The timeout
2957        // wrapper should not add measurable overhead.
2958        assert!(
2959            elapsed < Duration::from_secs(1),
2960            "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
2961        );
2962    }
2963
2964    /// Round-5 follow-up: a hung writer thread (simulated by a path
2965    /// the writer never returns from). The wrapper must time out and
2966    /// return without joining; the test must complete near the
2967    /// deadline, not hang.
2968    #[test]
2969    fn save_with_timeout_bounds_wall_time_on_hang() {
2970        // Use a real-but-slow-write simulation: hand the writer a
2971        // path that the OS will accept but with a synthetic delay
2972        // baked into a wrapping thread. Since save_snapshot itself
2973        // does no sleep, we instead test that the timeout wrapper
2974        // exits within deadline + small slack when the inner work
2975        // takes longer than the deadline. We approximate by giving
2976        // the wrapper a deadline shorter than any plausible local
2977        // disk write (1ms is too tight; 0ms is too tight). Use
2978        // 1ms deadline and assert wall time < 100ms — proving the
2979        // wrapper does NOT wait for the writer to actually finish
2980        // (the inner write to a tempdir takes a few ms typically).
2981        let dir = tempfile::tempdir().unwrap();
2982        let path = dir.path().join("snap.json");
2983        let snap = ChannelStart {
2984            quote: 1,
2985            store: 1,
2986            fetch: 1,
2987        };
2988        let started = Instant::now();
2989        // Deadline so short that on most machines the writer is
2990        // still running. The wrapper must NOT wait for it.
2991        save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
2992        let elapsed = started.elapsed();
2993        assert!(
2994            elapsed < Duration::from_millis(200),
2995            "timeout wrapper did not bound wall time: {elapsed:?}"
2996        );
2997    }
2998}