use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
#[allow(unused)]
#[derive(strum_macros::IntoStaticStr)]
pub(crate) enum Priority {
P1 = 1,
P2,
P3,
P4,
P5,
P6,
P7,
P8,
}
const INNER_QUEUES_COUNT: usize = Priority::P8 as usize - Priority::P1 as usize + 1;
const fn index_from_priority(priority: Priority) -> usize {
Priority::P8 as usize - priority as usize
}
const fn priority_from_index(idx: usize) -> Priority {
match idx {
0 => Priority::P8,
1 => Priority::P7,
2 => Priority::P6,
3 => Priority::P5,
4 => Priority::P4,
5 => Priority::P3,
6 => Priority::P2,
7 => Priority::P1,
_ => {
panic!("invalid index")
}
}
}
const _: () = {
assert!(index_from_priority(Priority::P1) == 7);
assert!(index_from_priority(Priority::P8) == 0);
assert!(index_from_priority(priority_from_index(7)) == 7);
assert!(index_from_priority(priority_from_index(0)) == 0);
};
pub(crate) struct AgeingPriorityQueue<T>
where
T: Send + 'static,
{
inner_queues:
[(crossbeam_channel::Sender<T>, crossbeam_channel::Receiver<T>); INNER_QUEUES_COUNT],
queued_count: AtomicUsize,
soft_capacity: usize,
}
pub(crate) struct Receiver<'a, T>
where
T: Send + 'static,
{
shared: &'a AgeingPriorityQueue<T>,
select: crossbeam_channel::Select<'a>,
}
impl<T> AgeingPriorityQueue<T>
where
T: Send + 'static,
{
pub(crate) fn soft_bounded(soft_capacity: usize) -> Self {
Self {
inner_queues: std::array::from_fn(|_| crossbeam_channel::unbounded()),
queued_count: AtomicUsize::new(0),
soft_capacity,
}
}
pub(crate) fn queued_count(&self) -> usize {
self.queued_count.load(Ordering::Relaxed)
}
pub(crate) fn is_full(&self) -> bool {
self.queued_count() >= self.soft_capacity
}
pub(crate) fn send(&self, priority: Priority, message: T) {
self.queued_count.fetch_add(1, Ordering::Relaxed);
let (inner_sender, _) = &self.inner_queues[index_from_priority(priority)];
inner_sender.send(message).expect("disconnected channel")
}
pub(crate) fn receiver(&self) -> Receiver<'_, T> {
let mut select = crossbeam_channel::Select::new_biased();
for (_, inner_receiver) in &self.inner_queues {
select.recv(inner_receiver);
}
Receiver {
shared: self,
select,
}
}
}
impl<T> Receiver<'_, T>
where
T: Send + 'static,
{
pub(crate) fn blocking_recv(&mut self) -> (T, Priority) {
let selected = self.select.select();
let index = selected.index();
let (_tx, rx) = &self.shared.inner_queues[index];
let item = selected.recv(rx).expect("disconnected channel");
self.shared.queued_count.fetch_sub(1, Ordering::Relaxed);
self.age(index);
(item, priority_from_index(index))
}
fn age(&self, message_consumed_at_index: usize) {
for window in self.shared.inner_queues[message_consumed_at_index..].windows(2) {
let [higher_priority, lower_priority] = window else {
panic!("expected windows of length 2")
};
let (higher_priority_sender, _) = higher_priority;
let (_, lower_priority_receiver) = lower_priority;
if let Ok(message) = lower_priority_receiver.try_recv() {
higher_priority_sender
.send(message)
.expect("disconnected channel")
}
}
}
}
#[test]
fn test_priorities() {
let queue = AgeingPriorityQueue::soft_bounded(3);
assert_eq!(queue.queued_count(), 0);
assert!(!queue.is_full());
queue.send(Priority::P1, "p1");
assert!(!queue.is_full());
queue.send(Priority::P2, "p2");
assert!(!queue.is_full());
queue.send(Priority::P3, "p3");
assert!(queue.is_full());
queue.send(Priority::P2, "p2 again");
assert_eq!(queue.queued_count(), 4);
let mut receiver = queue.receiver();
assert_eq!(receiver.blocking_recv().0, "p3");
assert_eq!(receiver.blocking_recv().0, "p2");
assert_eq!(receiver.blocking_recv().0, "p2 again");
assert_eq!(receiver.blocking_recv().0, "p1");
assert_eq!(queue.queued_count(), 0);
}