#[cfg(not(loom))]
use std::sync::{Mutex, atomic::AtomicUsize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use leaky_bucket::RateLimiter;
#[cfg(loom)]
use loom::sync::Mutex;
#[cfg(loom)]
use loom::sync::atomic::AtomicUsize;
use static_assertions::const_assert;
use tokio::sync::mpsc;
mod builder;
mod errors;
mod handle;
pub use builder::PushQueuesBuilder;
pub use errors::{PushConfigError, PushError};
pub use handle::PushHandle;
pub(crate) use handle::PushHandleInner;
pub trait FrameLike: Send + 'static {}
impl<T> FrameLike for T where T: Send + 'static {}
const DEFAULT_PUSH_RATE: usize = 100;
pub const MAX_PUSH_RATE: usize = 10_000;
const_assert!(DEFAULT_PUSH_RATE <= MAX_PUSH_RATE);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PushPriority {
High,
Low,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PushPolicy {
ReturnErrorIfFull,
DropIfFull,
WarnAndDropIfFull,
}
pub struct PushQueues<F> {
pub(crate) high_priority_rx: mpsc::Receiver<F>,
pub(crate) low_priority_rx: mpsc::Receiver<F>,
}
#[derive(Debug, Clone)]
pub struct PushQueueConfig<F> {
pub high_capacity: usize,
pub low_capacity: usize,
pub rate: Option<usize>,
pub dlq: Option<mpsc::Sender<F>>,
pub dlq_log_every_n: usize,
pub dlq_log_interval: Duration,
}
impl<F> PushQueueConfig<F> {
#[must_use]
pub fn new(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Self {
Self {
high_capacity,
low_capacity,
rate,
dlq,
dlq_log_every_n: 1,
dlq_log_interval: Duration::from_secs(10),
}
}
}
impl<F: FrameLike> PushQueues<F> {
#[must_use]
pub fn builder() -> PushQueuesBuilder<F> { PushQueuesBuilder::default() }
fn invalid_rate(rate: Option<usize>) -> Option<usize> {
match rate {
Some(r) if r == 0 || r > MAX_PUSH_RATE => Some(r),
_ => None,
}
}
pub(super) fn build_with_config(
config: PushQueueConfig<F>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
let PushQueueConfig {
high_capacity,
low_capacity,
rate,
dlq,
dlq_log_every_n,
dlq_log_interval,
} = config;
if let Some(invalid) = Self::invalid_rate(rate) {
return Err(PushConfigError::InvalidRate(invalid));
}
if high_capacity == 0 || low_capacity == 0 {
return Err(PushConfigError::InvalidCapacity {
high: high_capacity,
low: low_capacity,
});
}
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
let limiter = rate.map(|r| {
RateLimiter::builder()
.initial(r)
.refill(r)
.interval(Duration::from_secs(1))
.max(r)
.build()
});
let inner = PushHandleInner {
high_prio_tx: high_tx,
low_prio_tx: low_tx,
limiter,
dlq_tx: dlq,
dlq_drops: AtomicUsize::new(0),
dlq_last_log: Mutex::new(Instant::now()),
dlq_log_every_n,
dlq_log_interval,
};
Ok((
Self {
high_priority_rx: high_rx,
low_priority_rx: low_rx,
},
PushHandle::from_arc(Arc::new(inner)),
))
}
fn build_via_builder(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::builder()
.high_capacity(high_capacity)
.low_capacity(low_capacity)
.rate(rate)
.dlq(dlq)
.build()
}
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded(
high_capacity: usize,
low_capacity: usize,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::build_via_builder(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None)
}
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded_no_rate_limit(
high_capacity: usize,
low_capacity: usize,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::build_via_builder(high_capacity, low_capacity, None, None)
}
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded_with_rate(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::build_via_builder(high_capacity, low_capacity, rate, None)
}
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded_with_rate_dlq(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::build_with_config(PushQueueConfig::new(high_capacity, low_capacity, rate, dlq))
}
#[expect(
clippy::integer_division_remainder_used,
reason = "tokio::select! expands to modulus internally"
)]
pub async fn recv(&mut self) -> Option<(PushPriority, F)> {
let mut high_closed = false;
let mut low_closed = false;
loop {
tokio::select! {
biased;
res = self.high_priority_rx.recv(), if !high_closed => match res {
Some(f) => return Some((PushPriority::High, f)),
None => high_closed = true,
},
res = self.low_priority_rx.recv(), if !low_closed => match res {
Some(f) => return Some((PushPriority::Low, f)),
None => low_closed = true,
},
else => return None,
}
}
}
pub fn close(&mut self) {
self.high_priority_rx.close();
self.low_priority_rx.close();
}
}