use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub avg_bytes: u64,
pub peak_bytes: u64,
pub time_blocked_ms: u64,
}
pub struct OrderedQueue<T> {
inner: Mutex<OrderedQueueInner<T>>,
current_bytes: AtomicU64,
limit_bytes: AtomicU64,
next_seq: AtomicU64, has_next: AtomicBool,
peak_bytes: AtomicU64,
samples_sum: AtomicU64,
samples_count: AtomicU64,
blocked_ns: AtomicU64,
}
struct OrderedQueueInner<T> {
buffer: HashMap<u64, (T, usize)>,
next_seq: u64,
}
impl<T> OrderedQueue<T> {
#[must_use]
pub fn new(limit_bytes: u64) -> Self {
Self {
inner: Mutex::new(OrderedQueueInner { buffer: HashMap::new(), next_seq: 0 }),
current_bytes: AtomicU64::new(0),
limit_bytes: AtomicU64::new(limit_bytes),
next_seq: AtomicU64::new(0),
has_next: AtomicBool::new(false),
peak_bytes: AtomicU64::new(0),
samples_sum: AtomicU64::new(0),
samples_count: AtomicU64::new(0),
blocked_ns: AtomicU64::new(0),
}
}
pub fn can_accept(&self, heap_size: usize) -> bool {
if !self.has_next.load(Ordering::Acquire) {
return true;
}
let current = self.current_bytes.load(Ordering::Acquire);
let limit = self.limit_bytes.load(Ordering::Acquire);
current + heap_size as u64 <= limit
}
pub fn insert(&self, serial: u64, item: T, heap_size: usize) -> Result<(), (T, usize)> {
let mut inner = self.inner.lock();
let has_next = inner.buffer.contains_key(&inner.next_seq);
if has_next {
let current = self.current_bytes.load(Ordering::Acquire);
let limit = self.limit_bytes.load(Ordering::Acquire);
if current + heap_size as u64 > limit {
return Err((item, heap_size));
}
}
inner.buffer.insert(serial, (item, heap_size));
let new_current =
self.current_bytes.fetch_add(heap_size as u64, Ordering::AcqRel) + heap_size as u64;
let new_has_next = inner.buffer.contains_key(&inner.next_seq);
self.has_next.store(new_has_next, Ordering::Release);
let mut peak = self.peak_bytes.load(Ordering::Relaxed);
while new_current > peak {
match self.peak_bytes.compare_exchange_weak(
peak,
new_current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(p) => peak = p,
}
}
Ok(())
}
pub fn try_pop_next(&self) -> Option<(T, usize)> {
let mut inner = self.inner.lock();
let next = inner.next_seq;
if let Some((item, heap_size)) = inner.buffer.remove(&next) {
inner.next_seq += 1;
self.current_bytes.fetch_sub(heap_size as u64, Ordering::AcqRel);
self.next_seq.store(inner.next_seq, Ordering::Release);
let new_has_next = inner.buffer.contains_key(&inner.next_seq);
self.has_next.store(new_has_next, Ordering::Release);
Some((item, heap_size))
} else {
None
}
}
pub fn next_seq(&self) -> u64 {
self.next_seq.load(Ordering::Acquire)
}
pub fn can_pop(&self) -> bool {
self.has_next.load(Ordering::Acquire)
}
pub fn current_bytes(&self) -> u64 {
self.current_bytes.load(Ordering::Acquire)
}
pub fn set_limit(&self, new_limit: u64) {
self.limit_bytes.store(new_limit, Ordering::Release);
}
pub fn limit_bytes(&self) -> u64 {
self.limit_bytes.load(Ordering::Acquire)
}
pub fn len(&self) -> usize {
self.inner.lock().buffer.len()
}
pub fn is_empty(&self) -> bool {
self.inner.lock().buffer.is_empty()
}
pub fn record_sample(&self) {
let current = self.current_bytes.load(Ordering::Relaxed);
self.samples_sum.fetch_add(current, Ordering::Relaxed);
self.samples_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_blocked(&self, ns: u64) {
self.blocked_ns.fetch_add(ns, Ordering::Relaxed);
}
pub fn collect_stats(&self) -> QueueStats {
let peak = self.peak_bytes.swap(0, Ordering::Relaxed);
let sum = self.samples_sum.swap(0, Ordering::Relaxed);
let count = self.samples_count.swap(0, Ordering::Relaxed);
let blocked = self.blocked_ns.swap(0, Ordering::Relaxed);
QueueStats {
avg_bytes: if count > 0 { sum / count } else { 0 },
peak_bytes: peak,
time_blocked_ms: blocked / 1_000_000,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ordered_queue_basic() {
let queue: OrderedQueue<u32> = OrderedQueue::new(1000);
assert!(queue.insert(2, 200, 10).is_ok());
assert!(queue.insert(0, 100, 10).is_ok());
assert!(queue.insert(1, 150, 10).is_ok());
let (val, _) = queue.try_pop_next().expect("queue should have next element");
assert_eq!(val, 100);
let (val, _) = queue.try_pop_next().expect("queue should have next element");
assert_eq!(val, 150);
let (val, _) = queue.try_pop_next().expect("queue should have next element");
assert_eq!(val, 200);
assert!(queue.try_pop_next().is_none());
}
#[test]
fn test_ordered_queue_backpressure_when_has_next() {
let queue: OrderedQueue<u32> = OrderedQueue::new(100);
assert!(queue.insert(0, 100, 50).is_ok());
assert!(queue.can_pop());
assert!(queue.insert(1, 200, 60).is_err());
assert!(queue.insert(1, 200, 40).is_ok());
}
#[test]
fn test_ordered_queue_must_accept_when_waiting() {
let queue: OrderedQueue<u32> = OrderedQueue::new(100);
assert!(queue.insert(5, 500, 200).is_ok()); assert!(!queue.can_pop());
assert!(queue.insert(3, 300, 200).is_ok());
assert!(queue.insert(1, 100, 200).is_ok());
assert!(queue.insert(0, 0, 10).is_ok());
assert!(queue.can_pop());
assert!(queue.insert(2, 200, 200).is_err());
}
#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_ordered_queue_backpressure_memory_bound() {
let queue: OrderedQueue<Vec<u8>> = OrderedQueue::new(500);
assert!(queue.insert(0, vec![0u8; 100], 100).is_ok());
let mut pushed = 0;
let mut rejected = 0;
for i in 1..20 {
let item = vec![i as u8; 100];
match queue.insert(i, item, 100) {
Ok(()) => pushed += 1,
Err(_) => rejected += 1,
}
}
assert!(pushed > 0, "Should accept some items");
assert!(rejected > 0, "Should reject items when over limit");
let mut count = 0;
while queue.try_pop_next().is_some() {
count += 1;
}
assert!(count > 0, "Should pop the items we inserted");
}
}