use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};
use tokio::sync::Notify;
use bacnet_types::enums::NetworkPriority;
#[derive(Debug, Clone)]
pub struct PrioritizedItem<T> {
pub priority: NetworkPriority,
pub data: T,
}
pub struct PrioritySender<T> {
queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
notify: Arc<Notify>,
capacity: usize,
_token: Arc<()>,
}
impl<T> Clone for PrioritySender<T> {
fn clone(&self) -> Self {
Self {
queues: Arc::clone(&self.queues),
notify: Arc::clone(&self.notify),
capacity: self.capacity,
_token: Arc::clone(&self._token),
}
}
}
impl<T> Drop for PrioritySender<T> {
fn drop(&mut self) {
self.notify.notify_one();
}
}
pub struct PriorityReceiver<T> {
queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
notify: Arc<Notify>,
sender_token: Weak<()>,
}
pub fn priority_index(p: NetworkPriority) -> usize {
if p == NetworkPriority::LIFE_SAFETY {
0
} else if p == NetworkPriority::CRITICAL_EQUIPMENT {
1
} else if p == NetworkPriority::URGENT {
2
} else {
3 }
}
pub fn priority_channel<T>(capacity: usize) -> (PrioritySender<T>, PriorityReceiver<T>) {
let queues = Arc::new(Mutex::new([
VecDeque::with_capacity(capacity),
VecDeque::with_capacity(capacity),
VecDeque::with_capacity(capacity),
VecDeque::with_capacity(capacity),
]));
let notify = Arc::new(Notify::new());
let token = Arc::new(());
let weak = Arc::downgrade(&token);
let tx = PrioritySender {
queues: Arc::clone(&queues),
notify: Arc::clone(¬ify),
capacity,
_token: token,
};
let rx = PriorityReceiver {
queues,
notify,
sender_token: weak,
};
(tx, rx)
}
impl<T> PrioritySender<T> {
pub async fn send(&self, item: PrioritizedItem<T>) -> Result<(), PrioritizedItem<T>> {
{
let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
let idx = priority_index(item.priority);
let q = &mut queues[idx];
if q.len() >= self.capacity {
return Err(item);
}
q.push_back(item);
}
self.notify.notify_one();
Ok(())
}
}
impl<T> PriorityReceiver<T> {
pub async fn recv(&mut self) -> Option<PrioritizedItem<T>> {
loop {
{
let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
for q in queues.iter_mut() {
if let Some(item) = q.pop_front() {
return Some(item);
}
}
}
if self.sender_token.strong_count() == 0 {
let queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
if queues.iter().all(|q| q.is_empty()) {
return None;
}
}
self.notify.notified().await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn higher_priority_dequeued_first() {
let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: vec![1],
})
.await
.unwrap();
tx.send(PrioritizedItem {
priority: NetworkPriority::LIFE_SAFETY,
data: vec![2],
})
.await
.unwrap();
tx.send(PrioritizedItem {
priority: NetworkPriority::URGENT,
data: vec![3],
})
.await
.unwrap();
let first = rx.recv().await.unwrap();
assert_eq!(first.priority, NetworkPriority::LIFE_SAFETY);
assert_eq!(first.data, vec![2]);
let second = rx.recv().await.unwrap();
assert_eq!(second.priority, NetworkPriority::URGENT);
assert_eq!(second.data, vec![3]);
let third = rx.recv().await.unwrap();
assert_eq!(third.priority, NetworkPriority::NORMAL);
assert_eq!(third.data, vec![1]);
}
#[tokio::test]
async fn same_priority_fifo() {
let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: vec![1],
})
.await
.unwrap();
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: vec![2],
})
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap().data, vec![1]);
assert_eq!(rx.recv().await.unwrap().data, vec![2]);
}
#[tokio::test]
async fn sender_drop_closes_channel() {
let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: vec![1],
})
.await
.unwrap();
drop(tx);
assert_eq!(rx.recv().await.unwrap().data, vec![1]);
assert!(rx.recv().await.is_none());
}
#[tokio::test]
async fn capacity_limit() {
let (tx, mut _rx) = priority_channel::<u8>(2);
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: 1,
})
.await
.unwrap();
tx.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: 2,
})
.await
.unwrap();
let result = tx
.send(PrioritizedItem {
priority: NetworkPriority::NORMAL,
data: 3,
})
.await;
assert!(result.is_err());
tx.send(PrioritizedItem {
priority: NetworkPriority::URGENT,
data: 4,
})
.await
.unwrap();
}
#[test]
fn priority_index_ordering() {
assert_eq!(priority_index(NetworkPriority::LIFE_SAFETY), 0);
assert_eq!(priority_index(NetworkPriority::CRITICAL_EQUIPMENT), 1);
assert_eq!(priority_index(NetworkPriority::URGENT), 2);
assert_eq!(priority_index(NetworkPriority::NORMAL), 3);
}
}