sans_io_runtime/bus/
leg.rs

1use parking_lot::Mutex;
2use std::sync::Arc;
3
4use crate::{backend::Awaker, collections::DynamicDeque};
5
6/// Represents the identifier of a channel.
7#[derive(Debug, PartialEq, Eq, Clone)]
8pub enum BusEventSource<ChannelId> {
9    Channel(usize, ChannelId),
10    Broadcast(usize),
11    Direct(usize),
12    External,
13}
14
15#[derive(Debug, PartialEq, Eq)]
16pub enum BusLegSenderErr {
17    ChannelFull,
18}
19
20struct QueueInternal<ChannelId, MSG, const STATIC_SIZE: usize> {
21    awaker: Option<Arc<dyn Awaker>>,
22    queue: DynamicDeque<(BusEventSource<ChannelId>, MSG), STATIC_SIZE>,
23}
24
25impl<ChannelId, MSG, const STATIC_SIZE: usize> Default
26    for QueueInternal<ChannelId, MSG, STATIC_SIZE>
27{
28    fn default() -> Self {
29        Self {
30            awaker: None,
31            queue: DynamicDeque::default(),
32        }
33    }
34}
35
36impl<ChannelId, MSG, const STATIC_SIZE: usize> QueueInternal<ChannelId, MSG, STATIC_SIZE> {
37    pub fn push_safe(&mut self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
38        self.queue.push_back((source, msg));
39        let after = self.queue.len();
40        if after == 1 {
41            if let Some(awaker) = self.awaker.as_ref() {
42                awaker.awake();
43            }
44        }
45        after
46    }
47
48    pub fn push_generic(
49        &mut self,
50        safe: bool,
51        source: BusEventSource<ChannelId>,
52        msg: MSG,
53    ) -> Result<usize, ()> {
54        if safe {
55            self.queue.push_back((source, msg));
56        } else {
57            self.queue.push_back_stack((source, msg)).map_err(|_| ())?;
58        }
59        let after = self.queue.len();
60        if after == 1 {
61            if let Some(awaker) = self.awaker.as_ref() {
62                awaker.awake();
63            }
64        }
65        Ok(after)
66    }
67
68    pub fn pop_front(&mut self) -> Option<(BusEventSource<ChannelId>, MSG)> {
69        self.queue.pop_front()
70    }
71}
72
73#[derive(Default)]
74struct SharedBusQueue<ChannelId, MSG, const STATIC_SIZE: usize> {
75    queue: Arc<Mutex<QueueInternal<ChannelId, MSG, STATIC_SIZE>>>,
76}
77
78impl<ChannelId, MSG, const STATIC_SIZE: usize> Clone
79    for SharedBusQueue<ChannelId, MSG, STATIC_SIZE>
80{
81    fn clone(&self) -> Self {
82        Self {
83            queue: self.queue.clone(),
84        }
85    }
86}
87
88impl<ChannelId, MSG, const STATIC_SIZE: usize> SharedBusQueue<ChannelId, MSG, STATIC_SIZE> {
89    fn send_safe(&self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
90        self.queue.lock().push_safe(source, msg)
91    }
92
93    fn send(
94        &self,
95        source: BusEventSource<ChannelId>,
96        safe: bool,
97        msg: MSG,
98    ) -> Result<usize, BusLegSenderErr> {
99        self.queue
100            .lock()
101            .push_generic(safe, source, msg)
102            .map_err(|_| BusLegSenderErr::ChannelFull)
103    }
104
105    fn recv(&self) -> Option<(BusEventSource<ChannelId>, MSG)> {
106        self.queue.lock().pop_front()
107    }
108
109    fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
110        self.queue.lock().awaker = Some(awaker);
111    }
112}
113
114/// A sender for a bus leg.
115///
116/// This struct is used to send messages through a bus leg. It holds a queue of messages
117/// that are waiting to be processed by the bus leg.
118pub struct BusLegSender<ChannelId, MSG, const STATIC_SIZE: usize> {
119    queue: SharedBusQueue<ChannelId, MSG, STATIC_SIZE>,
120}
121
122impl<ChannelId, MSG, const STATIC_SIZE: usize> Clone for BusLegSender<ChannelId, MSG, STATIC_SIZE> {
123    fn clone(&self) -> Self {
124        Self {
125            queue: self.queue.clone(),
126        }
127    }
128}
129
130impl<ChannelId, MSG, const STATIC_SIZE: usize> BusLegSender<ChannelId, MSG, STATIC_SIZE> {
131    /// Sends a message through the bus leg with safe, if stack full, it will append to heap.
132    ///
133    /// # Arguments
134    ///
135    /// * `source` - The source of the bus event.
136    /// * `msg` - The message to be sent.
137    ///
138    /// # Returns
139    ///
140    /// Returns `Ok(len)` if the message is successfully sent, where `len` is the new length of the queue.
141    /// Returns `Err(ChannelFull)` if the queue is already full and the message cannot be sent.
142    pub fn send_safe(&self, source: BusEventSource<ChannelId>, msg: MSG) -> usize {
143        self.queue.send_safe(source, msg)
144    }
145
146    /// Sends a message through the bus leg.
147    ///
148    /// # Arguments
149    ///
150    /// * `source` - The source of the bus event.
151    /// * `msg` - The message to be sent.
152    ///
153    /// # Returns
154    ///
155    /// Returns `Ok(len)` if the message is successfully sent, where `len` is the new length of the queue.
156    /// Returns `Err(ChannelFull)` if the queue is already full and the message cannot be sent.
157    pub fn send(
158        &self,
159        source: BusEventSource<ChannelId>,
160        safe: bool,
161        msg: MSG,
162    ) -> Result<usize, BusLegSenderErr> {
163        self.queue.send(source, safe, msg)
164    }
165}
166
167/// A receiver for a bus leg.
168///
169/// This struct is used to receive messages from a bus leg. It holds a queue of messages
170/// that have been sent to the bus leg.
171pub struct BusLegReceiver<ChannelId, MSG, const STATIC_SIZE: usize> {
172    queue: SharedBusQueue<ChannelId, MSG, STATIC_SIZE>,
173}
174
175impl<ChannelId, MSG, const STATIC_SIZE: usize> BusLegReceiver<ChannelId, MSG, STATIC_SIZE> {
176    /// Receives a message from the bus leg.
177    ///
178    /// # Returns
179    ///
180    /// Returns `Some((source, msg))` if there is a message in the queue, where `source` is the source of the bus event
181    /// and `msg` is the received message.
182    /// Returns `None` if the queue is empty.
183    pub fn recv(&self) -> Option<(BusEventSource<ChannelId>, MSG)> {
184        self.queue.recv()
185    }
186
187    pub fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
188        self.queue.set_awaker(awaker);
189    }
190}
191
192/// Creates a pair of sender and receiver for a bus leg.
193///
194/// # Returns
195///
196/// Returns a tuple `(sender, receiver)` where `sender` is a `BusLegSender` and `receiver` is a `BusLegReceiver`.
197pub fn create_bus_leg<ChannelId, MSG, const STATIC_SIZE: usize>() -> (
198    BusLegSender<ChannelId, MSG, STATIC_SIZE>,
199    BusLegReceiver<ChannelId, MSG, STATIC_SIZE>,
200) {
201    let queue = SharedBusQueue {
202        queue: Default::default(),
203    };
204    let sender = BusLegSender {
205        queue: queue.clone(),
206    };
207    let receiver = BusLegReceiver { queue };
208    (sender, receiver)
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn test_send_and_recv() {
217        // Create a bus leg
218        let (sender, receiver) = create_bus_leg::<u32, String, 10>();
219
220        // Send a message
221        let source = BusEventSource::Channel(0, 42);
222        let msg = "Hello, world!".to_string();
223        let result = sender.send(source.clone(), true, msg.clone());
224        assert_eq!(result, Ok(1));
225
226        // Receive the message
227        let received = receiver.recv();
228        assert_eq!(received, Some((source, msg)));
229    }
230
231    #[test]
232    fn test_send_channel_full() {
233        // Create a bus leg with a static size of 1
234        let (sender, receiver) = create_bus_leg::<u32, &'static str, 1>();
235        let source = BusEventSource::Channel(0, 42);
236
237        // Send the first message
238
239        let result = sender.send(source.clone(), false, "Message 1");
240        assert_eq!(result, Ok(1));
241
242        // Send the second message (should fail)
243        let result = sender.send(source.clone(), false, "Message 2");
244        assert_eq!(result, Err(BusLegSenderErr::ChannelFull));
245
246        // Send the second message as safe (should not fail)
247        let result = sender.send(source.clone(), true, "Message 3");
248        assert_eq!(result, Ok(2));
249
250        assert_eq!(receiver.recv(), Some((source.clone(), "Message 1")));
251        assert_eq!(receiver.recv(), Some((source, "Message 3")));
252        assert_eq!(receiver.recv(), None);
253    }
254
255    #[test]
256    fn test_recv_empty_queue() {
257        // Create a bus leg
258        let (_, receiver) = create_bus_leg::<u32, String, 10>();
259
260        // Receive a message from an empty queue
261        let received = receiver.recv();
262        assert_eq!(received, None);
263    }
264}