use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::{Duration, Instant};
use tracing::{debug, warn};
static SAVE_COUNTER: AtomicU64 = AtomicU64::new(0);
fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
m.lock().unwrap_or_else(PoisonError::into_inner)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
Success,
Timeout,
NetworkError,
ApplicationError,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ChannelMax {
pub quote: usize,
pub store: usize,
pub fetch: usize,
}
impl Default for ChannelMax {
fn default() -> Self {
Self {
quote: 128,
store: 64,
fetch: 256,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdaptiveConfig {
pub enabled: bool,
pub min_concurrency: usize,
pub max: ChannelMax,
pub window_ops: usize,
pub min_window_ops: usize,
pub success_target: f64,
pub timeout_ceiling: f64,
pub latency_inflation_factor: f64,
pub latency_ewma_alpha: f64,
}
impl AdaptiveConfig {
pub fn sanitize(&mut self) {
if !self.latency_ewma_alpha.is_finite() {
self.latency_ewma_alpha = 0.2;
}
self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
if !self.success_target.is_finite() {
self.success_target = 0.95;
}
self.success_target = self.success_target.clamp(0.0, 1.0);
if !self.timeout_ceiling.is_finite() {
self.timeout_ceiling = 0.10;
}
self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
self.latency_inflation_factor = 2.0;
}
self.min_concurrency = self.min_concurrency.max(1);
self.window_ops = self.window_ops.max(1);
self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
self.max.quote = self.max.quote.max(self.min_concurrency);
self.max.store = self.max.store.max(self.min_concurrency);
self.max.fetch = self.max.fetch.max(self.min_concurrency);
}
}
impl Default for AdaptiveConfig {
fn default() -> Self {
Self {
enabled: true,
min_concurrency: 1,
max: ChannelMax::default(),
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
latency_inflation_factor: 2.0,
latency_ewma_alpha: 0.2,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct ChannelStart {
pub quote: usize,
pub store: usize,
pub fetch: usize,
}
impl Default for ChannelStart {
fn default() -> Self {
Self {
quote: 32,
store: 8,
fetch: 64,
}
}
}
#[derive(Debug, Clone, Copy)]
struct Sample {
outcome: Outcome,
latency: Duration,
}
#[derive(Debug, Clone)]
pub struct LimiterConfig {
pub enabled: bool,
pub min_concurrency: usize,
pub max_concurrency: usize,
pub window_ops: usize,
pub min_window_ops: usize,
pub success_target: f64,
pub timeout_ceiling: f64,
pub latency_inflation_factor: f64,
pub latency_ewma_alpha: f64,
}
impl LimiterConfig {
fn from_adaptive(cfg: &AdaptiveConfig, max_for_channel: usize) -> Self {
Self {
enabled: cfg.enabled,
min_concurrency: cfg.min_concurrency,
max_concurrency: max_for_channel.max(cfg.min_concurrency),
window_ops: cfg.window_ops,
min_window_ops: cfg.min_window_ops,
success_target: cfg.success_target,
timeout_ceiling: cfg.timeout_ceiling,
latency_inflation_factor: cfg.latency_inflation_factor,
latency_ewma_alpha: cfg.latency_ewma_alpha,
}
}
fn sanitize(&mut self) {
if !self.latency_ewma_alpha.is_finite() {
self.latency_ewma_alpha = 0.2;
}
self.latency_ewma_alpha = self.latency_ewma_alpha.clamp(0.0, 1.0);
if !self.success_target.is_finite() {
self.success_target = 0.95;
}
self.success_target = self.success_target.clamp(0.0, 1.0);
if !self.timeout_ceiling.is_finite() {
self.timeout_ceiling = 0.10;
}
self.timeout_ceiling = self.timeout_ceiling.clamp(0.0, 1.0);
if !self.latency_inflation_factor.is_finite() || self.latency_inflation_factor <= 0.0 {
self.latency_inflation_factor = 2.0;
}
self.min_concurrency = self.min_concurrency.max(1);
self.window_ops = self.window_ops.max(1);
self.min_window_ops = self.min_window_ops.max(1).min(self.window_ops);
self.max_concurrency = self.max_concurrency.max(self.min_concurrency);
}
}
#[derive(Debug, Clone)]
pub struct Limiter {
inner: Arc<Mutex<LimiterInner>>,
config: Arc<LimiterConfig>,
}
#[derive(Debug)]
struct LimiterInner {
current: usize,
window: VecDeque<Sample>,
samples_since_increase: usize,
samples_since_decrease: usize,
latency_baseline: Option<Duration>,
left_slow_start: bool,
}
impl Limiter {
#[must_use]
pub fn new(start: usize, config: LimiterConfig) -> Self {
let mut config = config;
config.sanitize();
let clamped = start.clamp(config.min_concurrency, config.max_concurrency.max(1));
let window_cap = config.window_ops;
Self {
inner: Arc::new(Mutex::new(LimiterInner {
current: clamped,
window: VecDeque::with_capacity(window_cap),
samples_since_increase: 0,
samples_since_decrease: 0,
latency_baseline: None,
left_slow_start: false,
})),
config: Arc::new(config),
}
}
#[must_use]
pub fn current(&self) -> usize {
lock(&self.inner).current
}
pub fn observe(&self, outcome: Outcome, latency: Duration) {
if !self.config.enabled {
return;
}
let mut g = lock(&self.inner);
if g.window.len() == self.config.window_ops {
g.window.pop_front();
}
g.window.push_back(Sample { outcome, latency });
g.samples_since_increase = g.samples_since_increase.saturating_add(1);
g.samples_since_decrease = g.samples_since_decrease.saturating_add(1);
if g.window.len() < self.config.min_window_ops {
return;
}
let decision = evaluate(&g.window, &self.config, g.latency_baseline);
apply_decision(&mut g, decision, &self.config);
}
pub fn warm_start(&self, start: usize) {
let clamped = start.clamp(
self.config.min_concurrency,
self.config.max_concurrency.max(1),
);
let mut g = lock(&self.inner);
g.current = clamped;
g.left_slow_start = true;
}
#[must_use]
pub fn snapshot(&self) -> usize {
lock(&self.inner).current
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Decision {
Increase,
Decrease,
Hold,
}
fn evaluate(
window: &VecDeque<Sample>,
cfg: &LimiterConfig,
baseline: Option<Duration>,
) -> Decision {
let mut successes = 0usize;
let mut timeouts = 0usize;
let mut net_errors = 0usize;
let mut latencies: Vec<Duration> = Vec::with_capacity(window.len());
for s in window {
match s.outcome {
Outcome::Success => {
successes += 1;
latencies.push(s.latency);
}
Outcome::Timeout => timeouts += 1,
Outcome::NetworkError => net_errors += 1,
Outcome::ApplicationError => {}
}
}
let capacity_total = successes + timeouts + net_errors;
if capacity_total < cfg.min_window_ops {
return Decision::Hold;
}
let total_f = capacity_total as f64;
let success_rate = successes as f64 / total_f;
let timeout_rate = timeouts as f64 / total_f;
if success_rate < cfg.success_target || timeout_rate > cfg.timeout_ceiling {
return Decision::Decrease;
}
if let Some(p95) = p95_of(&mut latencies) {
if let Some(base) = baseline {
let limit = base.mul_f64(cfg.latency_inflation_factor);
if p95 > limit {
return Decision::Decrease;
}
}
Decision::Increase
} else {
Decision::Hold
}
}
fn apply_decision(inner: &mut LimiterInner, decision: Decision, cfg: &LimiterConfig) {
match decision {
Decision::Increase => {
if inner.samples_since_increase < cfg.window_ops {
return;
}
let p95 = window_p95(&inner.window);
inner.latency_baseline = Some(match inner.latency_baseline {
None => p95,
Some(prev) => ewma(prev, p95, cfg.latency_ewma_alpha),
});
let next = if inner.left_slow_start {
inner.current.saturating_add(1)
} else {
inner.current.saturating_mul(2)
};
let next = next.min(cfg.max_concurrency).max(cfg.min_concurrency);
if next != inner.current {
debug!(
from = inner.current,
to = next,
slow_start = !inner.left_slow_start,
"adaptive: increase",
);
}
inner.current = next;
inner.samples_since_increase = 0;
inner.samples_since_decrease = 0;
}
Decision::Decrease => {
if inner.samples_since_decrease < cfg.min_window_ops {
return;
}
inner.left_slow_start = true;
let next = (inner.current / 2).max(cfg.min_concurrency);
if next != inner.current {
debug!(from = inner.current, to = next, "adaptive: decrease");
}
inner.current = next;
inner.samples_since_increase = 0;
inner.samples_since_decrease = 0;
}
Decision::Hold => {}
}
}
fn p95_of(latencies: &mut [Duration]) -> Option<Duration> {
if latencies.is_empty() {
return None;
}
latencies.sort_unstable();
let idx = ((latencies.len() as f64) * 0.95).ceil() as usize;
let idx = idx.saturating_sub(1).min(latencies.len() - 1);
latencies.get(idx).copied()
}
fn window_p95(window: &VecDeque<Sample>) -> Duration {
let mut latencies: Vec<Duration> = window
.iter()
.filter(|s| matches!(s.outcome, Outcome::Success))
.map(|s| s.latency)
.collect();
p95_of(&mut latencies).unwrap_or(Duration::ZERO)
}
fn ewma(prev: Duration, sample: Duration, alpha: f64) -> Duration {
let alpha = if alpha.is_finite() {
alpha.clamp(0.0, 1.0)
} else {
return prev;
};
let prev_ms = prev.as_secs_f64() * 1000.0;
let sample_ms = sample.as_secs_f64() * 1000.0;
let new_ms = (1.0 - alpha) * prev_ms + alpha * sample_ms;
if !new_ms.is_finite() || new_ms < 0.0 {
return prev;
}
Duration::from_secs_f64(new_ms / 1000.0)
}
#[derive(Debug, Clone)]
pub struct AdaptiveController {
pub quote: Limiter,
pub store: Limiter,
pub fetch: Limiter,
pub(crate) config: AdaptiveConfig,
cold_start: ChannelStart,
}
impl AdaptiveController {
#[must_use]
pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self {
let mut config = config;
config.sanitize();
let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote);
let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store);
let fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch);
Self {
quote: Limiter::new(start.quote, quote_cfg),
store: Limiter::new(start.store, store_cfg),
fetch: Limiter::new(start.fetch, fetch_cfg),
config,
cold_start: start,
}
}
#[must_use]
pub fn snapshot(&self) -> ChannelStart {
ChannelStart {
quote: self.quote.snapshot(),
store: self.store.snapshot(),
fetch: self.fetch.snapshot(),
}
}
#[must_use]
pub fn config(&self) -> &AdaptiveConfig {
&self.config
}
pub fn warm_start(&self, snapshot: ChannelStart) {
if !self.config.enabled {
return;
}
self.quote
.warm_start(snapshot.quote.max(self.cold_start.quote));
self.store
.warm_start(snapshot.store.max(self.cold_start.store));
self.fetch
.warm_start(snapshot.fetch.max(self.cold_start.fetch));
}
}
impl Default for AdaptiveController {
fn default() -> Self {
Self::new(ChannelStart::default(), AdaptiveConfig::default())
}
}
struct ObserveGuard<'a> {
limiter: &'a Limiter,
started: Instant,
outcome: Option<(Outcome, Duration)>,
}
impl<'a> ObserveGuard<'a> {
fn new(limiter: &'a Limiter) -> Self {
Self {
limiter,
started: Instant::now(),
outcome: None,
}
}
fn finish(&mut self, outcome: Outcome) {
self.outcome = Some((outcome, self.started.elapsed()));
}
}
impl Drop for ObserveGuard<'_> {
fn drop(&mut self) {
if let Some((outcome, latency)) = self.outcome.take() {
self.limiter.observe(outcome, latency);
}
}
}
pub async fn observe_op<T, E, F, Fut, C>(limiter: &Limiter, op: F, classify: C) -> Result<T, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
C: FnOnce(&E) -> Outcome,
{
let mut guard = ObserveGuard::new(limiter);
let result = op().await;
let outcome = match &result {
Ok(_) => Outcome::Success,
Err(e) => classify(e),
};
guard.finish(outcome);
drop(guard); result
}
pub async fn rebucketed_unordered<I, T, E, F, Fut>(
limiter: &Limiter,
items: I,
mut op: F,
) -> Result<Vec<T>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
use futures::stream::{FuturesUnordered, StreamExt};
let mut iter = items.into_iter().peekable();
let mut in_flight: FuturesUnordered<Fut> = FuturesUnordered::new();
let mut results = Vec::new();
let mut pending_err: Option<E> = None;
loop {
if pending_err.is_none() {
let cap = limiter.current().max(1);
while in_flight.len() < cap {
match iter.next() {
Some(item) => in_flight.push(op(item)),
None => break,
}
}
}
if in_flight.is_empty() {
break;
}
match in_flight.next().await {
Some(Ok(v)) => results.push(v),
Some(Err(e)) => {
if pending_err.is_none() {
pending_err = Some(e);
}
}
None => break,
}
}
match pending_err {
Some(e) => Err(e),
None => Ok(results),
}
}
pub async fn rebucketed_ordered<I, U, E, F, Fut>(
limiter: &Limiter,
items: I,
op: F,
) -> Result<Vec<U>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<(usize, U), E>>,
{
let mut indexed = rebucketed_unordered(limiter, items, op).await?;
indexed.sort_by_key(|(idx, _)| *idx);
Ok(indexed.into_iter().map(|(_, v)| v).collect())
}
pub async fn rebucketed<I, T, E, F, Fut>(
limiter: &Limiter,
items: I,
ordered: bool,
mut op: F,
) -> Result<Vec<T>, E>
where
I: IntoIterator,
F: FnMut(I::Item) -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
if !ordered {
return rebucketed_unordered(limiter, items, op).await;
}
use futures::stream::{self, StreamExt};
let mut iter = items.into_iter();
let mut results = Vec::new();
let mut pending_err: Option<E> = None;
loop {
if pending_err.is_some() {
break;
}
let cap = limiter.current().max(1);
let mut batch = Vec::with_capacity(cap);
for item in iter.by_ref().take(cap) {
batch.push(op(item));
}
if batch.is_empty() {
break;
}
let mut s = stream::iter(batch).buffered(cap);
while let Some(r) = s.next().await {
match r {
Ok(v) => results.push(v),
Err(e) => {
if pending_err.is_none() {
pending_err = Some(e);
}
}
}
}
}
match pending_err {
Some(e) => Err(e),
None => Ok(results),
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PersistedState {
schema: u32,
channels: ChannelStart,
}
const PERSIST_SCHEMA: u32 = 1;
const PERSIST_FILENAME: &str = "client_adaptive.json";
#[must_use]
pub fn default_persist_path() -> Option<PathBuf> {
crate::config::data_dir()
.ok()
.map(|d| d.join(PERSIST_FILENAME))
}
#[must_use]
pub fn load_snapshot(path: &Path) -> Option<ChannelStart> {
let bytes = std::fs::read(path).ok()?;
let state: PersistedState = match serde_json::from_slice(&bytes) {
Ok(s) => s,
Err(e) => {
warn!(path = %path.display(), error = %e, "adaptive: corrupt snapshot, ignoring");
return None;
}
};
if state.schema != PERSIST_SCHEMA {
debug!(
path = %path.display(),
schema = state.schema,
expected = PERSIST_SCHEMA,
"adaptive: snapshot schema mismatch, ignoring",
);
return None;
}
Some(state.channels)
}
pub fn save_snapshot(path: &Path, channels: ChannelStart) {
let state = PersistedState {
schema: PERSIST_SCHEMA,
channels,
};
let bytes = match serde_json::to_vec_pretty(&state) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "adaptive: snapshot serialize failed");
return;
}
};
if let Some(parent) = path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
warn!(path = %parent.display(), error = %e, "adaptive: snapshot mkdir failed");
return;
}
}
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let counter = SAVE_COUNTER.fetch_add(1, Ordering::Relaxed);
let tmp = path.with_extension(format!(
"json.tmp.{}.{}.{}",
std::process::id(),
counter,
nanos
));
if let Err(e) = std::fs::write(&tmp, &bytes) {
warn!(path = %tmp.display(), error = %e, "adaptive: snapshot write failed");
return;
}
if let Err(e) = std::fs::rename(&tmp, path) {
warn!(
from = %tmp.display(),
to = %path.display(),
error = %e,
"adaptive: snapshot rename failed",
);
let _ = std::fs::remove_file(&tmp);
}
}
pub fn save_snapshot_with_timeout(path: PathBuf, channels: ChannelStart, timeout: Duration) {
let handle = std::thread::spawn(move || {
save_snapshot(&path, channels);
});
let started = Instant::now();
let poll = Duration::from_millis(5);
while started.elapsed() < timeout {
if handle.is_finished() {
let _ = handle.join();
return;
}
std::thread::sleep(poll);
}
warn!(
timeout_ms = timeout.as_millis() as u64,
"adaptive: snapshot save timed out (data dir slow?); detaching writer thread"
);
drop(handle);
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn cfg_for_tests() -> LimiterConfig {
LimiterConfig {
enabled: true,
min_concurrency: 1,
max_concurrency: 64,
window_ops: 10,
min_window_ops: 5,
success_target: 0.9,
timeout_ceiling: 0.2,
latency_inflation_factor: 2.0,
latency_ewma_alpha: 0.5,
}
}
fn adaptive_cfg_for_tests() -> AdaptiveConfig {
let l = cfg_for_tests();
AdaptiveConfig {
enabled: l.enabled,
min_concurrency: l.min_concurrency,
max: ChannelMax {
quote: l.max_concurrency,
store: l.max_concurrency,
fetch: l.max_concurrency,
},
window_ops: l.window_ops,
min_window_ops: l.min_window_ops,
success_target: l.success_target,
timeout_ceiling: l.timeout_ceiling,
latency_inflation_factor: l.latency_inflation_factor,
latency_ewma_alpha: l.latency_ewma_alpha,
}
}
#[test]
fn cold_start_clamps_into_bounds() {
let cfg = cfg_for_tests();
let l = Limiter::new(1000, cfg.clone());
assert_eq!(l.current(), cfg.max_concurrency);
let l = Limiter::new(0, cfg.clone());
assert_eq!(l.current(), cfg.min_concurrency);
}
#[test]
fn slow_start_doubles_then_caps() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert_eq!(l.current(), 4);
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert_eq!(l.current(), 8);
}
#[test]
fn first_failure_exits_slow_start() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..6 {
l.observe(Outcome::Success, Duration::from_millis(50));
}
for _ in 0..4 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert!(
after_stress < 4,
"stress should reduce concurrency from 4, got {after_stress}",
);
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert!(
l.current() > after_stress,
"expected recovery above {after_stress}, got {}",
l.current(),
);
}
#[test]
fn floor_holds_at_one() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg);
for _ in 0..30 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 1);
}
#[test]
fn application_errors_do_not_punish() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops * 5 {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
assert_eq!(
l.current(),
4,
"ApplicationError must not move the cap; got {}",
l.current()
);
}
#[test]
fn latency_inflation_triggers_decrease() {
let cfg = LimiterConfig {
window_ops: 20,
min_window_ops: 5,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(50));
}
let after_baseline = l.current();
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(500));
}
assert!(
l.current() < after_baseline,
"expected decrease from {after_baseline}, got {}",
l.current(),
);
}
#[test]
fn warm_start_overrides_current() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg);
l.warm_start(20);
assert_eq!(l.current(), 20);
}
#[test]
fn warm_start_clamps() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
l.warm_start(1_000_000);
assert_eq!(l.current(), cfg.max_concurrency);
}
#[test]
fn disabled_controller_holds_steady() {
let cfg = LimiterConfig {
enabled: false,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg);
for _ in 0..50 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 8);
}
#[test]
fn controller_snapshot_round_trips() {
let c = AdaptiveController::new(
ChannelStart {
quote: 64,
store: 16,
fetch: 64,
},
adaptive_cfg_for_tests(),
);
let snap = c.snapshot();
assert_eq!(snap.quote, 64);
assert_eq!(snap.store, 16);
assert_eq!(snap.fetch, 64);
let c2 = AdaptiveController::default();
c2.warm_start(snap);
assert_eq!(c2.quote.current(), 64);
assert_eq!(c2.store.current(), 16);
assert_eq!(c2.fetch.current(), 64);
}
#[tokio::test]
async fn observe_op_records_success() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
let _: Result<(), &str> =
observe_op(&l, || async { Ok(()) }, |_e: &&str| Outcome::NetworkError).await;
}
assert_eq!(l.current(), 8);
}
#[test]
fn snapshot_round_trips_through_disk() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let snap = ChannelStart {
quote: 24,
store: 6,
fetch: 12,
};
save_snapshot(&path, snap);
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 24);
assert_eq!(loaded.store, 6);
assert_eq!(loaded.fetch, 12);
}
#[test]
fn load_missing_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("does_not_exist.json");
assert!(load_snapshot(&path).is_none());
}
#[test]
fn load_corrupt_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bad.json");
std::fs::write(&path, b"not valid json{{{").unwrap();
assert!(load_snapshot(&path).is_none());
}
#[test]
fn load_wrong_schema_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("future.json");
let payload = r#"{"schema":999,"channels":{"quote":1,"store":1,"fetch":1}}"#;
std::fs::write(&path, payload).unwrap();
assert!(load_snapshot(&path).is_none());
}
#[tokio::test]
async fn observe_op_records_classified_error() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
for _ in 0..cfg.window_ops {
let _: Result<(), &str> =
observe_op(&l, || async { Err("boom") }, |_e: &&str| Outcome::Timeout).await;
}
assert!(l.current() < 4);
}
#[test]
fn no_regression_cold_start_at_least_static_defaults() {
let s = ChannelStart::default();
assert!(
s.quote >= 32,
"quote cold-start regressed: got {}, prior static was 32",
s.quote,
);
assert!(
s.store >= 8,
"store cold-start regressed: got {}, prior static was 8",
s.store,
);
assert!(
s.fetch >= 64,
"fetch cold-start regressed: got {}, prior static was 64 (unbounded before)",
s.fetch,
);
}
#[test]
fn controller_default_config_is_sane() {
let c = AdaptiveController::default();
let starts = ChannelStart::default();
assert_eq!(c.quote.current(), starts.quote);
assert_eq!(c.store.current(), starts.store);
assert_eq!(c.fetch.current(), starts.fetch);
assert_eq!(lock(&c.quote.inner).window.len(), 0);
assert_eq!(lock(&c.store.inner).window.len(), 0);
assert_eq!(lock(&c.fetch.inner).window.len(), 0);
}
#[test]
fn alternating_success_failure_collapses_to_floor() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
let mut min_observed = usize::MAX;
let mut max_observed = 0usize;
let mut floor_visits = 0usize;
for i in 0..1000 {
let outcome = if i % 2 == 0 {
Outcome::Success
} else {
Outcome::Timeout
};
l.observe(outcome, Duration::from_millis(50));
let cur = l.current();
assert!(
cur >= cfg.min_concurrency,
"cap underflowed floor at iter {i}: got {cur}",
);
min_observed = min_observed.min(cur);
max_observed = max_observed.max(cur);
if cur == cfg.min_concurrency {
floor_visits += 1;
}
}
assert_eq!(
min_observed, cfg.min_concurrency,
"cap never reached the floor under 50% timeout rate"
);
assert!(
max_observed >= 8,
"cap never visited the start value: max_observed={max_observed}"
);
assert!(
floor_visits > 500,
"cap spent only {floor_visits}/1000 ticks at floor; expected mostly at floor"
);
assert_eq!(
l.current(),
cfg.min_concurrency,
"controller did not settle at floor after 1000 alternations"
);
}
#[test]
fn pure_success_stream_recovers_to_max() {
let cfg = cfg_for_tests();
let l = Limiter::new(cfg.min_concurrency, cfg.clone());
for _ in 0..10_000 {
l.observe(Outcome::Success, Duration::from_millis(5));
}
assert_eq!(
l.current(),
cfg.max_concurrency,
"expected recovery to max ({}), got {}",
cfg.max_concurrency,
l.current(),
);
}
#[test]
fn stress_then_heal_drives_floor_then_recovery() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..100 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert_eq!(
after_stress, cfg.min_concurrency,
"stress should drive cap to floor, got {after_stress}",
);
for _ in 0..1_000 {
l.observe(Outcome::Success, Duration::from_millis(10));
}
let after_heal = l.current();
assert!(
after_heal >= cfg.min_concurrency.saturating_add(4),
"expected substantial recovery from floor, got {after_heal}",
);
}
#[test]
fn baseline_does_not_grow_unbounded_under_slow_links() {
let cfg = cfg_for_tests();
let l = Limiter::new(2, cfg.clone());
for _ in 0..(cfg.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(500));
}
let baseline = lock(&l.inner).latency_baseline;
let base = baseline.expect("baseline should be set after many healthy windows");
assert!(
base > Duration::ZERO,
"baseline must not stay at ZERO, got {base:?}",
);
let lo = Duration::from_millis(250);
let hi = Duration::from_millis(1000);
assert!(
base >= lo && base <= hi,
"baseline drifted out of [{lo:?}, {hi:?}]: {base:?}",
);
}
#[test]
fn baseline_initialized_only_after_first_healthy_window() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..50 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert!(
lock(&l.inner).latency_baseline.is_none(),
"baseline must be None before any healthy window",
);
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::Success, Duration::from_millis(20));
}
let baseline = lock(&l.inner).latency_baseline;
assert!(
baseline.is_some(),
"baseline must be Some after healthy windows",
);
let base = baseline.unwrap_or_default();
assert!(
base > Duration::ZERO,
"baseline must reflect real latency, got {base:?}",
);
}
#[test]
fn min_concurrency_floor_holds_under_torrent_of_errors() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for i in 0..50_000 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
if i == 100 || i == 1_000 || i == 49_999 {
let cur = l.current();
assert_eq!(
cur, cfg.min_concurrency,
"floor breached at iter {i}: got {cur}",
);
}
}
}
#[test]
fn max_concurrency_ceiling_holds_under_torrent_of_successes() {
let cfg = cfg_for_tests();
let start = cfg
.max_concurrency
.saturating_sub(1)
.max(cfg.min_concurrency);
let l = Limiter::new(start, cfg.clone());
for i in 0..50_000 {
l.observe(Outcome::Success, Duration::from_millis(5));
if i == 100 || i == 1_000 || i == 49_999 {
let cur = l.current();
assert!(
cur <= cfg.max_concurrency,
"ceiling breached at iter {i}: got {cur} > {}",
cfg.max_concurrency,
);
}
}
assert_eq!(l.current(), cfg.max_concurrency);
}
#[test]
fn saturating_arithmetic_handles_extreme_config() {
let cfg = LimiterConfig {
max_concurrency: usize::MAX / 2,
..cfg_for_tests()
};
let start = usize::MAX / 4;
let l = Limiter::new(start, cfg.clone());
for _ in 0..(cfg.window_ops * 10) {
l.observe(Outcome::Success, Duration::from_millis(1));
}
assert_eq!(
l.current(),
cfg.max_concurrency,
"saturating math survived but cap did not grow to ceiling"
);
}
#[test]
fn window_eviction_is_fifo() {
let cfg = LimiterConfig {
window_ops: 10,
min_window_ops: 5,
success_target: 0.9,
timeout_ceiling: 0.1,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg.clone());
for _ in 0..cfg.window_ops {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
let after_stress = l.current();
assert!(
after_stress < 8,
"expected cap to drop from 8 after pure-timeout window, got {after_stress}"
);
for _ in 0..(cfg.window_ops * 3) {
l.observe(Outcome::Success, Duration::from_millis(20));
}
let after_recovery = l.current();
assert!(
after_recovery > after_stress,
"FIFO eviction broken: cap stayed at {after_stress} after recovery successes (expected > {after_stress}, got {after_recovery})"
);
}
#[test]
fn disabled_controller_returns_initial_value_invariantly() {
let cfg = LimiterConfig {
enabled: false,
..cfg_for_tests()
};
let initial = 8;
let l = Limiter::new(initial, cfg);
for i in 0..1_000 {
let outcome = match i % 4 {
0 => Outcome::Success,
1 => Outcome::Timeout,
2 => Outcome::NetworkError,
_ => Outcome::ApplicationError,
};
l.observe(outcome, Duration::from_millis(50));
assert_eq!(
l.current(),
initial,
"disabled controller moved at iter {i}",
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_observations_do_not_corrupt_window() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg.clone());
let mut handles = Vec::with_capacity(100);
for _ in 0..100 {
let l_clone = l.clone();
handles.push(tokio::spawn(async move {
for _ in 0..100 {
l_clone.observe(Outcome::Success, Duration::from_millis(5));
}
}));
}
for h in handles {
h.await.unwrap();
}
let cur = l.current();
assert!(
cur >= cfg.min_concurrency && cur <= cfg.max_concurrency,
"cap out of bounds after concurrent observations: {cur}",
);
}
#[test]
fn persisted_snapshot_warm_starts_above_cold_floor() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let saved = ChannelStart {
quote: 64,
store: 32,
fetch: 128,
};
save_snapshot(&path, saved);
let loaded = load_snapshot(&path).unwrap();
let low = ChannelStart {
quote: 2,
store: 2,
fetch: 2,
};
let c = AdaptiveController::new(low, AdaptiveConfig::default());
c.warm_start(loaded);
assert_eq!(c.quote.current(), 64);
assert_eq!(c.store.current(), 32);
assert_eq!(c.fetch.current(), 128);
}
#[test]
fn save_load_round_trip_with_concurrent_writes() {
use std::thread;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let path_a = path.clone();
let path_b = path.clone();
let snap_a = ChannelStart {
quote: 10,
store: 10,
fetch: 10,
};
let snap_b = ChannelStart {
quote: 99,
store: 99,
fetch: 99,
};
let h_a = thread::spawn(move || {
for _ in 0..50 {
save_snapshot(&path_a, snap_a);
}
});
let h_b = thread::spawn(move || {
for _ in 0..50 {
save_snapshot(&path_b, snap_b);
}
});
h_a.join().unwrap();
h_b.join().unwrap();
let loaded = load_snapshot(&path).expect("file must be a valid snapshot, not torn");
let valid = (loaded.quote == snap_a.quote
&& loaded.store == snap_a.store
&& loaded.fetch == snap_a.fetch)
|| (loaded.quote == snap_b.quote
&& loaded.store == snap_b.store
&& loaded.fetch == snap_b.fetch);
assert!(valid, "loaded snapshot is neither A nor B: {loaded:?}",);
}
#[test]
fn save_snapshot_to_unwritable_dir_does_not_panic() {
let path = PathBuf::from("/nonexistent_root_dir_xyz_for_test/sub/dir/client_adaptive.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
save_snapshot(&path, snap);
assert!(!path.exists());
}
#[test]
fn load_snapshot_from_truncated_file_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("truncated.json");
std::fs::write(&path, br#"{"schema":1,"channels":{"quote":"#).unwrap();
assert!(load_snapshot(&path).is_none());
}
#[test]
fn controller_perf_overhead_is_bounded() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg);
let started = Instant::now();
for _ in 0..100_000 {
let _ = l.current();
l.observe(Outcome::Success, Duration::from_micros(1));
}
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(500),
"100k observe+current pairs took {elapsed:?}, expected <500ms",
);
}
#[test]
fn nan_and_out_of_range_config_does_not_panic() {
let cfg = AdaptiveConfig {
enabled: true,
min_concurrency: 0, max: ChannelMax {
quote: 0, store: 0,
fetch: 0,
},
window_ops: 10,
min_window_ops: 50, success_target: f64::NAN,
timeout_ceiling: f64::INFINITY,
latency_inflation_factor: f64::NEG_INFINITY,
latency_ewma_alpha: f64::NAN,
};
let c = AdaptiveController::new(ChannelStart::default(), cfg);
let post = &c.config;
assert_eq!(
post.min_concurrency, 1,
"sanitize did not raise min_concurrency from 0"
);
assert!(
post.success_target.is_finite() && (0.0..=1.0).contains(&post.success_target),
"sanitize did not clamp success_target from NaN: {}",
post.success_target
);
assert!(
post.timeout_ceiling.is_finite() && (0.0..=1.0).contains(&post.timeout_ceiling),
"sanitize did not clamp timeout_ceiling from Inf: {}",
post.timeout_ceiling
);
assert!(
post.latency_inflation_factor.is_finite() && post.latency_inflation_factor > 0.0,
"sanitize did not fix latency_inflation_factor from -Inf: {}",
post.latency_inflation_factor
);
assert!(
post.latency_ewma_alpha.is_finite() && (0.0..=1.0).contains(&post.latency_ewma_alpha),
"sanitize did not fix latency_ewma_alpha from NaN: {}",
post.latency_ewma_alpha
);
assert!(
post.min_window_ops <= post.window_ops,
"sanitize did not clamp min_window_ops <= window_ops: min={} window={}",
post.min_window_ops,
post.window_ops
);
assert!(
post.max.quote >= post.min_concurrency,
"max.quote below min_concurrency"
);
for _ in 0..200 {
c.store
.observe(Outcome::Success, Duration::from_secs(99_999));
c.store.observe(Outcome::Timeout, Duration::ZERO);
}
let cur = c.store.current();
assert!(cur >= 1, "cap below floor: {cur}");
}
#[test]
fn transient_burst_does_not_pile_drive_to_floor() {
let cfg = LimiterConfig {
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
..cfg_for_tests()
};
let l = Limiter::new(32, cfg);
for _ in 0..8 {
l.observe(Outcome::Timeout, Duration::from_millis(10));
}
let after_burst = l.current();
assert!(
after_burst >= 16,
"transient burst pile-drove cap from 32 to {after_burst}; expected >= 16",
);
}
#[tokio::test]
async fn transport_errors_classify_as_capacity_signal() {
use crate::data::client::classify_error;
use crate::data::error::Error;
let make_cfg = || LimiterConfig {
window_ops: 16,
min_window_ops: 5,
success_target: 0.5,
timeout_ceiling: 0.5,
..cfg_for_tests()
};
type ErrFactory = Box<dyn Fn() -> Error>;
let cases: Vec<(&str, ErrFactory)> = vec![
("Network", Box::new(|| Error::Network("net".to_string()))),
(
"InsufficientPeers",
Box::new(|| Error::InsufficientPeers("ip".to_string())),
),
("Io", Box::new(|| Error::Io(std::io::Error::other("io")))),
("Protocol", Box::new(|| Error::Protocol("p".to_string()))),
("Storage", Box::new(|| Error::Storage("s".to_string()))),
(
"PartialUpload",
Box::new(|| Error::PartialUpload {
stored: vec![],
stored_count: 0,
failed: vec![],
failed_count: 0,
total_chunks: 0,
reason: "r".to_string(),
}),
),
];
for (name, mk) in &cases {
let l = Limiter::new(8, make_cfg());
for _ in 0..16 {
let _: std::result::Result<(), Error> =
observe_op(&l, || async { Err(mk()) }, classify_error).await;
}
let cur = l.current();
assert!(
cur < 8,
"{name} not classified as capacity signal: cap stayed at {cur}",
);
}
}
#[test]
fn per_channel_ceilings_are_independent() {
let cfg = AdaptiveConfig {
max: ChannelMax {
quote: 4, store: 8, fetch: 1024, },
..AdaptiveConfig::default()
};
let c = AdaptiveController::new(
ChannelStart {
quote: 4,
store: 8,
fetch: 64,
},
cfg,
);
for _ in 0..1000 {
c.quote.observe(Outcome::Success, Duration::from_micros(10));
c.store.observe(Outcome::Success, Duration::from_micros(10));
c.fetch.observe(Outcome::Success, Duration::from_micros(10));
}
assert_eq!(c.quote.current(), 4, "quote should cap at 4");
assert_eq!(c.store.current(), 8, "store should cap at 8");
assert_eq!(
c.fetch.current(),
1024,
"fetch did not reach its independent max of 1024; got {}",
c.fetch.current()
);
}
#[test]
fn cold_start_at_least_prior_static_defaults() {
let cs = ChannelStart::default();
assert!(cs.quote >= 32, "quote cold-start regressed: {}", cs.quote);
assert!(cs.store >= 8, "store cold-start regressed: {}", cs.store);
assert!(cs.fetch >= 64, "fetch cold-start regressed: {}", cs.fetch);
}
#[test]
fn sustained_stress_reaches_floor_within_bounded_ops() {
let cfg = LimiterConfig {
window_ops: 32,
min_window_ops: 8,
success_target: 0.95,
timeout_ceiling: 0.10,
max_concurrency: 64,
..cfg_for_tests()
};
let l = Limiter::new(64, cfg);
let mut ops = 0usize;
while l.current() > 1 && ops < 200 {
l.observe(Outcome::Timeout, Duration::from_millis(10));
ops += 1;
}
assert_eq!(
l.current(),
1,
"controller did not reach floor within 200 observations under \
sustained timeout stress; took {ops} ops, ended at cap {}",
l.current()
);
}
#[test]
fn default_controller_has_growth_headroom() {
let c = AdaptiveController::default();
let cs = ChannelStart::default();
let max = ChannelMax::default();
assert_eq!(c.quote.current(), cs.quote);
assert_eq!(c.store.current(), cs.store);
assert_eq!(c.fetch.current(), cs.fetch);
assert!(
max.quote > cs.quote,
"no growth headroom for quote: max={} start={}",
max.quote,
cs.quote
);
assert!(
max.store > cs.store,
"no growth headroom for store: max={} start={}",
max.store,
cs.store
);
assert!(
max.fetch > cs.fetch,
"no growth headroom for fetch: max={} start={}",
max.fetch,
cs.fetch
);
}
#[test]
fn warm_start_floors_at_cold_defaults() {
let c = AdaptiveController::default();
let cold = ChannelStart::default();
let bad_snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
c.warm_start(bad_snap);
assert_eq!(
c.quote.current(),
cold.quote,
"quote warm_start did not floor at cold default"
);
assert_eq!(
c.store.current(),
cold.store,
"store warm_start did not floor at cold default"
);
assert_eq!(
c.fetch.current(),
cold.fetch,
"fetch warm_start did not floor at cold default"
);
}
#[test]
fn warm_start_honors_values_above_cold_floor() {
let c = AdaptiveController::default();
let cold = ChannelStart::default();
let snap = ChannelStart {
quote: cold.quote * 2,
store: cold.store * 4,
fetch: cold.fetch * 2,
};
c.warm_start(snap);
assert_eq!(c.quote.current(), snap.quote);
assert_eq!(c.store.current(), snap.store);
assert_eq!(c.fetch.current(), snap.fetch);
}
#[tokio::test]
async fn rebucketed_picks_up_cap_changes_mid_stream() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 32,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let max_seen = StdArc::new(AtomicUsize::new(0));
let in_flight = StdArc::new(AtomicUsize::new(0));
let processed = StdArc::new(AtomicUsize::new(0));
let l_for_bump = l.clone();
let processed_for_bump = processed.clone();
let bump_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
if processed_for_bump.load(AtomicOrdering::Relaxed) >= 16 {
l_for_bump.warm_start(16);
return;
}
}
});
let _: Vec<()> = rebucketed(&l, 0..200usize, false, |_i| {
let max_seen = max_seen.clone();
let in_flight = in_flight.clone();
let processed = processed.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
max_seen.fetch_max(cur, AtomicOrdering::Relaxed);
tokio::time::sleep(Duration::from_millis(1)).await;
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
processed.fetch_add(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
bump_handle.await.unwrap();
let peak = max_seen.load(AtomicOrdering::Relaxed);
assert!(
peak > 4,
"rebucketed did not pick up the mid-stream cap bump (peak in-flight = {peak})"
);
}
#[tokio::test]
async fn observe_op_cancellation_drops_silently() {
let cfg = LimiterConfig {
window_ops: 16,
min_window_ops: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let l_clone = l.clone();
let fut = observe_op(
&l_clone,
|| async {
std::future::pending::<()>().await;
Ok::<(), &'static str>(())
},
|_| Outcome::Timeout,
);
drop(fut);
assert_eq!(l.current(), 4, "cancelled op moved the cap");
for _ in 0..16 {
let _: Result<(), &'static str> = observe_op(
&l,
|| async { Ok(()) },
|_| Outcome::NetworkError,
)
.await;
}
assert!(
l.current() > 4,
"cap did not grow after 16 successes; controller corrupted by cancellation? cap={}",
l.current(),
);
}
#[test]
fn save_snapshot_is_synchronous_and_durable() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
let snap = ChannelStart {
quote: 100,
store: 50,
fetch: 200,
};
save_snapshot(&path, snap);
assert!(
path.exists(),
"save_snapshot did not write file synchronously"
);
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 100);
assert_eq!(loaded.store, 50);
assert_eq!(loaded.fetch, 200);
}
#[tokio::test]
async fn warm_start_disables_slow_start_doubling() {
let cfg = LimiterConfig {
window_ops: 8,
min_window_ops: 4,
success_target: 0.9,
..cfg_for_tests()
};
let l = Limiter::new(2, cfg.clone());
l.warm_start(16);
assert_eq!(l.current(), 16);
for _ in 0..cfg.window_ops {
l.observe(Outcome::Success, Duration::from_millis(10));
}
assert_eq!(
l.current(),
17,
"warm-start triggered slow-start doubling instead of additive +1"
);
}
#[test]
fn controller_warm_start_floors_at_per_instance_cold_start() {
let custom_cold = ChannelStart {
quote: 2,
store: 1,
fetch: 4,
};
let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default());
c.warm_start(ChannelStart {
quote: 1,
store: 1,
fetch: 1,
});
assert_eq!(c.quote.current(), 2);
assert_eq!(c.store.current(), 1);
assert_eq!(c.fetch.current(), 4);
c.warm_start(ChannelStart {
quote: 10,
store: 10,
fetch: 10,
});
assert_eq!(c.quote.current(), 10);
assert_eq!(c.store.current(), 10);
assert_eq!(c.fetch.current(), 10);
}
#[test]
fn warm_start_is_noop_when_adaptive_disabled() {
let cfg = AdaptiveConfig {
enabled: false,
..AdaptiveConfig::default()
};
let custom_cold = ChannelStart {
quote: 5,
store: 5,
fetch: 5,
};
let c = AdaptiveController::new(custom_cold, cfg);
c.warm_start(ChannelStart {
quote: 100,
store: 100,
fetch: 100,
});
assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled");
assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled");
assert_eq!(c.fetch.current(), 5, "warm_start moved cap when disabled");
}
#[tokio::test]
async fn rebucketed_unordered_is_rolling_not_fenced() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 8,
window_ops: 100,
min_window_ops: 50,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let in_flight = StdArc::new(AtomicUsize::new(0));
let max_in_flight = StdArc::new(AtomicUsize::new(0));
let started = StdArc::new(AtomicUsize::new(0));
let _: Vec<()> = rebucketed_unordered(&l, 0..20usize, |i| {
let in_flight = in_flight.clone();
let max_in_flight = max_in_flight.clone();
let started = started.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
max_in_flight.fetch_max(cur, AtomicOrdering::Relaxed);
started.fetch_add(1, AtomicOrdering::Relaxed);
if i == 0 {
tokio::time::sleep(Duration::from_millis(50)).await;
} else {
tokio::time::sleep(Duration::from_millis(1)).await;
}
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
assert_eq!(started.load(AtomicOrdering::Relaxed), 20);
let peak = max_in_flight.load(AtomicOrdering::Relaxed);
assert!(
peak >= 4,
"rolling scheduler did not fill cap; peak in-flight = {peak}"
);
}
#[tokio::test]
async fn rebucketed_ordered_preserves_input_order() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let items: Vec<usize> = (0..50).collect();
let result: Vec<usize> = rebucketed_ordered(
&l,
items.iter().copied().enumerate(),
|(idx, v)| async move {
let delay = (50 - v) as u64;
tokio::time::sleep(Duration::from_micros(delay)).await;
Ok::<_, &'static str>((idx, v * 10))
},
)
.await
.unwrap();
assert_eq!(result.len(), 50);
for (i, v) in result.iter().enumerate() {
assert_eq!(*v, i * 10, "out of order at index {i}: got {v}");
}
}
#[tokio::test]
async fn rebucketed_ordered_pairs_idx_with_payload_correctly() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 8,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg);
let items: Vec<(usize, u64)> = (0..40).map(|i| (i, 1000u64 + i as u64)).collect();
let result: Vec<u64> = rebucketed_ordered(&l, items, |(idx, hash)| async move {
let delay = (40 - idx) as u64; tokio::time::sleep(Duration::from_micros(delay)).await;
Ok::<_, &'static str>((idx, hash * 7))
})
.await
.unwrap();
for (i, v) in result.iter().enumerate() {
let expected = (1000 + i as u64) * 7;
assert_eq!(
*v, expected,
"idx {i} paired with wrong content: {v}, expected {expected}"
);
}
}
#[test]
fn save_snapshot_temp_file_is_unique_per_call() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("client_adaptive.json");
for i in 0..100 {
save_snapshot(
&path,
ChannelStart {
quote: i + 1,
store: i + 1,
fetch: i + 1,
},
);
}
let loaded = load_snapshot(&path).unwrap();
assert_eq!(loaded.quote, 100);
assert_eq!(loaded.store, 100);
assert_eq!(loaded.fetch, 100);
let leftover: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.file_name().to_string_lossy().contains(".tmp."))
.collect();
assert!(
leftover.is_empty(),
"temp files leaked: {:?}",
leftover.iter().map(|e| e.file_name()).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn rebucketed_empty_input_returns_empty() {
let cfg = cfg_for_tests();
let l = Limiter::new(4, cfg);
let v: Vec<usize> = rebucketed_unordered(&l, std::iter::empty::<usize>(), |_| async {
Ok::<_, &'static str>(42usize)
})
.await
.unwrap();
assert!(v.is_empty());
let v: Vec<usize> = rebucketed_ordered(
&l,
std::iter::empty::<(usize, ())>(),
|(idx, _)| async move { Ok::<_, &'static str>((idx, 42usize)) },
)
.await
.unwrap();
assert!(v.is_empty());
}
#[tokio::test]
async fn rebucketed_exactly_cap_items() {
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let v: Vec<usize> =
rebucketed_unordered(
&l,
0..4usize,
|i| async move { Ok::<_, &'static str>(i * 2) },
)
.await
.unwrap();
assert_eq!(v.len(), 4);
}
#[tokio::test]
async fn rebucketed_preserves_first_error() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 4,
..cfg_for_tests()
};
let l = Limiter::new(4, cfg);
let started = StdArc::new(AtomicUsize::new(0));
let started_clone = started.clone();
let result: Result<Vec<()>, &'static str> = rebucketed_unordered(&l, 0..20usize, |i| {
let started = started_clone.clone();
async move {
started.fetch_add(1, AtomicOrdering::Relaxed);
if i == 5 {
tokio::time::sleep(Duration::from_micros(100)).await;
return Err("first error");
}
if i == 10 {
return Err("second error - should be ignored");
}
tokio::time::sleep(Duration::from_micros(50)).await;
Ok(())
}
})
.await;
match result {
Err(e) => assert_eq!(e, "first error", "wrong error preserved"),
Ok(_) => panic!("expected error, got ok"),
}
let total = started.load(AtomicOrdering::Relaxed);
assert!(
(5..20).contains(&total),
"started count out of range: {total}"
);
}
#[test]
fn limiter_with_min_equal_max_is_pinned() {
let cfg = LimiterConfig {
min_concurrency: 5,
max_concurrency: 5,
..cfg_for_tests()
};
let l = Limiter::new(5, cfg);
for _ in 0..1000 {
l.observe(Outcome::Success, Duration::from_millis(1));
}
assert_eq!(l.current(), 5, "cap moved despite min==max");
for _ in 0..1000 {
l.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert_eq!(l.current(), 5, "cap moved despite min==max");
}
#[test]
fn ewma_alpha_zero_returns_prev() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 0.0);
assert_eq!(result, prev, "alpha=0 must return prev unchanged");
}
#[test]
fn ewma_alpha_one_returns_sample() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 1.0);
let diff = result.abs_diff(sample);
assert!(
diff <= Duration::from_millis(1),
"alpha=1 should return sample; got {result:?}, expected ~{sample:?}"
);
}
#[test]
fn ewma_alpha_half_returns_midpoint() {
let prev = Duration::from_millis(200);
let sample = Duration::from_millis(400);
let result = ewma(prev, sample, 0.5);
let expected = Duration::from_millis(300);
let diff = result.abs_diff(expected);
assert!(
diff <= Duration::from_millis(1),
"alpha=0.5 midpoint: got {result:?}, expected ~{expected:?}"
);
}
#[test]
fn ewma_nan_alpha_returns_prev() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, f64::NAN);
assert_eq!(result, prev);
let result = ewma(prev, sample, f64::INFINITY);
assert_eq!(result, prev);
let result = ewma(prev, sample, f64::NEG_INFINITY);
assert_eq!(result, prev);
}
#[test]
fn ewma_clamps_alpha_above_one() {
let prev = Duration::from_millis(100);
let sample = Duration::from_millis(500);
let result = ewma(prev, sample, 2.5);
assert!(result >= Duration::from_millis(499));
assert!(result <= Duration::from_millis(501));
}
#[test]
fn window_full_of_application_errors_does_not_move_cap() {
let cfg = cfg_for_tests();
let l = Limiter::new(8, cfg.clone());
for _ in 0..(cfg.window_ops * 5) {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
assert_eq!(
l.current(),
8,
"cap moved on pure-app-error window; should hold"
);
}
#[test]
fn disabled_adaptive_controller_truly_inert() {
let cfg = AdaptiveConfig {
enabled: false,
..AdaptiveConfig::default()
};
let c = AdaptiveController::new(ChannelStart::default(), cfg);
let baseline_quote = c.quote.current();
let baseline_store = c.store.current();
let baseline_fetch = c.fetch.current();
for _ in 0..10000 {
c.quote.observe(Outcome::Timeout, Duration::from_millis(1));
c.store.observe(Outcome::Timeout, Duration::from_millis(1));
c.fetch.observe(Outcome::Timeout, Duration::from_millis(1));
}
assert_eq!(c.quote.current(), baseline_quote);
assert_eq!(c.store.current(), baseline_store);
assert_eq!(c.fetch.current(), baseline_fetch);
}
#[test]
fn channel_state_is_independent() {
let c = AdaptiveController::default();
let q0 = c.quote.current();
let f0 = c.fetch.current();
let s0 = c.store.current();
for _ in 0..1000 {
c.store.observe(Outcome::Timeout, Duration::from_millis(1));
}
assert_eq!(
c.store.current(),
c.config.min_concurrency,
"store did not reach floor after 1000 timeouts; cap={}",
c.store.current()
);
assert!(c.store.current() < s0, "store cap did not move at all");
assert_eq!(c.quote.current(), q0, "quote leaked from store stress");
assert_eq!(c.fetch.current(), f0, "fetch leaked from store stress");
}
#[test]
fn sanitize_corrects_pathological_floats() {
let mut cfg = AdaptiveConfig {
success_target: f64::NAN,
timeout_ceiling: 5.0,
latency_inflation_factor: f64::NEG_INFINITY,
latency_ewma_alpha: 2.5,
window_ops: 4,
min_window_ops: 10,
..AdaptiveConfig::default()
};
cfg.sanitize();
assert!(cfg.success_target.is_finite());
assert!((0.0..=1.0).contains(&cfg.success_target));
assert!((0.0..=1.0).contains(&cfg.timeout_ceiling));
assert!(cfg.latency_inflation_factor.is_finite());
assert!(cfg.latency_inflation_factor > 0.0);
assert!((0.0..=1.0).contains(&cfg.latency_ewma_alpha));
assert!(
cfg.min_window_ops <= cfg.window_ops,
"min_window_ops {} > window_ops {}",
cfg.min_window_ops,
cfg.window_ops
);
}
#[test]
fn channel_max_serde_round_trips() {
let m = ChannelMax {
quote: 7,
store: 13,
fetch: 200,
};
let json = serde_json::to_string(&m).unwrap();
let back: ChannelMax = serde_json::from_str(&json).unwrap();
assert_eq!(back.quote, 7);
assert_eq!(back.store, 13);
assert_eq!(back.fetch, 200);
}
#[test]
fn channel_start_serde_round_trips() {
let s = ChannelStart {
quote: 11,
store: 22,
fetch: 33,
};
let json = serde_json::to_string(&s).unwrap();
let back: ChannelStart = serde_json::from_str(&json).unwrap();
assert_eq!(back.quote, 11);
assert_eq!(back.store, 22);
assert_eq!(back.fetch, 33);
}
#[tokio::test]
async fn rebucketed_honors_cap_shrinkage_mid_stream() {
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc as StdArc;
let cfg = LimiterConfig {
min_concurrency: 1,
max_concurrency: 16,
..cfg_for_tests()
};
let l = Limiter::new(16, cfg);
let in_flight = StdArc::new(AtomicUsize::new(0));
let max_after_shrink = StdArc::new(AtomicUsize::new(0));
let processed = StdArc::new(AtomicUsize::new(0));
let shrunk = StdArc::new(std::sync::atomic::AtomicBool::new(false));
let l_for_shrink = l.clone();
let p_for_shrink = processed.clone();
let shrunk_for_shrink = shrunk.clone();
let shrink_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(2)).await;
if p_for_shrink.load(AtomicOrdering::Relaxed) >= 50 {
l_for_shrink.warm_start(2);
shrunk_for_shrink.store(true, AtomicOrdering::Relaxed);
return;
}
}
});
let _: Vec<()> = rebucketed_unordered(&l, 0..400usize, |_i| {
let in_flight = in_flight.clone();
let max_after_shrink = max_after_shrink.clone();
let processed = processed.clone();
let shrunk = shrunk.clone();
async move {
let cur = in_flight.fetch_add(1, AtomicOrdering::Relaxed) + 1;
if shrunk.load(AtomicOrdering::Relaxed) {
max_after_shrink.fetch_max(cur, AtomicOrdering::Relaxed);
}
tokio::time::sleep(Duration::from_millis(1)).await;
in_flight.fetch_sub(1, AtomicOrdering::Relaxed);
processed.fetch_add(1, AtomicOrdering::Relaxed);
Ok::<(), &'static str>(())
}
})
.await
.unwrap();
shrink_handle.await.unwrap();
let peak = max_after_shrink.load(AtomicOrdering::Relaxed);
assert!(
peak <= 4,
"rebucketed exceeded shrunk cap of 2: peak post-shrink in-flight = {peak}"
);
}
#[test]
fn mixed_window_app_errors_with_capacity_signal() {
let cfg = LimiterConfig {
window_ops: 10,
min_window_ops: 5,
timeout_ceiling: 0.2,
success_target: 0.9,
..cfg_for_tests()
};
let l = Limiter::new(8, cfg.clone());
for _ in 0..5 {
l.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
for _ in 0..5 {
l.observe(Outcome::Success, Duration::from_millis(50));
}
assert!(
l.current() >= 8,
"AppErrors falsely depressed the success rate; cap dropped from 8 to {}",
l.current()
);
let l2 = Limiter::new(8, cfg);
for _ in 0..5 {
l2.observe(Outcome::ApplicationError, Duration::from_millis(50));
}
for _ in 0..5 {
l2.observe(Outcome::Timeout, Duration::from_millis(50));
}
assert!(
l2.current() < 8,
"all-timeouts (with AppError padding) did not decrease cap; got {}",
l2.current()
);
}
#[test]
fn concurrent_save_load_no_torn_reads() {
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use std::thread;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("snap.json");
save_snapshot(
&path,
ChannelStart {
quote: 1,
store: 1,
fetch: 1,
},
);
let stop = std::sync::Arc::new(AtomicBool::new(false));
let p_w = path.clone();
let s_w = stop.clone();
let writer = thread::spawn(move || {
let mut i = 1usize;
while !s_w.load(AtomicOrdering::Relaxed) {
save_snapshot(
&p_w,
ChannelStart {
quote: i,
store: i,
fetch: i,
},
);
i = i.wrapping_add(1).max(1);
}
});
let p_r = path.clone();
let reader = thread::spawn(move || {
let mut torn = 0usize;
for _ in 0..2_000 {
if let Some(snap) = load_snapshot(&p_r) {
if snap.quote != snap.store || snap.store != snap.fetch {
torn += 1;
}
}
}
torn
});
let torn = reader.join().unwrap();
stop.store(true, AtomicOrdering::Relaxed);
writer.join().unwrap();
assert_eq!(
torn, 0,
"observed {torn} torn reads under concurrent writes"
);
}
#[test]
fn save_with_timeout_returns_promptly_on_fast_failure() {
let path = std::path::PathBuf::from("/nonexistent_root_xyz_test/snap.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
let started = Instant::now();
save_snapshot_with_timeout(path, snap, Duration::from_secs(5));
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"save_snapshot_with_timeout took {elapsed:?} on fast-failing path"
);
}
#[test]
fn save_with_timeout_bounds_wall_time_on_hang() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("snap.json");
let snap = ChannelStart {
quote: 1,
store: 1,
fetch: 1,
};
let started = Instant::now();
save_snapshot_with_timeout(path, snap, Duration::from_micros(1));
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"timeout wrapper did not bound wall time: {elapsed:?}"
);
}
}