Skip to main content

bacnet_network/
priority_channel.rs

1//! Priority-aware channel for BACnet NPDU dispatch (Clause 6.2.2).
2//!
3//! BACnet defines four priority levels: Normal, Urgent, Critical Equipment,
4//! and Life Safety. Higher-priority messages must be dispatched before
5//! lower-priority ones. This module provides a multi-queue channel where
6//! the receiver always drains the highest-priority queue first.
7
8use std::collections::VecDeque;
9use std::sync::{Arc, Mutex, Weak};
10use tokio::sync::Notify;
11
12use bacnet_types::enums::NetworkPriority;
13
14/// An item tagged with a BACnet network priority.
15#[derive(Debug, Clone)]
16pub struct PrioritizedItem<T> {
17    pub priority: NetworkPriority,
18    pub data: T,
19}
20
21/// Cloneable sender half of a priority channel.
22pub struct PrioritySender<T> {
23    queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
24    notify: Arc<Notify>,
25    capacity: usize,
26    /// Shared token — receiver holds a `Weak` to detect all senders dropped.
27    _token: Arc<()>,
28}
29
30impl<T> Clone for PrioritySender<T> {
31    fn clone(&self) -> Self {
32        Self {
33            queues: Arc::clone(&self.queues),
34            notify: Arc::clone(&self.notify),
35            capacity: self.capacity,
36            _token: Arc::clone(&self._token),
37        }
38    }
39}
40
41impl<T> Drop for PrioritySender<T> {
42    fn drop(&mut self) {
43        // Wake the receiver so it can check the closed condition.
44        self.notify.notify_one();
45    }
46}
47
48/// Receiver half of a priority channel (not cloneable).
49pub struct PriorityReceiver<T> {
50    queues: Arc<Mutex<[VecDeque<PrioritizedItem<T>>; 4]>>,
51    notify: Arc<Notify>,
52    sender_token: Weak<()>,
53}
54
55/// Map a `NetworkPriority` to a queue index (0 = highest priority).
56pub fn priority_index(p: NetworkPriority) -> usize {
57    if p == NetworkPriority::LIFE_SAFETY {
58        0
59    } else if p == NetworkPriority::CRITICAL_EQUIPMENT {
60        1
61    } else if p == NetworkPriority::URGENT {
62        2
63    } else {
64        3 // NORMAL and any unknown/vendor values
65    }
66}
67
68/// Create a priority channel with `capacity` slots per priority level.
69///
70/// Returns a `(PrioritySender, PriorityReceiver)` pair.
71pub fn priority_channel<T>(capacity: usize) -> (PrioritySender<T>, PriorityReceiver<T>) {
72    let queues = Arc::new(Mutex::new([
73        VecDeque::with_capacity(capacity),
74        VecDeque::with_capacity(capacity),
75        VecDeque::with_capacity(capacity),
76        VecDeque::with_capacity(capacity),
77    ]));
78    let notify = Arc::new(Notify::new());
79    let token = Arc::new(());
80    let weak = Arc::downgrade(&token);
81
82    let tx = PrioritySender {
83        queues: Arc::clone(&queues),
84        notify: Arc::clone(&notify),
85        capacity,
86        _token: token,
87    };
88
89    let rx = PriorityReceiver {
90        queues,
91        notify,
92        sender_token: weak,
93    };
94
95    (tx, rx)
96}
97
98impl<T> PrioritySender<T> {
99    /// Enqueue an item into the appropriate priority queue.
100    ///
101    /// Returns `Err(item)` if that priority's queue is at capacity.
102    /// The method is async for API consistency but does not block.
103    pub async fn send(&self, item: PrioritizedItem<T>) -> Result<(), PrioritizedItem<T>> {
104        {
105            let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
106            let idx = priority_index(item.priority);
107            let q = &mut queues[idx];
108            if q.len() >= self.capacity {
109                return Err(item);
110            }
111            q.push_back(item);
112        }
113        self.notify.notify_one();
114        Ok(())
115    }
116}
117
118impl<T> PriorityReceiver<T> {
119    /// Receive the next item, highest priority first.
120    ///
121    /// Returns `None` when all senders have been dropped and every queue is
122    /// empty — i.e. the channel is closed and fully drained.
123    pub async fn recv(&mut self) -> Option<PrioritizedItem<T>> {
124        loop {
125            // Try to dequeue in priority order (index 0 = highest).
126            {
127                let mut queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
128                for q in queues.iter_mut() {
129                    if let Some(item) = q.pop_front() {
130                        return Some(item);
131                    }
132                }
133            }
134
135            // All queues empty — check if senders are gone.
136            if self.sender_token.strong_count() == 0 {
137                let queues = self.queues.lock().unwrap_or_else(|e| e.into_inner());
138                if queues.iter().all(|q| q.is_empty()) {
139                    return None;
140                }
141            }
142
143            // Park until a sender enqueues or drops.
144            self.notify.notified().await;
145        }
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[tokio::test]
154    async fn higher_priority_dequeued_first() {
155        let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
156
157        tx.send(PrioritizedItem {
158            priority: NetworkPriority::NORMAL,
159            data: vec![1],
160        })
161        .await
162        .unwrap();
163        tx.send(PrioritizedItem {
164            priority: NetworkPriority::LIFE_SAFETY,
165            data: vec![2],
166        })
167        .await
168        .unwrap();
169        tx.send(PrioritizedItem {
170            priority: NetworkPriority::URGENT,
171            data: vec![3],
172        })
173        .await
174        .unwrap();
175
176        let first = rx.recv().await.unwrap();
177        assert_eq!(first.priority, NetworkPriority::LIFE_SAFETY);
178        assert_eq!(first.data, vec![2]);
179
180        let second = rx.recv().await.unwrap();
181        assert_eq!(second.priority, NetworkPriority::URGENT);
182        assert_eq!(second.data, vec![3]);
183
184        let third = rx.recv().await.unwrap();
185        assert_eq!(third.priority, NetworkPriority::NORMAL);
186        assert_eq!(third.data, vec![1]);
187    }
188
189    #[tokio::test]
190    async fn same_priority_fifo() {
191        let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
192
193        tx.send(PrioritizedItem {
194            priority: NetworkPriority::NORMAL,
195            data: vec![1],
196        })
197        .await
198        .unwrap();
199        tx.send(PrioritizedItem {
200            priority: NetworkPriority::NORMAL,
201            data: vec![2],
202        })
203        .await
204        .unwrap();
205
206        assert_eq!(rx.recv().await.unwrap().data, vec![1]);
207        assert_eq!(rx.recv().await.unwrap().data, vec![2]);
208    }
209
210    #[tokio::test]
211    async fn sender_drop_closes_channel() {
212        let (tx, mut rx) = priority_channel::<Vec<u8>>(16);
213        tx.send(PrioritizedItem {
214            priority: NetworkPriority::NORMAL,
215            data: vec![1],
216        })
217        .await
218        .unwrap();
219        drop(tx);
220
221        assert_eq!(rx.recv().await.unwrap().data, vec![1]);
222        assert!(rx.recv().await.is_none());
223    }
224
225    #[tokio::test]
226    async fn capacity_limit() {
227        let (tx, mut _rx) = priority_channel::<u8>(2);
228        tx.send(PrioritizedItem {
229            priority: NetworkPriority::NORMAL,
230            data: 1,
231        })
232        .await
233        .unwrap();
234        tx.send(PrioritizedItem {
235            priority: NetworkPriority::NORMAL,
236            data: 2,
237        })
238        .await
239        .unwrap();
240        let result = tx
241            .send(PrioritizedItem {
242                priority: NetworkPriority::NORMAL,
243                data: 3,
244            })
245            .await;
246        assert!(result.is_err());
247
248        tx.send(PrioritizedItem {
249            priority: NetworkPriority::URGENT,
250            data: 4,
251        })
252        .await
253        .unwrap();
254    }
255
256    #[test]
257    fn priority_index_ordering() {
258        assert_eq!(priority_index(NetworkPriority::LIFE_SAFETY), 0);
259        assert_eq!(priority_index(NetworkPriority::CRITICAL_EQUIPMENT), 1);
260        assert_eq!(priority_index(NetworkPriority::URGENT), 2);
261        assert_eq!(priority_index(NetworkPriority::NORMAL), 3);
262    }
263}