bacnet_network/
priority_channel.rs1use std::collections::VecDeque;
9use std::sync::{Arc, Mutex, Weak};
10use tokio::sync::Notify;
11
12use bacnet_types::enums::NetworkPriority;
13
14#[derive(Debug, Clone)]
16pub struct PrioritizedItem<T> {
17 pub priority: NetworkPriority,
18 pub data: T,
19}
20
21pub struct PrioritySender<T> {
23 queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
24 notify: Arc<Notify>,
25 capacity: usize,
26 _token: Arc<()>,
28}
29
30impl<T> Clone for PrioritySender<T> {
31 fn clone(&self) -> Self {
32 Self {
33 queues: Arc::clone(&self.queues),
34 notify: Arc::clone(&self.notify),
35 capacity: self.capacity,
36 _token: Arc::clone(&self._token),
37 }
38 }
39}
40
41impl<T> Drop for PrioritySender<T> {
42 fn drop(&mut self) {
43 self.notify.notify_one();
45 }
46}
47
48pub struct PriorityReceiver<T> {
50 queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
51 notify: Arc<Notify>,
52 sender_token: Weak<()>,
53}
54
55pub fn priority_index(p: NetworkPriority) -> usize {
57 if p == NetworkPriority::LIFE_SAFETY {
58 0
59 } else if p == NetworkPriority::CRITICAL_EQUIPMENT {
60 1
61 } else if p == NetworkPriority::URGENT {
62 2
63 } else {
64 3 }
66}
67
68pub fn priority_channel<T>(capacity: usize) -> (PrioritySender<T>, PriorityReceiver<T>) {
72 let queues = Arc::new(Mutex::new([
73 VecDeque::with_capacity(capacity),
74 VecDeque::with_capacity(capacity),
75 VecDeque::with_capacity(capacity),
76 VecDeque::with_capacity(capacity),
77 ]));
78 let notify = Arc::new(Notify::new());
79 let token = Arc::new(());
80 let weak = Arc::downgrade(&token);
81
82 let tx = PrioritySender {
83 queues: Arc::clone(&queues),
84 notify: Arc::clone(¬ify),
85 capacity,
86 _token: token,
87 };
88
89 let rx = PriorityReceiver {
90 queues,
91 notify,
92 sender_token: weak,
93 };
94
95 (tx, rx)
96}
97
98impl<T> PrioritySender<T> {
99 pub async fn send(&self, item: PrioritizedItem<T>) -> Result<(), PrioritizedItem<T>> {
104 {
105 let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
106 let idx = priority_index(item.priority);
107 let q = &mut queues[idx];
108 if q.len() >= self.capacity {
109 return Err(item);
110 }
111 q.push_back(item);
112 }
113 self.notify.notify_one();
114 Ok(())
115 }
116}
117
118impl<T> PriorityReceiver<T> {
119 pub async fn recv(&mut self) -> Option<PrioritizedItem<T>> {
124 loop {
125 {
127 let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
128 for q in queues.iter_mut() {
129 if let Some(item) = q.pop_front() {
130 return Some(item);
131 }
132 }
133 }
134
135 if self.sender_token.strong_count() == 0 {
137 let queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
138 if queues.iter().all(|q| q.is_empty()) {
139 return None;
140 }
141 }
142
143 self.notify.notified().await;
145 }
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[tokio::test]
154 async fn higher_priority_dequeued_first() {
155 let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
156
157 tx.send(PrioritizedItem {
158 priority: NetworkPriority::NORMAL,
159 data: vec![1],
160 })
161 .await
162 .unwrap();
163 tx.send(PrioritizedItem {
164 priority: NetworkPriority::LIFE_SAFETY,
165 data: vec![2],
166 })
167 .await
168 .unwrap();
169 tx.send(PrioritizedItem {
170 priority: NetworkPriority::URGENT,
171 data: vec![3],
172 })
173 .await
174 .unwrap();
175
176 let first = rx.recv().await.unwrap();
177 assert_eq!(first.priority, NetworkPriority::LIFE_SAFETY);
178 assert_eq!(first.data, vec![2]);
179
180 let second = rx.recv().await.unwrap();
181 assert_eq!(second.priority, NetworkPriority::URGENT);
182 assert_eq!(second.data, vec![3]);
183
184 let third = rx.recv().await.unwrap();
185 assert_eq!(third.priority, NetworkPriority::NORMAL);
186 assert_eq!(third.data, vec![1]);
187 }
188
189 #[tokio::test]
190 async fn same_priority_fifo() {
191 let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
192
193 tx.send(PrioritizedItem {
194 priority: NetworkPriority::NORMAL,
195 data: vec![1],
196 })
197 .await
198 .unwrap();
199 tx.send(PrioritizedItem {
200 priority: NetworkPriority::NORMAL,
201 data: vec![2],
202 })
203 .await
204 .unwrap();
205
206 assert_eq!(rx.recv().await.unwrap().data, vec![1]);
207 assert_eq!(rx.recv().await.unwrap().data, vec![2]);
208 }
209
210 #[tokio::test]
211 async fn sender_drop_closes_channel() {
212 let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
213 tx.send(PrioritizedItem {
214 priority: NetworkPriority::NORMAL,
215 data: vec![1],
216 })
217 .await
218 .unwrap();
219 drop(tx);
220
221 assert_eq!(rx.recv().await.unwrap().data, vec![1]);
222 assert!(rx.recv().await.is_none());
223 }
224
225 #[tokio::test]
226 async fn capacity_limit() {
227 let (tx, mut _rx) = priority_channel::<u8>(2);
228 tx.send(PrioritizedItem {
229 priority: NetworkPriority::NORMAL,
230 data: 1,
231 })
232 .await
233 .unwrap();
234 tx.send(PrioritizedItem {
235 priority: NetworkPriority::NORMAL,
236 data: 2,
237 })
238 .await
239 .unwrap();
240 let result = tx
241 .send(PrioritizedItem {
242 priority: NetworkPriority::NORMAL,
243 data: 3,
244 })
245 .await;
246 assert!(result.is_err());
247
248 tx.send(PrioritizedItem {
249 priority: NetworkPriority::URGENT,
250 data: 4,
251 })
252 .await
253 .unwrap();
254 }
255
256 #[test]
257 fn priority_index_ordering() {
258 assert_eq!(priority_index(NetworkPriority::LIFE_SAFETY), 0);
259 assert_eq!(priority_index(NetworkPriority::CRITICAL_EQUIPMENT), 1);
260 assert_eq!(priority_index(NetworkPriority::URGENT), 2);
261 assert_eq!(priority_index(NetworkPriority::NORMAL), 3);
262 }
263}