use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use tokio::sync::Semaphore;
pub struct AdmissionController {
semaphore: Arc<Semaphore>,
queue_depth: AtomicU64,
max_queue_depth: u64,
}
impl AdmissionController {
#[must_use]
pub fn new(max_concurrent: usize, max_queue_depth: u64) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
queue_depth: AtomicU64::new(0),
max_queue_depth,
}
}
pub fn try_acquire(&self) -> Option<AdmissionPermit<'_>> {
let current_depth = self.queue_depth.load(Ordering::Relaxed);
if current_depth >= self.max_queue_depth {
return None;
}
if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
Some(AdmissionPermit {
_permit: permit,
_phantom: std::marker::PhantomData,
})
} else {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
None
}
}
pub async fn acquire_timeout(&self, timeout: Duration) -> Option<AdmissionPermit<'_>> {
let current_depth = self.queue_depth.load(Ordering::Relaxed);
if current_depth >= self.max_queue_depth {
return None;
}
self.queue_depth.fetch_add(1, Ordering::Relaxed);
let result = tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()).await;
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
if let Ok(Ok(permit)) = result {
Some(AdmissionPermit {
_permit: permit,
_phantom: std::marker::PhantomData,
})
} else {
None
}
}
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Ordering::Relaxed)
}
}
pub struct AdmissionPermit<'a> {
_permit: tokio::sync::OwnedSemaphorePermit,
_phantom: std::marker::PhantomData<&'a AdmissionController>,
}
impl Drop for AdmissionPermit<'_> {
fn drop(&mut self) {
}
}