vex_rt/rtos/
queue.rs

1use alloc::sync::Arc;
2use owner_monad::OwnerMut;
3use queue_model::QueueModel;
4
5use super::{handle_event, Event, EventHandle, GenericSleep, Instant, Mutex, Selectable};
6use crate::error::Error;
7
8/// Represents the sending end of a message-passing queue.
9pub struct SendQueue<T>(Arc<dyn QueueShared<T> + Send + Sync>);
10
11impl<T> SendQueue<T> {
12    #[inline]
13    /// Attempts to send an item on a queue.
14    pub fn send(&self, item: T) -> bool {
15        self.0.send(item)
16    }
17}
18
19impl<T> Clone for SendQueue<T> {
20    fn clone(&self) -> Self {
21        Self(self.0.clone())
22    }
23}
24
25/// Represents the receive end of a message-passing queue.
26pub struct ReceiveQueue<T>(Arc<dyn QueueShared<T> + Send + Sync>);
27
28impl<T> ReceiveQueue<T> {
29    /// A [`Selectable`] event which resolves when a value is received on the
30    /// message-passing queue.
31    pub fn select(&self) -> impl '_ + Selectable<T> {
32        struct ReceiveSelect<'b, T> {
33            data: &'b dyn QueueShared<T>,
34            _handle: EventHandle<ReceiveWrapper<'b, T>>,
35        }
36
37        impl<'b, T> Selectable<T> for ReceiveSelect<'b, T> {
38            fn poll(self) -> Result<T, Self> {
39                self.data.receive().ok_or(self)
40            }
41
42            fn sleep(&self) -> GenericSleep {
43                if self.data.is_empty() {
44                    GenericSleep::NotifyTake(None)
45                } else {
46                    GenericSleep::Timestamp(Instant::from_millis(0))
47                }
48            }
49        }
50
51        ReceiveSelect {
52            data: &*self.0,
53            _handle: handle_event(ReceiveWrapper(&*self.0)),
54        }
55    }
56}
57
58impl<T> Clone for ReceiveQueue<T> {
59    fn clone(&self) -> Self {
60        Self(self.0.clone())
61    }
62}
63
64/// The send/receive pair type returned by [`queue()`] and [`try_queue()`] for a
65/// given queue type.
66pub type QueuePair<Q> = (
67    SendQueue<<Q as QueueModel>::Item>,
68    ReceiveQueue<<Q as QueueModel>::Item>,
69);
70
71/// Creates a new send-receive pair together representing a message-passing
72/// queue, based on the given underlying queue structure. Panics on failure; see
73/// [`try_queue`].
74pub fn queue<Q: 'static + QueueModel + Send + Sync>(queue: Q) -> QueuePair<Q> {
75    try_queue(queue).unwrap_or_else(|err| panic!("failed to create channel: {}", err))
76}
77
78/// Creates a new send-receive pair together representing a message-passing
79/// queue, based on the given underlying queue structure.
80pub fn try_queue<Q: 'static + QueueModel + Send + Sync>(queue: Q) -> Result<QueuePair<Q>, Error> {
81    struct Queue<Q: QueueModel>(Mutex<QueueData<Q>>);
82
83    impl<Q: QueueModel> QueueShared<Q::Item> for Queue<Q> {
84        fn send(&self, item: Q::Item) -> bool {
85            let mut lock = self.0.lock();
86
87            if lock.queue.enqueue(item) {
88                lock.event.notify();
89                true
90            } else {
91                false
92            }
93        }
94
95        fn receive(&self) -> Option<Q::Item> {
96            self.0.lock().queue.dequeue()
97        }
98
99        fn is_empty(&self) -> bool {
100            self.0.lock().queue.is_empty()
101        }
102
103        fn with_event<'a>(&'a self, f: &'a mut dyn FnMut(&mut Event)) {
104            f(&mut self.0.lock().event);
105        }
106    }
107
108    struct QueueData<Q: QueueModel> {
109        event: Event,
110        queue: Q,
111    }
112
113    let data = Arc::new(Queue(Mutex::try_new(QueueData {
114        event: Event::new(),
115        queue,
116    })?));
117    let send = SendQueue(data.clone());
118    let receive = ReceiveQueue(data);
119    Ok((send, receive))
120}
121
122trait QueueShared<T> {
123    fn send(&self, item: T) -> bool;
124    fn receive(&self) -> Option<T>;
125    fn is_empty(&self) -> bool;
126    fn with_event<'a>(&'a self, f: &'a mut dyn FnMut(&mut Event));
127}
128
129struct ReceiveWrapper<'b, T>(&'b dyn QueueShared<T>);
130
131impl<'b, T> OwnerMut<Event> for ReceiveWrapper<'b, T> {
132    fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
133    where
134        Event: 'a,
135    {
136        let mut f = Some(f);
137        let mut out: Option<U> = None;
138        self.0.with_event(&mut |e| out = Some(f.take().unwrap()(e)));
139        out
140    }
141}