use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use snapdir_core::store::StoreError;
use snapdir_stores::transfer::{AdaptivePolicy as TransferAdaptivePolicy, TransferConfig};
use snapdir_stores::{
classify_error, run_adaptive, run_concurrent, AdaptiveGate, AdaptivePolicy, ControllerDriver,
OpResult, OpSample,
};
fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("build tokio runtime")
}
fn record_like_live(driver: &ControllerDriver, bytes: u64, outcome: &Result<(), StoreError>) {
let (bytes, result) = match outcome {
Ok(()) => (bytes, OpResult::Ok),
Err(err) => (0, classify_error(err)),
};
driver.record_op(OpSample {
bytes,
latency: Duration::from_millis(40),
result,
});
}
fn transient_err(msg: &str) -> StoreError {
StoreError::Backend {
message: msg.to_owned(),
source: None,
}
}
#[test]
fn adaptive_wire_classify_injected_transient_is_throttle() {
for msg in [
"GET object failed: 503 Service Unavailable",
"S3 PUT failed: SlowDown, reduce your request rate",
"request timeout while downloading object",
"connection reset by peer",
] {
assert_eq!(
classify_error(&transient_err(msg)),
OpResult::Throttle,
"live wiring relies on {msg:?} classifying as Throttle",
);
}
assert_eq!(
classify_error(&transient_err("permission denied")),
OpResult::HardErr,
);
}
#[test]
fn adaptive_wire_aimd_shrinks_on_throttle_then_recovers() {
const CALM_CPU: Option<f64> = Some(20.0);
const NO_RSS: Option<u64> = Some(0);
let gate = AdaptiveGate::new(2, 32);
let policy = AdaptivePolicy::new(0.8, 32, u64::MAX, None);
let driver = ControllerDriver::new(policy, gate.clone(), 4096, None, None);
let mut now = Duration::ZERO;
let step = Duration::from_secs(1);
for _ in 0..10 {
for _ in 0..4 {
record_like_live(&driver, 2_000_000, &Ok(()));
}
driver.tick_with(now, CALM_CPU, NO_RSS);
now += step;
}
let grown = gate.limit();
assert!(
grown > 2,
"a healthy stream should grow the live gate above the seed of 2, got {grown}",
);
record_like_live(&driver, 0, &Err(transient_err("503 Service Unavailable")));
driver.tick_with(now, CALM_CPU, NO_RSS);
let after_throttle = gate.limit();
now += step;
assert!(
after_throttle < grown,
"sustained Throttle must multiplicatively shrink the live gate: {after_throttle} >= {grown}",
);
assert!(
after_throttle <= grown / 2 + 1,
"Throttle backoff should at least halve the gate: before {grown}, after {after_throttle}",
);
for _ in 0..5 {
for _ in 0..4 {
record_like_live(&driver, 3_000_000, &Ok(()));
}
driver.tick_with(now, CALM_CPU, NO_RSS);
now += step;
assert!(
gate.limit() <= after_throttle,
"no increase during the 15s cooldown: {} > {after_throttle}",
gate.limit(),
);
}
now += Duration::from_secs(20); for _ in 0..12 {
for _ in 0..4 {
record_like_live(&driver, 3_000_000, &Ok(()));
}
driver.tick_with(now, CALM_CPU, NO_RSS);
now += step;
}
assert!(
gate.limit() > after_throttle,
"after the cooldown, sustained Success must additively grow the live gate back up: {} <= {after_throttle}",
gate.limit(),
);
}
#[test]
fn adaptive_wire_run_adaptive_closure_records_throttle_and_shrinks_gate() {
let rt = runtime();
let gate = AdaptiveGate::new(4, 16);
let policy = AdaptivePolicy::new(0.8, 16, u64::MAX, None);
let driver = ControllerDriver::new(policy, gate.clone(), 4096, None, None);
let before = gate.limit();
assert_eq!(before, 4, "gate seeds at the configured concurrency");
let result: Result<Vec<()>, StoreError> = rt.block_on({
let gate = gate.clone();
let driver = driver.clone();
async move {
run_adaptive(0..8, &gate, |item| {
let driver = &driver;
async move {
let outcome: Result<(), StoreError> =
Err(transient_err("got HTTP 503 from backend"));
record_like_live(driver, item, &outcome);
outcome
}
})
.await
}
});
let err = result.expect_err("an all-error batch must surface the first error");
assert!(
matches!(err, StoreError::Backend { ref message, .. } if message.contains("503")),
"unexpected error: {err:?}",
);
driver.tick();
assert!(
gate.limit() < before,
"throttled ops recorded through the live closure must shrink the gate: {} >= {before}",
gate.limit(),
);
}
#[test]
fn adaptive_wire_off_path_uses_run_concurrent_no_gate() {
assert_eq!(
TransferConfig::default().adaptive,
TransferAdaptivePolicy::Off,
"the default transfer policy MUST stay Off (adaptive is opt-in)",
);
assert_eq!(
TransferConfig::new(8, None).adaptive,
TransferAdaptivePolicy::Off,
);
let concurrency = NonZeroUsize::new(6).unwrap();
let items = 24usize;
let in_flight = Arc::new(AtomicUsize::new(0));
let high = Arc::new(AtomicUsize::new(0));
let rt = runtime();
let result: Result<Vec<()>, StoreError> = rt.block_on({
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
async move {
run_concurrent(0..items, concurrency, move |_item| {
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
async move {
let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
high.fetch_max(cur, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(20)).await;
in_flight.fetch_sub(1, Ordering::SeqCst);
Ok::<(), StoreError>(())
}
})
.await
}
});
assert!(result.is_ok());
assert_eq!(
high.load(Ordering::SeqCst),
concurrency.get(),
"the Off path (run_concurrent) runs at the full fixed concurrency, not a gated limit",
);
}
#[test]
fn adaptive_wire_on_path_first_error_wins() {
let rt = runtime();
let gate = AdaptiveGate::new(3, 8);
let result: Result<Vec<()>, StoreError> = rt.block_on({
let gate = gate.clone();
async move {
run_adaptive(0..12, &gate, |item| async move {
if item == 5 {
Err(transient_err("permission denied")) } else {
tokio::time::sleep(Duration::from_millis(5)).await;
Ok::<(), StoreError>(())
}
})
.await
}
});
let err = result.expect_err("the hard error must abort the adaptive batch");
assert!(
matches!(err, StoreError::Backend { ref message, .. } if message == "permission denied"),
"first-error-wins must surface the injected hard error, got {err:?}",
);
assert_eq!(
classify_error(&transient_err("permission denied")),
OpResult::HardErr
);
}
#[test]
fn adaptive_wire_on_path_completion_independent_collection() {
let rt = runtime();
let gate = AdaptiveGate::new(8, 8);
let mut collected: Vec<usize> = rt.block_on({
let gate = gate.clone();
async move {
run_adaptive(0..8usize, &gate, |item| async move {
let delay = (8 - item as u64) * 5;
tokio::time::sleep(Duration::from_millis(delay)).await;
Ok::<usize, StoreError>(item)
})
.await
.expect("all ops succeed")
}
});
assert_eq!(
collected.len(),
8,
"all items must be collected exactly once"
);
collected.sort_unstable();
assert_eq!(
collected,
(0..8usize).collect::<Vec<_>>(),
"run_adaptive must collect every item's result regardless of completion order",
);
}
#[test]
fn adaptive_wire_on_path_respects_gate_limit() {
let rt = runtime();
let gate = AdaptiveGate::new(2, 16); let in_flight = Arc::new(AtomicUsize::new(0));
let high = Arc::new(AtomicUsize::new(0));
let result: Result<Vec<()>, StoreError> = rt.block_on({
let gate = gate.clone();
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
async move {
run_adaptive(0..24, &gate, move |_item| {
let in_flight = Arc::clone(&in_flight);
let high = Arc::clone(&high);
async move {
let cur = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
high.fetch_max(cur, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(15)).await;
in_flight.fetch_sub(1, Ordering::SeqCst);
Ok::<(), StoreError>(())
}
})
.await
}
});
assert!(result.is_ok());
assert!(
high.load(Ordering::SeqCst) <= 2,
"the live gate limit (2) must bound effective concurrency despite the 16-wide window, got {}",
high.load(Ordering::SeqCst),
);
}
#[test]
fn adaptive_wire_healthy_stream_does_not_spuriously_shrink() {
let gate = AdaptiveGate::new(4, 32);
let applied: Arc<Mutex<Option<Option<u64>>>> = Arc::new(Mutex::new(None));
let sink = Arc::clone(&applied);
let rate_applier: Arc<dyn Fn(Option<u64>) + Send + Sync> =
Arc::new(move |r| *sink.lock().unwrap() = Some(r));
let policy = AdaptivePolicy::new(0.8, 32, u64::MAX, None);
let driver = ControllerDriver::new(policy, gate.clone(), 4096, Some(rate_applier), None);
let seed = gate.limit();
for _ in 0..8 {
for _ in 0..4 {
record_like_live(&driver, 2_000_000, &Ok(()));
}
driver.tick();
}
assert!(
gate.limit() >= seed,
"a purely healthy stream must never shrink the live gate below its seed: {} < {seed}",
gate.limit(),
);
assert!(
applied.lock().unwrap().is_some(),
"the live rate applier must be invoked by the driver's tick",
);
}