use futures::stream::{self, FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::{Duration, Instant};
use tracing::{debug, warn};
static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
const FETCH_COLD_START_CONCURRENCY: usize = 4;
const HILL_PROBE_STEP_DIVISOR: usize = 4;
const HILL_MIN_PROBE_STEP: usize = 1;
const HILL_UP_PROBE_ACCEPT_RATIO: f64 = 1.05;
const HILL_DOWN_PROBE_ACCEPT_RATIO: f64 = 0.98;
const HILL_REJECT_COOLDOWN_EPOCHS: usize = 2;
const HILL_STABLE_PROBE_EPOCHS: usize = 3;
const HILL_STRESS_DECREASE_DIVISOR: usize = 2;
const HILL_EPOCH_FULL_WAVES: usize = 2;
fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
m.lock().unwrap_or_else(PoisonError::into_inner)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
Success,
Timeout,
NetworkError,
ApplicationError,
}
const FETCH_MIN_FLOOR: usize = 4;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ChannelMax {
pub quote: usize,
pub store: usize,
pub fetch: usize,
}
impl Default for ChannelMax {
fn default() -> Self {
Self {
quote: 128,
store: 64,
fetch: 256,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveConfig {
pub enabled: bool,
pub min_concurrency: usize,
pub max: ChannelMax,
pub window_ops: usize,
pub min_window_ops: usize,
pub success_target: f64,
pub timeout_ceiling: f64,
pub latency_inflation_factor: f64,
pub latency_ewma_alpha: f64,
}
impl AdaptiveConfig {
pub fn sanitize(&mut self) {
if !self.latency_ewma_alpha.is_finite() {
self.latency_ewma_alpha = 0.2;
}
self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
if !self.success_target.is_finite() {
self.success_target = 0.95;
}
self.success_target = self.success_target.clamp(0.0, 1.0);
if !self.timeout_ceiling.is_finite() {
self.timeout_ceiling = 0.10;
}
self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
self.latency_inflation_factor = 4.0;
}
self.min_concurrency = self.min_concurrency.max(1);
self.window_ops = self.window_ops.max(1);
self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
self.max.quote = self.max.quote.max(self.min_concurrency);
self.max.store = self.max.store.max(self.min_concurrency);
self.max.fetch = self.max.fetch.max(self.min_concurrency);
}
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
enabled: true,
min_concurrency: 1,
max: ChannelMax::default(),
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
latency_inflation_factor: 4.0,
latency_ewma_alpha: 0.2,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ChannelStart {
pub quote: usize,
pub store: usize,
pub fetch: usize,
}
impl Default for ChannelStart {
fn default() -> Self {
Self {
quote: 32,
store: 8,
fetch: FETCH_COLD_START_CONCURRENCY,
}
}
}
#[derive(Debug, Clone, Copy)]
struct Sample {
outcome: Outcome,
latency: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LimiterAlgorithm {
Aimd,
ThroughputHillClimb,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProbeDirection {
Up,
Down,
}
#[derive(Debug)]
struct HillClimbState {
epoch_started: Option<Instant>,
epoch_samples: usize,
epoch_successes: usize,
epoch_timeouts: usize,
epoch_net_errors: usize,
epoch_bytes: u64,
epoch_latencies: Vec<Duration>,
best_goodput_per_sec: Option<f64>,
best_latency_p95: Option<Duration>,
best_concurrency: usize,
stable_epochs: usize,
cooldown_epochs: usize,
next_probe: ProbeDirection,
active_probe: Option<ProbeDirection>,
}
impl HillClimbState {
fn new(start: usize, epoch_capacity: usize) -> Self {
Self {
epoch_started: None,
epoch_samples: 0,
epoch_successes: 0,
epoch_timeouts: 0,
epoch_net_errors: 0,
epoch_bytes: 0,
epoch_latencies: Vec::with_capacity(epoch_capacity),
best_goodput_per_sec: None,
best_latency_p95: None,
best_concurrency: start,
stable_epochs: 0,
cooldown_epochs: 0,
next_probe: ProbeDirection::Up,
active_probe: None,
}
}
fn reset_epoch(&mut self) {
self.epoch_started = None;
self.epoch_samples = 0;
self.epoch_successes = 0;
self.epoch_timeouts = 0;
self.epoch_net_errors = 0;
self.epoch_bytes = 0;
self.epoch_latencies.clear();
}
fn capacity_total(&self) -> usize {
self.epoch_successes + self.epoch_timeouts + self.epoch_net_errors
}
}
#[derive(Debug, Clone)]
pub struct LimiterConfig {
pub enabled: bool,
pub min_concurrency: usize,
pub max_concurrency: usize,
pub window_ops: usize,
pub min_window_ops: usize,
pub success_target: f64,
pub timeout_ceiling: f64,
pub latency_inflation_factor: f64,
pub latency_ewma_alpha: f64,
pub slow_start_ramp_threshold: usize,
pub latency_decrease_enabled: bool,
}
impl LimiterConfig {
fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
Self {
enabled: cfg.enabled,
min_concurrency: cfg.min_concurrency,
max_concurrency: max_for_channel.max(cfg.min_concurrency),
window_ops: cfg.window_ops,
min_window_ops: cfg.min_window_ops,
success_target: cfg.success_target,
timeout_ceiling: cfg.timeout_ceiling,
latency_inflation_factor: cfg.latency_inflation_factor,
latency_ewma_alpha: cfg.latency_ewma_alpha,
slow_start_ramp_threshold: 0,
latency_decrease_enabled: true,
}
}
fn sanitize(&mut self) {
if !self.latency_ewma_alpha.is_finite() {
self.latency_ewma_alpha = 0.2;
}
self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
if !self.success_target.is_finite() {
self.success_target = 0.95;
}
self.success_target = self.success_target.clamp(0.0, 1.0);
if !self.timeout_ceiling.is_finite() {
self.timeout_ceiling = 0.10;
}
self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
self.latency_inflation_factor = 4.0;
}
self.min_concurrency = self.min_concurrency.max(1);
self.window_ops = self.window_ops.max(1);
self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
}
}
#[derive(Debug, Clone)]
pub struct Limiter {
inner: Arc<Mutex<LimiterInner>>,
config: Arc<LimiterConfig>,
algorithm: LimiterAlgorithm,
}
#[derive(Debug)]
struct LimiterInner {
current: usize,
window: VecDeque<Sample>,
samples_since_increase: usize,
samples_since_decrease: usize,
latency_baseline: Option<Duration>,
left_slow_start: bool,
hill: HillClimbState,
}
impl Limiter {
#[must_use]
pub fn new(start: usize, config: LimiterConfig) -> Self {
Self::new_with_algorithm(start, config, LimiterAlgorithm::Aimd)
}
fn new_with_algorithm(
start: usize,
config: LimiterConfig,
algorithm: LimiterAlgorithm,
) -> Self {
let mut config = config;
config.sanitize();
let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
let window_cap = config.window_ops;
Self {
inner: Arc::new(Mutex::new(LimiterInner {
current: clamped,
window: VecDeque::with_capacity(window_cap),
samples_since_increase: 0,
samples_since_decrease: 0,
latency_baseline: None,
left_slow_start: false,
hill: HillClimbState::new(clamped, window_cap),
})),
config: Arc::new(config),
algorithm,
}
}
#[must_use]
pub fn current(&self) -> usize {
lock(&self.inner).current
}
pub fn observe(&self, outcome: Outcome, latency: Duration) {
self.observe_with_bytes(outcome, latency, 0);
}
pub fn observe_with_bytes(&self, outcome: Outcome, latency: Duration, bytes: u64) {
let observed_at = Instant::now();
let operation_started = observed_at.checked_sub(latency).unwrap_or(observed_at);
self.observe_with_timing(outcome, latency, bytes, operation_started);
}
fn observe_with_timing(
&self,
outcome: Outcome,
latency: Duration,
bytes: u64,
operation_started: Instant,
) {
if !self.config.enabled {
return;
}
let mut g = lock(&self.inner);
if g.window.len() == self.config.window_ops {
g.window.pop_front();
}
g.window.push_back(Sample { outcome, latency });
if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
observe_hill_climb(
&mut g,
outcome,
latency,
bytes,
operation_started,
&self.config,
);
return;
}
g.samples_since_increase = g.samples_since_increase.saturating_add(1);
g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
if g.window.len() < self.config.min_window_ops {
return;
}
let decision = evaluate(&g.window, &self.config, g.latency_baseline);
apply_decision(&mut g, decision, &self.config);
}
pub fn warm_start(&self, start: usize) {
let clamped = start.clamp(
self.config.min_concurrency,
self.config.max_concurrency.max(1),
);
let mut g = lock(&self.inner);
g.current = clamped;
g.left_slow_start = clamped >= self.config.slow_start_ramp_threshold;
g.hill = HillClimbState::new(clamped, self.config.window_ops);
}
#[must_use]
pub fn snapshot(&self) -> usize {
let g = lock(&self.inner);
if self.algorithm == LimiterAlgorithm::ThroughputHillClimb {
g.hill.best_concurrency
} else {
g.current
}
}
}
#[derive(Debug, Clone, Copy)]
struct HillEpochStats {
goodput_per_sec: f64,
latency_p95: Option<Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Decision {
Increase,
Decrease,
Hold,
}
fn evaluate(
window: &VecDeque<Sample>,
cfg: &LimiterConfig,
baseline: Option<Duration>,
) -> Decision {
let mut successes = 0usize;
let mut timeouts = 0usize;
let mut net_errors = 0usize;
let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
for s in window {
match s.outcome {
Outcome::Success => {
successes += 1;
latencies.push(s.latency);
}
Outcome::Timeout => timeouts += 1,
Outcome::NetworkError => net_errors += 1,
Outcome::ApplicationError => {}
}
}
let capacity_total = successes + timeouts + net_errors;
if capacity_total < cfg.min_window_ops {
return Decision::Hold;
}
let total_f = capacity_total as f64;
let success_rate = successes as f64 / total_f;
let timeout_rate = timeouts as f64 / total_f;
if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
return Decision::Decrease;
}
if let Some(p95) = p95_of(&mut latencies) {
if cfg.latency_decrease_enabled {
if let Some(base) = baseline {
let limit = base.mul_f64(cfg.latency_inflation_factor);
if p95 > limit {
return Decision::Decrease;
}
}
}
Decision::Increase
} else {
Decision::Hold
}
}
fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
match decision {
Decision::Increase => {
if inner.samples_since_increase < cfg.window_ops {
return;
}
let p95 = window_p95(&inner.window);
inner.latency_baseline = Some(match inner.latency_baseline {
None => p95,
Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
});
let next = if inner.left_slow_start {
inner.current.saturating_add(1)
} else {
inner.current.saturating_mul(2)
};
let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
if next != inner.current {
debug!(
from = inner.current,
to = next,
slow_start = !inner.left_slow_start,
"adaptive: increase",
);
}
inner.current = next;
inner.samples_since_increase = 0;
inner.samples_since_decrease = 0;
}
Decision::Decrease => {
if inner.samples_since_decrease < cfg.min_window_ops {
return;
}
if inner.current >= cfg.slow_start_ramp_threshold {
inner.left_slow_start = true;
}
let next = (inner.current / 2).max(cfg.min_concurrency);
if next != inner.current {
debug!(from = inner.current, to = next, "adaptive: decrease");
}
inner.current = next;
inner.samples_since_increase = 0;
inner.samples_since_decrease = 0;
}
Decision::Hold => {}
}
}
fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
if latencies.is_empty() {
return None;
}
latencies.sort_unstable();
let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
let idx = idx.saturating_sub(1).min(latencies.len() - 1);
latencies.get(idx).copied()
}
fn window_p95(window: &VecDeque<Sample>) -> Duration {
let mut latencies: Vec<Duration> = window
.iter()
.filter(|s| matches!(s.outcome, Outcome::Success))
.map(|s| s.latency)
.collect();
p95_of(&mut latencies).unwrap_or(Duration::ZERO)
}
fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
let alpha = if alpha.is_finite() {
alpha.clamp(0.0, 1.0)
} else {
return prev;
};
let prev_ms = prev.as_secs_f64() * 1000.0;
let sample_ms = sample.as_secs_f64() * 1000.0;
let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
if !new_ms.is_finite() || new_ms < 0.0 {
return prev;
}
Duration::from_secs_f64(new_ms / 1000.0)
}
fn observe_hill_climb(
inner: &mut LimiterInner,
outcome: Outcome,
latency: Duration,
bytes: u64,
operation_started: Instant,
cfg: &LimiterConfig,
) {
match inner.hill.epoch_started {
Some(epoch_started) if epoch_started <= operation_started => {}
_ => inner.hill.epoch_started = Some(operation_started),
}
inner.hill.epoch_samples = inner.hill.epoch_samples.saturating_add(1);
match outcome {
Outcome::Success => {
inner.hill.epoch_successes = inner.hill.epoch_successes.saturating_add(1);
inner.hill.epoch_bytes = inner.hill.epoch_bytes.saturating_add(bytes);
inner.hill.epoch_latencies.push(latency);
}
Outcome::Timeout => {
inner.hill.epoch_timeouts = inner.hill.epoch_timeouts.saturating_add(1);
}
Outcome::NetworkError => {
inner.hill.epoch_net_errors = inner.hill.epoch_net_errors.saturating_add(1);
}
Outcome::ApplicationError => {}
}
if hill_epoch_stressed(&inner.hill, cfg) {
apply_hill_stress(inner, cfg);
return;
}
if inner.hill.epoch_samples < hill_epoch_target_samples(inner.current, cfg) {
return;
}
if let Some(stats) = hill_epoch_stats(&inner.hill, cfg) {
apply_hill_epoch(inner, stats, cfg);
}
inner.hill.reset_epoch();
}
fn hill_epoch_target_samples(current: usize, cfg: &LimiterConfig) -> usize {
cfg.window_ops
.max(current.saturating_mul(HILL_EPOCH_FULL_WAVES))
.max(cfg.min_window_ops)
}
fn hill_epoch_stressed(hill: &HillClimbState, cfg: &LimiterConfig) -> bool {
let capacity_total = hill.capacity_total();
if capacity_total < cfg.min_window_ops {
return false;
}
let total_f = capacity_total as f64;
let success_rate = hill.epoch_successes as f64 / total_f;
let timeout_rate = hill.epoch_timeouts as f64 / total_f;
success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling
}
fn hill_epoch_stats(hill: &HillClimbState, cfg: &LimiterConfig) -> Option<HillEpochStats> {
let capacity_total = hill.capacity_total();
if capacity_total < cfg.min_window_ops || hill.epoch_successes == 0 {
return None;
}
let mut latencies = hill.epoch_latencies.clone();
let latency_p95 = p95_of(&mut latencies);
let max_latency = latencies.iter().copied().max().unwrap_or(Duration::ZERO);
let wall_elapsed = hill.epoch_started.map_or(Duration::ZERO, |s| s.elapsed());
let elapsed = wall_elapsed.max(max_latency);
let elapsed_secs = elapsed.as_secs_f64();
if !elapsed_secs.is_finite() || elapsed_secs <= 0.0 {
return None;
}
let units = if hill.epoch_bytes > 0 {
hill.epoch_bytes as f64
} else {
hill.epoch_successes as f64
};
Some(HillEpochStats {
goodput_per_sec: units / elapsed_secs,
latency_p95,
})
}
fn apply_hill_stress(inner: &mut LimiterInner, cfg: &LimiterConfig) {
let next = (inner.current / HILL_STRESS_DECREASE_DIVISOR)
.max(cfg.min_concurrency)
.min(cfg.max_concurrency);
if next != inner.current {
debug!(
from = inner.current,
to = next,
"adaptive: fetch hill stress decrease"
);
}
inner.current = next;
inner.hill.best_concurrency = next;
inner.hill.best_goodput_per_sec = None;
inner.hill.best_latency_p95 = None;
inner.hill.stable_epochs = 0;
inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
inner.hill.active_probe = None;
inner.hill.next_probe = ProbeDirection::Up;
inner.hill.reset_epoch();
}
fn apply_hill_epoch(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
let Some(best_goodput) = inner.hill.best_goodput_per_sec else {
inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
inner.hill.best_latency_p95 = stats.latency_p95;
inner.hill.best_concurrency = inner.current;
probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
return;
};
match inner.hill.active_probe {
Some(ProbeDirection::Up) => {
let improved = stats.goodput_per_sec >= best_goodput * HILL_UP_PROBE_ACCEPT_RATIO;
if improved
&& hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
{
accept_hill_probe(inner, stats, cfg);
probe_hill_neighbor(inner, ProbeDirection::Up, cfg);
} else {
reject_hill_probe(inner);
}
}
Some(ProbeDirection::Down) => {
let retained = stats.goodput_per_sec >= best_goodput * HILL_DOWN_PROBE_ACCEPT_RATIO;
if retained
&& hill_latency_acceptable(stats.latency_p95, inner.hill.best_latency_p95, cfg)
{
accept_hill_probe(inner, stats, cfg);
inner.hill.next_probe = ProbeDirection::Up;
} else {
reject_hill_probe(inner);
}
}
None => {
refresh_hill_best(inner, stats, cfg);
if inner.hill.cooldown_epochs > 0 {
inner.hill.cooldown_epochs -= 1;
return;
}
inner.hill.stable_epochs = inner.hill.stable_epochs.saturating_add(1);
if inner.hill.stable_epochs >= HILL_STABLE_PROBE_EPOCHS {
let direction = inner.hill.next_probe;
inner.hill.next_probe = match direction {
ProbeDirection::Up => ProbeDirection::Down,
ProbeDirection::Down => ProbeDirection::Up,
};
probe_hill_neighbor(inner, direction, cfg);
}
}
}
}
fn refresh_hill_best(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
inner.hill.best_goodput_per_sec = Some(match inner.hill.best_goodput_per_sec {
Some(prev) => ewma_f64(prev, stats.goodput_per_sec, cfg.latency_ewma_alpha),
None => stats.goodput_per_sec,
});
if let Some(latency_p95) = stats.latency_p95 {
inner.hill.best_latency_p95 = Some(match inner.hill.best_latency_p95 {
Some(prev) => ewma(prev, latency_p95, cfg.latency_ewma_alpha),
None => latency_p95,
});
}
}
fn hill_latency_acceptable(
candidate: Option<Duration>,
best: Option<Duration>,
cfg: &LimiterConfig,
) -> bool {
match (candidate, best) {
(Some(candidate), Some(best)) => candidate <= best.mul_f64(cfg.latency_inflation_factor),
_ => true,
}
}
fn ewma_f64(prev: f64, sample: f64, alpha: f64) -> f64 {
let alpha = if alpha.is_finite() {
alpha.clamp(0.0, 1.0)
} else {
return prev;
};
let next = (1.0 - alpha) * prev + alpha * sample;
if next.is_finite() && next >= 0.0 {
next
} else {
prev
}
}
fn accept_hill_probe(inner: &mut LimiterInner, stats: HillEpochStats, cfg: &LimiterConfig) {
debug!(
concurrency = inner.current,
goodput_per_sec = stats.goodput_per_sec,
"adaptive: fetch hill accepted probe"
);
inner.hill.best_concurrency = inner.current;
inner.hill.best_goodput_per_sec = Some(stats.goodput_per_sec);
inner.hill.best_latency_p95 = stats.latency_p95;
inner.hill.active_probe = None;
inner.hill.cooldown_epochs = 0;
inner.hill.stable_epochs = 0;
inner.current = inner
.hill
.best_concurrency
.clamp(cfg.min_concurrency, cfg.max_concurrency);
}
fn reject_hill_probe(inner: &mut LimiterInner) {
let from = inner.current;
let to = inner.hill.best_concurrency;
let rejected_direction = inner.hill.active_probe;
if from != to {
debug!(from, to, "adaptive: fetch hill rejected probe");
}
inner.current = to;
inner.hill.active_probe = None;
if let Some(direction) = rejected_direction {
inner.hill.next_probe = match direction {
ProbeDirection::Up => ProbeDirection::Down,
ProbeDirection::Down => ProbeDirection::Up,
};
}
inner.hill.cooldown_epochs = HILL_REJECT_COOLDOWN_EPOCHS;
inner.hill.stable_epochs = 0;
}
fn probe_hill_neighbor(inner: &mut LimiterInner, direction: ProbeDirection, cfg: &LimiterConfig) {
let best = inner.hill.best_concurrency;
let step = (best / HILL_PROBE_STEP_DIVISOR).max(HILL_MIN_PROBE_STEP);
let candidate = match direction {
ProbeDirection::Up => best.saturating_add(step).min(cfg.max_concurrency),
ProbeDirection::Down => best.saturating_sub(step).max(cfg.min_concurrency),
};
if candidate == best {
inner.current = best;
inner.hill.active_probe = None;
inner.hill.stable_epochs = 0;
return;
}
debug!(
from = best,
to = candidate,
?direction,
"adaptive: fetch hill probing"
);
inner.current = candidate;
inner.hill.active_probe = Some(direction);
inner.hill.stable_epochs = 0;
}
#[derive(Debug, Clone)]
pub struct AdaptiveController {
pub quote: Limiter,
pub store: Limiter,
pub fetch: Limiter,
pub(crate) config: AdaptiveConfig,
cold_start: ChannelStart,
}
impl AdaptiveController {
#[must_use]
pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
let mut config = config;
config.sanitize();
let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
let mut store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
store_cfg.latency_decrease_enabled = false;
store_cfg.slow_start_ramp_threshold = usize::MAX;
let mut fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
fetch_cfg.min_concurrency = fetch_cfg.min_concurrency.max(FETCH_MIN_FLOOR);
fetch_cfg.max_concurrency = fetch_cfg.max_concurrency.max(fetch_cfg.min_concurrency);
fetch_cfg.slow_start_ramp_threshold = usize::MAX;
fetch_cfg.latency_decrease_enabled = false;
Self {
quote: Limiter::new(start.quote, quote_cfg),
store: Limiter::new(start.store, store_cfg),
fetch: Limiter::new_with_algorithm(
start.fetch,
fetch_cfg,
LimiterAlgorithm::ThroughputHillClimb,
),
config,
cold_start: start,
}
}
#[must_use]
pub fn snapshot(&self) -> ChannelStart {
ChannelStart {
quote: self.quote.snapshot(),
store: self.store.snapshot(),
fetch: self.fetch.snapshot(),
}
}
#[must_use]
pub fn config(&self) -> &AdaptiveConfig {
&self.config
}
pub fn warm_start(&self, snapshot: ChannelStart) {
if !self.config.enabled {
return;
}
self.quote
.warm_start(snapshot.quote.max(self.cold_start.quote));
self.store
.warm_start(snapshot.store.max(self.cold_start.store));
self.fetch
.warm_start(snapshot.fetch.max(self.cold_start.fetch));
}
}
impl Default for AdaptiveController {
fn default() -> Self {
Self::new(ChannelStart::default(), AdaptiveConfig::default())
}
}
struct ObserveGuard<'a> {
limiter: &'a Limiter,
started: Instant,
outcome: Option<(Outcome, Duration, u64)>,
}
impl<'a> ObserveGuard<'a> {
fn new(limiter: &'a Limiter) -> Self {
Self {
limiter,
started: Instant::now(),
outcome: None,
}
}
fn finish(&mut self, outcome: Outcome) {
self.finish_with_bytes(outcome, 0);
}
fn finish_with_bytes(&mut self, outcome: Outcome, bytes: u64) {
self.outcome = Some((outcome, self.started.elapsed(), bytes));
}
}
impl Drop for ObserveGuard<'_> {
fn drop(&mut self) {
if let Some((outcome, latency, bytes)) = self.outcome.take() {
self.limiter
.observe_with_timing(outcome, latency, bytes, self.started);
}
}
}
pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
C: FnOnce(&E) -> Outcome,
{
let mut guard = ObserveGuard::new(limiter);
let result = op().await;
let outcome = match &result {
Ok(_) => Outcome::Success,
Err(e) => classify(e),
};
guard.finish(outcome);
drop(guard); result
}
pub async fn observe_op_with_success_bytes<T, E, F, Fut, C, B>(
limiter: &Limiter,
op: F,
classify: C,
success_bytes: B,
) -> Result<T, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
C: FnOnce(&E) -> Outcome,
B: FnOnce(&T) -> u64,
{
let mut guard = ObserveGuard::new(limiter);
let result = op().await;
match &result {
Ok(value) => guard.finish_with_bytes(Outcome::Success, success_bytes(value)),
Err(e) => guard.finish_with_bytes(classify(e), 0),
}
drop(guard);
result
}
pub async fn rebucketed_unordered<I, T, E, F, Fut>(
limiter: &Limiter,
items: I,
mut op: F,
) -> Result<Vec<T>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut iter = items.into_iter().peekable();
let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
let mut results = Vec::new();
let mut pending_err: Option<E> = None;
loop {
if pending_err.is_none() {
let cap = limiter.current().max(1);
while in_flight.len() < cap {
match iter.next() {
Some(item) => in_flight.push(op(item)),
None => break,
}
}
}
if in_flight.is_empty() {
break;
}
match in_flight.next().await {
Some(Ok(v)) => results.push(v),
Some(Err(e)) => {
if pending_err.is_none() {
pending_err = Some(e);
}
}
None => break,
}
}
match pending_err {
Some(e) => Err(e),
None => Ok(results),
}
}
pub async fn rebucketed_ordered<I, U, E, F, Fut>(
limiter: &Limiter,
items: I,
op: F,
) -> Result<Vec<U>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<(usize, U), E>>,
{
let mut indexed = rebucketed_unordered(limiter, items, op).await?;
indexed.sort_by_key(|(idx, _)| *idx);
Ok(indexed.into_iter().map(|(_, v)| v).collect())
}
pub async fn rebucketed<I, T, E, F, Fut>(
limiter: &Limiter,
items: I,
ordered: bool,
mut op: F,
) -> Result<Vec<T>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
if !ordered {
return rebucketed_unordered(limiter, items, op).await;
}
let mut iter = items.into_iter();
let mut results = Vec::new();
let mut pending_err: Option<E> = None;
loop {
if pending_err.is_some() {
break;
}
let cap = limiter.current().max(1);
let mut batch = Vec::with_capacity(cap);
for item in iter.by_ref().take(cap) {
batch.push(op(item));
}
if batch.is_empty() {
break;
}
let mut s = stream::iter(batch).buffered(cap);
while let Some(r) = s.next().await {
match r {
Ok(v) => results.push(v),
Err(e) => {
if pending_err.is_none() {
pending_err = Some(e);
}
}
}
}
}
match pending_err {
Some(e) => Err(e),
None => Ok(results),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedState {
schema: u32,
channels: ChannelStart,
}
const PERSIST_SCHEMA: u32 = 2;
const PERSIST_SCHEMA_AIMD_FETCH: u32 = 1;
const PERSIST_FILENAME: &str = "client_adaptive.json";
#[must_use]
pub fn default_persist_path() -> Option<PathBuf> {
crate::config::data_dir()
.ok()
.map(|d| d.join(PERSIST_FILENAME))
}
#[must_use]
pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
let bytes = std::fs::read(path).ok()?;
let state: PersistedState = match serde_json::from_slice(&bytes) {
Ok(s) => s,
Err(e) => {
warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
return None;
}
};
match state.schema {
PERSIST_SCHEMA => Some(state.channels),
PERSIST_SCHEMA_AIMD_FETCH => {
debug!(
path = %path.display(),
"adaptive: migrating schema-1 snapshot, preserving quote/store and resetting fetch",
);
Some(ChannelStart {
fetch: FETCH_COLD_START_CONCURRENCY,
..state.channels
})
}
schema => {
debug!(
path = %path.display(),
schema,
expected = PERSIST_SCHEMA,
"adaptive: snapshot schema mismatch, ignoring",
);
None
}
}
}
pub fn save_snapshot(path: &Path, channels: ChannelStart) {
let state = PersistedState {
schema: PERSIST_SCHEMA,
channels,
};
let bytes = match serde_json::to_vec_pretty(&state) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "adaptive: snapshot serialize failed");
return;
}
};
if let Some(parent) = path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
return;
}
}
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
let tmp = path.with_extension(format!(
"json.tmp.{}.{}.{}",
std::process::id(),
counter,
nanos
));
if let Err(e) = std::fs::write(&tmp, &bytes) {
warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
return;
}
if let Err(e) = std::fs::rename(&tmp, path) {
warn!(
from = %tmp.display(),
to = %path.display(),
error = %e,
"adaptive: snapshot rename failed",
);
let _ = std::fs::remove_file(&tmp);
}
}
pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
let handle = std::thread::spawn(move || {
save_snapshot(&path, channels);
});
let started = Instant::now();
let poll = Duration::from_millis(5);
while started.elapsed() < timeout {
if handle.is_finished() {
let _ = handle.join();
return;
}
std::thread::sleep(poll);
}
warn!(
timeout_ms = timeout.as_millis() as u64,
"adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
);
drop(handle);
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
const HILL_TEST_START_CAP: usize = 16;
const HILL_TEST_UP_PROBE_CAP: usize = 20;
const HILL_TEST_NEXT_UP_PROBE_CAP: usize = 25;
const HILL_TEST_DOWN_PROBE_CAP: usize = 12;
const HILL_TEST_CHUNK_BYTES: u64 = 1_000;
const HILL_TEST_BASE_LATENCY_MS: u64 = 100;
const HILL_TEST_REJECT_LATENCY_MS: u64 = 130;
const HILL_TEST_RETAINED_DOWN_LATENCY_MS: u64 = 75;
const HILL_TEST_ASYNC_LATENCY_MS: u64 = 10;
fn cfg_for_tests() -> LimiterConfig {
LimiterConfig {
enabled: true,
min_concurrency: 1,
max_concurrency: 64,
window_ops: 10,
min_window_ops: 5,
success_target: 0.9,
timeout_ceiling: 0.2,
latency_inflation_factor: 2.0,
latency_ewma_alpha: 0.5,
slow_start_ramp_threshold: 0,
latency_decrease_enabled: true,
}
}
fn hill_cfg_for_tests() -> LimiterConfig {
LimiterConfig {
window_ops: 4,
min_window_ops: 2,
max_concurrency: 64,
success_target: 0.9,
timeout_ceiling: 0.2,
..cfg_for_tests()
}
}
fn fetch_hill_for_tests(start: usize, cfg: LimiterConfig) -> Limiter {
Limiter::new_with_algorithm(start, cfg, LimiterAlgorithm::ThroughputHillClimb)
}
fn observe_hill_success_epoch_with_latency(
limiter: &Limiter,
cfg: &LimiterConfig,
bytes: u64,
latency: Duration,
) {
let samples = hill_epoch_target_samples(limiter.current(), cfg);
for _ in 0..samples {
limiter.observe_with_bytes(Outcome::Success, latency, bytes);
}
}
fn observe_hill_success_epoch(limiter: &Limiter, cfg: &LimiterConfig, bytes: u64) {
observe_hill_success_epoch_with_latency(
limiter,
cfg,
bytes,
Duration::from_millis(HILL_TEST_BASE_LATENCY_MS),
);
}
fn adaptive_cfg_for_tests() -> AdaptiveConfig {
let l = cfg_for_tests();
AdaptiveConfig {
enabled: l.enabled,
min_concurrency: l.min_concurrency,
max: ChannelMax {
quote: l.max_concurrency,
store: l.max_concurrency,
fetch: l.max_concurrency,
},
window_ops: l.window_ops,
min_window_ops: l.min_window_ops,
success_target: l.success_target,
timeout_ceiling: l.timeout_ceiling,
latency_inflation_factor: l.latency_inflation_factor,
latency_ewma_alpha: l.latency_ewma_alpha,
}
}
#[test]
fn warm_start_keeps_slow_start_armed_below_protected_threshold() {
let cfg = LimiterConfig {
max_concurrency: 256,
slow_start_ramp_threshold: 256,
latency_decrease_enabled: false,
..cfg_for_tests()
};
let l = Limiter::new(64, cfg.clone());
l.warm_start(20);
assert_eq!(l.current(), 20);
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(10));
}
assert_eq!(
l.current(),
40,
"protected channel must double after warm_start, not crawl +1",
);
let default_cfg = LimiterConfig {
max_concurrency: 256,
..cfg_for_tests()
};
let d = Limiter::new(64, default_cfg.clone());
d.warm_start(20);
for _ in 0..default_cfg.window_ops {
d.observe(Outcome::Success, Duration::from_millis(10));
}
assert_eq!(
d.current(),
21,
"default channel must stay additive after warm_start",
);
}
#[test]
fn slow_start_stays_armed_at_ceiling_with_max_threshold() {
let base = LimiterConfig {
max_concurrency: 256,
latency_decrease_enabled: false,
..cfg_for_tests()
};
let fixed = Limiter::new(
256,
LimiterConfig {
slow_start_ramp_threshold: usize::MAX,
..base.clone()
},
);
let buggy = Limiter::new(
256,
LimiterConfig {
slow_start_ramp_threshold: 256,
..base.clone()
},
);
for l in [&fixed, &buggy] {
for _ in 0..base.window_ops {
l.observe(Outcome::Timeout, Duration::from_millis(10));
}
for _ in 0..(base.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(10));
}
}
assert!(
fixed.current() > buggy.current(),
"MAX-threshold limiter ({}) must out-recover the ceiling-threshold one ({})",
fixed.current(),
buggy.current(),
);
}
#[test]
fn protected_slow_start_recovers_faster_than_additive() {
let base = LimiterConfig {
max_concurrency: 256,
latency_decrease_enabled: false,
..cfg_for_tests()
};
let protected = Limiter::new(
64,
LimiterConfig {
slow_start_ramp_threshold: 256,
..base.clone()
},
);
let unprotected = Limiter::new(
64,
LimiterConfig {
slow_start_ramp_threshold: 0,
..base.clone()
},
);
for l in [&protected, &unprotected] {
for _ in 0..base.window_ops {
l.observe(Outcome::Timeout, Duration::from_millis(10));
}
}
for l in [&protected, &unprotected] {
for _ in 0..(base.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(10));
}
}
assert!(
protected.current() > unprotected.current(),
"protected slow-start ({}) should recover faster than additive ({})",
protected.current(),
unprotected.current(),
);
}
#[test]
fn latency_decrease_disabled_ignores_p95_inflation() {
let cfg = LimiterConfig {
max_concurrency: 256,
slow_start_ramp_threshold: 256,
latency_decrease_enabled: false,
..cfg_for_tests()
};
let l = Limiter::new(16, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(5));
}
let after_baseline = l.current();
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(500));
}
assert!(
l.current() >= after_baseline,
"latency inflation must not shrink the cap when the check is disabled: {} < {}",
l.current(),
after_baseline,
);
}
#[test]
fn controller_sets_fetch_channel_download_tuning() {
let c = AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default());
assert!(
!c.fetch.config.latency_decrease_enabled,
"fetch latency-decrease must be disabled",
);
assert_eq!(
c.fetch.config.slow_start_ramp_threshold,
usize::MAX,
"fetch slow-start must never exit (armed at every cap incl. ceiling)",
);
assert!(
c.quote.config.latency_decrease_enabled,
"quote must keep the latency-decrease check",
);
assert_eq!(
c.quote.config.slow_start_ramp_threshold, 0,
"quote must keep classic AIMD slow-start exit",
);
assert!(
!c.store.config.latency_decrease_enabled,
"store latency-decrease must be disabled (verification variance is not congestion)",
);
assert_eq!(
c.store.config.slow_start_ramp_threshold,
usize::MAX,
"store slow-start must never exit so a transient Decrease re-doubles",
);
assert_eq!(
c.store.current(),
ChannelStart::default().store,
"store cold-start floor must remain unchanged at 8",
);
}
#[test]
fn store_channel_ramps_and_recovers_under_v2_468_tuning() {
let mut adaptive = adaptive_cfg_for_tests();
adaptive.max.store = 256;
let c = AdaptiveController::new(
ChannelStart {
quote: 8,
store: 8,
fetch: 8,
},
adaptive,
);
let store = &c.store;
let win = c.config().window_ops;
for _ in 0..win {
store.observe(Outcome::Success, Duration::from_millis(5));
}
let after_baseline = store.current();
assert!(after_baseline >= 8, "store should ramp on healthy windows");
for _ in 0..win {
store.observe(Outcome::Success, Duration::from_secs(30));
}
assert!(
store.current() >= after_baseline,
"verification-latency p95 must not shrink store cap: {} < {}",
store.current(),
after_baseline,
);
let before_stress = store.current();
for _ in 0..win {
store.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = store.current();
assert!(
after_stress < before_stress,
"timeout-rate breach must still cut the store cap: {after_stress} !< {before_stress}",
);
for _ in 0..(win * 8) {
store.observe(Outcome::Success, Duration::from_millis(5));
}
assert!(
store.current() >= before_stress,
"store must re-double back to {before_stress} after a transient Decrease, got {}",
store.current(),
);
}
#[test]
fn store_application_rejections_do_not_move_cap() {
let mut adaptive = adaptive_cfg_for_tests();
adaptive.max.store = 256;
let c = AdaptiveController::new(
ChannelStart {
quote: 8,
store: 8,
fetch: 8,
},
adaptive,
);
let store = &c.store;
let start = store.current();
for _ in 0..(c.config().window_ops * 5) {
store.observe(Outcome::ApplicationError, Duration::from_secs(30));
}
assert_eq!(
store.current(),
start,
"remote app-rejections must not move the store cap",
);
}
#[test]
fn cold_start_clamps_into_bounds() {
let cfg = cfg_for_tests();
let l = Limiter::new(1000, cfg.clone());
assert_eq!(l.current(), cfg.max_concurrency);
let l = Limiter::new(0, cfg.clone());
assert_eq!(l.current(), cfg.min_concurrency);
}
#[test]
fn slow_start_doubles_then_caps() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert_eq!(l.current(), 4);
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert_eq!(l.current(), 8);
}
#[test]
fn first_failure_exits_slow_start() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..6 {
l.observe(Outcome::Success, Duration::from_millis(50));
}
for _ in 0..4 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert!(
after_stress < 4,
"stress should reduce concurrency from 4, got {after_stress}",
);
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert!(
l.current() > after_stress,
"expected recovery above {after_stress}, got {}",
l.current(),
);
}
#[test]
fn floor_holds_at_one() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg);
for _ in 0..30 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 1);
}
#[test]
fn application_errors_do_not_punish() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops * 5 {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
assert_eq!(
l.current(),
4,
"ApplicationError must not move the cap; got {}",
l.current()
);
}
#[test]
fn latency_inflation_triggers_decrease() {
let cfg = LimiterConfig {
window_ops: 20,
min_window_ops: 5,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
let after_baseline = l.current();
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(500));
}
assert!(
l.current() < after_baseline,
"expected decrease from {after_baseline}, got {}",
l.current(),
);
}
#[test]
fn warm_start_overrides_current() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg);
l.warm_start(20);
assert_eq!(l.current(), 20);
}
#[test]
fn warm_start_clamps() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
l.warm_start(1_000_000);
assert_eq!(l.current(), cfg.max_concurrency);
}
#[test]
fn disabled_controller_holds_steady() {
let cfg = LimiterConfig {
enabled: false,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg);
for _ in 0..50 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 8);
}
#[test]
fn controller_snapshot_round_trips() {
let c = AdaptiveController::new(
ChannelStart {
quote: 64,
store: 16,
fetch: 64,
},
adaptive_cfg_for_tests(),
);
let snap = c.snapshot();
assert_eq!(snap.quote, 64);
assert_eq!(snap.store, 16);
assert_eq!(snap.fetch, 64);
let c2 = AdaptiveController::default();
c2.warm_start(snap);
assert_eq!(c2.quote.current(), 64);
assert_eq!(c2.store.current(), 16);
assert_eq!(c2.fetch.current(), 64);
}
#[tokio::test]
async fn observe_op_records_success() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
let _: Result<(), &str> =
observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
}
assert_eq!(l.current(), 8);
}
#[test]
fn snapshot_round_trips_through_disk() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let snap = ChannelStart {
quote: 24,
store: 6,
fetch: 12,
};
save_snapshot(&path, snap);
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 24);
assert_eq!(loaded.store, 6);
assert_eq!(loaded.fetch, 12);
}
#[test]
fn load_missing_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("does_not_exist.json");
assert!(load_snapshot(&path).is_none());
}
#[test]
fn load_corrupt_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bad.json");
std::fs::write(&path, b"not valid json{{{").unwrap();
assert!(load_snapshot(&path).is_none());
}
#[test]
fn load_wrong_schema_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("future.json");
let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
std::fs::write(&path, payload).unwrap();
assert!(load_snapshot(&path).is_none());
}
#[test]
fn load_schema_one_preserves_quote_store_and_resets_fetch() {
const LEGACY_QUOTE_CAP: usize = 48;
const LEGACY_STORE_CAP: usize = 24;
const LEGACY_FETCH_CAP: usize = 96;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("legacy.json");
let payload = format!(
r#"{{"schema":{},"channels":{{"quote":{},"store":{},"fetch":{}}}}}"#,
PERSIST_SCHEMA_AIMD_FETCH, LEGACY_QUOTE_CAP, LEGACY_STORE_CAP, LEGACY_FETCH_CAP,
);
std::fs::write(&path, payload).unwrap();
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, LEGACY_QUOTE_CAP);
assert_eq!(loaded.store, LEGACY_STORE_CAP);
assert_eq!(loaded.fetch, FETCH_COLD_START_CONCURRENCY);
}
#[tokio::test]
async fn observe_op_records_classified_error() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
let _: Result<(), &str> =
observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
}
assert!(l.current() < 4);
}
#[test]
fn no_regression_cold_start_at_least_static_defaults() {
let s = ChannelStart::default();
assert!(
s.quote >= 32,
"quote cold-start regressed: got {}, prior static was 32",
s.quote,
);
assert!(
s.store >= 8,
"store cold-start regressed: got {}, prior static was 8",
s.store,
);
assert_eq!(
s.fetch, FETCH_COLD_START_CONCURRENCY,
"fetch cold-start changed unexpectedly: got {}, expected {}",
s.fetch, FETCH_COLD_START_CONCURRENCY,
);
}
#[test]
fn controller_default_config_is_sane() {
let c = AdaptiveController::default();
let starts = ChannelStart::default();
assert_eq!(c.quote.current(), starts.quote);
assert_eq!(c.store.current(), starts.store);
assert_eq!(c.fetch.current(), starts.fetch);
assert_eq!(lock(&c.quote.inner).window.len(), 0);
assert_eq!(lock(&c.store.inner).window.len(), 0);
assert_eq!(lock(&c.fetch.inner).window.len(), 0);
}
#[test]
fn alternating_success_failure_collapses_to_floor() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
let mut min_observed = usize::MAX;
let mut max_observed = 0usize;
let mut floor_visits = 0usize;
for i in 0..1000 {
let outcome = if i % 2 == 0 {
Outcome::Success
} else {
Outcome::Timeout
};
l.observe(outcome, Duration::from_millis(50));
let cur = l.current();
assert!(
cur >= cfg.min_concurrency,
"cap underflowed floor at iter {i}: got {cur}",
);
min_observed = min_observed.min(cur);
max_observed = max_observed.max(cur);
if cur == cfg.min_concurrency {
floor_visits += 1;
}
}
assert_eq!(
min_observed, cfg.min_concurrency,
"cap never reached the floor under 50% timeout rate"
);
assert!(
max_observed >= 8,
"cap never visited the start value: max_observed={max_observed}"
);
assert!(
floor_visits > 500,
"cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
);
assert_eq!(
l.current(),
cfg.min_concurrency,
"controller did not settle at floor after 1000 alternations"
);
}
#[test]
fn pure_success_stream_recovers_to_max() {
let cfg = cfg_for_tests();
let l = Limiter::new(cfg.min_concurrency, cfg.clone());
for _ in 0..10_000 {
l.observe(Outcome::Success, Duration::from_millis(5));
}
assert_eq!(
l.current(),
cfg.max_concurrency,
"expected recovery to max ({}), got {}",
cfg.max_concurrency,
l.current(),
);
}
#[test]
fn stress_then_heal_drives_floor_then_recovery() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..100 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert_eq!(
after_stress, cfg.min_concurrency,
"stress should drive cap to floor, got {after_stress}",
);
for _ in 0..1_000 {
l.observe(Outcome::Success, Duration::from_millis(10));
}
let after_heal = l.current();
assert!(
after_heal >= cfg.min_concurrency.saturating_add(4),
"expected substantial recovery from floor, got {after_heal}",
);
}
#[test]
fn baseline_does_not_grow_unbounded_under_slow_links() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
for _ in 0..(cfg.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(500));
}
let baseline = lock(&l.inner).latency_baseline;
let base = baseline.expect("baseline should be set after many healthy windows");
assert!(
base > Duration::ZERO,
"baseline must not stay at ZERO, got {base:?}",
);
let lo = Duration::from_millis(250);
let hi = Duration::from_millis(1000);
assert!(
base >= lo && base <= hi,
"baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
);
}
#[test]
fn baseline_initialized_only_after_first_healthy_window() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..50 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert!(
lock(&l.inner).latency_baseline.is_none(),
"baseline must be None before any healthy window",
);
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::Success, Duration::from_millis(20));
}
let baseline = lock(&l.inner).latency_baseline;
assert!(
baseline.is_some(),
"baseline must be Some after healthy windows",
);
let base = baseline.unwrap_or_default();
assert!(
base > Duration::ZERO,
"baseline must reflect real latency, got {base:?}",
);
}
#[test]
fn min_concurrency_floor_holds_under_torrent_of_errors() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for i in 0..50_000 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
if i == 100 || i == 1_000 || i == 49_999 {
let cur = l.current();
assert_eq!(
cur, cfg.min_concurrency,
"floor breached at iter {i}: got {cur}",
);
}
}
}
#[test]
fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
let cfg = cfg_for_tests();
let start = cfg
.max_concurrency
.saturating_sub(1)
.max(cfg.min_concurrency);
let l = Limiter::new(start, cfg.clone());
for i in 0..50_000 {
l.observe(Outcome::Success, Duration::from_millis(5));
if i == 100 || i == 1_000 || i == 49_999 {
let cur = l.current();
assert!(
cur <= cfg.max_concurrency,
"ceiling breached at iter {i}: got {cur} > {}",
cfg.max_concurrency,
);
}
}
assert_eq!(l.current(), cfg.max_concurrency);
}
#[test]
fn saturating_arithmetic_handles_extreme_config() {
let cfg = LimiterConfig {
max_concurrency: usize::MAX / 2,
..cfg_for_tests()
};
let start = usize::MAX / 4;
let l = Limiter::new(start, cfg.clone());
for _ in 0..(cfg.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(1));
}
assert_eq!(
l.current(),
cfg.max_concurrency,
"saturating math survived but cap did not grow to ceiling"
);
}
#[test]
fn window_eviction_is_fifo() {
let cfg = LimiterConfig {
window_ops: 10,
min_window_ops: 5,
success_target: 0.9,
timeout_ceiling: 0.1,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert!(
after_stress < 8,
"expected cap to drop from 8 after pure-timeout window, got {after_stress}"
);
for _ in 0..(cfg.window_ops * 3) {
l.observe(Outcome::Success, Duration::from_millis(20));
}
let after_recovery = l.current();
assert!(
after_recovery > after_stress,
"FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
);
}
#[test]
fn disabled_controller_returns_initial_value_invariantly() {
let cfg = LimiterConfig {
enabled: false,
..cfg_for_tests()
};
let initial = 8;
let l = Limiter::new(initial, cfg);
for i in 0..1_000 {
let outcome = match i % 4 {
0 => Outcome::Success,
1 => Outcome::Timeout,
2 => Outcome::NetworkError,
_ => Outcome::ApplicationError,
};
l.observe(outcome, Duration::from_millis(50));
assert_eq!(
l.current(),
initial,
"disabled controller moved at iter {i}",
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_observations_do_not_corrupt_window() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
let mut handles = Vec::with_capacity(100);
for _ in 0..100 {
let l_clone = l.clone();
handles.push(tokio::spawn(async move {
for _ in 0..100 {
l_clone.observe(Outcome::Success, Duration::from_millis(5));
}
}));
}
for h in handles {
h.await.unwrap();
}
let cur = l.current();
assert!(
cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
"cap out of bounds after concurrent observations: {cur}",
);
}
#[test]
fn persisted_snapshot_warm_starts_above_cold_floor() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let saved = ChannelStart {
quote: 64,
store: 32,
fetch: 128,
};
save_snapshot(&path, saved);
let loaded = load_snapshot(&path).unwrap();
let low = ChannelStart {
quote: 2,
store: 2,
fetch: 2,
};
let c = AdaptiveController::new(low, AdaptiveConfig::default());
c.warm_start(loaded);
assert_eq!(c.quote.current(), 64);
assert_eq!(c.store.current(), 32);
assert_eq!(c.fetch.current(), 128);
}
#[test]
fn save_load_round_trip_with_concurrent_writes() {
use std::thread;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let path_a = path.clone();
let path_b = path.clone();
let snap_a = ChannelStart {
quote: 10,
store: 10,
fetch: 10,
};
let snap_b = ChannelStart {
quote: 99,
store: 99,
fetch: 99,
};
let h_a = thread::spawn(move || {
for _ in 0..50 {
save_snapshot(&path_a, snap_a);
}
});
let h_b = thread::spawn(move || {
for _ in 0..50 {
save_snapshot(&path_b, snap_b);
}
});
h_a.join().unwrap();
h_b.join().unwrap();
let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
let valid = (loaded.quote == snap_a.quote
&& loaded.store == snap_a.store
&& loaded.fetch == snap_a.fetch)
|| (loaded.quote == snap_b.quote
&& loaded.store == snap_b.store
&& loaded.fetch == snap_b.fetch);
assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
}
#[test]
fn save_snapshot_to_unwritable_dir_does_not_panic() {
let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
save_snapshot(&path, snap);
assert!(!path.exists());
}
#[test]
fn load_snapshot_from_truncated_file_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("truncated.json");
std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
assert!(load_snapshot(&path).is_none());
}
#[test]
fn controller_perf_overhead_is_bounded() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg);
let started = Instant::now();
for _ in 0..100_000 {
let _ = l.current();
l.observe(Outcome::Success, Duration::from_micros(1));
}
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"100k observe+current pairs took {elapsed:?}, expected <500ms",
);
}
#[test]
fn nan_and_out_of_range_config_does_not_panic() {
let cfg = AdaptiveConfig {
enabled: true,
min_concurrency: 0, max: ChannelMax {
quote: 0, store: 0,
fetch: 0,
},
window_ops: 10,
min_window_ops: 50, success_target: f64::NAN,
timeout_ceiling: f64::INFINITY,
latency_inflation_factor: f64::NEG_INFINITY,
latency_ewma_alpha: f64::NAN,
};
let c = AdaptiveController::new(ChannelStart::default(), cfg);
let post = &c.config;
assert_eq!(
post.min_concurrency, 1,
"sanitize did not raise min_concurrency from 0"
);
assert!(
post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
"sanitize did not clamp success_target from NaN: {}",
post.success_target
);
assert!(
post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
"sanitize did not clamp timeout_ceiling from Inf: {}",
post.timeout_ceiling
);
assert!(
post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
"sanitize did not fix latency_inflation_factor from -Inf: {}",
post.latency_inflation_factor
);
assert!(
post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
"sanitize did not fix latency_ewma_alpha from NaN: {}",
post.latency_ewma_alpha
);
assert!(
post.min_window_ops <= post.window_ops,
"sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
post.min_window_ops,
post.window_ops
);
assert!(
post.max.quote >= post.min_concurrency,
"max.quote below min_concurrency"
);
for _ in 0..200 {
c.store
.observe(Outcome::Success, Duration::from_secs(99_999));
c.store.observe(Outcome::Timeout, Duration::ZERO);
}
let cur = c.store.current();
assert!(cur >= 1, "cap below floor: {cur}");
}
#[test]
fn transient_burst_does_not_pile_drive_to_floor() {
let cfg = LimiterConfig {
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
..cfg_for_tests()
};
let l = Limiter::new(32, cfg);
for _ in 0..8 {
l.observe(Outcome::Timeout, Duration::from_millis(10));
}
let after_burst = l.current();
assert!(
after_burst >= 16,
"transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
);
}
#[tokio::test]
async fn transport_errors_classify_as_capacity_signal() {
use crate::data::client::classify_error;
use crate::data::error::Error;
let make_cfg = || LimiterConfig {
window_ops: 16,
min_window_ops: 5,
success_target: 0.5,
timeout_ceiling: 0.5,
..cfg_for_tests()
};
type ErrFactory = Box<dyn Fn() -> Error>;
let cases: Vec<(&str, ErrFactory)> = vec![
("Network", Box::new(|| Error::Network("net".to_string()))),
(
"InsufficientPeers",
Box::new(|| Error::InsufficientPeers("ip".to_string())),
),
("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
("Storage", Box::new(|| Error::Storage("s".to_string()))),
(
"PartialUpload",
Box::new(|| Error::PartialUpload {
stored: vec![],
stored_count: 0,
failed: vec![],
failed_count: 0,
total_chunks: 0,
spend: Box::new(crate::data::error::PartialUploadSpend {
storage_cost_atto: "0".to_string(),
gas_cost_wei: 0,
}),
reason: "r".to_string(),
}),
),
];
for (name, mk) in &cases {
let l = Limiter::new(8, make_cfg());
for _ in 0..16 {
let _: std::result::Result<(), Error> =
observe_op(&l, || async { Err(mk()) }, classify_error).await;
}
let cur = l.current();
assert!(
cur < 8,
"{name} not classified as capacity signal: cap stayed at {cur}",
);
}
}
#[test]
fn per_channel_ceilings_are_independent() {
let cfg = AdaptiveConfig {
max: ChannelMax {
quote: 4, store: 8, fetch: 1024, },
..AdaptiveConfig::default()
};
let c = AdaptiveController::new(
ChannelStart {
quote: 4,
store: 8,
fetch: 64,
},
cfg,
);
for _ in 0..1000 {
c.quote.observe(Outcome::Success, Duration::from_micros(10));
c.store.observe(Outcome::Success, Duration::from_micros(10));
c.fetch.observe(Outcome::Success, Duration::from_micros(10));
}
assert_eq!(c.quote.current(), 4, "quote should cap at 4");
assert_eq!(c.store.current(), 8, "store should cap at 8");
assert!(
c.fetch.current() > 8 && c.fetch.current() <= 1024,
"fetch did not use its independent ceiling; got {}",
c.fetch.current()
);
}
#[test]
fn fetch_hill_rejects_upward_probe_without_goodput_gain() {
let cfg = hill_cfg_for_tests();
let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
assert_eq!(
l.current(),
HILL_TEST_UP_PROBE_CAP,
"first healthy epoch should probe upward"
);
observe_hill_success_epoch_with_latency(
&l,
&cfg,
HILL_TEST_CHUNK_BYTES,
Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
);
assert_eq!(
l.current(),
HILL_TEST_START_CAP,
"slower higher-cap wave should reject the upward probe"
);
assert_eq!(l.snapshot(), HILL_TEST_START_CAP);
}
#[test]
fn fetch_hill_accepts_upward_probe_with_goodput_gain() {
let cfg = hill_cfg_for_tests();
let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
assert_eq!(l.current(), HILL_TEST_UP_PROBE_CAP);
observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
assert_eq!(
l.snapshot(),
HILL_TEST_UP_PROBE_CAP,
"same-size chunks at same latency should promote the higher cap"
);
assert_eq!(
l.current(),
HILL_TEST_NEXT_UP_PROBE_CAP,
"after accepting an upward probe, hill climber should probe higher"
);
}
#[test]
fn fetch_hill_accepts_lower_probe_when_goodput_is_retained() {
let cfg = hill_cfg_for_tests();
let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
observe_hill_success_epoch_with_latency(
&l,
&cfg,
HILL_TEST_CHUNK_BYTES,
Duration::from_millis(HILL_TEST_REJECT_LATENCY_MS),
);
assert_eq!(l.current(), HILL_TEST_START_CAP);
for _ in 0..(HILL_REJECT_COOLDOWN_EPOCHS + HILL_STABLE_PROBE_EPOCHS) {
observe_hill_success_epoch(&l, &cfg, HILL_TEST_CHUNK_BYTES);
}
assert_eq!(
l.current(),
HILL_TEST_DOWN_PROBE_CAP,
"stable best should eventually probe a lower cap"
);
observe_hill_success_epoch_with_latency(
&l,
&cfg,
HILL_TEST_CHUNK_BYTES,
Duration::from_millis(HILL_TEST_RETAINED_DOWN_LATENCY_MS),
);
assert_eq!(
l.snapshot(),
HILL_TEST_DOWN_PROBE_CAP,
"retained goodput at lower concurrency should become the new best"
);
}
#[tokio::test]
async fn fetch_hill_records_constant_size_timed_ops_without_stress() {
let cfg = hill_cfg_for_tests();
let l = fetch_hill_for_tests(HILL_TEST_START_CAP, cfg.clone());
let total_ops = hill_epoch_target_samples(HILL_TEST_START_CAP, &cfg)
+ hill_epoch_target_samples(HILL_TEST_UP_PROBE_CAP, &cfg);
let limiter_for_ops = l.clone();
let result: std::result::Result<Vec<()>, ()> =
rebucketed_unordered(&l, 0..total_ops, move |_| {
let limiter = limiter_for_ops.clone();
async move {
observe_op_with_success_bytes(
&limiter,
|| async {
tokio::time::sleep(Duration::from_millis(HILL_TEST_ASYNC_LATENCY_MS))
.await;
Ok::<(), ()>(())
},
|_| Outcome::NetworkError,
|_| HILL_TEST_CHUNK_BYTES,
)
.await
}
})
.await;
result.unwrap();
let snapshot = l.snapshot();
assert!(
matches!(snapshot, HILL_TEST_START_CAP | HILL_TEST_UP_PROBE_CAP),
"timed successes should finish at the existing or accepted best cap, got {snapshot}"
);
let current = l.current();
assert!(
matches!(current, HILL_TEST_START_CAP | HILL_TEST_NEXT_UP_PROBE_CAP),
"timed successes should leave the controller unstressed, got {current}"
);
}
#[test]
fn fetch_hill_stress_cuts_before_full_epoch() {
let cfg = LimiterConfig {
window_ops: 8,
min_window_ops: 4,
..hill_cfg_for_tests()
};
let l = fetch_hill_for_tests(16, cfg.clone());
for _ in 0..cfg.min_window_ops {
l.observe(Outcome::Timeout, Duration::from_millis(10));
}
assert_eq!(
l.current(),
8,
"fetch hill climber should halve on early stress"
);
}
#[test]
fn cold_start_at_least_prior_static_defaults() {
let cs = ChannelStart::default();
assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
assert_eq!(
cs.fetch, FETCH_COLD_START_CONCURRENCY,
"fetch cold-start changed unexpectedly"
);
}
#[test]
fn sustained_stress_reaches_floor_within_bounded_ops() {
let cfg = LimiterConfig {
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
max_concurrency: 64,
..cfg_for_tests()
};
let l = Limiter::new(64, cfg);
let mut ops = 0usize;
while l.current() > 1 && ops < 200 {
l.observe(Outcome::Timeout, Duration::from_millis(10));
ops += 1;
}
assert_eq!(
l.current(),
1,
"controller did not reach floor within 200 observations under \
sustained timeout stress; took {ops} ops, ended at cap {}",
l.current()
);
}
#[test]
fn default_controller_has_growth_headroom() {
let c = AdaptiveController::default();
let cs = ChannelStart::default();
let max = ChannelMax::default();
assert_eq!(c.quote.current(), cs.quote);
assert_eq!(c.store.current(), cs.store);
assert_eq!(c.fetch.current(), cs.fetch);
assert!(
max.quote > cs.quote,
"no growth headroom for quote: max={} start={}",
max.quote,
cs.quote
);
assert!(
max.store > cs.store,
"no growth headroom for store: max={} start={}",
max.store,
cs.store
);
assert!(
max.fetch > cs.fetch,
"no growth headroom for fetch: max={} start={}",
max.fetch,
cs.fetch
);
}
#[test]
fn warm_start_floors_at_cold_defaults() {
let c = AdaptiveController::default();
let cold = ChannelStart::default();
let bad_snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
c.warm_start(bad_snap);
assert_eq!(
c.quote.current(),
cold.quote,
"quote warm_start did not floor at cold default"
);
assert_eq!(
c.store.current(),
cold.store,
"store warm_start did not floor at cold default"
);
assert_eq!(
c.fetch.current(),
cold.fetch,
"fetch warm_start did not floor at cold default"
);
}
#[test]
fn warm_start_honors_values_above_cold_floor() {
let c = AdaptiveController::default();
let cold = ChannelStart::default();
let snap = ChannelStart {
quote: cold.quote * 2,
store: cold.store * 4,
fetch: cold.fetch * 2,
};
c.warm_start(snap);
assert_eq!(c.quote.current(), snap.quote);
assert_eq!(c.store.current(), snap.store);
assert_eq!(c.fetch.current(), snap.fetch);
}
#[tokio::test]
async fn rebucketed_picks_up_cap_changes_mid_stream() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 32,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let max_seen = StdArc::new(AtomicUsize::new(0));
let in_flight = StdArc::new(AtomicUsize::new(0));
let processed = StdArc::new(AtomicUsize::new(0));
let l_for_bump = l.clone();
let processed_for_bump = processed.clone();
let bump_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
l_for_bump.warm_start(16);
return;
}
}
});
let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
let max_seen = max_seen.clone();
let in_flight = in_flight.clone();
let processed = processed.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
tokio::time::sleep(Duration::from_millis(1)).await;
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
processed.fetch_add(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
bump_handle.await.unwrap();
let peak = max_seen.load(AtomicOrdering::Relaxed);
assert!(
peak > 4,
"rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
);
}
#[tokio::test]
async fn observe_op_cancellation_drops_silently() {
let cfg = LimiterConfig {
window_ops: 16,
min_window_ops: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let l_clone = l.clone();
let fut = observe_op(
&l_clone,
|| async {
std::future::pending::<()>().await;
Ok::<(), &'static str>(())
},
|_| Outcome::Timeout,
);
drop(fut);
assert_eq!(l.current(), 4, "cancelled op moved the cap");
for _ in 0..16 {
let _: Result<(), &'static str> = observe_op(
&l,
|| async { Ok(()) },
|_| Outcome::NetworkError,
)
.await;
}
assert!(
l.current() > 4,
"cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
l.current(),
);
}
#[test]
fn save_snapshot_is_synchronous_and_durable() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let snap = ChannelStart {
quote: 100,
store: 50,
fetch: 200,
};
save_snapshot(&path, snap);
assert!(
path.exists(),
"save_snapshot did not write file synchronously"
);
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 100);
assert_eq!(loaded.store, 50);
assert_eq!(loaded.fetch, 200);
}
#[tokio::test]
async fn warm_start_disables_slow_start_doubling() {
let cfg = LimiterConfig {
window_ops: 8,
min_window_ops: 4,
success_target: 0.9,
..cfg_for_tests()
};
let l = Limiter::new(2, cfg.clone());
l.warm_start(16);
assert_eq!(l.current(), 16);
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(10));
}
assert_eq!(
l.current(),
17,
"warm-start triggered slow-start doubling instead of additive +1"
);
}
#[test]
fn controller_warm_start_floors_at_per_instance_cold_start() {
let custom_cold = ChannelStart {
quote: 2,
store: 1,
fetch: 4,
};
let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
c.warm_start(ChannelStart {
quote: 1,
store: 1,
fetch: 1,
});
assert_eq!(c.quote.current(), 2);
assert_eq!(c.store.current(), 1);
assert_eq!(c.fetch.current(), 4);
c.warm_start(ChannelStart {
quote: 10,
store: 10,
fetch: 10,
});
assert_eq!(c.quote.current(), 10);
assert_eq!(c.store.current(), 10);
assert_eq!(c.fetch.current(), 10);
}
#[test]
fn warm_start_is_noop_when_adaptive_disabled() {
let cfg = AdaptiveConfig {
enabled: false,
..AdaptiveConfig::default()
};
let custom_cold = ChannelStart {
quote: 5,
store: 5,
fetch: 5,
};
let c = AdaptiveController::new(custom_cold, cfg);
c.warm_start(ChannelStart {
quote: 100,
store: 100,
fetch: 100,
});
assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
}
#[tokio::test]
async fn rebucketed_unordered_is_rolling_not_fenced() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 8,
window_ops: 100,
min_window_ops: 50,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let in_flight = StdArc::new(AtomicUsize::new(0));
let max_in_flight = StdArc::new(AtomicUsize::new(0));
let started = StdArc::new(AtomicUsize::new(0));
let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
let in_flight = in_flight.clone();
let max_in_flight = max_in_flight.clone();
let started = started.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
started.fetch_add(1, AtomicOrdering::Relaxed);
if i == 0 {
tokio::time::sleep(Duration::from_millis(50)).await;
} else {
tokio::time::sleep(Duration::from_millis(1)).await;
}
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
let peak = max_in_flight.load(AtomicOrdering::Relaxed);
assert!(
peak >= 4,
"rolling scheduler did not fill cap; peak in-flight = {peak}"
);
}
#[tokio::test]
async fn rebucketed_ordered_preserves_input_order() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let items: Vec<usize> = (0..50).collect();
let result: Vec<usize> = rebucketed_ordered(
&l,
items.iter().copied().enumerate(),
|(idx, v)| async move {
let delay = (50 - v) as u64;
tokio::time::sleep(Duration::from_micros(delay)).await;
Ok::<_, &'static str>((idx, v * 10))
},
)
.await
.unwrap();
assert_eq!(result.len(), 50);
for (i, v) in result.iter().enumerate() {
assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
}
}
#[tokio::test]
async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 8,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg);
let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
Ok::<_, &'static str>((idx, hash * 7))
})
.await
.unwrap();
for (i, v) in result.iter().enumerate() {
let expected = (1000 + i as u64) * 7;
assert_eq!(
*v, expected,
"idx {i} paired with wrong content: {v}, expected {expected}"
);
}
}
#[test]
fn save_snapshot_temp_file_is_unique_per_call() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
for i in 0..100 {
save_snapshot(
&path,
ChannelStart {
quote: i + 1,
store: i + 1,
fetch: i + 1,
},
);
}
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 100);
assert_eq!(loaded.store, 100);
assert_eq!(loaded.fetch, 100);
let leftover: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
.collect();
assert!(
leftover.is_empty(),
"temp files leaked: {:?}",
leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn rebucketed_empty_input_returns_empty() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg);
let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
Ok::<_, &'static str>(42usize)
})
.await
.unwrap();
assert!(v.is_empty());
let v: Vec<usize> = rebucketed_ordered(
&l,
std::iter::empty::<(usize, ())>(),
|(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
)
.await
.unwrap();
assert!(v.is_empty());
}
#[tokio::test]
async fn rebucketed_exactly_cap_items() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let v: Vec<usize> =
rebucketed_unordered(
&l,
0..4usize,
|i| async move { Ok::<_, &'static str>(i * 2) },
)
.await
.unwrap();
assert_eq!(v.len(), 4);
}
#[tokio::test]
async fn rebucketed_preserves_first_error() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let started = StdArc::new(AtomicUsize::new(0));
let started_clone = started.clone();
let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
let started = started_clone.clone();
async move {
started.fetch_add(1, AtomicOrdering::Relaxed);
if i == 5 {
tokio::time::sleep(Duration::from_micros(100)).await;
return Err("first error");
}
if i == 10 {
return Err("second error - should be ignored");
}
tokio::time::sleep(Duration::from_micros(50)).await;
Ok(())
}
})
.await;
match result {
Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
Ok(_) => panic!("expected error, got ok"),
}
let total = started.load(AtomicOrdering::Relaxed);
assert!(
(5..20).contains(&total),
"started count out of range: {total}"
);
}
#[test]
fn limiter_with_min_equal_max_is_pinned() {
let cfg = LimiterConfig {
min_concurrency: 5,
max_concurrency: 5,
..cfg_for_tests()
};
let l = Limiter::new(5, cfg);
for _ in 0..1000 {
l.observe(Outcome::Success, Duration::from_millis(1));
}
assert_eq!(l.current(), 5, "cap moved despite min==max");
for _ in 0..1000 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 5, "cap moved despite min==max");
}
#[test]
fn ewma_alpha_zero_returns_prev() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 0.0);
assert_eq!(result, prev, "alpha=0 must return prev unchanged");
}
#[test]
fn ewma_alpha_one_returns_sample() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 1.0);
let diff = result.abs_diff(sample);
assert!(
diff <= Duration::from_millis(1),
"alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
);
}
#[test]
fn ewma_alpha_half_returns_midpoint() {
let prev = Duration::from_millis(200);
let sample = Duration::from_millis(400);
let result = ewma(prev, sample, 0.5);
let expected = Duration::from_millis(300);
let diff = result.abs_diff(expected);
assert!(
diff <= Duration::from_millis(1),
"alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
);
}
#[test]
fn ewma_nan_alpha_returns_prev() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, f64::NAN);
assert_eq!(result, prev);
let result = ewma(prev, sample, f64::INFINITY);
assert_eq!(result, prev);
let result = ewma(prev, sample, f64::NEG_INFINITY);
assert_eq!(result, prev);
}
#[test]
fn ewma_clamps_alpha_above_one() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 2.5);
assert!(result >= Duration::from_millis(499));
assert!(result <= Duration::from_millis(501));
}
#[test]
fn window_full_of_application_errors_does_not_move_cap() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
assert_eq!(
l.current(),
8,
"cap moved on pure-app-error window; should hold"
);
}
#[test]
fn disabled_adaptive_controller_truly_inert() {
let cfg = AdaptiveConfig {
enabled: false,
..AdaptiveConfig::default()
};
let c = AdaptiveController::new(ChannelStart::default(), cfg);
let baseline_quote = c.quote.current();
let baseline_store = c.store.current();
let baseline_fetch = c.fetch.current();
for _ in 0..10000 {
c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
c.store.observe(Outcome::Timeout, Duration::from_millis(1));
c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
}
assert_eq!(c.quote.current(), baseline_quote);
assert_eq!(c.store.current(), baseline_store);
assert_eq!(c.fetch.current(), baseline_fetch);
}
#[test]
fn channel_state_is_independent() {
let c = AdaptiveController::default();
let q0 = c.quote.current();
let f0 = c.fetch.current();
let s0 = c.store.current();
for _ in 0..1000 {
c.store.observe(Outcome::Timeout, Duration::from_millis(1));
}
assert_eq!(
c.store.current(),
c.config.min_concurrency,
"store did not reach floor after 1000 timeouts; cap={}",
c.store.current()
);
assert!(c.store.current() < s0, "store cap did not move at all");
assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
}
#[test]
fn sanitize_corrects_pathological_floats() {
let mut cfg = AdaptiveConfig {
success_target: f64::NAN,
timeout_ceiling: 5.0,
latency_inflation_factor: f64::NEG_INFINITY,
latency_ewma_alpha: 2.5,
window_ops: 4,
min_window_ops: 10,
..AdaptiveConfig::default()
};
cfg.sanitize();
assert!(cfg.success_target.is_finite());
assert!((0.0..=1.0).contains(&cfg.success_target));
assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
assert!(cfg.latency_inflation_factor.is_finite());
assert!(cfg.latency_inflation_factor > 0.0);
assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
assert!(
cfg.min_window_ops <= cfg.window_ops,
"min_window_ops {} > window_ops {}",
cfg.min_window_ops,
cfg.window_ops
);
}
#[test]
fn channel_max_serde_round_trips() {
let m = ChannelMax {
quote: 7,
store: 13,
fetch: 200,
};
let json = serde_json::to_string(&m).unwrap();
let back: ChannelMax = serde_json::from_str(&json).unwrap();
assert_eq!(back.quote, 7);
assert_eq!(back.store, 13);
assert_eq!(back.fetch, 200);
}
#[test]
fn channel_start_serde_round_trips() {
let s = ChannelStart {
quote: 11,
store: 22,
fetch: 33,
};
let json = serde_json::to_string(&s).unwrap();
let back: ChannelStart = serde_json::from_str(&json).unwrap();
assert_eq!(back.quote, 11);
assert_eq!(back.store, 22);
assert_eq!(back.fetch, 33);
}
#[tokio::test]
async fn rebucketed_honors_cap_shrinkage_mid_stream() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 16,
..cfg_for_tests()
};
let l = Limiter::new(16, cfg);
let in_flight = StdArc::new(AtomicUsize::new(0));
let max_after_shrink = StdArc::new(AtomicUsize::new(0));
let processed = StdArc::new(AtomicUsize::new(0));
let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
let l_for_shrink = l.clone();
let p_for_shrink = processed.clone();
let shrunk_for_shrink = shrunk.clone();
let shrink_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
l_for_shrink.warm_start(2);
shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
return;
}
}
});
let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
let in_flight = in_flight.clone();
let max_after_shrink = max_after_shrink.clone();
let processed = processed.clone();
let shrunk = shrunk.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
if shrunk.load(AtomicOrdering::Relaxed) {
max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
}
tokio::time::sleep(Duration::from_millis(1)).await;
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
processed.fetch_add(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
shrink_handle.await.unwrap();
let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
assert!(
peak <= 4,
"rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
);
}
#[test]
fn mixed_window_app_errors_with_capacity_signal() {
let cfg = LimiterConfig {
window_ops: 10,
min_window_ops: 5,
timeout_ceiling: 0.2,
success_target: 0.9,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg.clone());
for _ in 0..5 {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
for _ in 0..5 {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert!(
l.current() >= 8,
"AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
l.current()
);
let l2 = Limiter::new(8, cfg);
for _ in 0..5 {
l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
for _ in 0..5 {
l2.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert!(
l2.current() < 8,
"all-timeouts (with AppError padding) did not decrease cap; got {}",
l2.current()
);
}
#[test]
fn concurrent_save_load_no_torn_reads() {
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("snap.json");
save_snapshot(
&path,
ChannelStart {
quote: 1,
store: 1,
fetch: 1,
},
);
let stop = std::sync::Arc::new(AtomicBool::new(false));
let p_w = path.clone();
let s_w = stop.clone();
let writer = thread::spawn(move || {
let mut i = 1usize;
while !s_w.load(AtomicOrdering::Relaxed) {
save_snapshot(
&p_w,
ChannelStart {
quote: i,
store: i,
fetch: i,
},
);
i = i.wrapping_add(1).max(1);
}
});
let p_r = path.clone();
let reader = thread::spawn(move || {
let mut torn = 0usize;
for _ in 0..2_000 {
if let Some(snap) = load_snapshot(&p_r) {
if snap.quote != snap.store || snap.store != snap.fetch {
torn += 1;
}
}
}
torn
});
let torn = reader.join().unwrap();
stop.store(true, AtomicOrdering::Relaxed);
writer.join().unwrap();
assert_eq!(
torn, 0,
"observed {torn} torn reads under concurrent writes"
);
}
#[test]
fn save_with_timeout_returns_promptly_on_fast_failure() {
let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
let started = Instant::now();
save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
);
}
#[test]
fn save_with_timeout_bounds_wall_time_on_hang() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("snap.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
let started = Instant::now();
save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"timeout wrapper did not bound wall time: {elapsed:?}"
);
}
}