use crate::observability::{QueueOverflowAction, QueuePolicy};
use std::{
collections::{HashSet, VecDeque},
sync::Mutex,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct QueueEntryId(pub(crate) u64);
#[derive(Debug)]
pub(crate) struct QueuedMessage<Msg> {
pub(crate) id: QueueEntryId,
pub(crate) message: Msg,
}
pub(crate) struct MessageQueue<Msg> {
pub(crate) pending: VecDeque<QueuedMessage<Msg>>,
pub(crate) is_draining: bool,
auto_drain_suspended_depth: usize,
}
impl<Msg> Default for MessageQueue<Msg> {
fn default() -> Self {
Self {
pending: VecDeque::new(),
is_draining: false,
auto_drain_suspended_depth: 0,
}
}
}
impl<Msg> MessageQueue<Msg> {
pub(crate) fn suspend_auto_drain(&mut self) {
self.auto_drain_suspended_depth = self
.auto_drain_suspended_depth
.checked_add(1)
.expect("auto-drain suspension depth overflow");
}
pub(crate) fn resume_auto_drain(&mut self) {
debug_assert!(
self.auto_drain_suspended_depth > 0,
"auto-drain suspension depth must stay balanced",
);
self.auto_drain_suspended_depth -= 1;
}
pub(crate) fn should_auto_drain(&self) -> bool {
!self.is_draining && self.auto_drain_suspended_depth == 0
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum QueueReservation {
Accepted {
id: QueueEntryId,
overflow_action: Option<QueueOverflowAction>,
displaced: Option<QueueEntryId>,
},
Rejected,
DroppedNewest,
}
#[derive(Debug)]
struct QueueControl {
policy: QueuePolicy,
next_id: u64,
active: VecDeque<QueueEntryId>,
dropped: HashSet<QueueEntryId>,
}
impl QueueControl {
fn new(policy: QueuePolicy) -> Self {
Self {
policy,
next_id: 0,
active: VecDeque::new(),
dropped: HashSet::new(),
}
}
fn next_id(&mut self) -> QueueEntryId {
self.next_id = self.next_id.wrapping_add(1);
QueueEntryId(self.next_id)
}
fn reserve_enqueue(&mut self) -> QueueReservation {
let Some(capacity) = self.policy.capacity() else {
let id = self.next_id();
self.active.push_back(id);
return QueueReservation::Accepted {
id,
overflow_action: None,
displaced: None,
};
};
if self.active.len() < capacity {
let id = self.next_id();
self.active.push_back(id);
return QueueReservation::Accepted {
id,
overflow_action: None,
displaced: None,
};
}
match self.policy {
QueuePolicy::Unbounded => unreachable!("unbounded queues return early"),
QueuePolicy::RejectNew { .. } => QueueReservation::Rejected,
QueuePolicy::DropNewest { .. } => QueueReservation::DroppedNewest,
QueuePolicy::DropOldest { .. } => {
let Some(oldest) = self.active.pop_front() else {
return QueueReservation::DroppedNewest;
};
self.dropped.insert(oldest);
let id = self.next_id();
self.active.push_back(id);
QueueReservation::Accepted {
id,
overflow_action: Some(QueueOverflowAction::DroppedOldest),
displaced: Some(oldest),
}
}
}
}
fn rollback_enqueue(&mut self, reservation: QueueReservation) {
let QueueReservation::Accepted {
id,
overflow_action,
displaced,
} = reservation
else {
return;
};
if let Some(position) = self.active.iter().position(|entry| *entry == id) {
self.active.remove(position);
}
if overflow_action == Some(QueueOverflowAction::DroppedOldest)
&& let Some(displaced) = displaced
&& self.dropped.remove(&displaced)
{
self.active.push_front(displaced);
}
}
fn complete_processed(&mut self, id: QueueEntryId) {
if let Some(position) = self.active.iter().position(|entry| *entry == id) {
self.active.remove(position);
}
}
fn take_if_dropped(&mut self, id: QueueEntryId) -> bool {
self.dropped.remove(&id)
}
fn depth(&self) -> usize {
self.active.len()
}
}
#[derive(Debug)]
pub(crate) struct QueueTracker {
control: Mutex<QueueControl>,
}
impl QueueTracker {
pub(crate) fn new(policy: QueuePolicy) -> Self {
Self {
control: Mutex::new(QueueControl::new(policy)),
}
}
pub(crate) fn reserve_enqueue(&self) -> QueueReservation {
self.control.lock().unwrap().reserve_enqueue()
}
pub(crate) fn rollback_enqueue(&self, reservation: QueueReservation) {
self.control.lock().unwrap().rollback_enqueue(reservation);
}
pub(crate) fn complete_processed(&self, id: QueueEntryId) {
self.control.lock().unwrap().complete_processed(id);
}
pub(crate) fn take_if_dropped(&self, id: QueueEntryId) -> bool {
self.control.lock().unwrap().take_if_dropped(id)
}
pub(crate) fn depth(&self) -> usize {
self.control.lock().unwrap().depth()
}
}