use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use tokio::sync::Semaphore;
pub struct AdmissionController {
semaphore: Arc<Semaphore>,
queue_depth: AtomicU64,
max_queue_depth: u64,
}
impl AdmissionController {
pub fn new(max_concurrent: usize, max_queue_depth: u64) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
queue_depth: AtomicU64::new(0),
max_queue_depth,
}
}
pub fn try_acquire(&self) -> Option<AdmissionPermit<'static>> {
let current_depth = self.queue_depth.load(Ordering::Relaxed);
if current_depth >= self.max_queue_depth {
return None;
}
match self.semaphore.clone().try_acquire_owned() {
Ok(permit) => Some(AdmissionPermit {
_permit: permit,
_controller: self as *const Self as usize,
_phantom: std::marker::PhantomData,
}),
Err(_) => {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
None
},
}
}
pub async fn acquire_timeout(&self, timeout: Duration) -> Option<AdmissionPermit<'static>> {
let current_depth = self.queue_depth.load(Ordering::Relaxed);
if current_depth >= self.max_queue_depth {
return None;
}
self.queue_depth.fetch_add(1, Ordering::Relaxed);
match tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()).await {
Ok(Ok(permit)) => {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
Some(AdmissionPermit {
_permit: permit,
_controller: self as *const Self as usize,
_phantom: std::marker::PhantomData,
})
},
_ => {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
None
},
}
}
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Ordering::Relaxed)
}
}
pub struct AdmissionPermit<'a> {
_permit: tokio::sync::OwnedSemaphorePermit,
_controller: usize, _phantom: std::marker::PhantomData<&'a ()>,
}
impl<'a> AdmissionPermit<'a> {
fn _new(_permit: tokio::sync::OwnedSemaphorePermit, controller: usize) -> Self {
Self {
_permit,
_controller: controller,
_phantom: std::marker::PhantomData,
}
}
}
impl Drop for AdmissionPermit<'_> {
fn drop(&mut self) {
}
}