vex_rt/rtos/
channel.rs

1use alloc::sync::Arc;
2use core::time::Duration;
3use owner_monad::OwnerMut;
4
5use super::{
6    handle_event, Event, EventHandle, GenericSleep, Instant, Mutex, Selectable, Semaphore,
7    TIMEOUT_MAX,
8};
9use crate::error::Error;
10
11/// Represents the sending end of a rendez-vous channel.
12pub struct SendChannel<T>(Arc<ChannelShared<T>>);
13
14impl<T> SendChannel<T> {
15    /// A [`Selectable`] event which resolves when `value` is sent on the
16    /// channel. Respects the atomicity and rendez-vous properties of the
17    /// operation; if the event occurs and is processed, then the value was
18    /// sent, and otherwise not.
19    pub fn select(&self, value: T) -> impl '_ + Selectable {
20        struct SendSelect<'b, T> {
21            value: T,
22            data: &'b ChannelShared<T>,
23            handle: EventHandle<SendWrapper<'b, T>>,
24        }
25
26        impl<'b, T> Selectable for SendSelect<'b, T> {
27            fn poll(self) -> Result<(), Self> {
28                // Send mutex is locked for the duration of the poll operation.
29                let _send_lock = self.data.send_mutex.lock();
30
31                assert_eq!(self.data.ack_sem.count(), 0);
32
33                let n = {
34                    let mut lock = self.data.data.lock();
35                    lock.value = Some(self.value);
36                    lock.seq = !lock.seq;
37                    lock.receive_event.notify();
38                    lock.receive_event.task_count()
39                };
40
41                // Wait for all receivers to process.
42                for _ in 0..n {
43                    // TODO: consider shortening this timeout to enforce a realtime guarantee.
44                    self.data
45                        .ack_sem
46                        .wait(Duration::from_millis(TIMEOUT_MAX as u64))
47                        .unwrap_or_else(|err| panic!("failed to synchronize on channel: {}", err));
48                }
49
50                // Check if the value remains.
51                if let Some(value) = self.data.data.lock().value.take() {
52                    Err(Self {
53                        value,
54                        data: self.data,
55                        handle: self.handle,
56                    })
57                } else {
58                    Ok(())
59                }
60            }
61
62            fn sleep(&self) -> GenericSleep {
63                if self.data.data.lock().receive_event.task_count() == 0 {
64                    GenericSleep::NotifyTake(None)
65                } else {
66                    GenericSleep::Timestamp(Instant::from_millis(0))
67                }
68            }
69        }
70
71        SendSelect {
72            value,
73            data: &self.0,
74            handle: handle_event(SendWrapper(&*self.0)),
75        }
76    }
77}
78
79impl<T> Clone for SendChannel<T> {
80    fn clone(&self) -> Self {
81        Self(self.0.clone())
82    }
83}
84
85/// Represents the receive end of a rendez-vous channel.
86pub struct ReceiveChannel<T>(Arc<ChannelShared<T>>);
87
88impl<T> ReceiveChannel<T> {
89    /// A [`Selectable`] event which resolves when a value is received on the
90    /// channel.
91    pub fn select(&self) -> impl '_ + Selectable<T> {
92        struct ReceiveSelect<'b, T> {
93            data: &'b ChannelShared<T>,
94            handle: EventHandle<ReceiveWrapper<'b, T>>,
95            seq: bool,
96        }
97
98        impl<'b, T> Selectable<T> for ReceiveSelect<'b, T> {
99            fn poll(mut self) -> core::result::Result<T, Self> {
100                let mut lock = self.data.data.lock();
101
102                if self.seq != lock.seq {
103                    // Ignore failure to post; we don't care.
104                    self.data.ack_sem.post().unwrap_or(());
105                    self.seq = lock.seq;
106                }
107
108                if let Some(value) = lock.value.take() {
109                    self.handle.clear();
110                    Ok(value)
111                } else {
112                    lock.send_event.notify();
113                    Err(self)
114                }
115            }
116
117            fn sleep(&self) -> GenericSleep {
118                if self.data.data.lock().send_event.task_count() == 0 {
119                    GenericSleep::NotifyTake(None)
120                } else {
121                    GenericSleep::Timestamp(Instant::from_millis(0))
122                }
123            }
124        }
125
126        impl<'b, T> Drop for ReceiveSelect<'b, T> {
127            fn drop(&mut self) {
128                // Keep mutex locked while dropping to avoid race condition.
129                let lock = self.data.data.lock();
130
131                if self.seq != lock.seq && !self.handle.is_done() {
132                    // Ignore failure to post; we don't care.
133                    self.data.ack_sem.post().unwrap_or(());
134                }
135
136                self.handle.clear();
137            }
138        }
139
140        let lock = self.0.data.lock();
141
142        ReceiveSelect {
143            data: &self.0,
144            handle: handle_event(ReceiveWrapper(&*self.0)),
145            seq: lock.seq,
146        }
147    }
148}
149
150impl<T> Clone for ReceiveChannel<T> {
151    fn clone(&self) -> Self {
152        Self(self.0.clone())
153    }
154}
155
156/// Creates a new send-receive pair together representing a rendez-vous channel.
157/// Panics on failure; see [`try_channel`].
158pub fn channel<T>() -> (SendChannel<T>, ReceiveChannel<T>) {
159    try_channel().unwrap_or_else(|err| panic!("failed to create channel: {}", err))
160}
161
162/// Creates a new send-receive pair together representing a rendez-vous channel.
163pub fn try_channel<T>() -> Result<(SendChannel<T>, ReceiveChannel<T>), Error> {
164    let data = Arc::new(ChannelShared {
165        data: Mutex::try_new(ChannelData {
166            send_event: Event::new(),
167            receive_event: Event::new(),
168            value: None,
169            seq: false,
170        })?,
171        send_mutex: Mutex::try_new(())?,
172        ack_sem: Semaphore::try_new(u32::MAX, 0)?,
173    });
174    let send = SendChannel(data.clone());
175    let receive = ReceiveChannel(data);
176    Ok((send, receive))
177}
178
179struct ChannelShared<T> {
180    data: Mutex<ChannelData<T>>,
181    send_mutex: Mutex<()>,
182    ack_sem: Semaphore,
183}
184
185struct ChannelData<T> {
186    send_event: Event,
187    receive_event: Event,
188    value: Option<T>,
189    seq: bool,
190}
191
192struct SendWrapper<'b, T>(&'b ChannelShared<T>);
193
194impl<'b, T> OwnerMut<Event> for SendWrapper<'b, T> {
195    fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
196    where
197        Event: 'a,
198    {
199        Some(f(&mut self.0.data.try_lock().ok()?.send_event))
200    }
201}
202
203struct ReceiveWrapper<'b, T>(&'b ChannelShared<T>);
204
205impl<'b, T> OwnerMut<Event> for ReceiveWrapper<'b, T> {
206    fn with<'a, U>(&'a mut self, f: impl FnOnce(&mut Event) -> U) -> Option<U>
207    where
208        Event: 'a,
209    {
210        Some(f(&mut self.0.data.try_lock().ok()?.receive_event))
211    }
212}