1use parking_lot::Mutex;
2use std::sync::Arc;
3
4use crate::{backend::Awaker, collections::DynamicDeque};
5
6#[derive(Debug, PartialEq, Eq, Clone)]
8pub enum BusEventSource<ChannelId> {
9 Channel(usize, ChannelId),
10 Broadcast(usize),
11 Direct(usize),
12 External,
13}
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum BusLegSenderErr {
17 ChannelFull,
18}
19
20struct QueueInternal<ChannelId, MSG, const STATIC_SIZE: usize> {
21 awaker: Option<Arc<dyn Awaker>>,
22 queue: DynamicDeque<(BusEventSource<ChannelId>, MSG), STATIC_SIZE>,
23}
24
25impl<ChannelId, MSG, const STATIC_SIZE: usize> Default
26 for QueueInternal<ChannelId, MSG, STATIC_SIZE>
27{
28 fn default() -> Self {
29 Self {
30 awaker: None,
31 queue: DynamicDeque::default(),
32 }
33 }
34}
35
36impl<ChannelId, MSG, const STATIC_SIZE: usize> QueueInternal<ChannelId, MSG, STATIC_SIZE> {
37 pub fn push_safe(&mut self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
38 self.queue.push_back((source, msg));
39 let after = self.queue.len();
40 if after == 1 {
41 if let Some(awaker) = self.awaker.as_ref() {
42 awaker.awake();
43 }
44 }
45 after
46 }
47
48 pub fn push_generic(
49 &mut self,
50 safe: bool,
51 source: BusEventSource<ChannelId>,
52 msg: MSG,
53 ) -> Result<usize, ()> {
54 if safe {
55 self.queue.push_back((source, msg));
56 } else {
57 self.queue.push_back_stack((source, msg)).map_err(|_| ())?;
58 }
59 let after = self.queue.len();
60 if after == 1 {
61 if let Some(awaker) = self.awaker.as_ref() {
62 awaker.awake();
63 }
64 }
65 Ok(after)
66 }
67
68 pub fn pop_front(&mut self) -> Option<(BusEventSource<ChannelId>, MSG)> {
69 self.queue.pop_front()
70 }
71}
72
73#[derive(Default)]
74struct SharedBusQueue<ChannelId, MSG, const STATIC_SIZE: usize> {
75 queue: Arc<Mutex<QueueInternal<ChannelId, MSG, STATIC_SIZE>>>,
76}
77
78impl<ChannelId, MSG, const STATIC_SIZE: usize> Clone
79 for SharedBusQueue<ChannelId, MSG, STATIC_SIZE>
80{
81 fn clone(&self) -> Self {
82 Self {
83 queue: self.queue.clone(),
84 }
85 }
86}
87
88impl<ChannelId, MSG, const STATIC_SIZE: usize> SharedBusQueue<ChannelId, MSG, STATIC_SIZE> {
89 fn send_safe(&self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
90 self.queue.lock().push_safe(source, msg)
91 }
92
93 fn send(
94 &self,
95 source: BusEventSource<ChannelId>,
96 safe: bool,
97 msg: MSG,
98 ) -> Result<usize, BusLegSenderErr> {
99 self.queue
100 .lock()
101 .push_generic(safe, source, msg)
102 .map_err(|_| BusLegSenderErr::ChannelFull)
103 }
104
105 fn recv(&self) -> Option<(BusEventSource<ChannelId>, MSG)> {
106 self.queue.lock().pop_front()
107 }
108
109 fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
110 self.queue.lock().awaker = Some(awaker);
111 }
112}
113
114pub struct BusLegSender<ChannelId, MSG, const STATIC_SIZE: usize> {
119 queue: SharedBusQueue<ChannelId, MSG, STATIC_SIZE>,
120}
121
122impl<ChannelId, MSG, const STATIC_SIZE: usize> Clone for BusLegSender<ChannelId, MSG, STATIC_SIZE> {
123 fn clone(&self) -> Self {
124 Self {
125 queue: self.queue.clone(),
126 }
127 }
128}
129
130impl<ChannelId, MSG, const STATIC_SIZE: usize> BusLegSender<ChannelId, MSG, STATIC_SIZE> {
131 pub fn send_safe(&self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
143 self.queue.send_safe(source, msg)
144 }
145
146 pub fn send(
158 &self,
159 source: BusEventSource<ChannelId>,
160 safe: bool,
161 msg: MSG,
162 ) -> Result<usize, BusLegSenderErr> {
163 self.queue.send(source, safe, msg)
164 }
165}
166
167pub struct BusLegReceiver<ChannelId, MSG, const STATIC_SIZE: usize> {
172 queue: SharedBusQueue<ChannelId, MSG, STATIC_SIZE>,
173}
174
175impl<ChannelId, MSG, const STATIC_SIZE: usize> BusLegReceiver<ChannelId, MSG, STATIC_SIZE> {
176 pub fn recv(&self) -> Option<(BusEventSource<ChannelId>, MSG)> {
184 self.queue.recv()
185 }
186
187 pub fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
188 self.queue.set_awaker(awaker);
189 }
190}
191
192pub fn create_bus_leg<ChannelId, MSG, const STATIC_SIZE: usize>() -> (
198 BusLegSender<ChannelId, MSG, STATIC_SIZE>,
199 BusLegReceiver<ChannelId, MSG, STATIC_SIZE>,
200) {
201 let queue = SharedBusQueue {
202 queue: Default::default(),
203 };
204 let sender = BusLegSender {
205 queue: queue.clone(),
206 };
207 let receiver = BusLegReceiver { queue };
208 (sender, receiver)
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn test_send_and_recv() {
217 let (sender, receiver) = create_bus_leg::<u32, String, 10>();
219
220 let source = BusEventSource::Channel(0, 42);
222 let msg = "Hello, world!".to_string();
223 let result = sender.send(source.clone(), true, msg.clone());
224 assert_eq!(result, Ok(1));
225
226 let received = receiver.recv();
228 assert_eq!(received, Some((source, msg)));
229 }
230
231 #[test]
232 fn test_send_channel_full() {
233 let (sender, receiver) = create_bus_leg::<u32, &'static str, 1>();
235 let source = BusEventSource::Channel(0, 42);
236
237 let result = sender.send(source.clone(), false, "Message 1");
240 assert_eq!(result, Ok(1));
241
242 let result = sender.send(source.clone(), false, "Message 2");
244 assert_eq!(result, Err(BusLegSenderErr::ChannelFull));
245
246 let result = sender.send(source.clone(), true, "Message 3");
248 assert_eq!(result, Ok(2));
249
250 assert_eq!(receiver.recv(), Some((source.clone(), "Message 1")));
251 assert_eq!(receiver.recv(), Some((source, "Message 3")));
252 assert_eq!(receiver.recv(), None);
253 }
254
255 #[test]
256 fn test_recv_empty_queue() {
257 let (_, receiver) = create_bus_leg::<u32, String, 10>();
259
260 let received = receiver.recv();
262 assert_eq!(received, None);
263 }
264}