use std::{convert::TryInto, time::Duration};
use futures_intrusive::sync::{Semaphore, SemaphoreReleaser};
use crate::{
error::{ErrorKind, Result},
options::StreamAddress,
RUNTIME,
};
#[derive(Debug)]
pub(crate) struct WaitQueue {
max_permits: usize,
semaphore: Semaphore,
address: StreamAddress,
timeout: Option<Duration>,
}
impl WaitQueue {
pub(super) fn new(
address: StreamAddress,
max_connections: u32,
timeout: Option<Duration>,
) -> Self {
let max_permits = if max_connections == 0 {
usize::max_value()
} else {
max_connections.try_into().unwrap_or(usize::max_value())
};
Self {
semaphore: Semaphore::new(true, max_permits),
address,
timeout,
max_permits,
}
}
pub(super) async fn wait_until_at_front(&self) -> Result<WaitQueueHandle<'_>> {
let future = self.semaphore.acquire(1);
let releaser = if let Some(timeout) = self.timeout {
RUNTIME.timeout(timeout, future).await.map_err(|_| {
ErrorKind::WaitQueueTimeoutError {
address: self.address.clone(),
}
})?
} else {
future.await
};
Ok(WaitQueueHandle {
semaphore_releaser: releaser,
})
}
pub(super) fn wake_front(&self) {
if self.semaphore.permits() >= self.max_permits {
panic!(
"greater than {} connections checked back into pool with address {}",
self.max_permits,
self.address.clone()
);
}
self.semaphore.release(1);
}
}
pub(super) struct WaitQueueHandle<'a> {
semaphore_releaser: SemaphoreReleaser<'a>,
}
impl<'a> WaitQueueHandle<'a> {
pub(super) fn disarm(&mut self) {
self.semaphore_releaser.disarm();
}
}