use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum SubmitError {
#[error("queue full")]
QueueFull,
#[error("queue closed")]
Closed,
}
#[derive(Clone)]
pub struct Admission {
inner: Arc<Semaphore>,
capacity: usize,
}
impl Admission {
pub fn new(active_permits: usize, queue_depth: usize) -> Self {
let total = active_permits.max(1) + queue_depth;
Self {
inner: Arc::new(Semaphore::new(total)),
capacity: total,
}
}
pub fn try_admit(&self) -> Result<OwnedSemaphorePermit, SubmitError> {
match Arc::clone(&self.inner).try_acquire_owned() {
Ok(permit) => Ok(permit),
Err(TryAcquireError::NoPermits) => Err(SubmitError::QueueFull),
Err(TryAcquireError::Closed) => Err(SubmitError::Closed),
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn available_permits(&self) -> usize {
self.inner.available_permits()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn admit_succeeds_until_capacity() {
let a = Admission::new(1, 2);
assert_eq!(a.capacity(), 3);
let _p1 = a.try_admit().unwrap();
let _p2 = a.try_admit().unwrap();
let _p3 = a.try_admit().unwrap();
assert_eq!(a.try_admit().unwrap_err(), SubmitError::QueueFull);
}
#[test]
fn dropping_permit_frees_slot() {
let a = Admission::new(1, 1);
let p1 = a.try_admit().unwrap();
let _p2 = a.try_admit().unwrap();
assert!(a.try_admit().is_err());
drop(p1);
let _p3 = a.try_admit().unwrap();
}
#[test]
fn zero_active_permits_treated_as_one() {
let a = Admission::new(0, 0);
assert_eq!(a.capacity(), 1);
let _p = a.try_admit().unwrap();
assert!(a.try_admit().is_err());
}
#[test]
fn available_permits_reflects_holds() {
let a = Admission::new(1, 2);
assert_eq!(a.available_permits(), 3);
let _p1 = a.try_admit().unwrap();
assert_eq!(a.available_permits(), 2);
let _p2 = a.try_admit().unwrap();
assert_eq!(a.available_permits(), 1);
}
#[tokio::test]
async fn admission_is_clone_safe_across_tasks() {
let a = Admission::new(1, 2);
let a2 = a.clone();
let h = tokio::spawn(async move {
let _p = a2.try_admit().unwrap();
});
h.await.unwrap();
let p1 = a.try_admit().unwrap();
let p2 = a.try_admit().unwrap();
let p3 = a.try_admit().unwrap();
drop((p1, p2, p3));
}
}