Skip to main content

ant_core/data/client/
adaptive.rs

1//! Adaptive concurrency controller for client data operations.
2//!
3//! Replaces hard-coded `quote_concurrency` / `store_concurrency` /
4//! download fan-out with a per-channel AIMD limiter that ramps up when
5//! the network is healthy and ramps down on stress signals (timeouts,
6//! errors, latency inflation). The goal is to give every machine and
7//! every connection profile a single client codebase that finds its
8//! own steady state without the user tweaking flags.
9//!
10//! ## Channels
11//!
12//! Three independent limiters share the same algorithm but track state
13//! separately, because their workloads have different cost profiles:
14//!
15//! - `quote`  — small DHT request/response messages, cheap per op
16//! - `store`  — multi-MB chunk PUTs to a close group, expensive per op
17//! - `fetch`  — multi-MB chunk GETs from peers, asymmetric to `store`
18//!
19//! ## Algorithms
20//!
21//! Quote and store use TCP-style AIMD with slow-start:
22//!
23//! - **Slow-start**: starting concurrency doubles after each healthy
24//!   window until first stress signal or until the configured ceiling.
25//! - **Steady state**: additive +1 per healthy window (>= success_target
26//!   success rate AND p95 latency within `latency_inflation_factor` of
27//!   the rolling baseline).
28//! - **Stress**: multiplicative decrease (current / 2, floor 1) on any
29//!   of: success rate < success_target, timeout rate > timeout_ceiling,
30//!   or p95 latency above `latency_inflation_factor * baseline`.
31//!
32//! Decisions evaluate over a sliding window of the last `window_ops`
33//! observed outcomes per channel. Below `min_window_ops` outcomes the
34//! controller holds steady — too few samples to act on.
35//!
36//! Fetch uses a throughput-seeking hill climber instead. It measures
37//! bytes/sec over epochs, probes nearby concurrency values, accepts
38//! higher caps only when goodput improves materially, and accepts lower
39//! caps when goodput is effectively unchanged. Stress signals still cut
40//! concurrency immediately.
41//!
42//! ## What this is not
43//!
44//! - Not a payment-batching controller. Wave / batch sizes are
45//!   orthogonal (gas-economics tradeoff, not throughput).
46//! - Not a persistent peer-quality scorer. Bootstrap cache scoring was
47//!   removed from saorsa-core; this controller only tunes client
48//!   concurrency.
49
50use futures::stream::{self, FuturesUnordered, StreamExt};
51use serde::{Deserialize, Serialize};
52use std::collections::VecDeque;
53use std::path::{Path, PathBuf};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::{Arc, Mutex, PoisonError};
56use std::time::{Duration, Instant};
57use tracing::{debug, warn};
58
59/// Process-monotonic counter for unique snapshot temp filenames.
60/// Combined with PID + nanosecond timestamp, makes collision
61/// effectively impossible across concurrent save_snapshot calls.
62static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
63
64/// Fetch starts at the residential-saturation floor validated in
65/// production. The hill climber will find higher caps on
66/// machines/networks that can actually use them.
67const FETCH_COLD_START_CONCURRENCY: usize = 4;
68
69/// Hill-climb probes grow/shrink by roughly 25% of the current best cap.
70const HILL_PROBE_STEP_DIVISOR: usize = 4;
71
72/// Minimum probe movement so low caps can still explore.
73const HILL_MIN_PROBE_STEP: usize = 1;
74
75/// Upward probes must improve measured goodput by at least 5%.
76const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
77
78/// Downward probes are accepted if goodput stays within 2% of the best.
79const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
80
81/// After rejecting a probe, wait a couple of epochs before trying again.
82const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
83
84/// At a stable best cap, periodically probe the neighbor again so the
85/// controller can adapt when machine/network conditions change.
86const HILL_STABLE_PROBE_EPOCHS: usize = 3;
87
88/// Stress cuts fetch concurrency in half.
89const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
90
91/// Fetch goodput epochs should cover complete concurrency waves. A
92/// fixed sample window can unfairly compare a full lower-cap wave with
93/// a partial higher-cap wave.
94const HILL_EPOCH_FULL_WAVES: usize = 2;
95
96/// Lock helper matching the project pattern (see `cache::ChunkCache`):
97/// poisoned mutexes still yield the inner state rather than panicking.
98fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
99    m.lock().unwrap_or_else(PoisonError::into_inner)
100}
101
102/// Outcome of a single observed operation on one channel.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum Outcome {
105    /// Completed successfully.
106    Success,
107    /// Did not complete within the per-op timeout.
108    Timeout,
109    /// Failed with a network/transport error (refused, reset, unreachable).
110    NetworkError,
111    /// Failed with an application-level error not attributable to the
112    /// network (e.g. bad payment proof). Recorded but does not push the
113    /// controller down — it is not a capacity signal.
114    ApplicationError,
115}
116
117/// Lower bound on the `fetch` channel's adaptive cap.
118///
119/// AIMD will not shrink fetch concurrency below this even under
120/// sustained timeout pressure. Specific to fetch because residential
121/// downloads exhibit a noise floor of peer-side timeouts (NAT path
122/// issues, peers in the close group not storing the chunk) that look
123/// like client saturation to the controller, causing it to fully
124/// serialize and collapse throughput. Quote and store channels keep
125/// the global `min_concurrency` floor of 1.
126const FETCH_MIN_FLOOR: usize = 4;
127
128/// Per-channel concurrency ceilings. Each channel has its own cap so
129/// that constraining one (e.g. user pinned a low store concurrency for
130/// a slow uplink) never bleeds into another (download).
131#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
132pub struct ChannelMax {
133    pub quote: usize,
134    pub store: usize,
135    pub fetch: usize,
136}
137
138impl Default for ChannelMax {
139    fn default() -> Self {
140        // Generous ceilings that give the controller real headroom to
141        // grow on healthy connections. The cold-start values
142        // (`ChannelStart::default()`) are well below these so AIMD
143        // can actually do its job. Each ceiling is independent.
144        Self {
145            quote: 128,
146            store: 64,
147            fetch: 256,
148        }
149    }
150}
151
152/// Tunable knobs for the adaptive controller. Defaults are picked so
153/// that the controller behaves at least as well as the prior static
154/// defaults on a healthy network: starts at the previous static value
155/// and only deviates when signals demand it.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AdaptiveConfig {
158    /// Master switch. When `false`, channels report `initial` forever
159    /// and ignore observations. Useful for benchmarks / debugging.
160    pub enabled: bool,
161    /// Floor concurrency per channel. Never go below this.
162    pub min_concurrency: usize,
163    /// Per-channel ceiling concurrency. See `ChannelMax`.
164    pub max: ChannelMax,
165    /// Sliding window size in number of recent ops considered for
166    /// adaptation decisions.
167    pub window_ops: usize,
168    /// Below this count of outcomes in the window, hold steady.
169    pub min_window_ops: usize,
170    /// Required success rate to consider the window healthy. Healthy
171    /// windows trigger increase; unhealthy windows trigger decrease.
172    pub success_target: f64,
173    /// Timeout rate above which the window counts as stressed even if
174    /// the success rate would otherwise pass.
175    pub timeout_ceiling: f64,
176    /// p95 latency above `latency_inflation_factor * baseline` is a
177    /// stress signal. Baseline is an EWMA of healthy-window p95s.
178    pub latency_inflation_factor: f64,
179    /// EWMA smoothing factor for the latency baseline. 0 = never
180    /// updates, 1 = baseline = last sample. 0.2 trades responsiveness
181    /// for stability. Validated to `[0.0, 1.0]`; `NaN`/non-finite
182    /// values are sanitized to the default at controller construction.
183    pub latency_ewma_alpha: f64,
184}
185
186impl AdaptiveConfig {
187    /// Sanitize the config: clamp `latency_ewma_alpha` to `[0,1]`
188    /// (rejecting NaN/Inf which would otherwise panic in
189    /// `Duration::from_secs_f64`), enforce `min_concurrency >= 1`,
190    /// enforce per-channel max >= min_concurrency, enforce
191    /// `min_window_ops <= window_ops`. Idempotent.
192    pub fn sanitize(&mut self) {
193        if !self.latency_ewma_alpha.is_finite() {
194            self.latency_ewma_alpha = 0.2;
195        }
196        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
197        if !self.success_target.is_finite() {
198            self.success_target = 0.95;
199        }
200        self.success_target = self.success_target.clamp(0.0, 1.0);
201        if !self.timeout_ceiling.is_finite() {
202            self.timeout_ceiling = 0.10;
203        }
204        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
205        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
206            self.latency_inflation_factor = 4.0;
207        }
208        self.min_concurrency = self.min_concurrency.max(1);
209        self.window_ops = self.window_ops.max(1);
210        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
211        self.max.quote = self.max.quote.max(self.min_concurrency);
212        self.max.store = self.max.store.max(self.min_concurrency);
213        self.max.fetch = self.max.fetch.max(self.min_concurrency);
214    }
215}
216
217impl Default for AdaptiveConfig {
218    fn default() -> Self {
219        Self {
220            enabled: true,
221            min_concurrency: 1,
222            max: ChannelMax::default(),
223            window_ops: 32,
224            min_window_ops: 8,
225            success_target: 0.95,
226            timeout_ceiling: 0.10,
227            // p95 doubling is the normal signal on a per-chunk fetch with
228            // close-group fallback (one slow peer in a chunk's close group
229            // adds ~10s on top of a sub-second median); 2.0 mis-classified
230            // that as stress and halved the fetch cap mid-download. 4.0
231            // means p95 has to quadruple before we treat the network as
232            // degraded.
233            latency_inflation_factor: 4.0,
234            latency_ewma_alpha: 0.2,
235        }
236    }
237}
238
239/// Suggested starting concurrency per channel for a brand-new client
240/// with no persisted state:
241///
242/// - quote was statically 32 — start at 32.
243/// - store was statically 8 — start at 8.
244/// - fetch starts at 4, the residential-saturation floor validated
245///   after the old 64-wide cold burst saturated home links before
246///   any adaptive observations could land. The throughput hill
247///   climber then lets measured goodput justify growth on faster
248///   links.
249#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
250pub struct ChannelStart {
251    pub quote: usize,
252    pub store: usize,
253    pub fetch: usize,
254}
255
256impl Default for ChannelStart {
257    fn default() -> Self {
258        Self {
259            quote: 32,
260            store: 8,
261            fetch: FETCH_COLD_START_CONCURRENCY,
262        }
263    }
264}
265
266/// One observed sample retained in the sliding window.
267#[derive(Debug, Clone, Copy)]
268struct Sample {
269    outcome: Outcome,
270    latency: Duration,
271}
272
273/// Limiter adaptation strategy. Kept out of `LimiterConfig` so external
274/// config literals and persisted JSON do not grow a migration surface.
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276enum LimiterAlgorithm {
277    Aimd,
278    ThroughputHillClimb,
279}
280
281/// Direction of an active hill-climb probe.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum ProbeDirection {
284    Up,
285    Down,
286}
287
288/// Epoch-local stats for the throughput hill climber.
289#[derive(Debug)]
290struct HillClimbState {
291    epoch_started: Option<Instant>,
292    epoch_samples: usize,
293    epoch_successes: usize,
294    epoch_timeouts: usize,
295    epoch_net_errors: usize,
296    epoch_bytes: u64,
297    epoch_latencies: Vec<Duration>,
298    best_goodput_per_sec: Option<f64>,
299    best_latency_p95: Option<Duration>,
300    best_concurrency: usize,
301    stable_epochs: usize,
302    cooldown_epochs: usize,
303    next_probe: ProbeDirection,
304    active_probe: Option<ProbeDirection>,
305}
306
307impl HillClimbState {
308    fn new(start: usize, epoch_capacity: usize) -> Self {
309        Self {
310            epoch_started: None,
311            epoch_samples: 0,
312            epoch_successes: 0,
313            epoch_timeouts: 0,
314            epoch_net_errors: 0,
315            epoch_bytes: 0,
316            epoch_latencies: Vec::with_capacity(epoch_capacity),
317            best_goodput_per_sec: None,
318            best_latency_p95: None,
319            best_concurrency: start,
320            stable_epochs: 0,
321            cooldown_epochs: 0,
322            next_probe: ProbeDirection::Up,
323            active_probe: None,
324        }
325    }
326
327    fn reset_epoch(&mut self) {
328        self.epoch_started = None;
329        self.epoch_samples = 0;
330        self.epoch_successes = 0;
331        self.epoch_timeouts = 0;
332        self.epoch_net_errors = 0;
333        self.epoch_bytes = 0;
334        self.epoch_latencies.clear();
335    }
336
337    fn capacity_total(&self) -> usize {
338        self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
339    }
340}
341
342/// Per-limiter configuration. Carries the shared adaptive parameters
343/// plus the channel-specific `max_concurrency`. Held behind an `Arc`
344/// so cloning a `Limiter` is a refcount bump rather than a struct copy
345/// (avoids allocating `AdaptiveConfig`-worth of bytes per chunk in
346/// hot loops).
347#[derive(Debug, Clone)]
348pub struct LimiterConfig {
349    pub enabled: bool,
350    pub min_concurrency: usize,
351    pub max_concurrency: usize,
352    pub window_ops: usize,
353    pub min_window_ops: usize,
354    pub success_target: f64,
355    pub timeout_ceiling: f64,
356    pub latency_inflation_factor: f64,
357    pub latency_ewma_alpha: f64,
358    /// While `current < slow_start_ramp_threshold`, a Decrease halves
359    /// the cap but does NOT permanently exit slow-start — the next
360    /// healthy window can double the cap back up. Above the threshold,
361    /// a Decrease exits slow-start and the controller transitions to
362    /// classic AIMD (+1 per healthy window).
363    ///
364    /// 0 (the default) reproduces the original behaviour: any Decrease
365    /// at any cap permanently exits slow-start. The fetch channel sets
366    /// this to its `max_concurrency` so download concurrency keeps
367    /// doubling toward the ceiling instead of crawling +1 per window —
368    /// additive growth simply cannot reach a useful cap on a
369    /// fast-but-lossy link before the file finishes. See
370    /// `AdaptiveController::new`.
371    pub slow_start_ramp_threshold: usize,
372    /// When `false`, the p95-latency-vs-baseline comparison never
373    /// triggers a Decrease (it still updates the baseline). The fetch
374    /// channel disables it because `chunk_get`'s observed latency
375    /// includes the internal retry sleep and slow retry-sweep for the
376    /// chunks that needed one, so a window with a couple of retry-path
377    /// chunks has a wildly inflated p95 that is retry variance, not
378    /// congestion. Genuine fetch congestion still surfaces as a rising
379    /// `Ok(None)` (Timeout) rate, which the timeout_ceiling check
380    /// catches.
381    pub latency_decrease_enabled: bool,
382    /// When `true`, a Decrease does NOT reset `samples_since_increase`, so
383    /// evidence already accrued toward the next Increase survives a transient
384    /// Decrease instead of being zeroed.
385    ///
386    /// The default (`false`) reproduces the original asymmetric gate: an
387    /// Increase needs a full fresh `window_ops` of samples, a Decrease needs
388    /// only `min_window_ops`, AND a Decrease resets the increase counter. On a
389    /// channel where decreases fire faster than a full increase window can
390    /// accrue, that combination permanently starves growth — the store cap sat
391    /// pinned at its cold-start floor of 8 for a whole 530-chunk upload
392    /// (V2-554). The store channel sets this `true` so a few-percent shortfall
393    /// rate can no longer deny the cap its next doubling: the accrued increase
394    /// credit persists across the transient Decrease, and the next genuinely
395    /// healthy window (`evaluate` returns Increase) can act on it. Sustained
396    /// unhealthiness still holds the cap down because `evaluate` keeps
397    /// returning Decrease — this only stops isolated dips from zeroing progress.
398    pub retain_increase_credit_on_decrease: bool,
399}
400
401impl LimiterConfig {
402    fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
403        Self {
404            enabled: cfg.enabled,
405            min_concurrency: cfg.min_concurrency,
406            max_concurrency: max_for_channel.max(cfg.min_concurrency),
407            window_ops: cfg.window_ops,
408            min_window_ops: cfg.min_window_ops,
409            success_target: cfg.success_target,
410            timeout_ceiling: cfg.timeout_ceiling,
411            latency_inflation_factor: cfg.latency_inflation_factor,
412            latency_ewma_alpha: cfg.latency_ewma_alpha,
413            // Defaults preserve the original AIMD behaviour; the fetch and
414            // store channels override these in `AdaptiveController::new`.
415            slow_start_ramp_threshold: 0,
416            latency_decrease_enabled: true,
417            retain_increase_credit_on_decrease: false,
418        }
419    }
420
421    /// Sanitize a directly-constructed `LimiterConfig`. External
422    /// callers (or tests) that build a `LimiterConfig` literal with
423    /// hostile values (`NaN`, sub-floor mins, inverted bounds) are
424    /// protected — `Limiter::new` calls this on every construction
425    /// so the controller never holds NaN or out-of-range floats.
426    fn sanitize(&mut self) {
427        if !self.latency_ewma_alpha.is_finite() {
428            self.latency_ewma_alpha = 0.2;
429        }
430        self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
431        if !self.success_target.is_finite() {
432            self.success_target = 0.95;
433        }
434        self.success_target = self.success_target.clamp(0.0, 1.0);
435        if !self.timeout_ceiling.is_finite() {
436            self.timeout_ceiling = 0.10;
437        }
438        self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
439        if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
440            self.latency_inflation_factor = 4.0;
441        }
442        self.min_concurrency = self.min_concurrency.max(1);
443        self.window_ops = self.window_ops.max(1);
444        self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
445        self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
446    }
447}
448
449/// Per-channel adaptive limiter.
450///
451/// Cheap to clone — both fields are `Arc`. Pass clones into hot loops;
452/// do not hold the lock across `.await` points (call sites observe
453/// with short critical sections only).
454#[derive(Debug, Clone)]
455pub struct Limiter {
456    inner: Arc<Mutex<LimiterInner>>,
457    config: Arc<LimiterConfig>,
458    algorithm: LimiterAlgorithm,
459}
460
461#[derive(Debug)]
462struct LimiterInner {
463    /// Current concurrency cap returned by `current()`.
464    current: usize,
465    /// Sliding window of recent outcomes.
466    window: VecDeque<Sample>,
467    /// Samples observed since the last increase. Increases require a
468    /// fresh window's worth of evidence to avoid ramping on every
469    /// individual healthy sample.
470    samples_since_increase: usize,
471    /// Samples observed since the last decrease. Decreases require
472    /// `min_window_ops` of fresh evidence to avoid pile-driving the
473    /// cap to floor on a single bad burst when many in-flight ops all
474    /// observe stress nearly simultaneously.
475    samples_since_decrease: usize,
476    /// EWMA of p95 latency from past healthy windows. `None` until
477    /// the first healthy window completes.
478    latency_baseline: Option<Duration>,
479    /// `true` once we have observed a stress signal at least once.
480    /// Slow-start mode ends permanently after first stress.
481    left_slow_start: bool,
482    /// Fetch-only throughput optimizer state. Present for every limiter
483    /// to keep `Limiter` cheap to clone and avoid an enum around the
484    /// whole inner struct.
485    hill: HillClimbState,
486}
487
488impl Limiter {
489    /// Create a new limiter starting at `start`, clamped into
490    /// `[min_concurrency, max_concurrency]`. Sanitizes the config to
491    /// guard against directly-constructed `LimiterConfig` literals
492    /// with hostile float values (`NaN`, etc).
493    #[must_use]
494    pub fn new(start: usize, config: LimiterConfig) -> Self {
495        Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
496    }
497
498    fn new_with_algorithm(
499        start: usize,
500        config: LimiterConfig,
501        algorithm: LimiterAlgorithm,
502    ) -> Self {
503        let mut config = config;
504        config.sanitize();
505        let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
506        let window_cap = config.window_ops;
507        Self {
508            inner: Arc::new(Mutex::new(LimiterInner {
509                current: clamped,
510                window: VecDeque::with_capacity(window_cap),
511                samples_since_increase: 0,
512                samples_since_decrease: 0,
513                latency_baseline: None,
514                left_slow_start: false,
515                hill: HillClimbState::new(clamped, window_cap),
516            })),
517            config: Arc::new(config),
518            algorithm,
519        }
520    }
521
522    /// Snapshot current concurrency cap. Hot-path call: the value may
523    /// change between this call and the next, but consumers
524    /// (`buffer_unordered(n)`) capture it once per pipeline build.
525    #[must_use]
526    pub fn current(&self) -> usize {
527        lock(&self.inner).current
528    }
529
530    /// Record one observed operation. Updates the sliding window and
531    /// re-evaluates the cap if the window is full enough.
532    pub fn observe(&self, outcome: Outcome, latency: Duration) {
533        self.observe_with_bytes(outcome, latency, 0);
534    }
535
536    /// Record one observed operation with a payload byte count. Bytes
537    /// are used by the fetch hill climber; AIMD channels ignore them.
538    pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
539        let observed_at = Instant::now();
540        let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
541        self.observe_with_timing(outcome, latency, bytes, operation_started);
542    }
543
544    fn observe_with_timing(
545        &self,
546        outcome: Outcome,
547        latency: Duration,
548        bytes: u64,
549        operation_started: Instant,
550    ) {
551        if !self.config.enabled {
552            return;
553        }
554        let mut g = lock(&self.inner);
555        if g.window.len() == self.config.window_ops {
556            g.window.pop_front();
557        }
558        g.window.push_back(Sample { outcome, latency });
559        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
560            observe_hill_climb(
561                &mut g,
562                outcome,
563                latency,
564                bytes,
565                operation_started,
566                &self.config,
567            );
568            return;
569        }
570        g.samples_since_increase = g.samples_since_increase.saturating_add(1);
571        g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
572        if g.window.len() < self.config.min_window_ops {
573            return;
574        }
575        let decision = evaluate(&g.window, &self.config, g.latency_baseline);
576        apply_decision(&mut g, decision, &self.config);
577    }
578
579    /// Replace the current cap with `start`, clamped. Used for warm
580    /// loads from persisted state. Does not clear the sliding window —
581    /// fresh observations remain authoritative for adaptation
582    /// decisions.
583    ///
584    /// Slow-start state after a warm load depends on the channel's
585    /// `slow_start_ramp_threshold`:
586    ///
587    /// - Default (threshold 0, i.e. quote/store): mark slow-start as
588    ///   already-left so a single healthy window doesn't *double* a
589    ///   learned warm value — an over-aggressive jump. Subsequent
590    ///   increases are +1 per healthy window.
591    /// - Protected (threshold > clamped, i.e. fetch below the ceiling):
592    ///   keep slow-start armed. This is critical for the CLI usage
593    ///   pattern where every `ant file download` is a fresh process
594    ///   that warm-starts from the snapshot: if warm_start always
595    ///   exited slow-start, the fetch cap could only ever grow
596    ///   additively from the persisted value, which cannot climb back
597    ///   to the ceiling against an intermittent Decrease trickle (the
598    ///   exact pin-at-~20 behaviour observed on a fast-but-lossy VPS).
599    ///   Keeping slow-start armed lets the cap double back toward the
600    ///   capacity the connection can actually sustain.
601    pub fn warm_start(&self, start: usize) {
602        let clamped = start.clamp(
603            self.config.min_concurrency,
604            self.config.max_concurrency.max(1),
605        );
606        let mut g = lock(&self.inner);
607        g.current = clamped;
608        g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
609        g.hill = HillClimbState::new(clamped, self.config.window_ops);
610    }
611
612    /// Snapshot of the current cap for persistence. Cheap, lock-only.
613    #[must_use]
614    pub fn snapshot(&self) -> usize {
615        let g = lock(&self.inner);
616        if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
617            g.hill.best_concurrency
618        } else {
619            g.current
620        }
621    }
622}
623
624#[derive(Debug, Clone, Copy)]
625struct HillEpochStats {
626    goodput_per_sec: f64,
627    latency_p95: Option<Duration>,
628}
629
630/// Outcome of evaluating one window.
631#[derive(Debug, Clone, Copy, PartialEq, Eq)]
632enum Decision {
633    /// Healthy window — increase concurrency.
634    Increase,
635    /// Stressed window — decrease concurrency.
636    Decrease,
637    /// Inconclusive — hold steady (e.g. mixed signals, baseline not yet set).
638    Hold,
639}
640
641fn evaluate(
642    window: &VecDeque<Sample>,
643    cfg: &LimiterConfig,
644    baseline: Option<Duration>,
645) -> Decision {
646    // Capacity-relevant denominator: ApplicationError outcomes are
647    // explicitly NOT capacity signals (per `Outcome` docs) and are
648    // excluded from rate calculations. A wave of `AlreadyStored`
649    // errors must not punish concurrency.
650    let mut successes = 0usize;
651    let mut timeouts = 0usize;
652    let mut net_errors = 0usize;
653    let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
654    for s in window {
655        match s.outcome {
656            Outcome::Success => {
657                successes += 1;
658                latencies.push(s.latency);
659            }
660            Outcome::Timeout => timeouts += 1,
661            Outcome::NetworkError => net_errors += 1,
662            Outcome::ApplicationError => {}
663        }
664    }
665    let capacity_total = successes + timeouts + net_errors;
666    if capacity_total < cfg.min_window_ops {
667        // Not enough capacity-relevant evidence to act. Hold.
668        return Decision::Hold;
669    }
670    let total_f = capacity_total as f64;
671    let success_rate = successes as f64 / total_f;
672    let timeout_rate = timeouts as f64 / total_f;
673
674    if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
675        return Decision::Decrease;
676    }
677
678    if let Some(p95) = p95_of(&mut latencies) {
679        if cfg.latency_decrease_enabled {
680            if let Some(base) = baseline {
681                let limit = base.mul_f64(cfg.latency_inflation_factor);
682                if p95 > limit {
683                    return Decision::Decrease;
684                }
685            }
686        }
687        Decision::Increase
688    } else {
689        Decision::Hold
690    }
691}
692
693fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
694    match decision {
695        Decision::Increase => {
696            // Gate increases on accumulating a fresh window's worth of
697            // evidence since the last bump.
698            if inner.samples_since_increase < cfg.window_ops {
699                return;
700            }
701            let p95 = window_p95(&inner.window);
702            inner.latency_baseline = Some(match inner.latency_baseline {
703                None => p95,
704                Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
705            });
706            let next = if inner.left_slow_start {
707                inner.current.saturating_add(1)
708            } else {
709                inner.current.saturating_mul(2)
710            };
711            let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
712            if next != inner.current {
713                debug!(
714                    from = inner.current,
715                    to = next,
716                    slow_start = !inner.left_slow_start,
717                    "adaptive: increase",
718                );
719            }
720            inner.current = next;
721            inner.samples_since_increase = 0;
722            inner.samples_since_decrease = 0;
723        }
724        Decision::Decrease => {
725            // Gate decreases on `min_window_ops` of fresh evidence
726            // since the last decrease so a burst of concurrent
727            // observations from in-flight ops can't pile-drive the
728            // cap from N to 1 in a few back-to-back ticks.
729            if inner.samples_since_decrease < cfg.min_window_ops {
730                return;
731            }
732            // Below the ramp threshold we still halve (responsiveness is
733            // preserved) but keep slow-start armed, so the next healthy
734            // window can double the cap back up rather than crawling +1.
735            // Above the threshold a Decrease is the signal to settle into
736            // classic AIMD. With threshold=0 (quote/store) this is the
737            // original behaviour: any Decrease exits slow-start.
738            if inner.current >= cfg.slow_start_ramp_threshold {
739                inner.left_slow_start = true;
740            }
741            let next = (inner.current / 2).max(cfg.min_concurrency);
742            if next != inner.current {
743                debug!(from = inner.current, to = next, "adaptive: decrease");
744            }
745            inner.current = next;
746            // Retaining the increase credit (store channel, V2-554) keeps the
747            // evidence accrued toward the next Increase alive across a transient
748            // Decrease so a few-percent shortfall rate can't permanently starve
749            // growth. The default zeroes it, requiring a full fresh window.
750            if !cfg.retain_increase_credit_on_decrease {
751                inner.samples_since_increase = 0;
752            }
753            inner.samples_since_decrease = 0;
754        }
755        Decision::Hold => {}
756    }
757}
758
759/// p95 of a mutable slice of Durations. Sorts in place. Returns
760/// `None` for an empty slice. Index choice: `ceil(len * 0.95) - 1`,
761/// floored at 0, capped at `len - 1`.
762fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
763    if latencies.is_empty() {
764        return None;
765    }
766    latencies.sort_unstable();
767    let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
768    let idx = idx.saturating_sub(1).min(latencies.len() - 1);
769    latencies.get(idx).copied()
770}
771
772fn window_p95(window: &VecDeque<Sample>) -> Duration {
773    let mut latencies: Vec<Duration> = window
774        .iter()
775        .filter(|s| matches!(s.outcome, Outcome::Success))
776        .map(|s| s.latency)
777        .collect();
778    p95_of(&mut latencies).unwrap_or(Duration::ZERO)
779}
780
781fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
782    let alpha = if alpha.is_finite() {
783        alpha.clamp(0.0, 1.0)
784    } else {
785        return prev;
786    };
787    let prev_ms = prev.as_secs_f64() * 1000.0;
788    let sample_ms = sample.as_secs_f64() * 1000.0;
789    let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
790    if !new_ms.is_finite() || new_ms < 0.0 {
791        return prev;
792    }
793    Duration::from_secs_f64(new_ms / 1000.0)
794}
795
796fn observe_hill_climb(
797    inner: &mut LimiterInner,
798    outcome: Outcome,
799    latency: Duration,
800    bytes: u64,
801    operation_started: Instant,
802    cfg: &LimiterConfig,
803) {
804    match inner.hill.epoch_started {
805        Some(epoch_started) if epoch_started <= operation_started => {}
806        _ => inner.hill.epoch_started = Some(operation_started),
807    }
808    inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
809    match outcome {
810        Outcome::Success => {
811            inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
812            inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
813            inner.hill.epoch_latencies.push(latency);
814        }
815        Outcome::Timeout => {
816            inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
817        }
818        Outcome::NetworkError => {
819            inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
820        }
821        Outcome::ApplicationError => {}
822    }
823
824    if hill_epoch_stressed(&inner.hill, cfg) {
825        apply_hill_stress(inner, cfg);
826        return;
827    }
828
829    if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
830        return;
831    }
832
833    if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
834        apply_hill_epoch(inner, stats, cfg);
835    }
836    inner.hill.reset_epoch();
837}
838
839fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
840    cfg.window_ops
841        .max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
842        .max(cfg.min_window_ops)
843}
844
845fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
846    let capacity_total = hill.capacity_total();
847    if capacity_total < cfg.min_window_ops {
848        return false;
849    }
850    let total_f = capacity_total as f64;
851    let success_rate = hill.epoch_successes as f64 / total_f;
852    let timeout_rate = hill.epoch_timeouts as f64 / total_f;
853    success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
854}
855
856fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
857    let capacity_total = hill.capacity_total();
858    if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
859        return None;
860    }
861    let mut latencies = hill.epoch_latencies.clone();
862    let latency_p95 = p95_of(&mut latencies);
863    let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
864    let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
865    let elapsed = wall_elapsed.max(max_latency);
866    let elapsed_secs = elapsed.as_secs_f64();
867    if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
868        return None;
869    }
870
871    // Unit fallback keeps direct unit tests that call `observe(Success, ..)`
872    // meaningful; real download paths report bytes.
873    let units = if hill.epoch_bytes > 0 {
874        hill.epoch_bytes as f64
875    } else {
876        hill.epoch_successes as f64
877    };
878    Some(HillEpochStats {
879        goodput_per_sec: units / elapsed_secs,
880        latency_p95,
881    })
882}
883
884fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
885    let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
886        .max(cfg.min_concurrency)
887        .min(cfg.max_concurrency);
888    if next != inner.current {
889        debug!(
890            from = inner.current,
891            to = next,
892            "adaptive: fetch hill stress decrease"
893        );
894    }
895    inner.current = next;
896    inner.hill.best_concurrency = next;
897    inner.hill.best_goodput_per_sec = None;
898    inner.hill.best_latency_p95 = None;
899    inner.hill.stable_epochs = 0;
900    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
901    inner.hill.active_probe = None;
902    inner.hill.next_probe = ProbeDirection::Up;
903    inner.hill.reset_epoch();
904}
905
906fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
907    let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
908        inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
909        inner.hill.best_latency_p95 = stats.latency_p95;
910        inner.hill.best_concurrency = inner.current;
911        probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
912        return;
913    };
914
915    match inner.hill.active_probe {
916        Some(ProbeDirection::Up) => {
917            let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
918            if improved
919                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
920            {
921                accept_hill_probe(inner, stats, cfg);
922                probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
923            } else {
924                reject_hill_probe(inner);
925            }
926        }
927        Some(ProbeDirection::Down) => {
928            let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
929            if retained
930                && hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
931            {
932                accept_hill_probe(inner, stats, cfg);
933                inner.hill.next_probe = ProbeDirection::Up;
934            } else {
935                reject_hill_probe(inner);
936            }
937        }
938        None => {
939            refresh_hill_best(inner, stats, cfg);
940            if inner.hill.cooldown_epochs > 0 {
941                inner.hill.cooldown_epochs -= 1;
942                return;
943            }
944            inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
945            if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
946                let direction = inner.hill.next_probe;
947                inner.hill.next_probe = match direction {
948                    ProbeDirection::Up => ProbeDirection::Down,
949                    ProbeDirection::Down => ProbeDirection::Up,
950                };
951                probe_hill_neighbor(inner, direction, cfg);
952            }
953        }
954    }
955}
956
957fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
958    inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
959        Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
960        None => stats.goodput_per_sec,
961    });
962    if let Some(latency_p95) = stats.latency_p95 {
963        inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
964            Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
965            None => latency_p95,
966        });
967    }
968}
969
970fn hill_latency_acceptable(
971    candidate: Option<Duration>,
972    best: Option<Duration>,
973    cfg: &LimiterConfig,
974) -> bool {
975    match (candidate, best) {
976        (Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
977        _ => true,
978    }
979}
980
981fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
982    let alpha = if alpha.is_finite() {
983        alpha.clamp(0.0, 1.0)
984    } else {
985        return prev;
986    };
987    let next = (1.0 - alpha) * prev + alpha * sample;
988    if next.is_finite() && next >= 0.0 {
989        next
990    } else {
991        prev
992    }
993}
994
995fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
996    debug!(
997        concurrency = inner.current,
998        goodput_per_sec = stats.goodput_per_sec,
999        "adaptive: fetch hill accepted probe"
1000    );
1001    inner.hill.best_concurrency = inner.current;
1002    inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
1003    inner.hill.best_latency_p95 = stats.latency_p95;
1004    inner.hill.active_probe = None;
1005    inner.hill.cooldown_epochs = 0;
1006    inner.hill.stable_epochs = 0;
1007    inner.current = inner
1008        .hill
1009        .best_concurrency
1010        .clamp(cfg.min_concurrency, cfg.max_concurrency);
1011}
1012
1013fn reject_hill_probe(inner: &mut LimiterInner) {
1014    let from = inner.current;
1015    let to = inner.hill.best_concurrency;
1016    let rejected_direction = inner.hill.active_probe;
1017    if from != to {
1018        debug!(from, to, "adaptive: fetch hill rejected probe");
1019    }
1020    inner.current = to;
1021    inner.hill.active_probe = None;
1022    if let Some(direction) = rejected_direction {
1023        inner.hill.next_probe = match direction {
1024            ProbeDirection::Up => ProbeDirection::Down,
1025            ProbeDirection::Down => ProbeDirection::Up,
1026        };
1027    }
1028    inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
1029    inner.hill.stable_epochs = 0;
1030}
1031
1032fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
1033    let best = inner.hill.best_concurrency;
1034    let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
1035    let candidate = match direction {
1036        ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
1037        ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
1038    };
1039    if candidate == best {
1040        inner.current = best;
1041        inner.hill.active_probe = None;
1042        inner.hill.stable_epochs = 0;
1043        return;
1044    }
1045    debug!(
1046        from = best,
1047        to = candidate,
1048        ?direction,
1049        "adaptive: fetch hill probing"
1050    );
1051    inner.current = candidate;
1052    inner.hill.active_probe = Some(direction);
1053    inner.hill.stable_epochs = 0;
1054}
1055
1056/// Bundle of per-channel limiters owned by the `Client`.
1057#[derive(Debug, Clone)]
1058pub struct AdaptiveController {
1059    pub quote: Limiter,
1060    pub store: Limiter,
1061    pub fetch: Limiter,
1062    /// `pub(crate)` so external callers cannot mutate this
1063    /// post-construction. Each `Limiter` snapshots its own
1064    /// `Arc<LimiterConfig>` at construction time, so external
1065    /// mutation here would silently desync `warm_start`'s
1066    /// `enabled` check from the limiters' frozen copies. Read via
1067    /// `config()`.
1068    pub(crate) config: AdaptiveConfig,
1069    /// Per-instance cold-start values. `warm_start` floors snapshot
1070    /// values against THIS, not the global `ChannelStart::default()`,
1071    /// so a controller built with custom (e.g. low) starts stays
1072    /// faithful to its construction parameters. Constructed-once,
1073    /// never mutated.
1074    cold_start: ChannelStart,
1075}
1076
1077impl AdaptiveController {
1078    /// Create a controller with cold-start values per channel.
1079    /// Sanitizes the config (NaN guards, floor/ceiling enforcement)
1080    /// before constructing limiters. The supplied `start` is captured
1081    /// as the per-instance cold-start floor for `warm_start`.
1082    #[must_use]
1083    pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
1084        let mut config = config;
1085        config.sanitize();
1086        let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
1087        let mut store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
1088        // Store-channel growth/decision tuning (V2-468). The store limiter
1089        // starts at 8 (correct — deliberately low for low-bandwidth uplinks)
1090        // but on the merkle upload path its health signals are polluted by two
1091        // things that are NOT local-capacity signals, so it never ramps and
1092        // gets crushed to a +1-per-window crawl. Both are the structural twin
1093        // of the fetch-channel overrides below (verification variance instead
1094        // of retry variance); the cold-start floor is deliberately untouched.
1095        //
1096        // - Disable the p95-latency Decrease. Node-side PUT latency is
1097        //   dominated by the ~28s synchronous merkle closeness lookup, giving a
1098        //   client-observed p95/median of ~3-6x that straddles
1099        //   `latency_inflation_factor` (4.0) and trips Decrease even though
1100        //   nothing about it is local congestion. Genuine store congestion
1101        //   still surfaces via the timeout-rate ceiling.
1102        // - Never exit slow-start. With the default threshold 0, any single
1103        //   Decrease at any cap permanently drops the store cap to additive
1104        //   +1-per-healthy-window growth, which cannot reach a useful cap
1105        //   before a file finishes (843 chunks stuck at effective ~5-9 in the
1106        //   PROD-UL-01 incident). `usize::MAX` keeps slow-start armed at every
1107        //   cap, so a transient Decrease still halves but the next healthy
1108        //   window doubles it back instead of condemning the rest of the file
1109        //   to a crawl. See the fetch override and `LimiterConfig` field docs.
1110        store_cfg.latency_decrease_enabled = false;
1111        store_cfg.slow_start_ramp_threshold = usize::MAX;
1112        // Break the asymmetric gate that pinned the store cap at its cold-start
1113        // floor (V2-554). Two coupled changes, both scoped to store:
1114        //
1115        // 1. Retain the increase credit across a Decrease. The default gate
1116        //    needs a full fresh `window_ops` of samples to earn an Increase but
1117        //    only `min_window_ops` to fire a Decrease, AND a Decrease zeroes the
1118        //    increase counter — so an isolated dip discards accrued growth.
1119        //    Retaining it lets the next healthy window act on that evidence.
1120        //
1121        // 2. Relax `success_target` from the global 0.95 to 0.88. Retaining the
1122        //    credit is not enough on its own: a Decrease still fires every
1123        //    `min_window_ops` (8) samples while eligible but an Increase only
1124        //    every `window_ops` (32), a 4:1 firing ratio that lets a sustained
1125        //    few-percent shortfall outpace growth and crush the cap regardless.
1126        //    At 0.95 just 2 shortfalls in a 32-op window (0.9375) trip a
1127        //    Decrease — normal per-chunk close-group noise. At 0.88 a Decrease
1128        //    needs ~4 shortfalls in 32 (~12%), so a few-percent shortfall reads
1129        //    as a healthy window and the cap can ramp. Genuine congestion (a
1130        //    larger shortfall rate, or the unchanged `timeout_ceiling`) still
1131        //    cuts the cap. Quote keeps the classic 0.95 gate.
1132        store_cfg.retain_increase_credit_on_decrease = true;
1133        store_cfg.success_target = 0.88;
1134        let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
1135        // Lift the fetch channel's floor above the global
1136        // `min_concurrency`. Reasoning is specific to download: on
1137        // residential links, residual peer-side timeouts (NAT path
1138        // issues, peers in the close group that don't store the chunk,
1139        // peers under temporary load) continuously push the
1140        // controller's timeout_rate above ceiling. A global floor of 1
1141        // means the controller fully serializes chunk fetches on that
1142        // noise floor and gets stuck — observed on PROD-LOCAL-DL-03
1143        // where the download stayed stable but throughput collapsed to
1144        // ~330 KB/s on a multi-MB/s link.
1145        //
1146        // 4 is the smallest floor that keeps the download from fully
1147        // serializing; it also matches the validated cold-start floor.
1148        // Floor `quote` and `store` separately if a corresponding
1149        // pathology is identified for them; today's evidence is
1150        // download-only.
1151        fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
1152        // Re-establish max >= min after the bump in case the channel
1153        // ceiling was somehow lower than the new floor.
1154        fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
1155        // Download-specific growth/decision tuning (see the field docs
1156        // on `LimiterConfig`):
1157        //
1158        // - Never exit slow-start. Classic AIMD additive growth (+1 per
1159        //   healthy window) cannot reach a useful cap from a low base
1160        //   before a multi-GB file finishes — a fast-but-lossy
1161        //   connection (e.g. a VPS with a steady ~4% close-group-
1162        //   exhaustion trickle) was observed stuck at cap ~13-24 across
1163        //   36 files because every transient Decrease permanently
1164        //   dropped it to additive growth. `usize::MAX` keeps slow-start
1165        //   armed at every cap including the ceiling, so a Decrease
1166        //   (e.g. 256 -> 128) still halves but the next healthy window
1167        //   doubles it back. The cap therefore tracks the connection's
1168        //   real capacity instead of crawling, and a single transient
1169        //   Decrease near the ceiling can't re-pin the link to additive
1170        //   recovery. (A threshold == max_concurrency would NOT achieve
1171        //   this: `current >= threshold` is true at the ceiling, so a
1172        //   Decrease there would exit slow-start.)
1173        // - Disable the p95-latency Decrease. chunk_get's observed
1174        //   latency includes the internal retry sleep + slow retry
1175        //   sweep for chunks that needed one, so a window with a couple
1176        //   of retry-path chunks shows a hugely inflated p95 that is
1177        //   retry variance, not congestion. Genuine fetch congestion
1178        //   still drives Decrease via the Ok(None) -> Timeout rate.
1179        fetch_cfg.slow_start_ramp_threshold = usize::MAX;
1180        fetch_cfg.latency_decrease_enabled = false;
1181        Self {
1182            quote: Limiter::new(start.quote, quote_cfg),
1183            store: Limiter::new(start.store, store_cfg),
1184            fetch: Limiter::new_with_algorithm(
1185                start.fetch,
1186                fetch_cfg,
1187                LimiterAlgorithm::ThroughputHillClimb,
1188            ),
1189            config,
1190            cold_start: start,
1191        }
1192    }
1193
1194    /// Snapshot current per-channel caps for persistence.
1195    #[must_use]
1196    pub fn snapshot(&self) -> ChannelStart {
1197        ChannelStart {
1198            quote: self.quote.snapshot(),
1199            store: self.store.snapshot(),
1200            fetch: self.fetch.snapshot(),
1201        }
1202    }
1203
1204    /// Read-only access to the controller's adaptive config. Made
1205    /// read-only deliberately: each `Limiter` snapshots its own
1206    /// `Arc<LimiterConfig>` at construction, so post-hoc mutation
1207    /// would silently desync `warm_start`'s `enabled` check from
1208    /// the limiters' frozen copies.
1209    #[must_use]
1210    pub fn config(&self) -> &AdaptiveConfig {
1211        &self.config
1212    }
1213
1214    /// Apply a previously-saved snapshot as the warm-start cap.
1215    ///
1216    /// The effective warm value per channel is
1217    /// `max(snapshot, self.cold_start)` — flooring at the
1218    /// per-instance cold-start (NOT the global default) so:
1219    /// 1. A prior bad run that pinned cap=1 doesn't pessimize this
1220    ///    run forever.
1221    /// 2. A controller built with custom (e.g. low) cold starts for
1222    ///    benchmarking is not silently jumped above its construction
1223    ///    parameters.
1224    ///
1225    /// Does not clear sliding windows. When `enabled = false`, this
1226    /// is a no-op — fixed-concurrency mode means fixed-concurrency.
1227    pub fn warm_start(&self, snapshot: ChannelStart) {
1228        if !self.config.enabled {
1229            return;
1230        }
1231        self.quote
1232            .warm_start(snapshot.quote.max(self.cold_start.quote));
1233        self.store
1234            .warm_start(snapshot.store.max(self.cold_start.store));
1235        self.fetch
1236            .warm_start(snapshot.fetch.max(self.cold_start.fetch));
1237    }
1238}
1239
1240impl Default for AdaptiveController {
1241    fn default() -> Self {
1242        Self::new(ChannelStart::default(), AdaptiveConfig::default())
1243    }
1244}
1245
1246/// Cancel-on-drop guard: if the wrapping future is dropped before
1247/// completion, record no outcome. We don't synthesize a Cancelled
1248/// signal because (a) dropped work was never observed by the network
1249/// and (b) injecting fake outcomes would skew the sliding window
1250/// after a fail-fast burst. The intentional behavior is "silent on
1251/// cancel, observe on completion" — callers that need to keep
1252/// fail-fast batches drained for full signal use `rebucketed`.
1253struct ObserveGuard<'a> {
1254    limiter: &'a Limiter,
1255    started: Instant,
1256    outcome: Option<(Outcome, Duration, u64)>,
1257}
1258
1259impl<'a> ObserveGuard<'a> {
1260    fn new(limiter: &'a Limiter) -> Self {
1261        Self {
1262            limiter,
1263            started: Instant::now(),
1264            outcome: None,
1265        }
1266    }
1267    fn finish(&mut self, outcome: Outcome) {
1268        self.finish_with_bytes(outcome, 0);
1269    }
1270
1271    fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
1272        self.outcome = Some((outcome, self.started.elapsed(), bytes));
1273    }
1274}
1275
1276impl Drop for ObserveGuard<'_> {
1277    fn drop(&mut self) {
1278        if let Some((outcome, latency, bytes)) = self.outcome.take() {
1279            self.limiter
1280                .observe_with_timing(outcome, latency, bytes, self.started);
1281        }
1282    }
1283}
1284
1285/// Helper for instrumented call sites: time an async op, classify the
1286/// result, and report to a `Limiter`. Returns the original result.
1287///
1288/// ## Cancellation safety
1289///
1290/// Uses an internal `ObserveGuard` so the recorded outcome is
1291/// committed via `Drop` after the inner future returns. If the
1292/// wrapper future is itself dropped before `op().await` resolves
1293/// (caller cancellation, `buffer_unordered` fail-fast), no outcome
1294/// is recorded — this is intentional, see the guard's docs.
1295///
1296/// ```ignore
1297/// let res = observe_op(&controller.store, || async { do_put().await }, classify_put_err).await;
1298/// ```
1299pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
1300where
1301    F: FnOnce() -> Fut,
1302    Fut: std::future::Future<Output = Result<T, E>>,
1303    C: FnOnce(&E) -> Outcome,
1304{
1305    let mut guard = ObserveGuard::new(limiter);
1306    let result = op().await;
1307    let outcome = match &result {
1308        Ok(_) => Outcome::Success,
1309        Err(e) => classify(e),
1310    };
1311    guard.finish(outcome);
1312    drop(guard); // commit observation explicitly so it lands before return
1313    result
1314}
1315
1316/// Byte-aware variant of [`observe_op`] for fetch paths. The success
1317/// byte extractor is called only for `Ok` results; errors still carry
1318/// zero bytes and are classified by the provided function.
1319pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
1320    limiter: &Limiter,
1321    op: F,
1322    classify: C,
1323    success_bytes: B,
1324) -> Result<T, E>
1325where
1326    F: FnOnce() -> Fut,
1327    Fut: std::future::Future<Output = Result<T, E>>,
1328    C: FnOnce(&E) -> Outcome,
1329    B: FnOnce(&T) -> u64,
1330{
1331    let mut guard = ObserveGuard::new(limiter);
1332    let result = op().await;
1333    match &result {
1334        Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
1335        Err(e) => guard.finish_with_bytes(classify(e), 0),
1336    }
1337    drop(guard);
1338    result
1339}
1340
1341/// Process an iterator of items with a rolling scheduler whose cap
1342/// is re-read from the limiter as each slot frees. Replaces the
1343/// "snapshot the cap once at pipeline build" behavior of plain
1344/// `buffer_unordered(N)` so a long pipeline (e.g. 10 GB download =
1345/// ~2500 chunks) sees adaptive growth/decay mid-flight.
1346///
1347/// Output is unordered (first-completion). For an ordered result
1348/// (e.g. `data_download` feeds chunks in DataMap order to
1349/// self_encryption decrypt), wrap items with their index and sort
1350/// after collection — see `rebucketed_ordered`.
1351///
1352/// On error: in-flight work drains to completion (so observed
1353/// outcomes still feed the controller) but no new launches happen.
1354/// The first error is preserved; later errors are discarded.
1355pub async fn rebucketed_unordered<I, T, E, F, Fut>(
1356    limiter: &Limiter,
1357    items: I,
1358    mut op: F,
1359) -> Result<Vec<T>, E>
1360where
1361    I: IntoIterator,
1362    F: FnMut(I::Item) -> Fut,
1363    Fut: std::future::Future<Output = Result<T, E>>,
1364{
1365    let mut iter = items.into_iter().peekable();
1366    let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
1367    let mut results = Vec::new();
1368    let mut pending_err: Option<E> = None;
1369    loop {
1370        // Refill: re-read the cap and launch up to `cap - in_flight.len()`
1371        // new items, but only if we are not already in error-stop.
1372        if pending_err.is_none() {
1373            let cap = limiter.current().max(1);
1374            while in_flight.len() < cap {
1375                match iter.next() {
1376                    Some(item) => in_flight.push(op(item)),
1377                    None => break,
1378                }
1379            }
1380        }
1381        if in_flight.is_empty() {
1382            break;
1383        }
1384        match in_flight.next().await {
1385            Some(Ok(v)) => results.push(v),
1386            Some(Err(e)) => {
1387                if pending_err.is_none() {
1388                    pending_err = Some(e);
1389                }
1390            }
1391            None => break,
1392        }
1393    }
1394    match pending_err {
1395        Some(e) => Err(e),
1396        None => Ok(results),
1397    }
1398}
1399
1400/// Ordered variant: items are tagged with a usize index by the
1401/// caller (typically by `iter.enumerate()`); after rolling
1402/// completion, results are sorted by index so output preserves
1403/// input order. Use this for callers that pass to APIs which
1404/// consume positionally (e.g. self_encryption's
1405/// `get_root_data_map_parallel` zips `Vec<(idx, Bytes)>` with input
1406/// hashes positionally and discards the idx — without a final sort
1407/// the bytes pair with the wrong hashes).
1408///
1409/// `op` is `FnMut(Item) -> Fut` where `Item` carries whatever
1410/// payload the caller needs; the closure must return
1411/// `Result<(usize, U), E>` so the wrapper can sort by the index.
1412pub async fn rebucketed_ordered<I, U, E, F, Fut>(
1413    limiter: &Limiter,
1414    items: I,
1415    op: F,
1416) -> Result<Vec<U>, E>
1417where
1418    I: IntoIterator,
1419    F: FnMut(I::Item) -> Fut,
1420    Fut: std::future::Future<Output = Result<(usize, U), E>>,
1421{
1422    let mut indexed = rebucketed_unordered(limiter, items, op).await?;
1423    indexed.sort_by_key(|(idx, _)| *idx);
1424    Ok(indexed.into_iter().map(|(_, v)| v).collect())
1425}
1426
1427/// Backward-compatible wrapper. `ordered = false` -> rolling
1428/// unordered. `ordered = true` -> the OLD batch-fence ordered path
1429/// (kept for tests that explicitly assert batch-fence semantics).
1430/// New call sites should use `rebucketed_unordered` or
1431/// `rebucketed_ordered` directly.
1432pub async fn rebucketed<I, T, E, F, Fut>(
1433    limiter: &Limiter,
1434    items: I,
1435    ordered: bool,
1436    mut op: F,
1437) -> Result<Vec<T>, E>
1438where
1439    I: IntoIterator,
1440    F: FnMut(I::Item) -> Fut,
1441    Fut: std::future::Future<Output = Result<T, E>>,
1442{
1443    if !ordered {
1444        return rebucketed_unordered(limiter, items, op).await;
1445    }
1446    let mut iter = items.into_iter();
1447    let mut results = Vec::new();
1448    let mut pending_err: Option<E> = None;
1449    loop {
1450        if pending_err.is_some() {
1451            break;
1452        }
1453        let cap = limiter.current().max(1);
1454        let mut batch = Vec::with_capacity(cap);
1455        for item in iter.by_ref().take(cap) {
1456            batch.push(op(item));
1457        }
1458        if batch.is_empty() {
1459            break;
1460        }
1461        let mut s = stream::iter(batch).buffered(cap);
1462        while let Some(r) = s.next().await {
1463            match r {
1464                Ok(v) => results.push(v),
1465                Err(e) => {
1466                    if pending_err.is_none() {
1467                        pending_err = Some(e);
1468                    }
1469                }
1470            }
1471        }
1472    }
1473    match pending_err {
1474        Some(e) => Err(e),
1475        None => Ok(results),
1476    }
1477}
1478
1479/// On-disk shape for the persisted adaptive state. Versioned so we
1480/// can evolve the controller without crashing on stale files — an
1481/// unknown future schema version simply causes a silent fallback to
1482/// cold defaults.
1483#[derive(Debug, Clone, Serialize, Deserialize)]
1484struct PersistedState {
1485    schema: u32,
1486    channels: ChannelStart,
1487}
1488
1489const PERSIST_SCHEMA: u32 = 2;
1490const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
1491const PERSIST_FILENAME: &str = "client_adaptive.json";
1492
1493/// Default persistence path: `<data_dir>/client_adaptive.json`. Falls
1494/// back to `None` if the platform data dir is not resolvable; in that
1495/// case the controller still works, it just won't persist.
1496#[must_use]
1497pub fn default_persist_path() -> Option<PathBuf> {
1498    crate::config::data_dir()
1499        .ok()
1500        .map(|d| d.join(PERSIST_FILENAME))
1501}
1502
1503/// Load a persisted snapshot from disk, returning `None` if the file
1504/// does not exist, is unreadable, contains malformed JSON, or has a
1505/// schema version this build does not understand. Persistence is best
1506/// effort — never propagate errors that would block the user's
1507/// operation.
1508#[must_use]
1509pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
1510    let bytes = std::fs::read(path).ok()?;
1511    let state: PersistedState = match serde_json::from_slice(&bytes) {
1512        Ok(s) => s,
1513        Err(e) => {
1514            warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
1515            return None;
1516        }
1517    };
1518    match state.schema {
1519        PERSIST_SCHEMA => Some(state.channels),
1520        PERSIST_SCHEMA_AIMD_FETCH => {
1521            debug!(
1522                path = %path.display(),
1523                "adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
1524            );
1525            Some(ChannelStart {
1526                fetch: FETCH_COLD_START_CONCURRENCY,
1527                ..state.channels
1528            })
1529        }
1530        schema => {
1531            debug!(
1532                path = %path.display(),
1533                schema,
1534                expected = PERSIST_SCHEMA,
1535                "adaptive: snapshot schema mismatch, ignoring",
1536            );
1537            None
1538        }
1539    }
1540}
1541
1542/// Save a snapshot to disk atomically (write to `<path>.tmp`, then
1543/// rename). Best effort — failures are logged at warn and discarded.
1544pub fn save_snapshot(path: &Path, channels: ChannelStart) {
1545    let state = PersistedState {
1546        schema: PERSIST_SCHEMA,
1547        channels,
1548    };
1549    let bytes = match serde_json::to_vec_pretty(&state) {
1550        Ok(b) => b,
1551        Err(e) => {
1552            warn!(error = %e, "adaptive: snapshot serialize failed");
1553            return;
1554        }
1555    };
1556    if let Some(parent) = path.parent() {
1557        if let Err(e) = std::fs::create_dir_all(parent) {
1558            warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
1559            return;
1560        }
1561    }
1562    // Unique-per-save temp filename: PID + monotonic counter +
1563    // nanosecond timestamp guarantees no collision between concurrent
1564    // CLI invocations OR concurrent save_snapshot calls within one
1565    // process (e.g. multiple Client instances sharing the same data
1566    // dir). POSIX rename is atomic on the destination, so the rename
1567    // target overlap is fine — last writer wins.
1568    let nanos = std::time::SystemTime::now()
1569        .duration_since(std::time::UNIX_EPOCH)
1570        .map(|d| d.subsec_nanos())
1571        .unwrap_or(0);
1572    let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
1573    let tmp = path.with_extension(format!(
1574        "json.tmp.{}.{}.{}",
1575        std::process::id(),
1576        counter,
1577        nanos
1578    ));
1579    if let Err(e) = std::fs::write(&tmp, &bytes) {
1580        warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
1581        return;
1582    }
1583    if let Err(e) = std::fs::rename(&tmp, path) {
1584        warn!(
1585            from = %tmp.display(),
1586            to = %path.display(),
1587            error = %e,
1588            "adaptive: snapshot rename failed",
1589        );
1590        // Try to clean up the temp on rename failure so we don't
1591        // leave junk in the data dir. Best effort.
1592        let _ = std::fs::remove_file(&tmp);
1593    }
1594}
1595
1596/// Save with a wall-clock deadline. Spawns the synchronous
1597/// `save_snapshot` on a detached thread and waits up to `timeout`
1598/// for it to finish. If the thread is still running past the
1599/// deadline (e.g. because the data dir is on a hung NFS mount),
1600/// returns without joining — the OS will clean up the thread when
1601/// the process exits.
1602///
1603/// Used by `Client::drop` so a stalled filesystem cannot block
1604/// process shutdown indefinitely.
1605pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
1606    let handle = std::thread::spawn(move || {
1607        save_snapshot(&path, channels);
1608    });
1609    // Park briefly waiting for the thread, polling its status. We
1610    // use a short polling interval rather than `join()` because
1611    // join() blocks indefinitely.
1612    let started = Instant::now();
1613    let poll = Duration::from_millis(5);
1614    while started.elapsed() < timeout {
1615        if handle.is_finished() {
1616            let _ = handle.join();
1617            return;
1618        }
1619        std::thread::sleep(poll);
1620    }
1621    // Deadline elapsed. Detach the thread; it will continue to run
1622    // in the background until process exit (its work is best-effort
1623    // anyway). Log so operators can see the slow filesystem.
1624    warn!(
1625        timeout_ms = timeout.as_millis() as u64,
1626        "adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
1627    );
1628    drop(handle);
1629}
1630
1631#[cfg(test)]
1632#[allow(clippy::unwrap_used)]
1633mod tests {
1634    use super::*;
1635
1636    const HILL_TEST_START_CAP: usize = 16;
1637    const HILL_TEST_UP_PROBE_CAP: usize = 20;
1638    const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
1639    const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
1640    const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
1641    const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
1642    const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
1643    const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
1644    const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
1645
1646    fn cfg_for_tests() -> LimiterConfig {
1647        LimiterConfig {
1648            enabled: true,
1649            min_concurrency: 1,
1650            max_concurrency: 64,
1651            window_ops: 10,
1652            min_window_ops: 5,
1653            success_target: 0.9,
1654            timeout_ceiling: 0.2,
1655            latency_inflation_factor: 2.0,
1656            latency_ewma_alpha: 0.5,
1657            slow_start_ramp_threshold: 0,
1658            latency_decrease_enabled: true,
1659            retain_increase_credit_on_decrease: false,
1660        }
1661    }
1662
1663    fn hill_cfg_for_tests() -> LimiterConfig {
1664        LimiterConfig {
1665            window_ops: 4,
1666            min_window_ops: 2,
1667            max_concurrency: 64,
1668            success_target: 0.9,
1669            timeout_ceiling: 0.2,
1670            ..cfg_for_tests()
1671        }
1672    }
1673
1674    fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
1675        Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
1676    }
1677
1678    fn observe_hill_success_epoch_with_latency(
1679        limiter: &Limiter,
1680        cfg: &LimiterConfig,
1681        bytes: u64,
1682        latency: Duration,
1683    ) {
1684        let samples = hill_epoch_target_samples(limiter.current(), cfg);
1685        for _ in 0..samples {
1686            limiter.observe_with_bytes(Outcome::Success, latency, bytes);
1687        }
1688    }
1689
1690    fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
1691        observe_hill_success_epoch_with_latency(
1692            limiter,
1693            cfg,
1694            bytes,
1695            Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
1696        );
1697    }
1698
1699    /// Build an `AdaptiveConfig` for tests that need to construct a
1700    /// full `AdaptiveController`. Mirrors `cfg_for_tests()` defaults
1701    /// where they overlap, plus per-channel max derived from the same
1702    /// `max_concurrency` value.
1703    fn adaptive_cfg_for_tests() -> AdaptiveConfig {
1704        let l = cfg_for_tests();
1705        AdaptiveConfig {
1706            enabled: l.enabled,
1707            min_concurrency: l.min_concurrency,
1708            max: ChannelMax {
1709                quote: l.max_concurrency,
1710                store: l.max_concurrency,
1711                fetch: l.max_concurrency,
1712            },
1713            window_ops: l.window_ops,
1714            min_window_ops: l.min_window_ops,
1715            success_target: l.success_target,
1716            timeout_ceiling: l.timeout_ceiling,
1717            latency_inflation_factor: l.latency_inflation_factor,
1718            latency_ewma_alpha: l.latency_ewma_alpha,
1719        }
1720    }
1721
1722    #[test]
1723    fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
1724        // Regression guard for the CLI multi-file pattern: each
1725        // `ant file download` is a fresh process that warm-starts from
1726        // the persisted snapshot. If warm_start exited slow-start, the
1727        // fetch cap could only grow additively from the warm value and
1728        // could never climb back to the ceiling against an intermittent
1729        // Decrease trickle. A protected limiter (threshold == max) that
1730        // warm-starts BELOW the ceiling must keep slow-start armed so it
1731        // doubles back up.
1732        let cfg = LimiterConfig {
1733            max_concurrency: 256,
1734            slow_start_ramp_threshold: 256,
1735            latency_decrease_enabled: false,
1736            ..cfg_for_tests()
1737        };
1738        let l = Limiter::new(64, cfg.clone());
1739        l.warm_start(20);
1740        assert_eq!(l.current(), 20);
1741        // A single healthy window should DOUBLE (slow-start armed),
1742        // proving warm_start did not exit slow-start.
1743        for _ in 0..cfg.window_ops {
1744            l.observe(Outcome::Success, Duration::from_millis(10));
1745        }
1746        assert_eq!(
1747            l.current(),
1748            40,
1749            "protected channel must double after warm_start, not crawl +1",
1750        );
1751
1752        // Default channel (threshold 0): warm_start exits slow-start,
1753        // so the same window only adds 1.
1754        let default_cfg = LimiterConfig {
1755            max_concurrency: 256,
1756            ..cfg_for_tests()
1757        };
1758        let d = Limiter::new(64, default_cfg.clone());
1759        d.warm_start(20);
1760        for _ in 0..default_cfg.window_ops {
1761            d.observe(Outcome::Success, Duration::from_millis(10));
1762        }
1763        assert_eq!(
1764            d.current(),
1765            21,
1766            "default channel must stay additive after warm_start",
1767        );
1768    }
1769
1770    #[test]
1771    fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
1772        // Regression for the "lost protection at the ceiling" bug.
1773        // threshold == usize::MAX (the fetch setting) keeps slow-start
1774        // armed even when a Decrease fires at the ceiling, so the cap
1775        // doubles back. threshold == max_concurrency (the buggy
1776        // setting) would exit slow-start there — `current >= threshold`
1777        // is true at the ceiling — and recover only additively. After
1778        // identical stress-at-ceiling + recovery, the MAX-threshold
1779        // limiter must end strictly higher.
1780        let base = LimiterConfig {
1781            max_concurrency: 256,
1782            latency_decrease_enabled: false,
1783            ..cfg_for_tests()
1784        };
1785        let fixed = Limiter::new(
1786            256,
1787            LimiterConfig {
1788                slow_start_ramp_threshold: usize::MAX,
1789                ..base.clone()
1790            },
1791        );
1792        let buggy = Limiter::new(
1793            256,
1794            LimiterConfig {
1795                slow_start_ramp_threshold: 256,
1796                ..base.clone()
1797            },
1798        );
1799        for l in [&fixed, &buggy] {
1800            for _ in 0..base.window_ops {
1801                l.observe(Outcome::Timeout, Duration::from_millis(10));
1802            }
1803            for _ in 0..(base.window_ops * 10) {
1804                l.observe(Outcome::Success, Duration::from_millis(10));
1805            }
1806        }
1807        assert!(
1808            fixed.current() > buggy.current(),
1809            "MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
1810            fixed.current(),
1811            buggy.current(),
1812        );
1813    }
1814
1815    #[test]
1816    fn protected_slow_start_recovers_faster_than_additive() {
1817        // After identical stress + recovery, a limiter with slow-start
1818        // protected to the ceiling (fetch behaviour) must end at a
1819        // higher cap than one that exits slow-start on first Decrease
1820        // (quote/store behaviour): doubling outpaces +1-per-window.
1821        let base = LimiterConfig {
1822            max_concurrency: 256,
1823            latency_decrease_enabled: false,
1824            ..cfg_for_tests()
1825        };
1826        let protected = Limiter::new(
1827            64,
1828            LimiterConfig {
1829                slow_start_ramp_threshold: 256,
1830                ..base.clone()
1831            },
1832        );
1833        let unprotected = Limiter::new(
1834            64,
1835            LimiterConfig {
1836                slow_start_ramp_threshold: 0,
1837                ..base.clone()
1838            },
1839        );
1840
1841        // Identical stress: a window of timeouts forces decreases on both.
1842        for l in [&protected, &unprotected] {
1843            for _ in 0..base.window_ops {
1844                l.observe(Outcome::Timeout, Duration::from_millis(10));
1845            }
1846        }
1847        // Identical recovery: a long stretch of healthy windows. The
1848        // protected limiter doubles each window; the unprotected one
1849        // only adds 1.
1850        for l in [&protected, &unprotected] {
1851            for _ in 0..(base.window_ops * 10) {
1852                l.observe(Outcome::Success, Duration::from_millis(10));
1853            }
1854        }
1855        assert!(
1856            protected.current() > unprotected.current(),
1857            "protected slow-start ({}) should recover faster than additive ({})",
1858            protected.current(),
1859            unprotected.current(),
1860        );
1861    }
1862
1863    #[test]
1864    fn latency_decrease_disabled_ignores_p95_inflation() {
1865        // With latency_decrease_enabled=false, a window of successes
1866        // whose p95 latency is far above the baseline must NOT trigger
1867        // a Decrease — only success/timeout rate can. (Fetch disables
1868        // this because chunk_get's observed latency is polluted by
1869        // retry-path variance.)
1870        let cfg = LimiterConfig {
1871            max_concurrency: 256,
1872            slow_start_ramp_threshold: 256,
1873            latency_decrease_enabled: false,
1874            ..cfg_for_tests()
1875        };
1876        let l = Limiter::new(16, cfg.clone());
1877        // Establish a fast baseline.
1878        for _ in 0..cfg.window_ops {
1879            l.observe(Outcome::Success, Duration::from_millis(5));
1880        }
1881        let after_baseline = l.current();
1882        // Now a window of successes with 100x the latency. With the
1883        // latency check disabled this is still a healthy window, so the
1884        // cap must not drop.
1885        for _ in 0..cfg.window_ops {
1886            l.observe(Outcome::Success, Duration::from_millis(500));
1887        }
1888        assert!(
1889            l.current() >= after_baseline,
1890            "latency inflation must not shrink the cap when the check is disabled: {} < {}",
1891            l.current(),
1892            after_baseline,
1893        );
1894    }
1895
1896    #[test]
1897    fn controller_sets_fetch_channel_download_tuning() {
1898        // AdaptiveController::new must apply the slow-start /
1899        // latency-decrease tuning to fetch AND store (V2-468), leaving
1900        // quote on classic AIMD.
1901        let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
1902        assert!(
1903            !c.fetch.config.latency_decrease_enabled,
1904            "fetch latency-decrease must be disabled",
1905        );
1906        assert_eq!(
1907            c.fetch.config.slow_start_ramp_threshold,
1908            usize::MAX,
1909            "fetch slow-start must never exit (armed at every cap incl. ceiling)",
1910        );
1911        assert!(
1912            c.quote.config.latency_decrease_enabled,
1913            "quote must keep the latency-decrease check",
1914        );
1915        assert_eq!(
1916            c.quote.config.slow_start_ramp_threshold, 0,
1917            "quote must keep classic AIMD slow-start exit",
1918        );
1919        assert!(
1920            !c.quote.config.retain_increase_credit_on_decrease,
1921            "quote must keep the classic gate (Decrease resets the increase counter)",
1922        );
1923        assert!(
1924            c.store.config.retain_increase_credit_on_decrease,
1925            "store must retain increase credit across a Decrease (V2-554)",
1926        );
1927        assert!(
1928            (c.store.config.success_target - 0.88).abs() < f64::EPSILON,
1929            "store must relax success_target to 0.88 so a few-percent shortfall still ramps (V2-554), got {}",
1930            c.store.config.success_target,
1931        );
1932        assert!(
1933            (c.quote.config.success_target - c.config().success_target).abs() < f64::EPSILON,
1934            "quote must keep the global success_target",
1935        );
1936        // Store now mirrors fetch on these two knobs: node-side merkle
1937        // verification latency is not local congestion, and a transient
1938        // Decrease must not condemn the cap to a +1-per-window crawl.
1939        assert!(
1940            !c.store.config.latency_decrease_enabled,
1941            "store latency-decrease must be disabled (verification variance is not congestion)",
1942        );
1943        assert_eq!(
1944            c.store.config.slow_start_ramp_threshold,
1945            usize::MAX,
1946            "store slow-start must never exit so a transient Decrease re-doubles",
1947        );
1948        // The store floor must stay at the cold-start value — V2-468 does
1949        // NOT change the floor, only the polluted ramp/decrease signals.
1950        assert_eq!(
1951            c.store.current(),
1952            ChannelStart::default().store,
1953            "store cold-start floor must remain unchanged at 8",
1954        );
1955    }
1956
1957    #[test]
1958    fn store_channel_ramps_and_recovers_under_v2_468_tuning() {
1959        // End-to-end on the real `controller.store` limiter: with the
1960        // V2-468 tuning, (a) verification-latency p95 inflation alone must
1961        // not shrink the cap, (b) a genuine timeout burst still cuts it,
1962        // and (c) the cap re-doubles on the next healthy window instead of
1963        // crawling +1 (slow-start stays armed).
1964        let mut adaptive = adaptive_cfg_for_tests();
1965        // Give the store channel real headroom to ramp.
1966        adaptive.max.store = 256;
1967        let c = AdaptiveController::new(
1968            ChannelStart {
1969                quote: 8,
1970                store: 8,
1971                fetch: 8,
1972            },
1973            adaptive,
1974        );
1975        let store = &c.store;
1976        let win = c.config().window_ops;
1977
1978        // (a) Establish a fast baseline, then a window of slow successes
1979        // (the ~28s verification tail). The cap must not drop.
1980        for _ in 0..win {
1981            store.observe(Outcome::Success, Duration::from_millis(5));
1982        }
1983        let after_baseline = store.current();
1984        assert!(after_baseline >= 8, "store should ramp on healthy windows");
1985        for _ in 0..win {
1986            store.observe(Outcome::Success, Duration::from_secs(30));
1987        }
1988        assert!(
1989            store.current() >= after_baseline,
1990            "verification-latency p95 must not shrink store cap: {} < {}",
1991            store.current(),
1992            after_baseline,
1993        );
1994
1995        // (b) A genuine local-congestion timeout burst must still cut it.
1996        let before_stress = store.current();
1997        for _ in 0..win {
1998            store.observe(Outcome::Timeout, Duration::from_millis(50));
1999        }
2000        let after_stress = store.current();
2001        assert!(
2002            after_stress < before_stress,
2003            "timeout-rate breach must still cut the store cap: {after_stress} !< {before_stress}",
2004        );
2005
2006        // (c) Slow-start stays armed, so healthy windows re-DOUBLE the cap
2007        // back to where it was instead of crawling +1 per window. Over this
2008        // many windows additive +1 recovery could not climb back to
2009        // `before_stress` from the stressed floor — only multiplicative
2010        // doubling can — so reaching it proves the crawl pathology is gone.
2011        for _ in 0..(win * 8) {
2012            store.observe(Outcome::Success, Duration::from_millis(5));
2013        }
2014        assert!(
2015            store.current() >= before_stress,
2016            "store must re-double back to {before_stress} after a transient Decrease, got {}",
2017            store.current(),
2018        );
2019    }
2020
2021    #[test]
2022    fn store_application_rejections_do_not_move_cap() {
2023        // The merkle incident's 397 remote app-rejections (now classified
2024        // ApplicationError via `Error::RemotePut`) must not push the store
2025        // cap down — they are not capacity signals.
2026        let mut adaptive = adaptive_cfg_for_tests();
2027        adaptive.max.store = 256;
2028        let c = AdaptiveController::new(
2029            ChannelStart {
2030                quote: 8,
2031                store: 8,
2032                fetch: 8,
2033            },
2034            adaptive,
2035        );
2036        let store = &c.store;
2037        let start = store.current();
2038        for _ in 0..(c.config().window_ops * 5) {
2039            store.observe(Outcome::ApplicationError, Duration::from_secs(30));
2040        }
2041        assert_eq!(
2042            store.current(),
2043            start,
2044            "remote app-rejections must not move the store cap",
2045        );
2046    }
2047
2048    #[test]
2049    fn store_gate_rebalance_ramps_where_classic_gate_pins() {
2050        // V2-554 gate rebalance, end-to-end. Two limiters driven by the SAME
2051        // borderline sequence — a steady ~5% shortfall (1 error per 20
2052        // successes), the normal per-chunk close-group noise floor:
2053        //
2054        // - `fixed`: the store tuning (success_target 0.88 + retain increase
2055        //   credit). A ~5% shortfall reads as a healthy window, so it ramps.
2056        // - `classic`: the pre-V2-554 store gate (success_target 0.95 + reset
2057        //   on Decrease). Just 2 shortfalls in a 32-op window (0.9375) trip a
2058        //   Decrease that fires 4x faster than an Increase can be earned and
2059        //   zeroes the increase credit, so the cap stays pinned at the floor.
2060        let build = |success_target: f64, retain: bool| {
2061            let mut cfg = cfg_for_tests();
2062            cfg.min_concurrency = 1;
2063            cfg.max_concurrency = 256;
2064            cfg.window_ops = 32;
2065            cfg.min_window_ops = 8;
2066            cfg.success_target = success_target;
2067            cfg.timeout_ceiling = 0.10;
2068            // Both stay slow-start-armed with no latency-decrease so the only
2069            // difference under test is the gate (target + retain).
2070            cfg.slow_start_ramp_threshold = usize::MAX;
2071            cfg.latency_decrease_enabled = false;
2072            cfg.retain_increase_credit_on_decrease = retain;
2073            Limiter::new(8, cfg)
2074        };
2075        let fixed = build(0.88, true);
2076        let classic = build(0.95, false);
2077
2078        for _ in 0..80 {
2079            for _ in 0..20 {
2080                fixed.observe(Outcome::Success, Duration::from_millis(5));
2081                classic.observe(Outcome::Success, Duration::from_millis(5));
2082            }
2083            fixed.observe(Outcome::NetworkError, Duration::from_millis(5));
2084            classic.observe(Outcome::NetworkError, Duration::from_millis(5));
2085        }
2086
2087        // The rebalanced gate ramps well off the cold-start floor of 8 under
2088        // the noise floor that pinned the classic gate.
2089        assert!(
2090            fixed.current() >= 32,
2091            "rebalanced store gate must ramp off the floor under ~5% shortfall, got {}",
2092            fixed.current(),
2093        );
2094        // The classic gate stays crushed at/near the floor on the same input —
2095        // this is the V2-554 pathology.
2096        assert!(
2097            classic.current() <= 8,
2098            "classic gate should stay pinned near the floor on the same input, got {}",
2099            classic.current(),
2100        );
2101        assert!(
2102            fixed.current() > classic.current(),
2103            "rebalanced gate {} must out-grow the classic gate {}",
2104            fixed.current(),
2105            classic.current(),
2106        );
2107    }
2108
2109    #[test]
2110    fn cold_start_clamps_into_bounds() {
2111        let cfg = cfg_for_tests();
2112        let l = Limiter::new(1000, cfg.clone());
2113        assert_eq!(l.current(), cfg.max_concurrency);
2114        let l = Limiter::new(0, cfg.clone());
2115        assert_eq!(l.current(), cfg.min_concurrency);
2116    }
2117
2118    #[test]
2119    fn slow_start_doubles_then_caps() {
2120        let cfg = cfg_for_tests();
2121        let l = Limiter::new(2, cfg.clone());
2122        // Feed a full healthy window — concurrency doubles.
2123        for _ in 0..cfg.window_ops {
2124            l.observe(Outcome::Success, Duration::from_millis(50));
2125        }
2126        assert_eq!(l.current(), 4);
2127        for _ in 0..cfg.window_ops {
2128            l.observe(Outcome::Success, Duration::from_millis(50));
2129        }
2130        assert_eq!(l.current(), 8);
2131    }
2132
2133    #[test]
2134    fn first_failure_exits_slow_start() {
2135        let cfg = cfg_for_tests();
2136        let l = Limiter::new(4, cfg.clone());
2137        // 6 successes + 4 timeouts in a window of 10. Decisions fire
2138        // per-sample once the window has min_window_ops entries, so
2139        // the four timeouts each drive Decrease. That floors the cap.
2140        for _ in 0..6 {
2141            l.observe(Outcome::Success, Duration::from_millis(50));
2142        }
2143        for _ in 0..4 {
2144            l.observe(Outcome::Timeout, Duration::from_millis(50));
2145        }
2146        let after_stress = l.current();
2147        assert!(
2148            after_stress < 4,
2149            "stress should reduce concurrency from 4, got {after_stress}",
2150        );
2151        // After exiting slow-start, recovery is +1 per fresh window,
2152        // not doubling. The first `window_ops` successes flush prior
2153        // timeouts out of the sliding window. Decreases now also need
2154        // `min_window_ops` of fresh evidence before re-firing, and
2155        // increases need `window_ops` of fresh evidence. Feed enough
2156        // successes to clear the window AND accumulate evidence for
2157        // multiple increases.
2158        for _ in 0..(cfg.window_ops * 5) {
2159            l.observe(Outcome::Success, Duration::from_millis(50));
2160        }
2161        assert!(
2162            l.current() > after_stress,
2163            "expected recovery above {after_stress}, got {}",
2164            l.current(),
2165        );
2166    }
2167
2168    #[test]
2169    fn floor_holds_at_one() {
2170        let cfg = cfg_for_tests();
2171        let l = Limiter::new(2, cfg);
2172        for _ in 0..30 {
2173            l.observe(Outcome::Timeout, Duration::from_millis(50));
2174        }
2175        assert_eq!(l.current(), 1);
2176    }
2177
2178    #[test]
2179    fn application_errors_do_not_punish() {
2180        let cfg = cfg_for_tests();
2181        let l = Limiter::new(4, cfg.clone());
2182        // ApplicationError is NOT a capacity signal (per `Outcome`
2183        // docs and the reviewer's M1 finding). A wave of e.g.
2184        // `AlreadyStored` errors must not lower concurrency, because
2185        // they say nothing about the network's ability to take more
2186        // load. Specifically: the controller should HOLD at 4 because
2187        // there are zero capacity-relevant samples to act on.
2188        for _ in 0..cfg.window_ops * 5 {
2189            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
2190        }
2191        assert_eq!(
2192            l.current(),
2193            4,
2194            "ApplicationError must not move the cap; got {}",
2195            l.current()
2196        );
2197    }
2198
2199    #[test]
2200    fn latency_inflation_triggers_decrease() {
2201        let cfg = LimiterConfig {
2202            window_ops: 20,
2203            min_window_ops: 5,
2204            ..cfg_for_tests()
2205        };
2206        let l = Limiter::new(4, cfg.clone());
2207        // Establish a baseline with many fast successes.
2208        for _ in 0..cfg.window_ops {
2209            l.observe(Outcome::Success, Duration::from_millis(50));
2210        }
2211        let after_baseline = l.current();
2212        // Now flood with slow successes — same outcome, 5x latency.
2213        for _ in 0..cfg.window_ops {
2214            l.observe(Outcome::Success, Duration::from_millis(500));
2215        }
2216        // Latency inflation > 2x baseline must drop concurrency.
2217        assert!(
2218            l.current() < after_baseline,
2219            "expected decrease from {after_baseline}, got {}",
2220            l.current(),
2221        );
2222    }
2223
2224    #[test]
2225    fn warm_start_overrides_current() {
2226        let cfg = cfg_for_tests();
2227        let l = Limiter::new(2, cfg);
2228        l.warm_start(20);
2229        assert_eq!(l.current(), 20);
2230    }
2231
2232    #[test]
2233    fn warm_start_clamps() {
2234        let cfg = cfg_for_tests();
2235        let l = Limiter::new(2, cfg.clone());
2236        l.warm_start(1_000_000);
2237        assert_eq!(l.current(), cfg.max_concurrency);
2238    }
2239
2240    #[test]
2241    fn disabled_controller_holds_steady() {
2242        let cfg = LimiterConfig {
2243            enabled: false,
2244            ..cfg_for_tests()
2245        };
2246        let l = Limiter::new(8, cfg);
2247        for _ in 0..50 {
2248            l.observe(Outcome::Timeout, Duration::from_millis(50));
2249        }
2250        assert_eq!(l.current(), 8);
2251    }
2252
2253    #[test]
2254    fn controller_snapshot_round_trips() {
2255        // The test cfg has max=64 for every channel (cfg_for_tests's
2256        // max_concurrency=64 -> ChannelMax::{quote: 64, store: 64, fetch: 64}).
2257        // Pick start values <= 64 so they survive cap clamping at
2258        // construction. Pick values >= cold-defaults (32/8/4) so they
2259        // also survive the warm-start floor.
2260        let c = AdaptiveController::new(
2261            ChannelStart {
2262                quote: 64,
2263                store: 16,
2264                fetch: 64,
2265            },
2266            adaptive_cfg_for_tests(),
2267        );
2268        let snap = c.snapshot();
2269        assert_eq!(snap.quote, 64);
2270        assert_eq!(snap.store, 16);
2271        assert_eq!(snap.fetch, 64);
2272
2273        let c2 = AdaptiveController::default();
2274        c2.warm_start(snap);
2275        assert_eq!(c2.quote.current(), 64);
2276        assert_eq!(c2.store.current(), 16);
2277        assert_eq!(c2.fetch.current(), 64);
2278    }
2279
2280    #[tokio::test]
2281    async fn observe_op_records_success() {
2282        let cfg = cfg_for_tests();
2283        let l = Limiter::new(4, cfg.clone());
2284        for _ in 0..cfg.window_ops {
2285            let _: Result<(), &str> =
2286                observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
2287        }
2288        // Healthy window from cold start doubles 4 -> 8.
2289        assert_eq!(l.current(), 8);
2290    }
2291
2292    #[test]
2293    fn snapshot_round_trips_through_disk() {
2294        let dir = tempfile::tempdir().unwrap();
2295        let path = dir.path().join("client_adaptive.json");
2296        let snap = ChannelStart {
2297            quote: 24,
2298            store: 6,
2299            fetch: 12,
2300        };
2301        save_snapshot(&path, snap);
2302        let loaded = load_snapshot(&path).unwrap();
2303        assert_eq!(loaded.quote, 24);
2304        assert_eq!(loaded.store, 6);
2305        assert_eq!(loaded.fetch, 12);
2306    }
2307
2308    #[test]
2309    fn load_missing_returns_none() {
2310        let dir = tempfile::tempdir().unwrap();
2311        let path = dir.path().join("does_not_exist.json");
2312        assert!(load_snapshot(&path).is_none());
2313    }
2314
2315    #[test]
2316    fn load_corrupt_returns_none() {
2317        let dir = tempfile::tempdir().unwrap();
2318        let path = dir.path().join("bad.json");
2319        std::fs::write(&path, b"not valid json{{{").unwrap();
2320        assert!(load_snapshot(&path).is_none());
2321    }
2322
2323    #[test]
2324    fn load_wrong_schema_returns_none() {
2325        let dir = tempfile::tempdir().unwrap();
2326        let path = dir.path().join("future.json");
2327        // Schema 999 is from a future build — current build must not
2328        // crash and must not act on it.
2329        let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
2330        std::fs::write(&path, payload).unwrap();
2331        assert!(load_snapshot(&path).is_none());
2332    }
2333
2334    #[test]
2335    fn load_schema_one_preserves_quote_store_and_resets_fetch() {
2336        const LEGACY_QUOTE_CAP: usize = 48;
2337        const LEGACY_STORE_CAP: usize = 24;
2338        const LEGACY_FETCH_CAP: usize = 96;
2339
2340        let dir = tempfile::tempdir().unwrap();
2341        let path = dir.path().join("legacy.json");
2342        let payload = format!(
2343            r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
2344            PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
2345        );
2346        std::fs::write(&path, payload).unwrap();
2347
2348        let loaded = load_snapshot(&path).unwrap();
2349
2350        assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
2351        assert_eq!(loaded.store, LEGACY_STORE_CAP);
2352        assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
2353    }
2354
2355    #[tokio::test]
2356    async fn observe_op_records_classified_error() {
2357        let cfg = cfg_for_tests();
2358        let l = Limiter::new(4, cfg.clone());
2359        for _ in 0..cfg.window_ops {
2360            let _: Result<(), &str> =
2361                observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
2362        }
2363        assert!(l.current() < 4);
2364    }
2365
2366    // ----- Adversarial / regression-guard tests below ---------------------
2367    //
2368    // These exist primarily to prove the controller never silently regresses
2369    // upload/download throughput and never panics under hostile workloads.
2370
2371    /// Cold-start defaults for quote/store must preserve the prior static
2372    /// knobs. Fetch intentionally starts at the validated residential floor
2373    /// because the throughput hill climber now has to prove that higher
2374    /// fan-out improves goodput.
2375    #[test]
2376    fn no_regression_cold_start_at_least_static_defaults() {
2377        let s = ChannelStart::default();
2378        assert!(
2379            s.quote >= 32,
2380            "quote cold-start regressed: got {}, prior static was 32",
2381            s.quote,
2382        );
2383        assert!(
2384            s.store >= 8,
2385            "store cold-start regressed: got {}, prior static was 8",
2386            s.store,
2387        );
2388        assert_eq!(
2389            s.fetch, FETCH_COLD_START_CONCURRENCY,
2390            "fetch cold-start changed unexpectedly: got {}, expected {}",
2391            s.fetch, FETCH_COLD_START_CONCURRENCY,
2392        );
2393    }
2394
2395    /// The production `AdaptiveController::default()` (NOT the test cfg)
2396    /// must come up reporting the cold-start values immediately, with no
2397    /// observations recorded.
2398    #[test]
2399    fn controller_default_config_is_sane() {
2400        let c = AdaptiveController::default();
2401        let starts = ChannelStart::default();
2402        assert_eq!(c.quote.current(), starts.quote);
2403        assert_eq!(c.store.current(), starts.store);
2404        assert_eq!(c.fetch.current(), starts.fetch);
2405        // No observations made yet — internal windows must be empty.
2406        assert_eq!(lock(&c.quote.inner).window.len(), 0);
2407        assert_eq!(lock(&c.store.inner).window.len(), 0);
2408        assert_eq!(lock(&c.fetch.inner).window.len(), 0);
2409    }
2410
2411    /// Mixed signals (every other op fails) must not pin the controller
2412    /// at the floor for the whole run. The cap should oscillate or settle
2413    /// somewhere above the floor — collapse to 1 forever would be a bug.
2414    #[test]
2415    fn alternating_success_failure_collapses_to_floor() {
2416        // 50% timeout rate is far above `timeout_ceiling` (0.2 in test
2417        // config), so the window is always stressed. The controller
2418        // MUST collapse to the floor, and once there must NEVER go
2419        // below it. Assert both invariants explicitly: floor reached
2420        // and floor held.
2421        let cfg = cfg_for_tests();
2422        let l = Limiter::new(8, cfg.clone());
2423        let mut min_observed = usize::MAX;
2424        let mut max_observed = 0usize;
2425        let mut floor_visits = 0usize;
2426        for i in 0..1000 {
2427            let outcome = if i % 2 == 0 {
2428                Outcome::Success
2429            } else {
2430                Outcome::Timeout
2431            };
2432            l.observe(outcome, Duration::from_millis(50));
2433            let cur = l.current();
2434            assert!(
2435                cur >= cfg.min_concurrency,
2436                "cap underflowed floor at iter {i}: got {cur}",
2437            );
2438            min_observed = min_observed.min(cur);
2439            max_observed = max_observed.max(cur);
2440            if cur == cfg.min_concurrency {
2441                floor_visits += 1;
2442            }
2443        }
2444        assert_eq!(
2445            min_observed, cfg.min_concurrency,
2446            "cap never reached the floor under 50% timeout rate"
2447        );
2448        assert!(
2449            max_observed >= 8,
2450            "cap never visited the start value: max_observed={max_observed}"
2451        );
2452        // Should spend MOST of the run at the floor — a single
2453        // healthy window is not enough to climb back from a 50% loss
2454        // environment.
2455        assert!(
2456            floor_visits > 500,
2457            "cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
2458        );
2459        assert_eq!(
2460            l.current(),
2461            cfg.min_concurrency,
2462            "controller did not settle at floor after 1000 alternations"
2463        );
2464    }
2465
2466    /// From the floor, a long stream of healthy successes must walk the
2467    /// cap all the way back up to `max_concurrency`. Otherwise transient
2468    /// stress on a slow link would permanently penalize throughput.
2469    #[test]
2470    fn pure_success_stream_recovers_to_max() {
2471        let cfg = cfg_for_tests();
2472        let l = Limiter::new(cfg.min_concurrency, cfg.clone());
2473        for _ in 0..10_000 {
2474            l.observe(Outcome::Success, Duration::from_millis(5));
2475        }
2476        assert_eq!(
2477            l.current(),
2478            cfg.max_concurrency,
2479            "expected recovery to max ({}), got {}",
2480            cfg.max_concurrency,
2481            l.current(),
2482        );
2483    }
2484
2485    /// Heavy stress drives the cap to the floor; subsequent recovery
2486    /// must climb meaningfully higher than the floor with enough healthy
2487    /// evidence. No "permanent floor" failure mode allowed.
2488    #[test]
2489    fn stress_then_heal_drives_floor_then_recovery() {
2490        let cfg = cfg_for_tests();
2491        let l = Limiter::new(8, cfg.clone());
2492        for _ in 0..100 {
2493            l.observe(Outcome::Timeout, Duration::from_millis(50));
2494        }
2495        let after_stress = l.current();
2496        assert_eq!(
2497            after_stress, cfg.min_concurrency,
2498            "stress should drive cap to floor, got {after_stress}",
2499        );
2500        for _ in 0..1_000 {
2501            l.observe(Outcome::Success, Duration::from_millis(10));
2502        }
2503        let after_heal = l.current();
2504        assert!(
2505            after_heal >= cfg.min_concurrency.saturating_add(4),
2506            "expected substantial recovery from floor, got {after_heal}",
2507        );
2508    }
2509
2510    /// The latency baseline must track actual workload latency. If it
2511    /// stayed pinned at `Duration::ZERO`, every healthy sample would
2512    /// look like infinite inflation and inflate the decrease rate.
2513    #[test]
2514    fn baseline_does_not_grow_unbounded_under_slow_links() {
2515        let cfg = cfg_for_tests();
2516        let l = Limiter::new(2, cfg.clone());
2517        for _ in 0..(cfg.window_ops * 10) {
2518            l.observe(Outcome::Success, Duration::from_millis(500));
2519        }
2520        let baseline = lock(&l.inner).latency_baseline;
2521        let base = baseline.expect("baseline should be set after many healthy windows");
2522        assert!(
2523            base > Duration::ZERO,
2524            "baseline must not stay at ZERO, got {base:?}",
2525        );
2526        // Within 2x of the actual latency: 250ms..=1000ms.
2527        let lo = Duration::from_millis(250);
2528        let hi = Duration::from_millis(1000);
2529        assert!(
2530            base >= lo && base <= hi,
2531            "baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
2532        );
2533    }
2534
2535    /// Until the first healthy window completes, the latency baseline
2536    /// stays `None` (so no false-inflation alarms). Decreases during the
2537    /// stress phase are driven purely by success/timeout rate, not by
2538    /// inflated p95 vs a phantom zero baseline.
2539    #[test]
2540    fn baseline_initialized_only_after_first_healthy_window() {
2541        let cfg = cfg_for_tests();
2542        let l = Limiter::new(8, cfg.clone());
2543        for _ in 0..50 {
2544            l.observe(Outcome::Timeout, Duration::from_millis(50));
2545        }
2546        // Without any healthy window, baseline must still be None.
2547        assert!(
2548            lock(&l.inner).latency_baseline.is_none(),
2549            "baseline must be None before any healthy window",
2550        );
2551        // Now drain healthy windows.
2552        for _ in 0..(cfg.window_ops * 5) {
2553            l.observe(Outcome::Success, Duration::from_millis(20));
2554        }
2555        let baseline = lock(&l.inner).latency_baseline;
2556        assert!(
2557            baseline.is_some(),
2558            "baseline must be Some after healthy windows",
2559        );
2560        let base = baseline.unwrap_or_default();
2561        assert!(
2562            base > Duration::ZERO,
2563            "baseline must reflect real latency, got {base:?}",
2564        );
2565    }
2566
2567    /// A torrent of timeouts must not underflow the cap. Sample at
2568    /// several depths to catch any wraparound.
2569    #[test]
2570    fn min_concurrency_floor_holds_under_torrent_of_errors() {
2571        let cfg = cfg_for_tests();
2572        let l = Limiter::new(8, cfg.clone());
2573        for i in 0..50_000 {
2574            l.observe(Outcome::Timeout, Duration::from_millis(50));
2575            if i == 100 || i == 1_000 || i == 49_999 {
2576                let cur = l.current();
2577                assert_eq!(
2578                    cur, cfg.min_concurrency,
2579                    "floor breached at iter {i}: got {cur}",
2580                );
2581            }
2582        }
2583    }
2584
2585    /// Mirror: a torrent of successes must not exceed `max_concurrency`.
2586    #[test]
2587    fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
2588        let cfg = cfg_for_tests();
2589        let start = cfg
2590            .max_concurrency
2591            .saturating_sub(1)
2592            .max(cfg.min_concurrency);
2593        let l = Limiter::new(start, cfg.clone());
2594        for i in 0..50_000 {
2595            l.observe(Outcome::Success, Duration::from_millis(5));
2596            if i == 100 || i == 1_000 || i == 49_999 {
2597                let cur = l.current();
2598                assert!(
2599                    cur <= cfg.max_concurrency,
2600                    "ceiling breached at iter {i}: got {cur} > {}",
2601                    cfg.max_concurrency,
2602                );
2603            }
2604        }
2605        assert_eq!(l.current(), cfg.max_concurrency);
2606    }
2607
2608    /// Slow-start doubles the cap; with `max_concurrency = usize::MAX/2`
2609    /// a naive `*2` would overflow. The controller must use saturating
2610    /// arithmetic and never panic. Also asserts the cap actually
2611    /// REACHED max — proving that "no panic" wasn't achieved by
2612    /// the cap getting stuck somewhere instead of growing.
2613    #[test]
2614    fn saturating_arithmetic_handles_extreme_config() {
2615        let cfg = LimiterConfig {
2616            max_concurrency: usize::MAX / 2,
2617            ..cfg_for_tests()
2618        };
2619        let start = usize::MAX / 4;
2620        let l = Limiter::new(start, cfg.clone());
2621        for _ in 0..(cfg.window_ops * 10) {
2622            l.observe(Outcome::Success, Duration::from_millis(1));
2623        }
2624        // First-iteration doubles start (which is max/4) to max/2 = ceiling.
2625        // The cap MUST have grown to the ceiling; if saturating math
2626        // were broken (panic) we'd never get here, but we'd also fail
2627        // if the cap got stuck at the start value.
2628        assert_eq!(
2629            l.current(),
2630            cfg.max_concurrency,
2631            "saturating math survived but cap did not grow to ceiling"
2632        );
2633    }
2634
2635    /// FIFO eviction: prove that a window of pure-timeout collapses
2636    /// the cap, and once enough successes flush ALL timeouts out of
2637    /// the window, the cap can rise. The earlier version of this test
2638    /// used an OR clause that made the assertion satisfiable trivially;
2639    /// this version asserts the strict invariant: after eviction, cap
2640    /// must be STRICTLY GREATER than the post-stress cap.
2641    #[test]
2642    fn window_eviction_is_fifo() {
2643        let cfg = LimiterConfig {
2644            window_ops: 10,
2645            min_window_ops: 5,
2646            success_target: 0.9,
2647            timeout_ceiling: 0.1,
2648            ..cfg_for_tests()
2649        };
2650        let l = Limiter::new(8, cfg.clone());
2651        // Fill the window with timeouts. With decrease-gating
2652        // (samples_since_decrease >= min_window_ops between halvings),
2653        // window_ops=10 + min_window_ops=5 timeouts allow at most
2654        // ~2 halvings: 8 -> 4 -> 2. Cap must DROP from 8.
2655        for _ in 0..cfg.window_ops {
2656            l.observe(Outcome::Timeout, Duration::from_millis(50));
2657        }
2658        let after_stress = l.current();
2659        assert!(
2660            after_stress < 8,
2661            "expected cap to drop from 8 after pure-timeout window, got {after_stress}"
2662        );
2663        // Push enough successes to fully evict the timeouts AND
2664        // accumulate at least one full window of fresh evidence for
2665        // an Increase. window_ops to evict + window_ops to gate first
2666        // +1 = 2 * window_ops minimum; use 3x for safety margin.
2667        for _ in 0..(cfg.window_ops * 3) {
2668            l.observe(Outcome::Success, Duration::from_millis(20));
2669        }
2670        let after_recovery = l.current();
2671        // Strict greater-than: FIFO MUST flush the timeouts so a
2672        // fresh-window Increase can fire.
2673        assert!(
2674            after_recovery > after_stress,
2675            "FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
2676        );
2677    }
2678
2679    /// With `enabled = false`, the controller is a no-op. Hot paths
2680    /// must see exactly `initial` at every check, no exceptions.
2681    #[test]
2682    fn disabled_controller_returns_initial_value_invariantly() {
2683        let cfg = LimiterConfig {
2684            enabled: false,
2685            ..cfg_for_tests()
2686        };
2687        let initial = 8;
2688        let l = Limiter::new(initial, cfg);
2689        for i in 0..1_000 {
2690            let outcome = match i % 4 {
2691                0 => Outcome::Success,
2692                1 => Outcome::Timeout,
2693                2 => Outcome::NetworkError,
2694                _ => Outcome::ApplicationError,
2695            };
2696            l.observe(outcome, Duration::from_millis(50));
2697            assert_eq!(
2698                l.current(),
2699                initial,
2700                "disabled controller moved at iter {i}",
2701            );
2702        }
2703    }
2704
2705    /// 100 tasks concurrently observing 100 successes each. The cap
2706    /// must remain a valid in-bounds value, no panic, no deadlock.
2707    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
2708    async fn concurrent_observations_do_not_corrupt_window() {
2709        let cfg = cfg_for_tests();
2710        let l = Limiter::new(4, cfg.clone());
2711        let mut handles = Vec::with_capacity(100);
2712        for _ in 0..100 {
2713            let l_clone = l.clone();
2714            handles.push(tokio::spawn(async move {
2715                for _ in 0..100 {
2716                    l_clone.observe(Outcome::Success, Duration::from_millis(5));
2717                }
2718            }));
2719        }
2720        for h in handles {
2721            h.await.unwrap();
2722        }
2723        let cur = l.current();
2724        assert!(
2725            cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
2726            "cap out of bounds after concurrent observations: {cur}",
2727        );
2728    }
2729
2730    /// Persisted higher values from a prior run must beat low cold-start
2731    /// defaults. Otherwise warm-start would silently pessimize throughput.
2732    /// (Values BELOW cold-start are floored — see
2733    /// `warm_start_floors_at_cold_defaults`.)
2734    #[test]
2735    fn persisted_snapshot_warm_starts_above_cold_floor() {
2736        let dir = tempfile::tempdir().unwrap();
2737        let path = dir.path().join("client_adaptive.json");
2738        // All snapshot values ABOVE the production cold-start defaults
2739        // so the warm_start floor doesn't kick in.
2740        let saved = ChannelStart {
2741            quote: 64,
2742            store: 32,
2743            fetch: 128,
2744        };
2745        save_snapshot(&path, saved);
2746        let loaded = load_snapshot(&path).unwrap();
2747
2748        // Build a controller with intentionally low cold-start values
2749        // — these get overridden by warm_start.
2750        let low = ChannelStart {
2751            quote: 2,
2752            store: 2,
2753            fetch: 2,
2754        };
2755        let c = AdaptiveController::new(low, AdaptiveConfig::default());
2756        c.warm_start(loaded);
2757        assert_eq!(c.quote.current(), 64);
2758        assert_eq!(c.store.current(), 32);
2759        assert_eq!(c.fetch.current(), 128);
2760    }
2761
2762    /// Two threads racing on `save_snapshot` must never produce a
2763    /// half-written file. Atomic-rename guarantees we either see the
2764    /// old content or the new content, never a torn write.
2765    #[test]
2766    fn save_load_round_trip_with_concurrent_writes() {
2767        use std::thread;
2768        let dir = tempfile::tempdir().unwrap();
2769        let path = dir.path().join("client_adaptive.json");
2770        let path_a = path.clone();
2771        let path_b = path.clone();
2772        let snap_a = ChannelStart {
2773            quote: 10,
2774            store: 10,
2775            fetch: 10,
2776        };
2777        let snap_b = ChannelStart {
2778            quote: 99,
2779            store: 99,
2780            fetch: 99,
2781        };
2782        let h_a = thread::spawn(move || {
2783            for _ in 0..50 {
2784                save_snapshot(&path_a, snap_a);
2785            }
2786        });
2787        let h_b = thread::spawn(move || {
2788            for _ in 0..50 {
2789                save_snapshot(&path_b, snap_b);
2790            }
2791        });
2792        h_a.join().unwrap();
2793        h_b.join().unwrap();
2794        let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
2795        let valid = (loaded.quote == snap_a.quote
2796            && loaded.store == snap_a.store
2797            && loaded.fetch == snap_a.fetch)
2798            || (loaded.quote == snap_b.quote
2799                && loaded.store == snap_b.store
2800                && loaded.fetch == snap_b.fetch);
2801        assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
2802    }
2803
2804    /// `save_snapshot` to an unwritable / impossible path must be a
2805    /// quiet no-op: best-effort, no panic, no error propagation.
2806    #[test]
2807    fn save_snapshot_to_unwritable_dir_does_not_panic() {
2808        // A path under a non-existent absolute root that the process
2809        // also cannot create. On macOS/Linux a write under "/" requires
2810        // root; create_dir_all will fail on this path.
2811        let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
2812        let snap = ChannelStart {
2813            quote: 1,
2814            store: 1,
2815            fetch: 1,
2816        };
2817        // No panic = pass. Function returns unit, errors are logged.
2818        save_snapshot(&path, snap);
2819        // File should not exist.
2820        assert!(!path.exists());
2821    }
2822
2823    /// A truncated/partial JSON file must not crash the loader; it must
2824    /// return None so the controller falls back to cold defaults.
2825    #[test]
2826    fn load_snapshot_from_truncated_file_returns_none() {
2827        let dir = tempfile::tempdir().unwrap();
2828        let path = dir.path().join("truncated.json");
2829        std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
2830        assert!(load_snapshot(&path).is_none());
2831    }
2832
2833    /// Microbench: 100k observe+current pairs must complete in well
2834    /// under 100ms. Catches any accidental quadratic behaviour or
2835    /// massive lock contention introduced by future changes.
2836    #[test]
2837    fn controller_perf_overhead_is_bounded() {
2838        let cfg = cfg_for_tests();
2839        let l = Limiter::new(8, cfg);
2840        let started = Instant::now();
2841        for _ in 0..100_000 {
2842            let _ = l.current();
2843            l.observe(Outcome::Success, Duration::from_micros(1));
2844        }
2845        let elapsed = started.elapsed();
2846        // 1µs per pair on a modern machine is generous; allow 500ms to
2847        // tolerate slow CI runners while still catching real regressions.
2848        assert!(
2849            elapsed < Duration::from_millis(500),
2850            "100k observe+current pairs took {elapsed:?}, expected <500ms",
2851        );
2852    }
2853
2854    // ---- Regression tests for adversarial-review findings ----
2855
2856    /// M10 fix: hand-edited or future-schema configs may plant `NaN`
2857    /// or out-of-range values into the float fields. Constructing a
2858    /// controller and feeding observations must not panic.
2859    /// `Duration::from_secs_f64(NaN)` panics per std docs, so without
2860    /// `sanitize()` and the EWMA NaN guard this would crash.
2861    #[test]
2862    fn nan_and_out_of_range_config_does_not_panic() {
2863        let cfg = AdaptiveConfig {
2864            enabled: true,
2865            min_concurrency: 0, // sub-floor; sanitize raises to 1
2866            max: ChannelMax {
2867                quote: 0, // sub-min; sanitize raises to min
2868                store: 0,
2869                fetch: 0,
2870            },
2871            window_ops: 10,
2872            min_window_ops: 50, // > window_ops; sanitize clamps
2873            success_target: f64::NAN,
2874            timeout_ceiling: f64::INFINITY,
2875            latency_inflation_factor: f64::NEG_INFINITY,
2876            latency_ewma_alpha: f64::NAN,
2877        };
2878        let c = AdaptiveController::new(ChannelStart::default(), cfg);
2879        // Verify sanitize() ACTUALLY corrected the values (not just
2880        // that no panic occurred). Reading c.config back proves the
2881        // sanitization landed.
2882        let post = &c.config;
2883        assert_eq!(
2884            post.min_concurrency, 1,
2885            "sanitize did not raise min_concurrency from 0"
2886        );
2887        assert!(
2888            post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
2889            "sanitize did not clamp success_target from NaN: {}",
2890            post.success_target
2891        );
2892        assert!(
2893            post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
2894            "sanitize did not clamp timeout_ceiling from Inf: {}",
2895            post.timeout_ceiling
2896        );
2897        assert!(
2898            post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
2899            "sanitize did not fix latency_inflation_factor from -Inf: {}",
2900            post.latency_inflation_factor
2901        );
2902        assert!(
2903            post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
2904            "sanitize did not fix latency_ewma_alpha from NaN: {}",
2905            post.latency_ewma_alpha
2906        );
2907        assert!(
2908            post.min_window_ops <= post.window_ops,
2909            "sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
2910            post.min_window_ops,
2911            post.window_ops
2912        );
2913        assert!(
2914            post.max.quote >= post.min_concurrency,
2915            "max.quote below min_concurrency"
2916        );
2917        // Now exercise the runtime under hostile latencies — must
2918        // not panic.
2919        for _ in 0..200 {
2920            c.store
2921                .observe(Outcome::Success, Duration::from_secs(99_999));
2922            c.store.observe(Outcome::Timeout, Duration::ZERO);
2923        }
2924        let cur = c.store.current();
2925        assert!(cur >= 1, "cap below floor: {cur}");
2926    }
2927
2928    /// M3+M6 fix: a burst of N concurrent in-flight chunks all
2929    /// observing stress at almost the same time used to pile-drive
2930    /// the cap from N to 1 in N back-to-back ticks. After the fix,
2931    /// decreases require `min_window_ops` of FRESH evidence between
2932    /// successive Decreases, so a single transient burst can drop
2933    /// the cap by at most one halving.
2934    #[test]
2935    fn transient_burst_does_not_pile_drive_to_floor() {
2936        let cfg = LimiterConfig {
2937            window_ops: 32,
2938            min_window_ops: 8,
2939            success_target: 0.95,
2940            timeout_ceiling: 0.10,
2941            ..cfg_for_tests()
2942        };
2943        let l = Limiter::new(32, cfg);
2944        // Simulate 8 concurrent ops all completing as Timeout in a
2945        // back-to-back burst (the kind of event that previously
2946        // floor-slammed the cap).
2947        for _ in 0..8 {
2948            l.observe(Outcome::Timeout, Duration::from_millis(10));
2949        }
2950        // After one burst, cap should have decreased AT MOST once
2951        // (32 -> 16). Pile-driving would land at 1 or 2.
2952        let after_burst = l.current();
2953        assert!(
2954            after_burst >= 16,
2955            "transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
2956        );
2957    }
2958
2959    /// M2 fix: classifier must map transport-related errors to
2960    /// `NetworkError`, not `ApplicationError`. Test EACH transport
2961    /// variant separately so a regression in any one variant is
2962    /// caught by name.
2963    #[tokio::test]
2964    async fn transport_errors_classify_as_capacity_signal() {
2965        use crate::data::client::classify_error;
2966        use crate::data::error::Error;
2967        let make_cfg = || LimiterConfig {
2968            window_ops: 16,
2969            min_window_ops: 5,
2970            success_target: 0.5,
2971            timeout_ceiling: 0.5,
2972            ..cfg_for_tests()
2973        };
2974        // Cases: (variant_name, error_factory)
2975        type ErrFactory = Box<dyn Fn() -> Error>;
2976        let cases: Vec<(&str, ErrFactory)> = vec![
2977            ("Network", Box::new(|| Error::Network("net".to_string()))),
2978            (
2979                "InsufficientPeers",
2980                Box::new(|| Error::InsufficientPeers("ip".to_string())),
2981            ),
2982            ("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
2983            ("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
2984            ("Storage", Box::new(|| Error::Storage("s".to_string()))),
2985            (
2986                "PartialUpload",
2987                Box::new(|| Error::PartialUpload {
2988                    stored: vec![],
2989                    stored_count: 0,
2990                    failed: vec![],
2991                    failed_count: 0,
2992                    total_chunks: 0,
2993                    spend: Box::new(crate::data::error::PartialUploadSpend {
2994                        storage_cost_atto: "0".to_string(),
2995                        gas_cost_wei: 0,
2996                    }),
2997                    reason: "r".to_string(),
2998                }),
2999            ),
3000        ];
3001        for (name, mk) in &cases {
3002            let l = Limiter::new(8, make_cfg());
3003            for _ in 0..16 {
3004                let _: std::result::Result<(), Error> =
3005                    observe_op(&l, || async { Err(mk()) }, classify_error).await;
3006            }
3007            // Each variant alone must drive the cap STRICTLY below
3008            // the start (8 -> 4 via one halving). If a variant maps
3009            // to ApplicationError, cap stays at 8.
3010            let cur = l.current();
3011            assert!(
3012                cur < 8,
3013                "{name} not classified as capacity signal: cap stayed at {cur}",
3014            );
3015        }
3016    }
3017
3018    /// C4 fix: per-channel max ceilings. Confirm that a `LimiterConfig`
3019    /// with a constrained `max_concurrency` does not bleed into other
3020    /// channels. The ceilings are independent.
3021    #[test]
3022    fn per_channel_ceilings_are_independent() {
3023        let cfg = AdaptiveConfig {
3024            max: ChannelMax {
3025                quote: 4,    // tightly capped
3026                store: 8,    // moderate
3027                fetch: 1024, // very high
3028            },
3029            ..AdaptiveConfig::default()
3030        };
3031        let c = AdaptiveController::new(
3032            ChannelStart {
3033                quote: 4,
3034                store: 8,
3035                fetch: 64,
3036            },
3037            cfg,
3038        );
3039        // Feed 1000 successes to each channel; each must respect its
3040        // own ceiling and never one another's.
3041        for _ in 0..1000 {
3042            c.quote.observe(Outcome::Success, Duration::from_micros(10));
3043            c.store.observe(Outcome::Success, Duration::from_micros(10));
3044            c.fetch.observe(Outcome::Success, Duration::from_micros(10));
3045        }
3046        assert_eq!(c.quote.current(), 4, "quote should cap at 4");
3047        assert_eq!(c.store.current(), 8, "store should cap at 8");
3048        // Fetch uses the hill climber now, so it should not blindly jump to
3049        // its max on success-only samples. It still must prove the fetch
3050        // ceiling is independent by climbing above the quote/store caps.
3051        assert!(
3052            c.fetch.current() > 8 && c.fetch.current() <= 1024,
3053            "fetch did not use its independent ceiling; got {}",
3054            c.fetch.current()
3055        );
3056    }
3057
3058    #[test]
3059    fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
3060        let cfg = hill_cfg_for_tests();
3061        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3062
3063        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3064        assert_eq!(
3065            l.current(),
3066            HILL_TEST_UP_PROBE_CAP,
3067            "first healthy epoch should probe upward"
3068        );
3069
3070        observe_hill_success_epoch_with_latency(
3071            &l,
3072            &cfg,
3073            HILL_TEST_CHUNK_BYTES,
3074            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
3075        );
3076        assert_eq!(
3077            l.current(),
3078            HILL_TEST_START_CAP,
3079            "slower higher-cap wave should reject the upward probe"
3080        );
3081        assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
3082    }
3083
3084    #[test]
3085    fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
3086        let cfg = hill_cfg_for_tests();
3087        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3088
3089        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3090        assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
3091
3092        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3093        assert_eq!(
3094            l.snapshot(),
3095            HILL_TEST_UP_PROBE_CAP,
3096            "same-size chunks at same latency should promote the higher cap"
3097        );
3098        assert_eq!(
3099            l.current(),
3100            HILL_TEST_NEXT_UP_PROBE_CAP,
3101            "after accepting an upward probe, hill climber should probe higher"
3102        );
3103    }
3104
3105    #[test]
3106    fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
3107        let cfg = hill_cfg_for_tests();
3108        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3109
3110        observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3111        observe_hill_success_epoch_with_latency(
3112            &l,
3113            &cfg,
3114            HILL_TEST_CHUNK_BYTES,
3115            Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
3116        );
3117        assert_eq!(l.current(), HILL_TEST_START_CAP);
3118
3119        for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
3120            observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
3121        }
3122        assert_eq!(
3123            l.current(),
3124            HILL_TEST_DOWN_PROBE_CAP,
3125            "stable best should eventually probe a lower cap"
3126        );
3127
3128        observe_hill_success_epoch_with_latency(
3129            &l,
3130            &cfg,
3131            HILL_TEST_CHUNK_BYTES,
3132            Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
3133        );
3134        assert_eq!(
3135            l.snapshot(),
3136            HILL_TEST_DOWN_PROBE_CAP,
3137            "retained goodput at lower concurrency should become the new best"
3138        );
3139    }
3140
3141    #[tokio::test]
3142    async fn fetch_hill_records_constant_size_timed_ops_without_stress() {
3143        let cfg = hill_cfg_for_tests();
3144        let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
3145        let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
3146            + hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
3147        let limiter_for_ops = l.clone();
3148
3149        let result: std::result::Result<Vec<()>, ()> =
3150            rebucketed_unordered(&l, 0..total_ops, move |_| {
3151                let limiter = limiter_for_ops.clone();
3152                async move {
3153                    observe_op_with_success_bytes(
3154                        &limiter,
3155                        || async {
3156                            tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
3157                                .await;
3158                            Ok::<(), ()>(())
3159                        },
3160                        |_| Outcome::NetworkError,
3161                        |_| HILL_TEST_CHUNK_BYTES,
3162                    )
3163                    .await
3164                }
3165            })
3166            .await;
3167        result.unwrap();
3168
3169        // The timed wrapper records real wall-clock latency. Loaded runners can make the
3170        // wider probe miss the deterministic gain covered by
3171        // `fetch_hill_accepts_upward_probe_with_goodput_gain`, so this test constrains
3172        // the async observation path to a non-stress outcome.
3173        let snapshot = l.snapshot();
3174        assert!(
3175            matches!(snapshot, HILL_TEST_START_CAP | HILL_TEST_UP_PROBE_CAP),
3176            "timed successes should finish at the existing or accepted best cap, got {snapshot}"
3177        );
3178        let current = l.current();
3179        assert!(
3180            matches!(current, HILL_TEST_START_CAP | HILL_TEST_NEXT_UP_PROBE_CAP),
3181            "timed successes should leave the controller unstressed, got {current}"
3182        );
3183    }
3184
3185    #[test]
3186    fn fetch_hill_stress_cuts_before_full_epoch() {
3187        let cfg = LimiterConfig {
3188            window_ops: 8,
3189            min_window_ops: 4,
3190            ..hill_cfg_for_tests()
3191        };
3192        let l = fetch_hill_for_tests(16, cfg.clone());
3193
3194        for _ in 0..cfg.min_window_ops {
3195            l.observe(Outcome::Timeout, Duration::from_millis(10));
3196        }
3197
3198        assert_eq!(
3199            l.current(),
3200            8,
3201            "fetch hill climber should halve on early stress"
3202        );
3203    }
3204
3205    /// Quote/store cold-starts preserve prior static defaults. Fetch starts
3206    /// at the new conservative hill-climb default to avoid download
3207    /// overshoot on fresh installs.
3208    #[test]
3209    fn cold_start_at_least_prior_static_defaults() {
3210        let cs = ChannelStart::default();
3211        assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
3212        assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
3213        assert_eq!(
3214            cs.fetch, FETCH_COLD_START_CONCURRENCY,
3215            "fetch cold-start changed unexpectedly"
3216        );
3217    }
3218
3219    /// Reviewer N-M5 guard: with the new gated-decrease semantics
3220    /// (decreases require `min_window_ops` of fresh evidence), the
3221    /// controller must STILL reach the floor under sustained stress
3222    /// within a bounded number of observations. Otherwise we've made
3223    /// the controller too sluggish to react to a real network
3224    /// outage.
3225    ///
3226    /// From start = 64 with `min_window_ops = 8`, reaching floor 1
3227    /// takes log2(64) = 6 halvings, each gated on 8 fresh samples,
3228    /// so the upper bound is roughly `6 * 8 + min_window_ops = ~56`
3229    /// observations. We allow 200 to absorb the warm-up window and
3230    /// any per-sample evaluation slack.
3231    #[test]
3232    fn sustained_stress_reaches_floor_within_bounded_ops() {
3233        let cfg = LimiterConfig {
3234            window_ops: 32,
3235            min_window_ops: 8,
3236            success_target: 0.95,
3237            timeout_ceiling: 0.10,
3238            max_concurrency: 64,
3239            ..cfg_for_tests()
3240        };
3241        let l = Limiter::new(64, cfg);
3242        let mut ops = 0usize;
3243        while l.current() > 1 && ops < 200 {
3244            l.observe(Outcome::Timeout, Duration::from_millis(10));
3245            ops += 1;
3246        }
3247        assert_eq!(
3248            l.current(),
3249            1,
3250            "controller did not reach floor within 200 observations under \
3251             sustained timeout stress; took {ops} ops, ended at cap {}",
3252            l.current()
3253        );
3254    }
3255
3256    /// The default `AdaptiveController` (production defaults) starts
3257    /// each channel at the documented cold-start value, with each
3258    /// per-channel max strictly above the start (so the controller
3259    /// has room to grow).
3260    #[test]
3261    fn default_controller_has_growth_headroom() {
3262        let c = AdaptiveController::default();
3263        let cs = ChannelStart::default();
3264        let max = ChannelMax::default();
3265        assert_eq!(c.quote.current(), cs.quote);
3266        assert_eq!(c.store.current(), cs.store);
3267        assert_eq!(c.fetch.current(), cs.fetch);
3268        assert!(
3269            max.quote > cs.quote,
3270            "no growth headroom for quote: max={} start={}",
3271            max.quote,
3272            cs.quote
3273        );
3274        assert!(
3275            max.store > cs.store,
3276            "no growth headroom for store: max={} start={}",
3277            max.store,
3278            cs.store
3279        );
3280        assert!(
3281            max.fetch > cs.fetch,
3282            "no growth headroom for fetch: max={} start={}",
3283            max.fetch,
3284            cs.fetch
3285        );
3286    }
3287
3288    // ---- Codex review (round 3) regression tests ----
3289
3290    /// Codex CRITICAL: warm_start was blindly restoring caps below the
3291    /// cold-start floor. A prior bad run that drove store=1 would
3292    /// pessimize every subsequent run forever. The fix floors warm
3293    /// values at `ChannelStart::default()` per channel.
3294    #[test]
3295    fn warm_start_floors_at_cold_defaults() {
3296        let c = AdaptiveController::default();
3297        let cold = ChannelStart::default();
3298        // Snapshot from a "bad prior run" — every channel pinned to 1.
3299        let bad_snap = ChannelStart {
3300            quote: 1,
3301            store: 1,
3302            fetch: 1,
3303        };
3304        c.warm_start(bad_snap);
3305        // After warm_start, each channel should be AT LEAST the
3306        // cold-start value, not the persisted 1.
3307        assert_eq!(
3308            c.quote.current(),
3309            cold.quote,
3310            "quote warm_start did not floor at cold default"
3311        );
3312        assert_eq!(
3313            c.store.current(),
3314            cold.store,
3315            "store warm_start did not floor at cold default"
3316        );
3317        assert_eq!(
3318            c.fetch.current(),
3319            cold.fetch,
3320            "fetch warm_start did not floor at cold default"
3321        );
3322    }
3323
3324    /// Warm values ABOVE the cold-start floor must still be honored —
3325    /// the floor is a one-sided lower bound, not a clamp.
3326    #[test]
3327    fn warm_start_honors_values_above_cold_floor() {
3328        let c = AdaptiveController::default();
3329        let cold = ChannelStart::default();
3330        let snap = ChannelStart {
3331            quote: cold.quote * 2,
3332            store: cold.store * 4,
3333            fetch: cold.fetch * 2,
3334        };
3335        c.warm_start(snap);
3336        assert_eq!(c.quote.current(), snap.quote);
3337        assert_eq!(c.store.current(), snap.store);
3338        assert_eq!(c.fetch.current(), snap.fetch);
3339    }
3340
3341    /// Codex MAJOR: long pipelines used to capture the cap once via
3342    /// `buffer_unordered(N)`. `rebucketed` re-reads the cap at each
3343    /// batch boundary so adaptive growth/decay actually takes effect
3344    /// mid-stream. Test: fire 200 items at start cap=4, then halfway
3345    /// through bump the cap manually via warm_start to 16, and assert
3346    /// the LATER batches see the new cap.
3347    #[tokio::test]
3348    async fn rebucketed_picks_up_cap_changes_mid_stream() {
3349        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3350        use std::sync::Arc as StdArc;
3351        let cfg = LimiterConfig {
3352            min_concurrency: 1,
3353            max_concurrency: 32,
3354            ..cfg_for_tests()
3355        };
3356        let l = Limiter::new(4, cfg);
3357        let max_seen = StdArc::new(AtomicUsize::new(0));
3358        let in_flight = StdArc::new(AtomicUsize::new(0));
3359        let processed = StdArc::new(AtomicUsize::new(0));
3360        let l_for_bump = l.clone();
3361        let processed_for_bump = processed.clone();
3362        // Spawn a watcher that bumps the cap once enough items have
3363        // started to "warm up".
3364        let bump_handle = tokio::spawn(async move {
3365            loop {
3366                tokio::time::sleep(Duration::from_millis(2)).await;
3367                if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
3368                    l_for_bump.warm_start(16);
3369                    return;
3370                }
3371            }
3372        });
3373        let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
3374            let max_seen = max_seen.clone();
3375            let in_flight = in_flight.clone();
3376            let processed = processed.clone();
3377            async move {
3378                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3379                max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
3380                tokio::time::sleep(Duration::from_millis(1)).await;
3381                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3382                processed.fetch_add(1, AtomicOrdering::Relaxed);
3383                Ok::<(), &'static str>(())
3384            }
3385        })
3386        .await
3387        .unwrap();
3388        bump_handle.await.unwrap();
3389        // The cap was bumped to 16 mid-stream. If rebucketing actually
3390        // picks up cap changes, max_seen should reach above the
3391        // initial 4.
3392        let peak = max_seen.load(AtomicOrdering::Relaxed);
3393        assert!(
3394            peak > 4,
3395            "rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
3396        );
3397    }
3398
3399    /// Codex MAJOR: `observe_op` cancellation safety. If the wrapper
3400    /// future is dropped before the inner op completes, no outcome is
3401    /// recorded (intentional — dropped work was never observed by
3402    /// the network). This test asserts the contract: completed ops
3403    /// land observations, dropped ops do not corrupt the window.
3404    /// Two-sided: confirms cancellation is a NO-OP, AND confirms
3405    /// post-cancellation observations DO land normally (proving the
3406    /// limiter's internal state was not corrupted).
3407    #[tokio::test]
3408    async fn observe_op_cancellation_drops_silently() {
3409        let cfg = LimiterConfig {
3410            window_ops: 16,
3411            min_window_ops: 4,
3412            ..cfg_for_tests()
3413        };
3414        let l = Limiter::new(4, cfg);
3415        // Build a future that never completes, then drop it before
3416        // awaiting. observe_op must not panic on drop and must not
3417        // record an outcome.
3418        let l_clone = l.clone();
3419        let fut = observe_op(
3420            &l_clone,
3421            || async {
3422                std::future::pending::<()>().await;
3423                Ok::<(), &'static str>(())
3424            },
3425            |_| Outcome::Timeout,
3426        );
3427        drop(fut);
3428        // Cap unchanged: no observation was recorded.
3429        assert_eq!(l.current(), 4, "cancelled op moved the cap");
3430        // Now feed observations that ACTUALLY count as Success (the
3431        // Ok branch of observe_op is always Outcome::Success — the
3432        // classifier only runs on Err). Cold-start at 4 + a full
3433        // window of healthy successes = double to 8.
3434        for _ in 0..16 {
3435            let _: Result<(), &'static str> = observe_op(
3436                &l,
3437                || async { Ok(()) },
3438                // classifier only fires on Err; Ok is always Success
3439                |_| Outcome::NetworkError,
3440            )
3441            .await;
3442        }
3443        // STRICT: cap must have GROWN, not just held. If cancellation
3444        // had corrupted internal counters, slow-start might be stuck.
3445        assert!(
3446            l.current() > 4,
3447            "cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
3448            l.current(),
3449        );
3450    }
3451
3452    /// Codex MAJOR: Drop persistence must be reliable. The CLI relies
3453    /// on Client::drop firing a synchronous save. If save_snapshot
3454    /// were dispatched via fire-and-forget spawn_blocking, runtime
3455    /// teardown would silently lose the snapshot. This test asserts
3456    /// that calling save_snapshot synchronously from a normal context
3457    /// (not Drop, but the same code path) actually writes.
3458    #[test]
3459    fn save_snapshot_is_synchronous_and_durable() {
3460        let dir = tempfile::tempdir().unwrap();
3461        let path = dir.path().join("client_adaptive.json");
3462        let snap = ChannelStart {
3463            quote: 100,
3464            store: 50,
3465            fetch: 200,
3466        };
3467        save_snapshot(&path, snap);
3468        // The file must exist immediately after save_snapshot returns.
3469        // No async waiting, no spawn_blocking, no eventual consistency.
3470        assert!(
3471            path.exists(),
3472            "save_snapshot did not write file synchronously"
3473        );
3474        let loaded = load_snapshot(&path).unwrap();
3475        assert_eq!(loaded.quote, 100);
3476        assert_eq!(loaded.store, 50);
3477        assert_eq!(loaded.fetch, 200);
3478    }
3479
3480    // ---- Codex round 4 regression tests ----
3481
3482    /// Codex CR-2 fix: warm_start marks the limiter as having
3483    /// already-left-slow-start, so a single healthy window does NOT
3484    /// double the cap (which would be over-aggressive resume from a
3485    /// learned value).
3486    #[tokio::test]
3487    async fn warm_start_disables_slow_start_doubling() {
3488        let cfg = LimiterConfig {
3489            window_ops: 8,
3490            min_window_ops: 4,
3491            success_target: 0.9,
3492            ..cfg_for_tests()
3493        };
3494        let l = Limiter::new(2, cfg.clone());
3495        // Warm-start to a learned value of 16. This must not be
3496        // treated as a fresh slow-start.
3497        l.warm_start(16);
3498        assert_eq!(l.current(), 16);
3499        // One full healthy window: in slow-start would double to 32;
3500        // post-warm-start it should add +1 to 17.
3501        for _ in 0..cfg.window_ops {
3502            l.observe(Outcome::Success, Duration::from_millis(10));
3503        }
3504        assert_eq!(
3505            l.current(),
3506            17,
3507            "warm-start triggered slow-start doubling instead of additive +1"
3508        );
3509    }
3510
3511    /// Codex CR-3 fix: warm_start floors against the per-instance
3512    /// cold-start, NOT the global ChannelStart::default. A controller
3513    /// built with custom low starts must stay faithful to its
3514    /// construction parameters even after warm_start.
3515    #[test]
3516    fn controller_warm_start_floors_at_per_instance_cold_start() {
3517        let custom_cold = ChannelStart {
3518            quote: 2,
3519            store: 1,
3520            fetch: 4,
3521        };
3522        let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
3523        // Snapshot below the per-instance cold-start floors at custom values.
3524        c.warm_start(ChannelStart {
3525            quote: 1,
3526            store: 1,
3527            fetch: 1,
3528        });
3529        assert_eq!(c.quote.current(), 2);
3530        assert_eq!(c.store.current(), 1);
3531        assert_eq!(c.fetch.current(), 4);
3532        // Snapshot above the per-instance cold-start uses the snapshot.
3533        c.warm_start(ChannelStart {
3534            quote: 10,
3535            store: 10,
3536            fetch: 10,
3537        });
3538        assert_eq!(c.quote.current(), 10);
3539        assert_eq!(c.store.current(), 10);
3540        assert_eq!(c.fetch.current(), 10);
3541    }
3542
3543    /// Codex CR-3 fix: when adaptive.enabled = false, warm_start is
3544    /// a no-op — fixed-concurrency mode means the user wants exactly
3545    /// the cold start, not a learned value from a prior run.
3546    #[test]
3547    fn warm_start_is_noop_when_adaptive_disabled() {
3548        let cfg = AdaptiveConfig {
3549            enabled: false,
3550            ..AdaptiveConfig::default()
3551        };
3552        let custom_cold = ChannelStart {
3553            quote: 5,
3554            store: 5,
3555            fetch: 5,
3556        };
3557        let c = AdaptiveController::new(custom_cold, cfg);
3558        c.warm_start(ChannelStart {
3559            quote: 100,
3560            store: 100,
3561            fetch: 100,
3562        });
3563        assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
3564        assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
3565        assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
3566    }
3567
3568    /// Codex CR-4 fix: rebucketed_unordered is rolling, not batch-fenced.
3569    /// One slow item must NOT block other items in the same logical
3570    /// "wave" — the in-flight set should refill as fast items complete.
3571    #[tokio::test]
3572    async fn rebucketed_unordered_is_rolling_not_fenced() {
3573        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3574        use std::sync::Arc as StdArc;
3575        let cfg = LimiterConfig {
3576            min_concurrency: 1,
3577            max_concurrency: 8,
3578            window_ops: 100,
3579            min_window_ops: 50,
3580            ..cfg_for_tests()
3581        };
3582        let l = Limiter::new(4, cfg);
3583        let in_flight = StdArc::new(AtomicUsize::new(0));
3584        let max_in_flight = StdArc::new(AtomicUsize::new(0));
3585        let started = StdArc::new(AtomicUsize::new(0));
3586        let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
3587            let in_flight = in_flight.clone();
3588            let max_in_flight = max_in_flight.clone();
3589            let started = started.clone();
3590            async move {
3591                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
3592                max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
3593                started.fetch_add(1, AtomicOrdering::Relaxed);
3594                // Item 0 is intentionally slow; items 1..20 are fast.
3595                // In a batch-fenced scheduler, item 0 would gate the
3596                // start of items in the next batch. In a rolling
3597                // scheduler, items 1..N can start as soon as their
3598                // slot frees from a fast completion.
3599                if i == 0 {
3600                    tokio::time::sleep(Duration::from_millis(50)).await;
3601                } else {
3602                    tokio::time::sleep(Duration::from_millis(1)).await;
3603                }
3604                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
3605                Ok::<(), &'static str>(())
3606            }
3607        })
3608        .await
3609        .unwrap();
3610        // All 20 items must have started; in a rolling scheduler the
3611        // peak in-flight should reach at least 4 (the cap).
3612        assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
3613        let peak = max_in_flight.load(AtomicOrdering::Relaxed);
3614        assert!(
3615            peak >= 4,
3616            "rolling scheduler did not fill cap; peak in-flight = {peak}"
3617        );
3618    }
3619
3620    /// Codex CR-4 fix: rebucketed_ordered preserves input order.
3621    #[tokio::test]
3622    async fn rebucketed_ordered_preserves_input_order() {
3623        let cfg = LimiterConfig {
3624            min_concurrency: 1,
3625            max_concurrency: 4,
3626            ..cfg_for_tests()
3627        };
3628        let l = Limiter::new(4, cfg);
3629        let items: Vec<usize> = (0..50).collect();
3630        let result: Vec<usize> = rebucketed_ordered(
3631            &l,
3632            items.iter().copied().enumerate(),
3633            |(idx, v)| async move {
3634                // Reverse-bias delay so out-of-order completion is likely.
3635                let delay = (50 - v) as u64;
3636                tokio::time::sleep(Duration::from_micros(delay)).await;
3637                Ok::<_, &'static str>((idx, v * 10))
3638            },
3639        )
3640        .await
3641        .unwrap();
3642        assert_eq!(result.len(), 50);
3643        for (i, v) in result.iter().enumerate() {
3644            assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
3645        }
3646    }
3647
3648    /// Codex CR-1 regression guard (logical, not the actual data path):
3649    /// rebucketed_ordered with a payload of (idx, hash) must always
3650    /// pair the right hash with the right chunk content even under
3651    /// adversarial out-of-order completion.
3652    #[tokio::test]
3653    async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
3654        let cfg = LimiterConfig {
3655            min_concurrency: 1,
3656            max_concurrency: 8,
3657            ..cfg_for_tests()
3658        };
3659        let l = Limiter::new(8, cfg);
3660        // Each item is (idx, fake_hash). The "fetch" returns
3661        // (idx, content_for_hash). We adversarially out-of-order them
3662        // and assert that the post-sort puts content with the right
3663        // index.
3664        let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
3665        let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
3666            let delay = (40 - idx) as u64; // reverse delay
3667            tokio::time::sleep(Duration::from_micros(delay)).await;
3668            // "content_for_hash" derived from the hash.
3669            Ok::<_, &'static str>((idx, hash * 7))
3670        })
3671        .await
3672        .unwrap();
3673        for (i, v) in result.iter().enumerate() {
3674            let expected = (1000 + i as u64) * 7;
3675            assert_eq!(
3676                *v, expected,
3677                "idx {i} paired with wrong content: {v}, expected {expected}"
3678            );
3679        }
3680    }
3681
3682    /// Codex CR-5 fix: snapshot temp file is unique per save call,
3683    /// not just per-PID. Two save_snapshot calls in the SAME process
3684    /// must not collide on the temp file.
3685    #[test]
3686    fn save_snapshot_temp_file_is_unique_per_call() {
3687        let dir = tempfile::tempdir().unwrap();
3688        let path = dir.path().join("client_adaptive.json");
3689        // Fire many saves back-to-back in the same process. Without
3690        // a per-call unique suffix, the temp file would be the same
3691        // for every call (PID is constant), and any partial write +
3692        // crash window would expose the race. We can't simulate the
3693        // exact race in a unit test, but we can confirm no panic and
3694        // the final file is correct after many calls.
3695        for i in 0..100 {
3696            save_snapshot(
3697                &path,
3698                ChannelStart {
3699                    quote: i + 1,
3700                    store: i + 1,
3701                    fetch: i + 1,
3702                },
3703            );
3704        }
3705        let loaded = load_snapshot(&path).unwrap();
3706        assert_eq!(loaded.quote, 100);
3707        assert_eq!(loaded.store, 100);
3708        assert_eq!(loaded.fetch, 100);
3709        // Confirm no leftover .tmp files.
3710        let leftover: Vec<_> = std::fs::read_dir(dir.path())
3711            .unwrap()
3712            .filter_map(|e| e.ok())
3713            .filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
3714            .collect();
3715        assert!(
3716            leftover.is_empty(),
3717            "temp files leaked: {:?}",
3718            leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
3719        );
3720    }
3721
3722    // ---- Edge case tests ----
3723
3724    /// Edge case: rebucketed_unordered with EMPTY input returns empty
3725    /// Vec immediately, no panic, no work scheduled.
3726    #[tokio::test]
3727    async fn rebucketed_empty_input_returns_empty() {
3728        let cfg = cfg_for_tests();
3729        let l = Limiter::new(4, cfg);
3730        let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
3731            Ok::<_, &'static str>(42usize)
3732        })
3733        .await
3734        .unwrap();
3735        assert!(v.is_empty());
3736        let v: Vec<usize> = rebucketed_ordered(
3737            &l,
3738            std::iter::empty::<(usize, ())>(),
3739            |(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
3740        )
3741        .await
3742        .unwrap();
3743        assert!(v.is_empty());
3744    }
3745
3746    /// Edge case: rebucketed_unordered with EXACTLY cap items.
3747    #[tokio::test]
3748    async fn rebucketed_exactly_cap_items() {
3749        let cfg = LimiterConfig {
3750            min_concurrency: 1,
3751            max_concurrency: 4,
3752            ..cfg_for_tests()
3753        };
3754        let l = Limiter::new(4, cfg);
3755        let v: Vec<usize> =
3756            rebucketed_unordered(
3757                &l,
3758                0..4usize,
3759                |i| async move { Ok::<_, &'static str>(i * 2) },
3760            )
3761            .await
3762            .unwrap();
3763        assert_eq!(v.len(), 4);
3764    }
3765
3766    /// Edge case: rebucketed_unordered preserves the FIRST error and
3767    /// discards subsequent ones, draining in-flight work first.
3768    #[tokio::test]
3769    async fn rebucketed_preserves_first_error() {
3770        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
3771        use std::sync::Arc as StdArc;
3772        let cfg = LimiterConfig {
3773            min_concurrency: 1,
3774            max_concurrency: 4,
3775            ..cfg_for_tests()
3776        };
3777        let l = Limiter::new(4, cfg);
3778        let started = StdArc::new(AtomicUsize::new(0));
3779        let started_clone = started.clone();
3780        let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
3781            let started = started_clone.clone();
3782            async move {
3783                started.fetch_add(1, AtomicOrdering::Relaxed);
3784                if i == 5 {
3785                    // Slight delay so item 6, 7 also start before
3786                    // this error propagates.
3787                    tokio::time::sleep(Duration::from_micros(100)).await;
3788                    return Err("first error");
3789                }
3790                if i == 10 {
3791                    return Err("second error - should be ignored");
3792                }
3793                tokio::time::sleep(Duration::from_micros(50)).await;
3794                Ok(())
3795            }
3796        })
3797        .await;
3798        match result {
3799            Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
3800            Ok(_) => panic!("expected error, got ok"),
3801        }
3802        // The first error stops new launches, but in-flight items
3803        // drain. We don't assert exact count (nondeterministic) — only
3804        // that we did not launch ALL 20 items (proving error-stop
3805        // works) and we did launch more than just item 5 (proving
3806        // in-flight drain happens).
3807        let total = started.load(AtomicOrdering::Relaxed);
3808        assert!(
3809            (5..20).contains(&total),
3810            "started count out of range: {total}"
3811        );
3812    }
3813
3814    /// Edge case: limiter with min == max (degenerate single-value).
3815    /// Cap stays at the single value regardless of observations.
3816    #[test]
3817    fn limiter_with_min_equal_max_is_pinned() {
3818        let cfg = LimiterConfig {
3819            min_concurrency: 5,
3820            max_concurrency: 5,
3821            ..cfg_for_tests()
3822        };
3823        let l = Limiter::new(5, cfg);
3824        for _ in 0..1000 {
3825            l.observe(Outcome::Success, Duration::from_millis(1));
3826        }
3827        assert_eq!(l.current(), 5, "cap moved despite min==max");
3828        for _ in 0..1000 {
3829            l.observe(Outcome::Timeout, Duration::from_millis(50));
3830        }
3831        assert_eq!(l.current(), 5, "cap moved despite min==max");
3832    }
3833
3834    /// Direct test of `ewma()` math: alpha = 0 means new value =
3835    /// prev (the baseline never updates from new samples).
3836    #[test]
3837    fn ewma_alpha_zero_returns_prev() {
3838        let prev = Duration::from_millis(100);
3839        let sample = Duration::from_millis(500);
3840        let result = ewma(prev, sample, 0.0);
3841        assert_eq!(result, prev, "alpha=0 must return prev unchanged");
3842    }
3843
3844    /// Direct test of `ewma()` math: alpha = 1 means new value =
3845    /// sample (full overwrite).
3846    #[test]
3847    fn ewma_alpha_one_returns_sample() {
3848        let prev = Duration::from_millis(100);
3849        let sample = Duration::from_millis(500);
3850        let result = ewma(prev, sample, 1.0);
3851        // Allow 1ms of float-conversion slop.
3852        let diff = result.abs_diff(sample);
3853        assert!(
3854            diff <= Duration::from_millis(1),
3855            "alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
3856        );
3857    }
3858
3859    /// Direct test of `ewma()`: alpha = 0.5 should give the midpoint.
3860    #[test]
3861    fn ewma_alpha_half_returns_midpoint() {
3862        let prev = Duration::from_millis(200);
3863        let sample = Duration::from_millis(400);
3864        let result = ewma(prev, sample, 0.5);
3865        let expected = Duration::from_millis(300);
3866        let diff = result.abs_diff(expected);
3867        assert!(
3868            diff <= Duration::from_millis(1),
3869            "alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
3870        );
3871    }
3872
3873    /// Direct test of `ewma()`: NaN alpha must NOT panic and must
3874    /// preserve the previous value (defense against
3875    /// `Duration::from_secs_f64(NaN)` panic).
3876    #[test]
3877    fn ewma_nan_alpha_returns_prev() {
3878        let prev = Duration::from_millis(100);
3879        let sample = Duration::from_millis(500);
3880        let result = ewma(prev, sample, f64::NAN);
3881        assert_eq!(result, prev);
3882        let result = ewma(prev, sample, f64::INFINITY);
3883        assert_eq!(result, prev);
3884        let result = ewma(prev, sample, f64::NEG_INFINITY);
3885        assert_eq!(result, prev);
3886    }
3887
3888    /// Out-of-range alpha (e.g. 2.5) must clamp to [0,1] and NOT
3889    /// produce a negative result.
3890    #[test]
3891    fn ewma_clamps_alpha_above_one() {
3892        let prev = Duration::from_millis(100);
3893        let sample = Duration::from_millis(500);
3894        let result = ewma(prev, sample, 2.5);
3895        // Clamped to 1.0 -> should equal sample (~500ms).
3896        assert!(result >= Duration::from_millis(499));
3897        assert!(result <= Duration::from_millis(501));
3898    }
3899
3900    /// Edge case: window contains ONLY ApplicationErrors. Controller
3901    /// must HOLD (not move at all), because there are zero
3902    /// capacity-relevant samples.
3903    #[test]
3904    fn window_full_of_application_errors_does_not_move_cap() {
3905        let cfg = cfg_for_tests();
3906        let l = Limiter::new(8, cfg.clone());
3907        for _ in 0..(cfg.window_ops * 5) {
3908            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
3909        }
3910        assert_eq!(
3911            l.current(),
3912            8,
3913            "cap moved on pure-app-error window; should hold"
3914        );
3915    }
3916
3917    /// Edge case: AdaptiveController with `enabled = false` plus
3918    /// observations does not move and does not interact with the
3919    /// observation window.
3920    #[test]
3921    fn disabled_adaptive_controller_truly_inert() {
3922        let cfg = AdaptiveConfig {
3923            enabled: false,
3924            ..AdaptiveConfig::default()
3925        };
3926        let c = AdaptiveController::new(ChannelStart::default(), cfg);
3927        let baseline_quote = c.quote.current();
3928        let baseline_store = c.store.current();
3929        let baseline_fetch = c.fetch.current();
3930        for _ in 0..10000 {
3931            c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
3932            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3933            c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
3934        }
3935        assert_eq!(c.quote.current(), baseline_quote);
3936        assert_eq!(c.store.current(), baseline_store);
3937        assert_eq!(c.fetch.current(), baseline_fetch);
3938    }
3939
3940    /// Edge case: per-channel limiters share NO state. Hammering one
3941    /// channel must not move another. Two-sided: assert store DROPS
3942    /// to the floor (proving observations landed) AND quote/fetch
3943    /// are EXACTLY unchanged (proving zero cross-channel leakage).
3944    #[test]
3945    fn channel_state_is_independent() {
3946        let c = AdaptiveController::default();
3947        let q0 = c.quote.current();
3948        let f0 = c.fetch.current();
3949        let s0 = c.store.current();
3950        for _ in 0..1000 {
3951            c.store.observe(Outcome::Timeout, Duration::from_millis(1));
3952        }
3953        // Strict: store reached the floor (observations landed).
3954        assert_eq!(
3955            c.store.current(),
3956            c.config.min_concurrency,
3957            "store did not reach floor after 1000 timeouts; cap={}",
3958            c.store.current()
3959        );
3960        assert!(c.store.current() < s0, "store cap did not move at all");
3961        // Strict: quote and fetch unchanged.
3962        assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
3963        assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
3964    }
3965
3966    // ---- Round-5 test reviewer suggestions ----
3967
3968    /// Direct unit test for `AdaptiveConfig::sanitize`. Verifies that
3969    /// every clamped field is correctly fixed up, not merely that
3970    /// the controller doesn't crash.
3971    #[test]
3972    fn sanitize_corrects_pathological_floats() {
3973        let mut cfg = AdaptiveConfig {
3974            success_target: f64::NAN,
3975            timeout_ceiling: 5.0,
3976            latency_inflation_factor: f64::NEG_INFINITY,
3977            latency_ewma_alpha: 2.5,
3978            window_ops: 4,
3979            min_window_ops: 10,
3980            ..AdaptiveConfig::default()
3981        };
3982        cfg.sanitize();
3983        assert!(cfg.success_target.is_finite());
3984        assert!((0.0..=1.0).contains(&cfg.success_target));
3985        assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
3986        assert!(cfg.latency_inflation_factor.is_finite());
3987        assert!(cfg.latency_inflation_factor > 0.0);
3988        assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
3989        assert!(
3990            cfg.min_window_ops <= cfg.window_ops,
3991            "min_window_ops {} > window_ops {}",
3992            cfg.min_window_ops,
3993            cfg.window_ops
3994        );
3995    }
3996
3997    /// Snapshot persistence relies on serde for ChannelStart and
3998    /// ChannelMax. A field rename in either type would silently
3999    /// break warm-start across binary upgrades — this test catches
4000    /// that.
4001    #[test]
4002    fn channel_max_serde_round_trips() {
4003        let m = ChannelMax {
4004            quote: 7,
4005            store: 13,
4006            fetch: 200,
4007        };
4008        let json = serde_json::to_string(&m).unwrap();
4009        let back: ChannelMax = serde_json::from_str(&json).unwrap();
4010        assert_eq!(back.quote, 7);
4011        assert_eq!(back.store, 13);
4012        assert_eq!(back.fetch, 200);
4013    }
4014
4015    #[test]
4016    fn channel_start_serde_round_trips() {
4017        let s = ChannelStart {
4018            quote: 11,
4019            store: 22,
4020            fetch: 33,
4021        };
4022        let json = serde_json::to_string(&s).unwrap();
4023        let back: ChannelStart = serde_json::from_str(&json).unwrap();
4024        assert_eq!(back.quote, 11);
4025        assert_eq!(back.store, 22);
4026        assert_eq!(back.fetch, 33);
4027    }
4028
4029    /// Mid-flight cap SHRINKAGE: `rebucketed_picks_up_cap_changes_mid_stream`
4030    /// only proves growth. Overload protection requires the reverse —
4031    /// when the controller halves the cap mid-pipeline, in-flight
4032    /// must respect the new lower cap on the next refill.
4033    #[tokio::test]
4034    async fn rebucketed_honors_cap_shrinkage_mid_stream() {
4035        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
4036        use std::sync::Arc as StdArc;
4037        let cfg = LimiterConfig {
4038            min_concurrency: 1,
4039            max_concurrency: 16,
4040            ..cfg_for_tests()
4041        };
4042        let l = Limiter::new(16, cfg);
4043        let in_flight = StdArc::new(AtomicUsize::new(0));
4044        let max_after_shrink = StdArc::new(AtomicUsize::new(0));
4045        let processed = StdArc::new(AtomicUsize::new(0));
4046        let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
4047        let l_for_shrink = l.clone();
4048        let p_for_shrink = processed.clone();
4049        let shrunk_for_shrink = shrunk.clone();
4050        let shrink_handle = tokio::spawn(async move {
4051            // Bump down the cap once 50 items have completed.
4052            loop {
4053                tokio::time::sleep(Duration::from_millis(2)).await;
4054                if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
4055                    l_for_shrink.warm_start(2);
4056                    shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
4057                    return;
4058                }
4059            }
4060        });
4061        let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
4062            let in_flight = in_flight.clone();
4063            let max_after_shrink = max_after_shrink.clone();
4064            let processed = processed.clone();
4065            let shrunk = shrunk.clone();
4066            async move {
4067                let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
4068                if shrunk.load(AtomicOrdering::Relaxed) {
4069                    max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
4070                }
4071                tokio::time::sleep(Duration::from_millis(1)).await;
4072                in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
4073                processed.fetch_add(1, AtomicOrdering::Relaxed);
4074                Ok::<(), &'static str>(())
4075            }
4076        })
4077        .await
4078        .unwrap();
4079        shrink_handle.await.unwrap();
4080        let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
4081        // After the shrink to cap=2, no NEW launches should put us
4082        // above 2. Already-launched in-flight may still be draining
4083        // briefly, so allow a small overshoot for the natural
4084        // refill-after-completion lag.
4085        assert!(
4086            peak <= 4,
4087            "rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
4088        );
4089    }
4090
4091    /// Mixed `ApplicationError` + capacity-relevant items in one
4092    /// window. ApplicationError must NOT contribute to the success
4093    /// rate denominator — otherwise a wave with some AppErrors and
4094    /// some healthy successes would falsely look like a stressed
4095    /// window.
4096    #[test]
4097    fn mixed_window_app_errors_with_capacity_signal() {
4098        let cfg = LimiterConfig {
4099            window_ops: 10,
4100            min_window_ops: 5,
4101            timeout_ceiling: 0.2,
4102            success_target: 0.9,
4103            ..cfg_for_tests()
4104        };
4105        // Case 1: 5 AppErrors + 5 Successes. Capacity-relevant
4106        // success_rate = 5/5 = 100%. Cap must NOT decrease (it may
4107        // hold at 8 or grow via slow-start; both prove the AppErrors
4108        // didn't poison the success-rate denominator).
4109        let l = Limiter::new(8, cfg.clone());
4110        for _ in 0..5 {
4111            l.observe(Outcome::ApplicationError, Duration::from_millis(50));
4112        }
4113        for _ in 0..5 {
4114            l.observe(Outcome::Success, Duration::from_millis(50));
4115        }
4116        assert!(
4117            l.current() >= 8,
4118            "AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
4119            l.current()
4120        );
4121        // Case 2: 5 AppErrors + 5 Timeouts. Capacity-relevant
4122        // success_rate = 0/5 = 0%. Cap MUST decrease.
4123        let l2 = Limiter::new(8, cfg);
4124        for _ in 0..5 {
4125            l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
4126        }
4127        for _ in 0..5 {
4128            l2.observe(Outcome::Timeout, Duration::from_millis(50));
4129        }
4130        assert!(
4131            l2.current() < 8,
4132            "all-timeouts (with AppError padding) did not decrease cap; got {}",
4133            l2.current()
4134        );
4135    }
4136
4137    /// Real concurrent torn-read test for save/load. The previous
4138    /// concurrent-write test only reads after both writers join;
4139    /// this version interleaves a reader thread with writers and
4140    /// asserts every successful load returns a coherent (non-torn)
4141    /// snapshot.
4142    #[test]
4143    fn concurrent_save_load_no_torn_reads() {
4144        use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
4145        use std::thread;
4146        let dir = tempfile::tempdir().unwrap();
4147        let path = dir.path().join("snap.json");
4148        // Seed the file so the reader doesn't get a None on first read.
4149        save_snapshot(
4150            &path,
4151            ChannelStart {
4152                quote: 1,
4153                store: 1,
4154                fetch: 1,
4155            },
4156        );
4157        let stop = std::sync::Arc::new(AtomicBool::new(false));
4158        let p_w = path.clone();
4159        let s_w = stop.clone();
4160        let writer = thread::spawn(move || {
4161            let mut i = 1usize;
4162            while !s_w.load(AtomicOrdering::Relaxed) {
4163                save_snapshot(
4164                    &p_w,
4165                    ChannelStart {
4166                        quote: i,
4167                        store: i,
4168                        fetch: i,
4169                    },
4170                );
4171                i = i.wrapping_add(1).max(1);
4172            }
4173        });
4174        let p_r = path.clone();
4175        let reader = thread::spawn(move || {
4176            let mut torn = 0usize;
4177            for _ in 0..2_000 {
4178                if let Some(snap) = load_snapshot(&p_r) {
4179                    // Coherent snapshots have all three channels equal
4180                    // (writer always saves equal values).
4181                    if snap.quote != snap.store || snap.store != snap.fetch {
4182                        torn += 1;
4183                    }
4184                }
4185            }
4186            torn
4187        });
4188        let torn = reader.join().unwrap();
4189        stop.store(true, AtomicOrdering::Relaxed);
4190        writer.join().unwrap();
4191        assert_eq!(
4192            torn, 0,
4193            "observed {torn} torn reads under concurrent writes"
4194        );
4195    }
4196
4197    /// Round-5 follow-up: `save_snapshot_with_timeout` returns
4198    /// promptly even when the underlying write would otherwise hang.
4199    /// Use a path under a non-existent root that mkdir cannot create
4200    /// to simulate a slow/failing filesystem (mkdir returns Err
4201    /// quickly so this isn't a real hang test, but it confirms the
4202    /// timeout wrapper does not block longer than the deadline on a
4203    /// fast-failing operation either).
4204    #[test]
4205    fn save_with_timeout_returns_promptly_on_fast_failure() {
4206        let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
4207        let snap = ChannelStart {
4208            quote: 1,
4209            store: 1,
4210            fetch: 1,
4211        };
4212        let started = Instant::now();
4213        save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
4214        let elapsed = started.elapsed();
4215        // Fast-failing mkdir returns immediately. The timeout
4216        // wrapper should not add measurable overhead.
4217        assert!(
4218            elapsed < Duration::from_secs(1),
4219            "save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
4220        );
4221    }
4222
4223    /// Round-5 follow-up: a hung writer thread (simulated by a path
4224    /// the writer never returns from). The wrapper must time out and
4225    /// return without joining; the test must complete near the
4226    /// deadline, not hang.
4227    #[test]
4228    fn save_with_timeout_bounds_wall_time_on_hang() {
4229        // Use a real-but-slow-write simulation: hand the writer a
4230        // path that the OS will accept but with a synthetic delay
4231        // baked into a wrapping thread. Since save_snapshot itself
4232        // does no sleep, we instead test that the timeout wrapper
4233        // exits within deadline + small slack when the inner work
4234        // takes longer than the deadline. We approximate by giving
4235        // the wrapper a deadline shorter than any plausible local
4236        // disk write (1ms is too tight; 0ms is too tight). Use
4237        // 1ms deadline and assert wall time < 100ms — proving the
4238        // wrapper does NOT wait for the writer to actually finish
4239        // (the inner write to a tempdir takes a few ms typically).
4240        let dir = tempfile::tempdir().unwrap();
4241        let path = dir.path().join("snap.json");
4242        let snap = ChannelStart {
4243            quote: 1,
4244            store: 1,
4245            fetch: 1,
4246        };
4247        let started = Instant::now();
4248        // Deadline so short that on most machines the writer is
4249        // still running. The wrapper must NOT wait for it.
4250        save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
4251        let elapsed = started.elapsed();
4252        assert!(
4253            elapsed < Duration::from_millis(200),
4254            "timeout wrapper did not bound wall time: {elapsed:?}"
4255        );
4256    }
4257}