use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use crate::factory::DiscardHandler;
use crate::factory::DiscardReason;
use crate::factory::Job;
use crate::factory::JobKey;
use crate::Message;
pub trait Queue<TKey, TMsg>: Send + 'static
where
TKey: JobKey,
TMsg: Message,
{
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn pop_front(&mut self) -> Option<Job<TKey, TMsg>>;
fn discard_oldest(&mut self) -> Option<Job<TKey, TMsg>>;
fn peek(&self) -> Option<&Job<TKey, TMsg>>;
fn push_back(&mut self, job: Job<TKey, TMsg>);
fn remove_expired_items(
&mut self,
discard_handler: &Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
) -> usize;
fn is_job_discardable(&self, _key: &TKey) -> bool {
true
}
}
pub trait Priority: Default + From<usize> + Send + 'static {
fn get_index(&self) -> usize;
}
#[derive(strum::FromRepr, Default, Debug, Clone, Copy, Eq, PartialEq, Hash)]
#[repr(usize)]
pub enum StandardPriority {
Highest = 0,
High = 1,
Important = 2,
#[default]
Normal = 3,
BestEffort = 4,
}
#[cfg(feature = "cluster")]
impl crate::BytesConvertable for StandardPriority {
fn from_bytes(bytes: Vec<u8>) -> Self {
(u64::from_bytes(bytes) as usize).into()
}
fn into_bytes(self) -> Vec<u8> {
(self as u64).into_bytes()
}
}
impl StandardPriority {
pub const fn size() -> usize {
5
}
}
impl Priority for StandardPriority {
fn get_index(&self) -> usize {
*self as usize
}
}
impl From<usize> for StandardPriority {
fn from(value: usize) -> Self {
Self::from_repr(value).unwrap_or_default()
}
}
pub trait PriorityManager<TKey, TPriority>: Send + Sync + 'static
where
TKey: JobKey,
TPriority: Priority,
{
fn is_discardable(&self, job: &TKey) -> bool;
fn get_priority(&self, job: &TKey) -> Option<TPriority>;
}
pub struct DefaultQueue<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
q: VecDeque<Job<TKey, TMsg>>,
}
impl<TKey, TMsg> Debug for DefaultQueue<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DefaultQueue({} items)", self.q.len())
}
}
impl<TKey, TMsg> Default for DefaultQueue<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn default() -> Self {
Self { q: VecDeque::new() }
}
}
impl<TKey, TMsg> Queue<TKey, TMsg> for DefaultQueue<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn len(&self) -> usize {
self.q.len()
}
fn is_empty(&self) -> bool {
self.q.is_empty()
}
fn pop_front(&mut self) -> Option<Job<TKey, TMsg>> {
self.q.pop_front()
}
fn discard_oldest(&mut self) -> Option<Job<TKey, TMsg>> {
self.pop_front()
}
fn peek(&self) -> Option<&Job<TKey, TMsg>> {
self.q.front()
}
fn push_back(&mut self, job: Job<TKey, TMsg>) {
self.q.push_back(job)
}
fn remove_expired_items(
&mut self,
discard_handler: &Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
) -> usize {
let before = self.q.len();
self.q.retain_mut(|queued_item| {
if queued_item.is_expired() {
if let Some(handler) = discard_handler {
handler.discard(DiscardReason::TtlExpired, queued_item);
}
false
} else {
true
}
});
before - self.q.len()
}
}
pub struct PriorityQueue<TKey, TMsg, TPriority, TPriorityManager, const NUM_PRIORITIES: usize>
where
TKey: JobKey,
TMsg: Message,
TPriority: Priority,
TPriorityManager: PriorityManager<TKey, TPriority>,
{
queues: [VecDeque<Job<TKey, TMsg>>; NUM_PRIORITIES],
priority_manager: TPriorityManager,
_p: PhantomData<fn() -> TPriority>,
}
impl<TKey, TMsg, TPriority, TPriorityManager, const NUM_PRIORITIES: usize> Debug
for PriorityQueue<TKey, TMsg, TPriority, TPriorityManager, NUM_PRIORITIES>
where
TKey: JobKey,
TMsg: Message,
TPriority: Priority,
TPriorityManager: PriorityManager<TKey, TPriority>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PriorityQueue({} items)", self.len())
}
}
impl<TKey, TMsg, TPriority, TPriorityManager, const NUM_PRIORITIES: usize>
PriorityQueue<TKey, TMsg, TPriority, TPriorityManager, NUM_PRIORITIES>
where
TKey: JobKey,
TMsg: Message,
TPriority: Priority,
TPriorityManager: PriorityManager<TKey, TPriority>,
{
pub fn new(priority_manager: TPriorityManager) -> Self {
Self {
_p: PhantomData,
priority_manager,
queues: [(); NUM_PRIORITIES].map(|_| VecDeque::new()),
}
}
}
impl<TKey, TMsg, TPriority, TPriorityManager, const NUM_PRIORITIES: usize> Queue<TKey, TMsg>
for PriorityQueue<TKey, TMsg, TPriority, TPriorityManager, NUM_PRIORITIES>
where
TKey: JobKey,
TMsg: Message,
TPriority: Priority,
TPriorityManager: PriorityManager<TKey, TPriority>,
{
fn len(&self) -> usize {
self.queues.iter().map(|q| q.len()).sum()
}
fn is_empty(&self) -> bool {
self.queues.iter().all(|q| q.is_empty())
}
fn pop_front(&mut self) -> Option<Job<TKey, TMsg>> {
for i in 0..NUM_PRIORITIES {
if let Some(r) = self.queues[i].pop_front() {
return Some(r);
}
}
None
}
fn discard_oldest(&mut self) -> Option<Job<TKey, TMsg>> {
for i in (0..NUM_PRIORITIES).rev() {
if let Some(r) = self.queues[i].pop_front() {
return Some(r);
}
}
None
}
fn peek(&self) -> Option<&Job<TKey, TMsg>> {
for i in 0..NUM_PRIORITIES {
let maybe = self.queues[i].front();
if maybe.is_some() {
return maybe;
}
}
None
}
fn push_back(&mut self, job: Job<TKey, TMsg>) {
let priority = self
.priority_manager
.get_priority(&job.key)
.unwrap_or_else(Default::default);
let idx = priority.get_index();
self.queues[idx].push_back(job);
}
fn remove_expired_items(
&mut self,
discard_handler: &Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
) -> usize {
let mut num_removed = 0;
for i in 0..NUM_PRIORITIES {
self.queues[i].retain_mut(|queued_item| {
if queued_item.is_expired() {
if let Some(handler) = discard_handler {
handler.discard(DiscardReason::TtlExpired, queued_item);
}
num_removed += 1;
false
} else {
true
}
});
}
num_removed
}
fn is_job_discardable(&self, key: &TKey) -> bool {
self.priority_manager.is_discardable(key)
}
}
#[cfg(test)]
mod tests {
use super::super::*;
use super::*;
use crate::concurrency::Duration;
#[derive(Default, Debug)]
enum BasicPriority {
#[default]
Low,
High,
}
impl Priority for BasicPriority {
fn get_index(&self) -> usize {
match self {
BasicPriority::Low => 1,
BasicPriority::High => 0,
}
}
}
impl From<usize> for BasicPriority {
fn from(value: usize) -> Self {
match value {
0 => BasicPriority::High,
_ => BasicPriority::Low,
}
}
}
struct BasicPriorityManager;
impl PriorityManager<u64, BasicPriority> for BasicPriorityManager {
fn get_priority(&self, _key: &u64) -> Option<BasicPriority> {
if *_key % 2 == 0 {
Some(BasicPriority::High)
} else {
Some(BasicPriority::Low)
}
}
fn is_discardable(&self, _key: &u64) -> bool {
false
}
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_basic_queueing() {
let mut queue = DefaultQueue::<u64, ()>::default();
for i in 0..99 {
queue.push_back(Job {
key: i,
accepted: None,
msg: (),
options: JobOptions::default(),
});
}
queue.push_back(Job {
key: 99,
accepted: None,
msg: (),
options: JobOptions::new(Some(Duration::from_millis(1))),
});
let oldest = queue.discard_oldest();
assert!(matches!(oldest, Some(Job { key: 0, .. })));
let peeked = queue.peek();
assert!(matches!(peeked, Some(Job { key: 1, .. })));
let popped = queue.pop_front();
assert!(matches!(popped, Some(Job { key: 1, .. })));
let len = queue.len();
assert_eq!(len, 98);
let is_empty = queue.is_empty();
assert!(!is_empty);
crate::concurrency::sleep(Duration::from_millis(2)).await;
struct MyDiscardHandler;
impl DiscardHandler<u64, ()> for MyDiscardHandler {
fn discard(&self, _reason: DiscardReason, job: &mut Job<u64, ()>) {
tracing::info!("discarding job: {}", job.key);
assert_eq!(99, job.key);
}
}
_ = queue.remove_expired_items(&Some(Arc::new(MyDiscardHandler)));
let len = queue.len();
assert_eq!(len, 97);
}
#[crate::concurrency::test]
#[cfg_attr(
not(all(target_arch = "wasm32", target_os = "unknown")),
tracing_test::traced_test
)]
async fn test_priority_queueing() {
let mut queue = PriorityQueue::<u64, (), BasicPriority, BasicPriorityManager, 2>::new(
BasicPriorityManager,
);
for i in 0..99 {
queue.push_back(Job {
key: i,
accepted: None,
msg: (),
options: JobOptions::default(),
});
}
queue.push_back(Job {
key: 99,
accepted: None,
msg: (),
options: JobOptions::new(Some(Duration::from_millis(1))),
});
let oldest = queue.discard_oldest();
assert!(matches!(oldest, Some(Job { key: 1, .. })));
let peeked = queue.peek();
assert!(matches!(peeked, Some(Job { key: 0, .. })));
let popped = queue.pop_front();
assert!(matches!(popped, Some(Job { key: 0, .. })));
let len = queue.len();
assert_eq!(len, 98);
let is_empty = queue.is_empty();
assert!(!is_empty);
crate::concurrency::sleep(Duration::from_millis(2)).await;
struct MyDiscardHandler;
impl DiscardHandler<u64, ()> for MyDiscardHandler {
fn discard(&self, _reason: DiscardReason, job: &mut Job<u64, ()>) {
tracing::info!("discarding job: {}", job.key);
assert_eq!(99, job.key);
}
}
_ = queue.remove_expired_items(&Some(Arc::new(MyDiscardHandler)));
let len = queue.len();
assert_eq!(len, 97);
}
}