#[cfg(loom)]
mod sync {
pub use std::sync::{Arc, Weak};
pub use loom::sync::{
Mutex,
atomic::{AtomicUsize, Ordering},
};
}
#[cfg(not(loom))]
mod sync {
pub use std::sync::{
Arc,
Mutex,
Weak,
atomic::{AtomicUsize, Ordering},
};
}
use std::time::{Duration, Instant};
use leaky_bucket::RateLimiter;
use log::{debug, warn};
use tokio::{sync::mpsc, time::sleep};
use self::sync::{Arc, AtomicUsize, Mutex, Ordering, Weak};
use super::{FrameLike, PushError, PushPolicy, PushPriority};
pub(crate) struct PushHandleInner<F> {
pub(crate) high_prio_tx: mpsc::Sender<F>,
pub(crate) low_prio_tx: mpsc::Sender<F>,
pub(crate) limiter: Option<RateLimiter>,
pub(crate) dlq_tx: Option<mpsc::Sender<F>>,
pub(crate) dlq_drops: AtomicUsize,
pub(crate) dlq_last_log: Mutex<Instant>,
pub(crate) dlq_log_every_n: usize,
pub(crate) dlq_log_interval: Duration,
}
#[derive(Clone)]
pub struct PushHandle<F>(Arc<PushHandleInner<F>>);
#[cfg(loom)]
pub struct PushHandleProbe<F> {
inner: Arc<PushHandleInner<F>>,
}
#[cfg(loom)]
impl<F> PushHandleProbe<F> {
#[must_use]
pub fn dlq_drop_count(&self) -> usize { self.inner.dlq_drops.load(Ordering::SeqCst) }
}
impl<F: FrameLike> PushHandle<F> {
pub(crate) fn from_arc(arc: Arc<PushHandleInner<F>>) -> Self { Self(arc) }
#[cfg(loom)]
#[must_use]
pub fn probe(&self) -> PushHandleProbe<F> {
PushHandleProbe {
inner: self.0.clone(),
}
}
async fn push_with_priority(&self, frame: F, priority: PushPriority) -> Result<(), PushError> {
let tx = match priority {
PushPriority::High => &self.0.high_prio_tx,
PushPriority::Low => &self.0.low_prio_tx,
};
if let Some(ref limiter) = self.0.limiter {
self.wait_for_permit(limiter).await;
}
tx.clone()
.send(frame)
.await
.map_err(|_| PushError::Closed)?;
debug!("frame pushed: priority={priority:?}");
Ok(())
}
pub async fn push_high_priority(&self, frame: F) -> Result<(), PushError> {
self.push_with_priority(frame, PushPriority::High).await
}
pub async fn push_low_priority(&self, frame: F) -> Result<(), PushError> {
self.push_with_priority(frame, PushPriority::Low).await
}
fn route_to_dlq(&self, frame: F)
where
F: std::fmt::Debug,
{
if let Some(dlq) = &self.0.dlq_tx
&& let Err(mpsc::error::TrySendError::Full(f) | mpsc::error::TrySendError::Closed(f)) =
dlq.try_send(frame)
{
let dropped = self.0.dlq_drops.fetch_add(1, Ordering::Relaxed) + 1;
let mut last = match self.0.dlq_last_log.lock() {
Ok(guard) => guard,
Err(poisoned) => {
warn!("DLQ last-log mutex poisoned; continuing with stale state");
poisoned.into_inner()
}
};
self.log_dlq_drop(&f, dropped, &mut last);
}
}
const PERMIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
async fn wait_for_permit(&self, limiter: &RateLimiter) {
loop {
if limiter.try_acquire(1) {
break;
}
sleep(Self::PERMIT_POLL_INTERVAL).await;
}
}
fn log_dlq_drop(&self, frame: &F, dropped: usize, last_log: &mut Instant)
where
F: std::fmt::Debug,
{
let log_every_n = self.0.dlq_log_every_n;
let log_interval = self.0.dlq_log_interval;
let should_log = (log_every_n != 0 && dropped.is_multiple_of(log_every_n))
|| last_log.elapsed() > log_interval;
if should_log {
warn!(
"DLQ dropped frames (full or closed): frame={frame:?}, dropped={dropped}, \
log_every_n={log_every_n}, log_interval={log_interval:?}"
);
*last_log = Instant::now();
self.0.dlq_drops.store(0, Ordering::Relaxed);
}
}
pub fn try_push(
&self,
frame: F,
priority: PushPriority,
policy: PushPolicy,
) -> Result<(), PushError>
where
F: std::fmt::Debug,
{
let tx = match priority {
PushPriority::High => &self.0.high_prio_tx,
PushPriority::Low => &self.0.low_prio_tx,
};
match tx.try_send(frame) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(f)) => match policy {
PushPolicy::ReturnErrorIfFull => Err(PushError::QueueFull),
PushPolicy::DropIfFull | PushPolicy::WarnAndDropIfFull => {
if matches!(policy, PushPolicy::WarnAndDropIfFull) {
warn!(
"push queue full: priority={priority:?}, policy={policy:?}, dlq={dlq}",
dlq = self.0.dlq_tx.is_some()
);
}
self.route_to_dlq(f);
Ok(())
}
},
Err(mpsc::error::TrySendError::Closed(_)) => Err(PushError::Closed),
}
}
pub(crate) fn downgrade(&self) -> Weak<PushHandleInner<F>> { Arc::downgrade(&self.0) }
}