1use alloc::sync::Arc;
2use core::time::Duration;
3use owner_monad::OwnerMut;
4
5use super::{
6 handle_event, Event, EventHandle, GenericSleep, Instant, Mutex, Selectable, Semaphore,
7 TIMEOUT_MAX,
8};
9use crate::error::Error;
10
11pub struct SendChannel<T>(Arc<ChannelShared<T>>);
13
14impl<T> SendChannel<T> {
15 pub fn select(&self, value: T) -> impl '_ + Selectable {
20 struct SendSelect<'b, T> {
21 value: T,
22 data: &'b ChannelShared<T>,
23 handle: EventHandle<SendWrapper<'b, T>>,
24 }
25
26 impl<'b, T> Selectable for SendSelect<'b, T> {
27 fn poll(self) -> Result<(), Self> {
28 let _send_lock = self.data.send_mutex.lock();
30
31 assert_eq!(self.data.ack_sem.count(), 0);
32
33 let n = {
34 let mut lock = self.data.data.lock();
35 lock.value = Some(self.value);
36 lock.seq = !lock.seq;
37 lock.receive_event.notify();
38 lock.receive_event.task_count()
39 };
40
41 for _ in 0..n {
43 self.data
45 .ack_sem
46 .wait(Duration::from_millis(TIMEOUT_MAX as u64))
47 .unwrap_or_else(|err| panic!("failed to synchronize on channel: {}", err));
48 }
49
50 if let Some(value) = self.data.data.lock().value.take() {
52 Err(Self {
53 value,
54 data: self.data,
55 handle: self.handle,
56 })
57 } else {
58 Ok(())
59 }
60 }
61
62 fn sleep(&self) -> GenericSleep {
63 if self.data.data.lock().receive_event.task_count() == 0 {
64 GenericSleep::NotifyTake(None)
65 } else {
66 GenericSleep::Timestamp(Instant::from_millis(0))
67 }
68 }
69 }
70
71 SendSelect {
72 value,
73 data: &self.0,
74 handle: handle_event(SendWrapper(&*self.0)),
75 }
76 }
77}
78
79impl<T> Clone for SendChannel<T> {
80 fn clone(&self) -> Self {
81 Self(self.0.clone())
82 }
83}
84
85pub struct ReceiveChannel<T>(Arc<ChannelShared<T>>);
87
88impl<T> ReceiveChannel<T> {
89 pub fn select(&self) -> impl '_ + Selectable<T> {
92 struct ReceiveSelect<'b, T> {
93 data: &'b ChannelShared<T>,
94 handle: EventHandle<ReceiveWrapper<'b, T>>,
95 seq: bool,
96 }
97
98 impl<'b, T> Selectable<T> for ReceiveSelect<'b, T> {
99 fn poll(mut self) -> core::result::Result<T, Self> {
100 let mut lock = self.data.data.lock();
101
102 if self.seq != lock.seq {
103 self.data.ack_sem.post().unwrap_or(());
105 self.seq = lock.seq;
106 }
107
108 if let Some(value) = lock.value.take() {
109 self.handle.clear();
110 Ok(value)
111 } else {
112 lock.send_event.notify();
113 Err(self)
114 }
115 }
116
117 fn sleep(&self) -> GenericSleep {
118 if self.data.data.lock().send_event.task_count() == 0 {
119 GenericSleep::NotifyTake(None)
120 } else {
121 GenericSleep::Timestamp(Instant::from_millis(0))
122 }
123 }
124 }
125
126 impl<'b, T> Drop for ReceiveSelect<'b, T> {
127 fn drop(&mut self) {
128 let lock = self.data.data.lock();
130
131 if self.seq != lock.seq && !self.handle.is_done() {
132 self.data.ack_sem.post().unwrap_or(());
134 }
135
136 self.handle.clear();
137 }
138 }
139
140 let lock = self.0.data.lock();
141
142 ReceiveSelect {
143 data: &self.0,
144 handle: handle_event(ReceiveWrapper(&*self.0)),
145 seq: lock.seq,
146 }
147 }
148}
149
150impl<T> Clone for ReceiveChannel<T> {
151 fn clone(&self) -> Self {
152 Self(self.0.clone())
153 }
154}
155
156pub fn channel<T>() -> (SendChannel<T>, ReceiveChannel<T>) {
159 try_channel().unwrap_or_else(|err| panic!("failed to create channel: {}", err))
160}
161
162pub fn try_channel<T>() -> Result<(SendChannel<T>, ReceiveChannel<T>), Error> {
164 let data = Arc::new(ChannelShared {
165 data: Mutex::try_new(ChannelData {
166 send_event: Event::new(),
167 receive_event: Event::new(),
168 value: None,
169 seq: false,
170 })?,
171 send_mutex: Mutex::try_new(())?,
172 ack_sem: Semaphore::try_new(u32::MAX, 0)?,
173 });
174 let send = SendChannel(data.clone());
175 let receive = ReceiveChannel(data);
176 Ok((send, receive))
177}
178
179struct ChannelShared<T> {
180 data: Mutex<ChannelData<T>>,
181 send_mutex: Mutex<()>,
182 ack_sem: Semaphore,
183}
184
185struct ChannelData<T> {
186 send_event: Event,
187 receive_event: Event,
188 value: Option<T>,
189 seq: bool,
190}
191
192struct SendWrapper<'b, T>(&'b ChannelShared<T>);
193
194impl<'b, T> OwnerMut<Event> for SendWrapper<'b, T> {
195 fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
196 where
197 Event: 'a,
198 {
199 Some(f(&mut self.0.data.try_lock().ok()?.send_event))
200 }
201}
202
203struct ReceiveWrapper<'b, T>(&'b ChannelShared<T>);
204
205impl<'b, T> OwnerMut<Event> for ReceiveWrapper<'b, T> {
206 fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
207 where
208 Event: 'a,
209 {
210 Some(f(&mut self.0.data.try_lock().ok()?.receive_event))
211 }
212}