1use alloc::sync::Arc;
2use owner_monad::OwnerMut;
3use queue_model::QueueModel;
4
5use super::{handle_event, Event, EventHandle, GenericSleep, Instant, Mutex, Selectable};
6use crate::error::Error;
7
8pub struct SendQueue<T>(Arc<dyn QueueShared<T> + Send + Sync>);
10
11impl<T> SendQueue<T> {
12 #[inline]
13 pub fn send(&self, item: T) -> bool {
15 self.0.send(item)
16 }
17}
18
19impl<T> Clone for SendQueue<T> {
20 fn clone(&self) -> Self {
21 Self(self.0.clone())
22 }
23}
24
25pub struct ReceiveQueue<T>(Arc<dyn QueueShared<T> + Send + Sync>);
27
28impl<T> ReceiveQueue<T> {
29 pub fn select(&self) -> impl '_ + Selectable<T> {
32 struct ReceiveSelect<'b, T> {
33 data: &'b dyn QueueShared<T>,
34 _handle: EventHandle<ReceiveWrapper<'b, T>>,
35 }
36
37 impl<'b, T> Selectable<T> for ReceiveSelect<'b, T> {
38 fn poll(self) -> Result<T, Self> {
39 self.data.receive().ok_or(self)
40 }
41
42 fn sleep(&self) -> GenericSleep {
43 if self.data.is_empty() {
44 GenericSleep::NotifyTake(None)
45 } else {
46 GenericSleep::Timestamp(Instant::from_millis(0))
47 }
48 }
49 }
50
51 ReceiveSelect {
52 data: &*self.0,
53 _handle: handle_event(ReceiveWrapper(&*self.0)),
54 }
55 }
56}
57
58impl<T> Clone for ReceiveQueue<T> {
59 fn clone(&self) -> Self {
60 Self(self.0.clone())
61 }
62}
63
64pub type QueuePair<Q> = (
67 SendQueue<<Q as QueueModel>::Item>,
68 ReceiveQueue<<Q as QueueModel>::Item>,
69);
70
71pub fn queue<Q: 'static + QueueModel + Send + Sync>(queue: Q) -> QueuePair<Q> {
75 try_queue(queue).unwrap_or_else(|err| panic!("failed to create channel: {}", err))
76}
77
78pub fn try_queue<Q: 'static + QueueModel + Send + Sync>(queue: Q) -> Result<QueuePair<Q>, Error> {
81 struct Queue<Q: QueueModel>(Mutex<QueueData<Q>>);
82
83 impl<Q: QueueModel> QueueShared<Q::Item> for Queue<Q> {
84 fn send(&self, item: Q::Item) -> bool {
85 let mut lock = self.0.lock();
86
87 if lock.queue.enqueue(item) {
88 lock.event.notify();
89 true
90 } else {
91 false
92 }
93 }
94
95 fn receive(&self) -> Option<Q::Item> {
96 self.0.lock().queue.dequeue()
97 }
98
99 fn is_empty(&self) -> bool {
100 self.0.lock().queue.is_empty()
101 }
102
103 fn with_event<'a>(&'a self, f: &'a mut dyn FnMut(&mut Event)) {
104 f(&mut self.0.lock().event);
105 }
106 }
107
108 struct QueueData<Q: QueueModel> {
109 event: Event,
110 queue: Q,
111 }
112
113 let data = Arc::new(Queue(Mutex::try_new(QueueData {
114 event: Event::new(),
115 queue,
116 })?));
117 let send = SendQueue(data.clone());
118 let receive = ReceiveQueue(data);
119 Ok((send, receive))
120}
121
122trait QueueShared<T> {
123 fn send(&self, item: T) -> bool;
124 fn receive(&self) -> Option<T>;
125 fn is_empty(&self) -> bool;
126 fn with_event<'a>(&'a self, f: &'a mut dyn FnMut(&mut Event));
127}
128
129struct ReceiveWrapper<'b, T>(&'b dyn QueueShared<T>);
130
131impl<'b, T> OwnerMut<Event> for ReceiveWrapper<'b, T> {
132 fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
133 where
134 Event: 'a,
135 {
136 let mut f = Some(f);
137 let mut out: Option<U> = None;
138 self.0.with_event(&mut |e| out = Some(f.take().unwrap()(e)));
139 out
140 }
141}