#![allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, PoisonError};
use std::time::{Duration, Instant};
use snapdir_core::resources::{resident_set_bytes, CpuSampler};
use snapdir_core::Meter;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
#[derive(Debug)]
struct BlockingState {
available: usize,
limit: usize,
in_flight: usize,
}
#[derive(Debug)]
struct GateInner {
ceiling: usize,
limit: AtomicUsize,
sem: Arc<Semaphore>,
async_debt: AtomicUsize,
blocking: Mutex<BlockingState>,
blocking_cv: Condvar,
}
#[derive(Clone, Debug)]
pub struct AdaptiveGate {
inner: Arc<GateInner>,
}
#[derive(Debug)]
pub struct GatePermit {
inner: Arc<GateInner>,
permit: Option<OwnedSemaphorePermit>,
}
impl Drop for GatePermit {
fn drop(&mut self) {
let Some(permit) = self.permit.take() else {
return;
};
let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
loop {
if debt == 0 {
return; }
match self.inner.async_debt.compare_exchange_weak(
debt,
debt - 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
permit.forget();
return;
}
Err(actual) => debt = actual,
}
}
}
}
#[derive(Debug)]
pub struct BlockingGatePermit {
inner: Arc<GateInner>,
}
impl Drop for BlockingGatePermit {
fn drop(&mut self) {
let mut state = self
.inner
.blocking
.lock()
.unwrap_or_else(PoisonError::into_inner);
state.in_flight = state.in_flight.saturating_sub(1);
if state.available + state.in_flight < state.limit {
state.available += 1;
drop(state);
self.inner.blocking_cv.notify_one();
}
}
}
impl AdaptiveGate {
#[must_use]
pub fn new(start: usize, ceiling: usize) -> Self {
let ceiling = ceiling.max(1);
let start = start.clamp(1, ceiling);
Self {
inner: Arc::new(GateInner {
ceiling,
limit: AtomicUsize::new(start),
sem: Arc::new(Semaphore::new(start)),
async_debt: AtomicUsize::new(0),
blocking: Mutex::new(BlockingState {
available: start,
limit: start,
in_flight: 0,
}),
blocking_cv: Condvar::new(),
}),
}
}
#[must_use]
pub fn ceiling(&self) -> usize {
self.inner.ceiling
}
#[must_use]
pub fn limit(&self) -> usize {
self.inner.limit.load(Ordering::SeqCst)
}
pub fn set_limit(&self, n: usize) -> usize {
let new = n.clamp(1, self.inner.ceiling);
let old = self.inner.limit.swap(new, Ordering::SeqCst);
if new == old {
return new;
}
if new > old {
let grow = new - old;
let paid = self.take_debt(grow);
let remainder = grow - paid;
if remainder > 0 {
self.inner.sem.add_permits(remainder);
}
} else {
let mut to_remove = old - new;
while to_remove > 0 {
if let Ok(permit) = self.inner.sem.clone().try_acquire_owned() {
permit.forget();
to_remove -= 1;
} else {
break; }
}
if to_remove > 0 {
self.inner.async_debt.fetch_add(to_remove, Ordering::SeqCst);
}
}
{
let mut state = self
.inner
.blocking
.lock()
.unwrap_or_else(PoisonError::into_inner);
state.limit = new;
state.available = new.saturating_sub(state.in_flight);
drop(state);
if new > old {
self.inner.blocking_cv.notify_all();
}
}
new
}
fn take_debt(&self, n: usize) -> usize {
let mut debt = self.inner.async_debt.load(Ordering::SeqCst);
loop {
let pay = debt.min(n);
if pay == 0 {
return 0;
}
match self.inner.async_debt.compare_exchange_weak(
debt,
debt - pay,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => return pay,
Err(actual) => debt = actual,
}
}
}
pub async fn acquire(&self) -> GatePermit {
let permit = Arc::clone(&self.inner.sem)
.acquire_owned()
.await
.expect("AdaptiveGate semaphore is never closed while the gate is alive");
GatePermit {
inner: Arc::clone(&self.inner),
permit: Some(permit),
}
}
#[must_use]
pub fn acquire_blocking(&self) -> BlockingGatePermit {
let mut state = self
.inner
.blocking
.lock()
.unwrap_or_else(PoisonError::into_inner);
while state.available == 0 {
state = self
.inner
.blocking_cv
.wait(state)
.unwrap_or_else(PoisonError::into_inner);
}
state.available -= 1;
state.in_flight += 1;
BlockingGatePermit {
inner: Arc::clone(&self.inner),
}
}
#[must_use]
pub fn available_permits(&self) -> usize {
self.inner.sem.available_permits()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum OpResult {
Ok,
Throttle,
HardErr,
}
#[derive(Clone, Copy, Debug)]
pub struct OpSample {
pub bytes: u64,
pub latency: Duration,
pub result: OpResult,
}
#[derive(Clone, Copy, Debug)]
pub struct AdaptivePolicy {
pub fraction: f64,
pub ceiling: usize,
pub total_ram: u64,
pub max_rate: Option<u64>,
}
impl AdaptivePolicy {
#[must_use]
pub fn new(fraction: f64, ceiling: usize, total_ram: u64, max_rate: Option<u64>) -> Self {
let fraction = if fraction.is_finite() && fraction > 0.0 && fraction <= 1.0 {
fraction
} else {
0.8
};
Self {
fraction,
ceiling: ceiling.max(1),
total_ram,
max_rate,
}
}
}
impl Default for AdaptivePolicy {
fn default() -> Self {
Self {
fraction: 0.8,
ceiling: 16,
total_ram: 0,
max_rate: None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Decision {
pub limit: usize,
pub target_rate: Option<u64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum Phase {
SlowStart,
Aimd,
}
const EWMA_ALPHA: f64 = 0.3;
const SLOW_START_MULT: f64 = 1.5;
const BACKOFF_MULT: f64 = 0.5;
const GRADIENT_THRESHOLD: f64 = 0.7;
const CPU_NO_INCREASE_PCT: f64 = 85.0;
const CPU_DECREASE_PCT: f64 = 95.0;
const COOLDOWN: Duration = Duration::from_secs(15);
const REPROBE_INTERVAL: Duration = Duration::from_secs(15);
const IMPROVE_EPS: f64 = 0.02;
pub type MonoTime = Duration;
#[derive(Debug)]
pub struct AdaptiveController {
policy: AdaptivePolicy,
limit: f64,
phase: Phase,
goodput_ewma: f64,
goodput_knee: f64,
goodput_prev_tick: f64,
rtt_ewma: f64,
rtt_min: f64,
acc_bytes: u64,
acc_latency_secs: f64,
acc_count: u64,
congestion_seen: bool,
cooldown_until: Option<MonoTime>,
last_reprobe: MonoTime,
probe_pending: Option<(f64, f64)>,
started: bool,
}
impl AdaptiveController {
#[must_use]
pub fn new(policy: AdaptivePolicy) -> Self {
let start = 2.0_f64.min(policy.ceiling as f64).max(1.0);
Self {
policy,
limit: start,
phase: Phase::SlowStart,
goodput_ewma: 0.0,
goodput_knee: 0.0,
goodput_prev_tick: 0.0,
rtt_ewma: 0.0,
rtt_min: f64::INFINITY,
acc_bytes: 0,
acc_latency_secs: 0.0,
acc_count: 0,
congestion_seen: false,
cooldown_until: None,
last_reprobe: Duration::ZERO,
probe_pending: None,
started: false,
}
}
#[must_use]
pub fn current_limit(&self) -> usize {
(self.limit.round() as usize).clamp(1, self.policy.ceiling)
}
pub fn record_op(&mut self, sample: OpSample) {
let secs = sample.latency.as_secs_f64().max(0.0);
if secs > 0.0 {
if self.rtt_ewma <= 0.0 {
self.rtt_ewma = secs;
} else {
self.rtt_ewma = EWMA_ALPHA.mul_add(secs - self.rtt_ewma, self.rtt_ewma);
}
if secs < self.rtt_min {
self.rtt_min = secs;
}
}
self.acc_bytes = self.acc_bytes.saturating_add(sample.bytes);
self.acc_latency_secs += secs;
self.acc_count += 1;
match sample.result {
OpResult::Throttle | OpResult::HardErr => self.congestion_seen = true,
OpResult::Ok => {}
}
}
pub fn tick(
&mut self,
now: MonoTime,
cpu_pct: Option<f64>,
_rss: Option<u64>,
p95_obj_size: u64,
) -> Decision {
if !self.started {
self.started = true;
self.last_reprobe = now;
}
let interval_goodput = self.window_goodput();
if interval_goodput > 0.0 {
if self.goodput_ewma <= 0.0 {
self.goodput_ewma = interval_goodput;
} else {
self.goodput_ewma =
EWMA_ALPHA.mul_add(interval_goodput - self.goodput_ewma, self.goodput_ewma);
}
if self.goodput_ewma > self.goodput_knee {
self.goodput_knee = self.goodput_ewma;
}
}
let congestion = self.congestion_seen;
let gradient = self.latency_gradient();
let cpu_blocks_increase = cpu_pct.is_some_and(|c| c > CPU_NO_INCREASE_PCT);
let cpu_forces_decrease = cpu_pct.is_some_and(|c| c > CPU_DECREASE_PCT);
let in_cooldown = self.cooldown_until.is_some_and(|d| now < d);
let queueing = gradient.is_some_and(|g| g < GRADIENT_THRESHOLD);
if let Some((prev_limit, prev_goodput)) = self.probe_pending.take() {
let improved = self.goodput_ewma > prev_goodput * (1.0 + IMPROVE_EPS);
if !improved || congestion || queueing {
self.limit = prev_limit; }
}
if congestion {
self.limit = (self.limit * BACKOFF_MULT).max(1.0);
self.phase = Phase::Aimd; self.cooldown_until = Some(now + COOLDOWN);
self.goodput_prev_tick = self.goodput_ewma;
return self.finish(now, p95_obj_size);
}
if cpu_forces_decrease {
self.limit = (self.limit * BACKOFF_MULT).max(1.0);
self.goodput_prev_tick = self.goodput_ewma;
return self.finish(now, p95_obj_size);
}
if queueing {
if self.phase == Phase::SlowStart {
self.phase = Phase::Aimd;
}
self.limit = (self.limit - 1.0).max(1.0);
self.goodput_prev_tick = self.goodput_ewma;
return self.finish(now, p95_obj_size);
}
if in_cooldown || cpu_blocks_increase {
self.goodput_prev_tick = self.goodput_ewma;
return self.finish(now, p95_obj_size);
}
match self.phase {
Phase::SlowStart => {
let rising = self.goodput_ewma > self.goodput_prev_tick * (1.0 + IMPROVE_EPS)
|| self.goodput_prev_tick <= 0.0;
if rising {
self.limit *= SLOW_START_MULT;
} else {
self.phase = Phase::Aimd;
self.limit += 1.0; }
}
Phase::Aimd => {
if now >= self.last_reprobe + REPROBE_INTERVAL {
self.last_reprobe = now;
self.probe_pending = Some((self.limit, self.goodput_ewma));
self.limit += 1.0;
} else {
self.limit += 1.0;
}
}
}
self.goodput_prev_tick = self.goodput_ewma;
self.finish(now, p95_obj_size)
}
fn window_goodput(&self) -> f64 {
if self.acc_count == 0 || self.acc_latency_secs <= 0.0 {
return 0.0;
}
let per_op = self.acc_bytes as f64 / self.acc_latency_secs;
per_op * self.limit
}
fn latency_gradient(&self) -> Option<f64> {
if self.rtt_ewma > 0.0 && self.rtt_min.is_finite() && self.rtt_min > 0.0 {
Some((self.rtt_min / self.rtt_ewma).clamp(0.0, 1.0))
} else {
None
}
}
fn finish(&mut self, _now: MonoTime, p95_obj_size: u64) -> Decision {
let mem_cap = self.memory_cap(p95_obj_size);
let ceiling = self.policy.ceiling;
let capped = (self.limit.round() as usize).clamp(1, ceiling).min(mem_cap);
self.limit = capped as f64;
self.acc_bytes = 0;
self.acc_latency_secs = 0.0;
self.acc_count = 0;
self.congestion_seen = false;
Decision {
limit: capped,
target_rate: self.target_rate(),
}
}
fn memory_cap(&self, p95_obj_size: u64) -> usize {
if self.policy.total_ram == 0 || p95_obj_size == 0 {
return self.policy.ceiling;
}
let budget = self.policy.fraction * self.policy.total_ram as f64;
let cap = (budget / p95_obj_size as f64).floor();
if cap < 1.0 {
1
} else {
(cap as usize).min(self.policy.ceiling)
}
}
fn target_rate(&self) -> Option<u64> {
let knee_rate = if self.goodput_knee > 0.0 {
Some((self.policy.fraction * self.goodput_knee).max(1.0) as u64)
} else {
None
};
match (knee_rate, self.policy.max_rate) {
(Some(k), Some(m)) => Some(k.min(m)),
(Some(k), None) => Some(k),
(None, Some(m)) => Some(m),
(None, None) => None,
}
}
}
#[must_use]
pub fn p95_object_size(sizes: &[u64]) -> u64 {
if sizes.is_empty() {
return 0;
}
let mut sorted = sizes.to_vec();
sorted.sort_unstable();
let n = sorted.len();
let rank = ((0.95 * n as f64).ceil() as usize).max(1);
sorted[rank.min(n) - 1]
}
#[derive(Clone)]
pub struct ControllerDriver {
controller: Arc<Mutex<AdaptiveController>>,
gate: AdaptiveGate,
cpu: Arc<Mutex<CpuSampler>>,
p95_obj_size: u64,
epoch: Instant,
rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
meter: Option<Arc<Meter>>,
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for ControllerDriver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ControllerDriver")
.field("gate_limit", &self.gate.limit())
.field("ceiling", &self.gate.ceiling())
.field("p95_obj_size", &self.p95_obj_size)
.field("has_rate_applier", &self.rate_applier.is_some())
.field("has_meter", &self.meter.is_some())
.finish()
}
}
impl ControllerDriver {
#[must_use]
pub fn new(
policy: AdaptivePolicy,
gate: AdaptiveGate,
p95_obj_size: u64,
rate_applier: Option<Arc<dyn Fn(Option<u64>) + Send + Sync>>,
meter: Option<Arc<Meter>>,
) -> Self {
Self {
controller: Arc::new(Mutex::new(AdaptiveController::new(policy))),
gate,
cpu: Arc::new(Mutex::new(CpuSampler::new())),
p95_obj_size,
epoch: Instant::now(),
rate_applier,
meter,
}
}
pub fn record_op(&self, sample: OpSample) {
self.controller
.lock()
.unwrap_or_else(PoisonError::into_inner)
.record_op(sample);
}
#[allow(clippy::must_use_candidate)]
pub fn tick(&self) -> Decision {
let cpu_pct = self
.cpu
.lock()
.unwrap_or_else(PoisonError::into_inner)
.poll();
let rss = resident_set_bytes();
let now = self.epoch.elapsed();
self.tick_with(now, cpu_pct, rss)
}
#[allow(clippy::must_use_candidate)]
pub fn tick_with(&self, now: MonoTime, cpu_pct: Option<f64>, rss: Option<u64>) -> Decision {
let decision = {
let mut controller = self
.controller
.lock()
.unwrap_or_else(PoisonError::into_inner);
controller.tick(now, cpu_pct, rss, self.p95_obj_size)
};
self.gate.set_limit(decision.limit);
if let Some(apply) = &self.rate_applier {
apply(decision.target_rate);
}
if let Some(meter) = &self.meter {
meter.set_current_limit(decision.limit as u64);
meter.set_target_rate(decision.target_rate.unwrap_or(0));
}
decision
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("build tokio runtime")
}
#[test]
fn adaptive_gate_async_acquire_blocks_beyond_limit() {
let rt = runtime();
rt.block_on(async {
let gate = AdaptiveGate::new(2, 8);
let p1 = gate.acquire().await;
let p2 = gate.acquire().await;
assert_eq!(gate.available_permits(), 0, "both permits taken");
let fut = gate.acquire();
tokio::pin!(fut);
let pending = futures::poll!(&mut fut);
assert!(pending.is_pending(), "third acquire blocks at limit 2");
gate.set_limit(3);
let p3 = futures::poll!(&mut fut);
assert!(p3.is_ready(), "set_limit(3) frees a permit for the waiter");
drop((p1, p2));
});
}
fn peak_under_limit(limit: usize, items: usize) -> usize {
let rt = runtime();
rt.block_on(async {
let gate = AdaptiveGate::new(limit, 32);
let in_flight = Arc::new(AtomicUsize::new(0));
let high = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..items {
let gate = gate.clone();
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
handles.push(tokio::spawn(async move {
let _p = gate.acquire().await;
let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
high.fetch_max(cur, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(20)).await;
in_flight.fetch_sub(1, Ordering::SeqCst);
}));
}
for h in handles {
h.await.unwrap();
}
high.load(Ordering::SeqCst)
})
}
#[test]
fn adaptive_gate_async_set_limit_changes_max_in_flight() {
assert_eq!(peak_under_limit(3, 12), 3, "limit 3 caps in-flight at 3");
assert_eq!(peak_under_limit(1, 5), 1, "limit 1 is strictly sequential");
}
#[test]
fn adaptive_gate_async_shrink_does_not_deadlock_held_permits() {
let rt = runtime();
rt.block_on(async {
let gate = AdaptiveGate::new(4, 8);
let p1 = gate.acquire().await;
let p2 = gate.acquire().await;
let new = gate.set_limit(1);
assert_eq!(new, 1);
drop(p1);
drop(p2);
let _q = gate.acquire().await;
let fut = gate.acquire();
tokio::pin!(fut);
assert!(
futures::poll!(&mut fut).is_pending(),
"after shrink to 1, only one permit is available"
);
});
}
#[test]
fn adaptive_gate_blocking_acquire_and_set_limit() {
let gate = AdaptiveGate::new(1, 8);
let in_flight = Arc::new(AtomicUsize::new(0));
let high = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..4 {
let gate = gate.clone();
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
handles.push(thread::spawn(move || {
let _p = gate.acquire_blocking();
let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
high.fetch_max(cur, Ordering::SeqCst);
thread::sleep(Duration::from_millis(30));
in_flight.fetch_sub(1, Ordering::SeqCst);
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(
high.load(Ordering::SeqCst),
1,
"blocking limit 1 serializes"
);
let gate2 = AdaptiveGate::new(1, 8);
gate2.set_limit(3);
let in_flight = Arc::new(AtomicUsize::new(0));
let high = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..6 {
let gate = gate2.clone();
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
handles.push(thread::spawn(move || {
let _p = gate.acquire_blocking();
let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
high.fetch_max(cur, Ordering::SeqCst);
thread::sleep(Duration::from_millis(30));
in_flight.fetch_sub(1, Ordering::SeqCst);
}));
}
for h in handles {
h.join().unwrap();
}
let peak = high.load(Ordering::SeqCst);
assert!(
(1..=3).contains(&peak) && peak >= 2,
"after set_limit(3) peak in-flight should reach ~3, got {peak}"
);
}
#[test]
fn adaptive_gate_blocking_shrink_does_not_deadlock() {
let gate = AdaptiveGate::new(4, 8);
let p1 = gate.acquire_blocking();
let p2 = gate.acquire_blocking();
gate.set_limit(1); drop(p1);
drop(p2);
let got = Arc::new(AtomicUsize::new(0));
let g = gate.clone();
let got2 = Arc::clone(&got);
let h = thread::spawn(move || {
let _p = g.acquire_blocking();
got2.fetch_add(1, Ordering::SeqCst);
});
h.join().unwrap();
assert_eq!(got.load(Ordering::SeqCst), 1, "acquire after shrink works");
}
fn ok_op(bytes: u64, latency_ms: u64) -> OpSample {
OpSample {
bytes,
latency: Duration::from_millis(latency_ms),
result: OpResult::Ok,
}
}
fn big_policy() -> AdaptivePolicy {
AdaptivePolicy::new(0.8, 64, u64::MAX, None)
}
#[test]
fn adaptive_controller_healthy_stream_ramps_up_then_caps() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
let mut last = c.current_limit();
let mut max_seen = last;
for i in 0..20u64 {
for _ in 0..4 {
c.record_op(ok_op(1_000_000 + i * 200_000, 50));
}
let d = c.tick(t, Some(30.0), Some(0), 4096);
assert!(d.limit <= 64, "never exceed ceiling");
max_seen = max_seen.max(d.limit);
last = d.limit;
t += Duration::from_secs(1);
}
assert!(
max_seen > 2,
"healthy stream should ramp the limit up from the start of 2, got max {max_seen}"
);
assert!(last <= 64, "stays within ceiling, got {last}");
}
#[test]
fn adaptive_controller_throttle_backs_off_and_cooldown_holds() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
for _ in 0..6 {
for _ in 0..4 {
c.record_op(ok_op(2_000_000, 40));
}
c.tick(t, Some(20.0), Some(0), 4096);
t += Duration::from_secs(1);
}
let before = c.current_limit();
assert!(before > 2, "should have grown before the throttle");
c.record_op(OpSample {
bytes: 1000,
latency: Duration::from_millis(40),
result: OpResult::Throttle,
});
let d = c.tick(t, Some(20.0), Some(0), 4096);
assert!(
d.limit <= before / 2 + 1,
"throttle should at least halve the limit: before {before}, after {}",
d.limit
);
let after_backoff = d.limit;
t += Duration::from_secs(1);
for _ in 0..10 {
for _ in 0..4 {
c.record_op(ok_op(5_000_000, 20)); }
let d = c.tick(t, Some(10.0), Some(0), 4096);
assert!(
d.limit <= after_backoff,
"no increase during cooldown: {} > {after_backoff}",
d.limit
);
t += Duration::from_secs(1);
}
t += Duration::from_secs(6);
for _ in 0..3 {
for _ in 0..4 {
c.record_op(ok_op(6_000_000, 20));
}
c.tick(t, Some(10.0), Some(0), 4096);
t += Duration::from_secs(1);
}
assert!(
c.current_limit() > after_backoff,
"limit should recover after the cooldown expires"
);
}
#[test]
fn adaptive_controller_rising_latency_holds_without_error() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
for _ in 0..5 {
for _ in 0..4 {
c.record_op(ok_op(2_000_000, 10)); }
c.tick(t, Some(20.0), Some(0), 4096);
t += Duration::from_secs(1);
}
let peak = c.current_limit();
for _ in 0..6 {
for _ in 0..4 {
c.record_op(ok_op(2_000_000, 200)); }
let d = c.tick(t, Some(20.0), Some(0), 4096);
assert!(
d.limit <= peak,
"high latency gradient must hold/decrease (no growth): {} > {peak}",
d.limit
);
t += Duration::from_secs(1);
}
assert!(
c.current_limit() <= peak,
"latency-gradient guard held the limit at/below the peak"
);
}
#[test]
fn adaptive_controller_memory_budget_caps_limit() {
let total_ram = 10 * 1024 * 1024;
let p95 = 2 * 1024 * 1024;
let policy = AdaptivePolicy::new(0.8, 64, total_ram, None);
let mut c = AdaptiveController::new(policy);
let mut t = Duration::ZERO;
for _ in 0..30 {
for _ in 0..4 {
c.record_op(ok_op(10_000_000, 10)); }
let d = c.tick(t, Some(10.0), Some(0), p95);
assert!(
(d.limit as u64) * p95 <= ((0.8 * total_ram as f64) as u64),
"memory budget violated: limit {} * p95 {} > budget",
d.limit,
p95
);
assert!(
d.limit <= 4,
"memory cap should pin limit at 4, got {}",
d.limit
);
t += Duration::from_secs(1);
}
}
#[test]
fn adaptive_controller_high_cpu_prevents_increase() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
for _ in 0..3 {
for _ in 0..4 {
c.record_op(ok_op(2_000_000, 20));
}
c.tick(t, Some(20.0), Some(0), 4096);
t += Duration::from_secs(1);
}
let before = c.current_limit();
for _ in 0..8 {
for _ in 0..4 {
c.record_op(ok_op(5_000_000, 10));
}
let d = c.tick(t, Some(90.0), Some(0), 4096);
assert!(
d.limit <= before,
"cpu>85 must block increases: {} > {before}",
d.limit
);
t += Duration::from_secs(1);
}
}
#[test]
fn adaptive_controller_converges_on_steady_stream() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
let mut limits = Vec::new();
for _ in 0..60 {
for _ in 0..4 {
c.record_op(ok_op(3_000_000, 25)); }
let d = c.tick(t, Some(40.0), Some(0), 4096);
limits.push(d.limit);
t += Duration::from_secs(1);
}
let tail = &limits[limits.len() - 15..];
let min = *tail.iter().min().unwrap();
let max = *tail.iter().max().unwrap();
assert!(
max - min <= 3,
"steady stream should converge (small tail spread), got min {min} max {max} tail {tail:?}"
);
}
#[test]
fn p95_object_size_nearest_rank() {
assert_eq!(p95_object_size(&[]), 0, "empty -> 0 (memory cap disabled)");
assert_eq!(p95_object_size(&[42]), 42, "single element");
let sizes: Vec<u64> = (1..=100).collect();
assert_eq!(p95_object_size(&sizes), 95);
assert_eq!(p95_object_size(&[1, 1, 1, 1_000_000]), 1_000_000);
}
#[test]
fn controller_driver_throttle_drives_gate_decrease() {
let gate = AdaptiveGate::new(2, 32);
let applied_rate = Arc::new(Mutex::new(None::<Option<u64>>));
let rate_sink = Arc::clone(&applied_rate);
let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> = Arc::new(move |r| {
*rate_sink.lock().unwrap() = Some(r);
});
let policy = AdaptivePolicy::new(0.8, 32, u64::MAX, None);
let driver = ControllerDriver::new(policy, gate.clone(), 4096, Some(rate_applier), None);
for _ in 0..8 {
for _ in 0..4 {
driver.record_op(OpSample {
bytes: 2_000_000,
latency: Duration::from_millis(40),
result: OpResult::Ok,
});
}
driver.tick();
}
let grown = gate.limit();
assert!(
grown > 2,
"healthy stream should grow the gate, got {grown}"
);
driver.record_op(OpSample {
bytes: 1000,
latency: Duration::from_millis(40),
result: OpResult::Throttle,
});
let decision = driver.tick();
assert!(
gate.limit() < grown,
"throttle must shrink the gate: {} >= {grown}",
gate.limit()
);
assert_eq!(
gate.limit(),
decision.limit,
"the gate reflects the decision's limit"
);
assert!(
applied_rate.lock().unwrap().is_some(),
"the rate applier must have been invoked"
);
}
#[test]
fn adaptive_controller_target_rate_respects_fraction() {
let mut c = AdaptiveController::new(big_policy());
let mut t = Duration::ZERO;
let mut last_rate = None;
for _ in 0..15 {
for _ in 0..4 {
c.record_op(ok_op(1_000_000, 100)); }
let d = c.tick(t, Some(30.0), Some(0), 4096);
last_rate = d.target_rate;
t += Duration::from_secs(1);
}
let rate = last_rate.expect("a knee should have produced a target_rate");
assert!(
rate > 0,
"target_rate should be positive once a knee is known"
);
let policy = AdaptivePolicy::new(0.8, 64, u64::MAX, Some(1234));
let mut c2 = AdaptiveController::new(policy);
let mut t2 = Duration::ZERO;
for _ in 0..10 {
for _ in 0..4 {
c2.record_op(ok_op(10_000_000, 10));
}
let d = c2.tick(t2, Some(30.0), Some(0), 4096);
assert!(
d.target_rate.unwrap_or(0) <= 1234,
"target_rate must respect max_rate cap"
);
t2 += Duration::from_secs(1);
}
}
}