#[path = "concurrency/ring_buffer.rs"]
mod ring_buffer;
pub use ring_buffer::ReportChannel;
pub(crate) use ring_buffer::{ChannelMode, PopResult};
use crate::error::{CaducusError, CaducusErrorKind};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{Duration, Instant};
use ring_buffer::Ring;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DrainMode {
DrainOnly,
DrainAndClaim,
}
pub(crate) struct DrainResult<T> {
pub expired: Vec<PopResult<T>>,
pub live: Option<PopResult<T>>,
pub next_deadline: Option<Instant>,
pub is_shutdown: bool,
}
pub(crate) struct ConcurrentRing<T> {
ring: Mutex<Ring<T>>,
reclaimer_reporting: Mutex<()>,
notify_reclaimer: Arc<tokio::sync::Notify>,
notify_receiver: Arc<tokio::sync::Notify>,
}
impl<T> ConcurrentRing<T> {
pub fn new(
capacity: usize,
ttl: Duration,
mode: ChannelMode,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<Self, CaducusError> {
Ok(Self {
ring: Mutex::new(Ring::new(
capacity,
ttl,
mode,
expiry_channel,
shutdown_channel,
)?),
reclaimer_reporting: Mutex::new(()),
notify_reclaimer: Arc::new(tokio::sync::Notify::new()),
notify_receiver: Arc::new(tokio::sync::Notify::new()),
})
}
pub fn reclaimer_reporting_lock(&self) -> MutexGuard<'_, ()> {
self.reclaimer_reporting
.lock()
.unwrap_or_else(PoisonError::into_inner)
}
pub fn notify_reclaimer_handle(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.notify_reclaimer)
}
pub fn notify_receiver_handle(&self) -> Arc<tokio::sync::Notify> {
Arc::clone(&self.notify_receiver)
}
pub fn send_spsc(&self, item: T) -> Result<(), CaducusError<T>> {
let mut ring = self.lock();
ring.try_push_spsc(item)?;
drop(ring);
self.notify_reclaimer.notify_waiters();
self.notify_receiver.notify_waiters();
Ok(())
}
pub fn send_mpsc(
&self,
item: T,
expiry_channel: Option<Arc<dyn ReportChannel<T>>>,
shutdown_channel: Option<Arc<dyn ReportChannel<T>>>,
) -> Result<(), CaducusError<T>> {
let mut ring = self.lock();
ring.try_push_mpsc(item, expiry_channel, shutdown_channel)?;
drop(ring);
self.notify_reclaimer.notify_waiters();
self.notify_receiver.notify_waiters();
Ok(())
}
pub fn drain(&self, now: Instant, mode: DrainMode) -> DrainResult<T> {
let mut ring = self.lock();
let expired = ring.drain_expired(now);
let live = match mode {
DrainMode::DrainAndClaim => ring.try_pop(),
DrainMode::DrainOnly => None,
};
let next_deadline = ring.peek_expires_at();
let is_shutdown = ring.is_shutdown();
drop(ring);
DrainResult {
expired,
live,
next_deadline,
is_shutdown,
}
}
pub fn update_ttl(&self, duration: Duration) -> Result<(), CaducusError> {
let mut ring = self.lock();
ring.set_ttl(duration)?;
Ok(())
}
pub fn update_capacity(&self, new: usize) {
let mut ring = self.lock();
ring.request_capacity(new);
drop(ring);
self.notify_reclaimer.notify_waiters();
self.notify_receiver.notify_waiters();
}
pub fn shutdown(&self) -> Vec<PopResult<T>> {
let mut ring = self.lock();
let items = ring.shutdown();
drop(ring);
self.notify_reclaimer.notify_waiters();
self.notify_receiver.notify_waiters();
items
}
pub fn is_shutdown(&self) -> bool {
let ring = self.lock();
ring.is_shutdown()
}
fn lock(&self) -> MutexGuard<'_, Ring<T>> {
self.ring.lock().unwrap_or_else(PoisonError::into_inner)
}
}