use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::{Arc, Weak};
use std::time::Instant;
use tokio::runtime::Handle;
use tokio::sync::Notify;
use crate::concurrency::{ConcurrentRing, DrainMode, PopResult};
use crate::error::{CaducusError, CaducusErrorKind};
pub(crate) fn report_expired<T: 'static>(items: Vec<PopResult<T>>) {
for pop in items {
if let Some(ch) = pop.expiry_channel {
report_one(ch, pop.item, "expiry");
}
}
}
pub(crate) fn report_shutdown<T: 'static>(items: Vec<PopResult<T>>) {
for pop in items {
if let Some(ch) = pop.shutdown_channel {
report_one(ch, pop.item, "shutdown");
}
}
}
pub(crate) fn shutdown_and_report<T: Send + 'static>(ring: &ConcurrentRing<T>) {
let items = ring.shutdown();
drop(ring.reclaimer_reporting_lock());
report_shutdown(items);
}
fn report_one<T: 'static>(ch: Arc<dyn crate::concurrency::ReportChannel<T>>, item: T, label: &str) {
match catch_unwind(AssertUnwindSafe(|| ch.send(item))) {
Ok(Ok(())) => {}
Ok(Err(_item)) => {
log::warn!("{label} report channel rejected item; dropping");
}
Err(_panic) => {
log::warn!("{label} report channel panicked; dropping item");
}
}
}
pub(crate) fn try_receive<T: Send + 'static>(
ring: &ConcurrentRing<T>,
) -> Result<Option<T>, CaducusError> {
let result = ring.drain(Instant::now(), DrainMode::DrainAndClaim);
report_expired(result.expired);
match (result.live, result.is_shutdown) {
(Some(pop), _) => Ok(Some(pop.item)),
(None, true) => Err(CaducusError {
kind: CaducusErrorKind::Shutdown(()),
}),
(None, false) => Ok(None),
}
}
pub(crate) fn spawn_reclaimer<T: Send + 'static>(
ring: Weak<ConcurrentRing<T>>,
notify_reclaimer: Arc<Notify>,
notify_receiver: Arc<Notify>,
handle: &Handle,
) {
handle.spawn(reclaimer_loop(ring, notify_reclaimer, notify_receiver));
}
async fn reclaimer_loop<T: 'static>(
ring: Weak<ConcurrentRing<T>>,
notify_reclaimer: Arc<Notify>,
notify_receiver: Arc<Notify>,
) {
loop {
let strong = match ring.upgrade() {
Some(s) => s,
None => return,
};
let waiter = notify_reclaimer.notified();
tokio::pin!(waiter);
let report_guard = strong.reclaimer_reporting_lock();
let result = strong.drain(Instant::now(), DrainMode::DrainOnly);
let had_expired = !result.expired.is_empty();
{
report_expired(result.expired);
}
drop(report_guard);
if result.is_shutdown {
return;
}
drop(strong);
if had_expired && result.next_deadline.is_some() {
notify_receiver.notify_waiters();
}
match result.next_deadline {
Some(deadline) => {
let now = Instant::now();
if deadline <= now {
continue;
}
tokio::select! {
_ = &mut waiter => {}
_ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {}
}
}
None => {
waiter.await;
}
}
}
}