use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::Notify;
const EWMA_FAST_SHIFT: u32 = 2;
const EWMA_SLOW_SHIFT: u32 = 5;
const SHRINK_NUM_FACTOR: u64 = 7;
const SHRINK_DEN_FACTOR: u64 = 10;
const THROTTLE_NUM: u64 = 1;
const THROTTLE_DEN: u64 = 2;
const THROTTLE_COOLDOWN_NS: u64 = 1_000_000_000;
const SLOW_START_SAMPLES: u64 = 32;
#[inline(always)]
fn pack(inflight: u32, limit: u32) -> u64 {
((limit as u64) << 32) | (inflight as u64)
}
#[inline(always)]
fn unpack(s: u64) -> (u32, u32) {
(s as u32, (s >> 32) as u32)
}
#[repr(align(64))]
struct HotState {
state: AtomicU64,
min_rtt_us: AtomicU64,
ewma_fast_us: AtomicU64,
ewma_slow_us: AtomicU64,
throttle_until_ns: AtomicU64,
successes: AtomicU64,
first_throttle: AtomicU64,
}
#[repr(align(64))]
struct ColdState {
available: Notify,
bounds: (u32, u32),
created_at: Instant,
regime: RegimeDetector,
cusum_shrink_disabled: AtomicBool,
}
#[derive(Debug)]
pub struct AdaptiveLimit {
hot: HotState,
cold: ColdState,
}
impl std::fmt::Debug for HotState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (inflight, limit) = unpack(self.state.load(Ordering::Relaxed));
f.debug_struct("HotState")
.field("inflight", &inflight)
.field("limit", &limit)
.field("min_rtt_us", &self.min_rtt_us.load(Ordering::Relaxed))
.field("ewma_fast_us", &self.ewma_fast_us.load(Ordering::Relaxed))
.field("ewma_slow_us", &self.ewma_slow_us.load(Ordering::Relaxed))
.finish()
}
}
impl std::fmt::Debug for ColdState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ColdState")
.field("bounds", &self.bounds)
.finish()
}
}
impl AdaptiveLimit {
pub fn new(initial: usize, min_limit: usize, max_limit: usize) -> Arc<Self> {
assert!(min_limit >= 1, "min_limit must be at least 1");
assert!(min_limit <= max_limit, "min_limit must be <= max_limit");
assert!(max_limit <= u32::MAX as usize, "max_limit fits in u32");
let initial = initial.clamp(min_limit, max_limit) as u32;
Arc::new(Self {
hot: HotState {
state: AtomicU64::new(pack(0, initial)),
min_rtt_us: AtomicU64::new(u64::MAX),
ewma_fast_us: AtomicU64::new(0),
ewma_slow_us: AtomicU64::new(0),
throttle_until_ns: AtomicU64::new(0),
successes: AtomicU64::new(0),
first_throttle: AtomicU64::new(0),
},
cold: ColdState {
available: Notify::new(),
bounds: (min_limit as u32, max_limit as u32),
created_at: Instant::now(),
regime: RegimeDetector::new(5_000_000),
cusum_shrink_disabled: AtomicBool::new(false),
},
})
}
pub fn disable_cusum_shrink(&self) {
self.cold
.cusum_shrink_disabled
.store(true, Ordering::Relaxed);
}
pub fn current_limit(&self) -> usize {
unpack(self.hot.state.load(Ordering::Relaxed)).1 as usize
}
pub fn inflight(&self) -> usize {
unpack(self.hot.state.load(Ordering::Relaxed)).0 as usize
}
pub fn from_persistent(
state: &PersistentState,
key: &str,
default_initial: usize,
min_limit: usize,
max_limit: usize,
) -> Arc<Self> {
let seed = state.load_seed(key, default_initial);
Self::new(seed, min_limit, max_limit)
}
pub fn persist(&self, state: &PersistentState, key: &str) {
state.save_observed(key, self.current_limit());
}
pub async fn acquire(self: &Arc<Self>) -> AdaptivePermit {
loop {
let waiter = self.cold.available.notified();
tokio::pin!(waiter);
let s = self.hot.state.load(Ordering::Acquire);
let (inflight, limit) = unpack(s);
if inflight < limit {
match self.hot.state.compare_exchange(
s,
pack(inflight + 1, limit),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return AdaptivePermit {
limiter: Arc::clone(self),
started: Instant::now(),
consumed: false,
};
}
Err(_) => continue,
}
}
waiter.as_mut().await;
}
}
fn record_success(&self, rtt: Duration) {
let rtt_us = (rtt.as_micros().min(u64::MAX as u128)) as u64;
let rtt_us = rtt_us.max(1);
Self::ratchet_min(&self.hot.min_rtt_us, rtt_us);
let fast = Self::ewma_update(&self.hot.ewma_fast_us, rtt_us, EWMA_FAST_SHIFT);
let slow = Self::ewma_update(&self.hot.ewma_slow_us, rtt_us, EWMA_SLOW_SHIFT);
let min_rtt = self.hot.min_rtt_us.load(Ordering::Relaxed);
let now_ns = self.cold.created_at.elapsed().as_nanos() as u64;
let cooldown_active = now_ns < self.hot.throttle_until_ns.load(Ordering::Relaxed);
let n_succ = self.hot.successes.fetch_add(1, Ordering::Relaxed);
let in_slow_start =
n_succ < SLOW_START_SAMPLES && self.hot.first_throttle.load(Ordering::Acquire) == 0;
let _ = (slow, min_rtt, fast);
let regime = self.cold.regime.record(rtt_us);
if regime == RegimeSignal::Rising
&& !self.cold.cusum_shrink_disabled.load(Ordering::Relaxed)
{
self.scale_limit(SHRINK_NUM_FACTOR, SHRINK_DEN_FACTOR);
self.release();
return;
}
if in_slow_start && !cooldown_active {
self.scale_limit_grow(2, 1);
} else if !cooldown_active {
self.bump_limit_by(1);
}
self.release();
}
fn record_throttle(&self) {
self.scale_limit(THROTTLE_NUM, THROTTLE_DEN);
let cooldown_end = self.cold.created_at.elapsed().as_nanos() as u64 + THROTTLE_COOLDOWN_NS;
self.hot
.throttle_until_ns
.store(cooldown_end, Ordering::Relaxed);
self.hot.first_throttle.store(1, Ordering::Release);
self.release();
}
fn record_cancelled(&self) {
self.release();
}
fn release(&self) {
let s = self.hot.state.fetch_sub(1, Ordering::AcqRel);
debug_assert!(unpack(s).0 > 0, "release without matching acquire");
self.cold.available.notify_one();
}
#[inline]
fn ratchet_min(slot: &AtomicU64, sample: u64) {
let mut current = slot.load(Ordering::Relaxed);
while sample < current {
match slot.compare_exchange_weak(current, sample, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => return,
Err(observed) => current = observed,
}
}
}
#[inline]
fn ewma_update(slot: &AtomicU64, sample: u64, shift: u32) -> u64 {
let mut current = slot.load(Ordering::Relaxed);
loop {
let next = if current == 0 {
sample
} else {
let diff = sample as i128 - current as i128;
let step = diff >> shift;
((current as i128).saturating_add(step)).max(1) as u64
};
match slot.compare_exchange_weak(current, next, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => return next,
Err(observed) => current = observed,
}
}
}
fn bump_limit_by(&self, delta: u32) {
let mut s = self.hot.state.load(Ordering::Relaxed);
loop {
let (inflight, limit) = unpack(s);
let next_limit = limit.saturating_add(delta).min(self.cold.bounds.1);
if next_limit == limit {
return;
}
match self.hot.state.compare_exchange_weak(
s,
pack(inflight, next_limit),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let added = next_limit - limit;
for _ in 0..added {
self.cold.available.notify_one();
}
return;
}
Err(observed) => s = observed,
}
}
}
fn scale_limit_grow(&self, num: u64, den: u64) {
let mut s = self.hot.state.load(Ordering::Relaxed);
loop {
let (inflight, limit) = unpack(s);
let scaled = ((limit as u64).saturating_mul(num) / den.max(1)) as u32;
let next_limit = scaled.clamp(self.cold.bounds.0, self.cold.bounds.1);
if next_limit == limit {
return;
}
match self.hot.state.compare_exchange_weak(
s,
pack(inflight, next_limit),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let added = next_limit - limit;
for _ in 0..added {
self.cold.available.notify_one();
}
return;
}
Err(observed) => s = observed,
}
}
}
fn scale_limit(&self, num: u64, den: u64) {
let mut s = self.hot.state.load(Ordering::Relaxed);
loop {
let (inflight, limit) = unpack(s);
let scaled = ((limit as u64).saturating_mul(num) / den) as u32;
let next_limit = scaled.clamp(self.cold.bounds.0, self.cold.bounds.1);
if next_limit == limit {
return;
}
match self.hot.state.compare_exchange_weak(
s,
pack(inflight, next_limit),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(observed) => s = observed,
}
}
}
}
#[must_use = "permit must be recorded with record_success/record_throttle, or dropped to cancel"]
pub struct AdaptivePermit {
limiter: Arc<AdaptiveLimit>,
started: Instant,
consumed: bool,
}
impl AdaptivePermit {
pub fn record_success(mut self) {
self.consumed = true;
let rtt = self.started.elapsed();
self.limiter.record_success(rtt);
}
pub fn record_throttle(mut self) {
self.consumed = true;
self.limiter.record_throttle();
}
pub fn record_cancelled(mut self) {
self.consumed = true;
self.limiter.record_cancelled();
}
#[cfg(test)]
pub(crate) fn record_success_with_rtt(mut self, rtt: Duration) {
self.consumed = true;
self.limiter.record_success(rtt);
}
}
impl Drop for AdaptivePermit {
fn drop(&mut self) {
if !self.consumed {
self.limiter.record_cancelled();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn grows_under_stable_latency() {
let limit = AdaptiveLimit::new(8, 4, 64);
let baseline = Duration::from_millis(50);
for _ in 0..200 {
let permit = limit.acquire().await;
permit.record_success_with_rtt(baseline);
}
assert!(
limit.current_limit() > 16,
"expected growth, got {}",
limit.current_limit()
);
}
#[tokio::test]
async fn shrinks_on_throttle() {
let limit = AdaptiveLimit::new(32, 4, 64);
let permit = limit.acquire().await;
permit.record_throttle();
assert_eq!(limit.current_limit(), 16);
}
#[tokio::test]
async fn floor_holds_under_continuous_throttle() {
let limit = AdaptiveLimit::new(32, 4, 64);
for _ in 0..20 {
let permit = limit.acquire().await;
permit.record_throttle();
}
assert_eq!(limit.current_limit(), 4);
}
#[tokio::test]
async fn ceiling_holds_under_runaway_growth() {
let limit = AdaptiveLimit::new(8, 4, 16);
for _ in 0..1000 {
limit.bump_limit_by(1);
}
assert_eq!(limit.current_limit(), 16);
}
#[tokio::test]
async fn permit_drop_releases_without_signal() {
let limit = AdaptiveLimit::new(2, 1, 4);
let p1 = limit.acquire().await;
let p2 = limit.acquire().await;
assert_eq!(limit.inflight(), 2);
drop(p1);
assert_eq!(limit.inflight(), 1);
drop(p2);
assert_eq!(limit.inflight(), 0);
}
#[tokio::test]
async fn awaits_when_saturated() {
let limit = AdaptiveLimit::new(1, 1, 4);
let hold = limit.acquire().await;
let limit_clone = Arc::clone(&limit);
let task = tokio::spawn(async move {
let p = limit_clone.acquire().await;
p.record_cancelled();
});
tokio::time::sleep(Duration::from_millis(10)).await;
drop(hold);
task.await.unwrap();
}
#[tokio::test]
async fn ewma_converges_toward_sample_mean() {
let slot = AtomicU64::new(0);
for _ in 0..100 {
AdaptiveLimit::ewma_update(&slot, 1000, EWMA_FAST_SHIFT);
}
let v = slot.load(Ordering::Relaxed);
assert!((900..=1100).contains(&v), "ewma settled at {v}");
}
#[tokio::test]
async fn shrink_only_on_explicit_throttle() {
let limit = AdaptiveLimit::new(64, 4, 256);
for i in 0..200 {
let p = limit.acquire().await;
let rtt = if i == 0 {
Duration::from_millis(20)
} else {
Duration::from_millis(100)
};
p.record_success_with_rtt(rtt);
}
assert!(
limit.current_limit() >= 64,
"no shrink without throttle, got {}",
limit.current_limit()
);
}
}
pub struct RegimeDetector {
pos: AtomicU64,
neg: AtomicU64,
baseline_us: AtomicU64,
threshold_us: u64,
}
impl std::fmt::Debug for RegimeDetector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RegimeDetector")
.field("pos", &self.pos.load(Ordering::Relaxed))
.field("neg", &self.neg.load(Ordering::Relaxed))
.field("baseline_us", &self.baseline_us.load(Ordering::Relaxed))
.field("threshold_us", &self.threshold_us)
.finish()
}
}
impl RegimeDetector {
pub fn new(threshold_us: u64) -> Self {
Self {
pos: AtomicU64::new(0),
neg: AtomicU64::new(0),
baseline_us: AtomicU64::new(0),
threshold_us,
}
}
pub fn record(&self, sample_us: u64) -> RegimeSignal {
let baseline = self.baseline_us.load(Ordering::Relaxed);
let next_baseline = if baseline == 0 {
sample_us
} else {
let diff = sample_us as i128 - baseline as i128;
let step = diff >> EWMA_SLOW_SHIFT;
((baseline as i128).saturating_add(step)).max(1) as u64
};
self.baseline_us.store(next_baseline, Ordering::Relaxed);
let dev = sample_us as i64 - next_baseline as i64;
if dev >= 0 {
let pos = self
.pos
.fetch_add(dev as u64, Ordering::Relaxed)
.saturating_add(dev as u64);
self.neg.store(0, Ordering::Relaxed);
if pos >= self.threshold_us {
self.pos.store(0, Ordering::Relaxed);
return RegimeSignal::Rising;
}
} else {
let neg = self
.neg
.fetch_add(-dev as u64, Ordering::Relaxed)
.saturating_add(-dev as u64);
self.pos.store(0, Ordering::Relaxed);
if neg >= self.threshold_us {
self.neg.store(0, Ordering::Relaxed);
return RegimeSignal::Falling;
}
}
RegimeSignal::Stable
}
pub fn baseline_us(&self) -> u64 {
self.baseline_us.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegimeSignal {
Stable,
Rising,
Falling,
}
pub struct PersistentState {
path: PathBuf,
cache: std::sync::Mutex<Option<PersistedSnapshot>>,
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Default)]
struct PersistedSnapshot {
version: u32,
values: std::collections::BTreeMap<String, u64>,
}
impl std::fmt::Debug for PersistentState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PersistentState")
.field("path", &self.path)
.finish()
}
}
const PERSISTED_SCHEMA_VERSION: u32 = 1;
const PERSISTED_BLEND_NUM: u64 = 7;
const PERSISTED_BLEND_DEN: u64 = 10;
impl PersistentState {
pub fn new(path: PathBuf) -> Self {
Self {
path,
cache: std::sync::Mutex::new(None),
}
}
pub fn load_seed(&self, key: &str, default: usize) -> usize {
let snapshot = self.snapshot();
let persisted = snapshot.values.get(key).copied();
match persisted {
Some(v) if snapshot.version == PERSISTED_SCHEMA_VERSION => {
let blended = (v.saturating_mul(PERSISTED_BLEND_NUM)
+ (default as u64).saturating_mul(PERSISTED_BLEND_DEN - PERSISTED_BLEND_NUM))
/ PERSISTED_BLEND_DEN;
blended as usize
}
_ => default,
}
}
pub fn save_observed(&self, key: &str, value: usize) {
let mut snapshot = self.snapshot();
snapshot.version = PERSISTED_SCHEMA_VERSION;
snapshot.values.insert(key.to_string(), value as u64);
if let Ok(mut cached) = self.cache.lock() {
*cached = Some(snapshot.clone());
}
if let Some(parent) = self.path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let Ok(json) = serde_json::to_vec_pretty(&snapshot) else {
return;
};
let Some(parent) = self.path.parent() else {
return;
};
let tmp = parent.join(format!(
".adaptive-state.tmp.{}.{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
if std::fs::write(&tmp, json).is_ok() {
let _ = std::fs::rename(&tmp, &self.path);
} else {
let _ = std::fs::remove_file(&tmp);
}
}
fn snapshot(&self) -> PersistedSnapshot {
if let Ok(mut cached) = self.cache.lock() {
if let Some(snap) = cached.as_ref() {
return snap.clone();
}
let snap = read_snapshot(&self.path).unwrap_or_default();
*cached = Some(snap.clone());
return snap;
}
read_snapshot(&self.path).unwrap_or_default()
}
}
fn read_snapshot(path: &Path) -> Option<PersistedSnapshot> {
let bytes = std::fs::read(path).ok()?;
serde_json::from_slice(&bytes).ok()
}
pub fn default_persistent_state_path() -> Option<PathBuf> {
let ns = crate::embedder().cache_namespace;
if let Ok(xdg) = std::env::var("XDG_CACHE_HOME")
&& !xdg.is_empty()
{
return Some(PathBuf::from(xdg).join(ns).join("adaptive-state.json"));
}
if cfg!(windows) {
std::env::var("LOCALAPPDATA")
.ok()
.filter(|s| !s.is_empty())
.map(|p| PathBuf::from(p).join(ns).join("adaptive-state.json"))
} else if cfg!(target_os = "macos") {
std::env::var("HOME").ok().map(|h| {
PathBuf::from(h)
.join("Library")
.join("Caches")
.join(ns)
.join("adaptive-state.json")
})
} else {
std::env::var("HOME").ok().map(|h| {
PathBuf::from(h)
.join(".cache")
.join(ns)
.join("adaptive-state.json")
})
}
}
static GLOBAL_PERSISTENT_STATE: std::sync::OnceLock<Arc<PersistentState>> =
std::sync::OnceLock::new();
pub fn global_persistent_state() -> Option<Arc<PersistentState>> {
let path = default_persistent_state_path()?;
Some(
GLOBAL_PERSISTENT_STATE
.get_or_init(|| Arc::new(PersistentState::new(path)))
.clone(),
)
}
#[cfg(test)]
mod additional_tests {
use super::*;
#[test]
fn cusum_detects_rising_shift() {
let det = RegimeDetector::new(50_000);
for _ in 0..100 {
assert_eq!(det.record(10_000), RegimeSignal::Stable);
}
let mut saw_rising = false;
for _ in 0..200 {
if det.record(50_000) == RegimeSignal::Rising {
saw_rising = true;
break;
}
}
assert!(saw_rising, "expected rising regime detection");
}
#[test]
fn cusum_ignores_single_outlier() {
let det = RegimeDetector::new(500_000);
for _ in 0..100 {
det.record(10_000);
}
let r = det.record(2_000_000);
assert_eq!(
r,
RegimeSignal::Rising,
"single big outlier above threshold fires"
);
let det2 = RegimeDetector::new(5_000_000);
for _ in 0..100 {
det2.record(10_000);
}
for _ in 0..5 {
assert_eq!(det2.record(50_000), RegimeSignal::Stable);
}
}
}