#![allow(clippy::unwrap_used)] #![allow(missing_docs)]
use std::{sync::Arc, time::Duration};
use fraiseql_server::resilience::backpressure::AdmissionController;
fn simulate_concurrent_requests(
admission_limit: usize,
max_queue_depth: u64,
total_requests: usize,
) -> (usize, usize) {
use std::sync::Barrier;
let controller = Arc::new(AdmissionController::new(admission_limit, max_queue_depth));
let barrier = Arc::new(Barrier::new(total_requests));
let mut handles = Vec::new();
for _ in 0..total_requests {
let c = Arc::clone(&controller);
let b = Arc::clone(&barrier);
handles.push(std::thread::spawn(move || {
let permit = c.try_acquire();
let acquired = permit.is_some();
b.wait();
drop(permit);
acquired
}));
}
let mut acquired = 0usize;
let mut rejected = 0usize;
for h in handles {
if h.join().unwrap() {
acquired += 1;
} else {
rejected += 1;
}
}
(acquired, rejected)
}
#[test]
fn test_admission_controller_allows_up_to_limit() {
let limit = 5;
let (acquired, _rejected) = simulate_concurrent_requests(limit, limit as u64, limit);
assert_eq!(acquired, limit, "all {limit} requests within the limit must be admitted");
}
#[test]
fn test_admission_controller_rejects_over_limit() {
let limit = 5;
let extra = 5;
let total = limit + extra;
let (acquired, rejected) = simulate_concurrent_requests(limit, total as u64, total);
assert!(
rejected > 0,
"expected some rejections when {total} requests exceed limit {limit}, \
got acquired={acquired} rejected={rejected}"
);
assert_eq!(acquired + rejected, total, "every request must be either admitted or rejected");
}
#[tokio::test]
async fn test_admission_controller_recovers_after_spike() {
let limit = 3;
let controller = Arc::new(AdmissionController::new(limit, 50));
let mut permits = Vec::new();
for _ in 0..limit {
let p = controller.try_acquire().expect("must admit within limit");
permits.push(p);
}
assert!(controller.try_acquire().is_none(), "must reject when at capacity");
drop(permits);
let p = controller.try_acquire();
assert!(
p.is_some(),
"server must recover after spike: must admit new request after permits released"
);
}
#[tokio::test]
async fn test_zero_queue_depth_rejects_all() {
let controller = Arc::new(AdmissionController::new(10, 0));
let result = controller.try_acquire();
assert!(result.is_none(), "max_queue_depth=0 must reject all requests unconditionally");
}
#[tokio::test]
async fn test_acquire_timeout_succeeds_when_permits_available() {
let controller = Arc::new(AdmissionController::new(5, 10));
let permit = controller.acquire_timeout(Duration::from_millis(100)).await;
assert!(permit.is_some(), "must admit when permits available");
}
#[tokio::test]
async fn test_acquire_timeout_cleans_up_queue_depth_on_expiry() {
let controller = Arc::new(AdmissionController::new(1, 10));
let _held = controller.try_acquire().expect("first permit");
let permit = controller.acquire_timeout(Duration::from_millis(20)).await;
assert!(permit.is_none(), "must return None on timeout");
assert_eq!(controller.queue_depth(), 0, "queue depth must return to 0 after timeout expiry");
}
#[test]
fn test_concurrent_spike_no_panic_no_deadlock() {
use std::sync::Barrier;
let limit = 4;
let total = limit + 8;
let controller = Arc::new(AdmissionController::new(limit, total as u64));
let barrier = Arc::new(Barrier::new(total));
let mut handles = Vec::new();
for _ in 0..total {
let c = Arc::clone(&controller);
let b = Arc::clone(&barrier);
handles.push(std::thread::spawn(move || {
let permit = c.try_acquire();
let ok = permit.is_some();
b.wait(); drop(permit);
ok
}));
}
let results: Vec<bool> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let admitted = results.iter().filter(|&&ok| ok).count();
let rejected = results.iter().filter(|&&ok| !ok).count();
assert_eq!(admitted + rejected, total, "every request must resolve");
assert!(
rejected > 0,
"overload spike ({total} requests > limit {limit}) must produce rejections"
);
}