use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::concurrency::ConcurrentRing;
use crate::error::{CaducusError, CaducusErrorKind};
use crate::reclaimer;
const DEFAULT_RECEIVE_TIMEOUT: Duration = Duration::from_secs(1);
pub struct Receiver<T: Send + 'static> {
ring: Arc<ConcurrentRing<T>>,
notify: Arc<tokio::sync::Notify>,
}
impl<T: Send + 'static> std::fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
impl<T: Send + 'static> Receiver<T> {
pub(crate) fn new(ring: Arc<ConcurrentRing<T>>) -> Self {
let notify = ring.notify_receiver_handle();
Self { ring, notify }
}
pub async fn next(&self, deadline: Option<Instant>) -> Result<T, CaducusError> {
let deadline = deadline.unwrap_or_else(|| Instant::now() + DEFAULT_RECEIVE_TIMEOUT);
loop {
if let Some(item) = reclaimer::try_receive(&self.ring)? {
return Ok(item);
}
let waiter = self.notify.notified();
tokio::pin!(waiter);
if let Some(item) = reclaimer::try_receive(&self.ring)? {
return Ok(item);
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Err(CaducusError {
kind: CaducusErrorKind::Timeout,
});
}
tokio::select! {
_ = &mut waiter => {}
_ = tokio::time::sleep(remaining) => {
return Err(CaducusError { kind: CaducusErrorKind::Timeout });
}
}
}
}
pub fn is_closed(&self) -> bool {
self.ring.is_shutdown()
}
}
impl<T: Send + 'static> Drop for Receiver<T> {
fn drop(&mut self) {
reclaimer::shutdown_and_report(&self.ring);
}
}