embassy_sync/
zerocopy_channel.rs1use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26pub struct Channel<'a, M: RawMutex, T> {
38 buf: BufferPtr<T>,
39 phantom: PhantomData<&'a mut T>,
40 state: Mutex<M, RefCell<State>>,
41}
42
43impl<'a, M: RawMutex, T> Channel<'a, M, T> {
44 pub fn new(buf: &'a mut [T]) -> Self {
49 let len = buf.len();
50 assert!(len != 0);
51
52 Self {
53 buf: BufferPtr(buf.as_mut_ptr()),
54 phantom: PhantomData,
55 state: Mutex::new(RefCell::new(State {
56 capacity: len,
57 front: 0,
58 back: 0,
59 full: false,
60 send_waker: WakerRegistration::new(),
61 receive_waker: WakerRegistration::new(),
62 })),
63 }
64 }
65
66 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
71 (Sender { channel: self }, Receiver { channel: self })
72 }
73
74 pub fn clear(&mut self) {
76 self.state.lock(|s| {
77 s.borrow_mut().clear();
78 });
79 }
80
81 pub fn len(&self) -> usize {
83 self.state.lock(|s| s.borrow().len())
84 }
85
86 pub fn is_empty(&self) -> bool {
88 self.state.lock(|s| s.borrow().is_empty())
89 }
90
91 pub fn is_full(&self) -> bool {
93 self.state.lock(|s| s.borrow().is_full())
94 }
95}
96
97#[repr(transparent)]
98struct BufferPtr<T>(*mut T);
99
100impl<T> BufferPtr<T> {
101 unsafe fn add(&self, count: usize) -> *mut T {
102 self.0.add(count)
103 }
104}
105
106unsafe impl<T> Send for BufferPtr<T> {}
107unsafe impl<T> Sync for BufferPtr<T> {}
108
109pub struct Sender<'a, M: RawMutex, T> {
111 channel: &'a Channel<'a, M, T>,
112}
113
114impl<'a, M: RawMutex, T> Sender<'a, M, T> {
115 pub fn borrow(&mut self) -> Sender<'_, M, T> {
117 Sender { channel: self.channel }
118 }
119
120 pub fn try_send(&mut self) -> Option<&mut T> {
122 self.channel.state.lock(|s| {
123 let s = &mut *s.borrow_mut();
124 match s.push_index() {
125 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
126 None => None,
127 }
128 })
129 }
130
131 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
133 self.channel.state.lock(|s| {
134 let s = &mut *s.borrow_mut();
135 match s.push_index() {
136 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
137 None => {
138 s.receive_waker.register(cx.waker());
139 Poll::Pending
140 }
141 }
142 })
143 }
144
145 pub fn send(&mut self) -> impl Future<Output = &mut T> {
147 poll_fn(|cx| {
148 self.channel.state.lock(|s| {
149 let s = &mut *s.borrow_mut();
150 match s.push_index() {
151 Some(i) => {
152 let r = unsafe { &mut *self.channel.buf.add(i) };
153 Poll::Ready(r)
154 }
155 None => {
156 s.receive_waker.register(cx.waker());
157 Poll::Pending
158 }
159 }
160 })
161 })
162 }
163
164 pub fn send_done(&mut self) {
166 self.channel.state.lock(|s| s.borrow_mut().push_done())
167 }
168
169 pub fn clear(&mut self) {
171 self.channel.state.lock(|s| {
172 s.borrow_mut().clear();
173 });
174 }
175
176 pub fn len(&self) -> usize {
178 self.channel.state.lock(|s| s.borrow().len())
179 }
180
181 pub fn is_empty(&self) -> bool {
183 self.channel.state.lock(|s| s.borrow().is_empty())
184 }
185
186 pub fn is_full(&self) -> bool {
188 self.channel.state.lock(|s| s.borrow().is_full())
189 }
190}
191
192pub struct Receiver<'a, M: RawMutex, T> {
194 channel: &'a Channel<'a, M, T>,
195}
196
197impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
198 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
200 Receiver { channel: self.channel }
201 }
202
203 pub fn try_receive(&mut self) -> Option<&mut T> {
205 self.channel.state.lock(|s| {
206 let s = &mut *s.borrow_mut();
207 match s.pop_index() {
208 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
209 None => None,
210 }
211 })
212 }
213
214 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
216 self.channel.state.lock(|s| {
217 let s = &mut *s.borrow_mut();
218 match s.pop_index() {
219 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
220 None => {
221 s.send_waker.register(cx.waker());
222 Poll::Pending
223 }
224 }
225 })
226 }
227
228 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
230 poll_fn(|cx| {
231 self.channel.state.lock(|s| {
232 let s = &mut *s.borrow_mut();
233 match s.pop_index() {
234 Some(i) => {
235 let r = unsafe { &mut *self.channel.buf.add(i) };
236 Poll::Ready(r)
237 }
238 None => {
239 s.send_waker.register(cx.waker());
240 Poll::Pending
241 }
242 }
243 })
244 })
245 }
246
247 pub fn receive_done(&mut self) {
249 self.channel.state.lock(|s| s.borrow_mut().pop_done())
250 }
251
252 pub fn clear(&mut self) {
254 self.channel.state.lock(|s| {
255 s.borrow_mut().clear();
256 });
257 }
258
259 pub fn len(&self) -> usize {
261 self.channel.state.lock(|s| s.borrow().len())
262 }
263
264 pub fn is_empty(&self) -> bool {
266 self.channel.state.lock(|s| s.borrow().is_empty())
267 }
268
269 pub fn is_full(&self) -> bool {
271 self.channel.state.lock(|s| s.borrow().is_full())
272 }
273}
274
275struct State {
276 capacity: usize,
278
279 front: usize,
281 back: usize,
283
284 full: bool,
287
288 send_waker: WakerRegistration,
289 receive_waker: WakerRegistration,
290}
291
292impl State {
293 fn increment(&self, i: usize) -> usize {
294 if i + 1 == self.capacity {
295 0
296 } else {
297 i + 1
298 }
299 }
300
301 fn clear(&mut self) {
302 if self.full {
303 self.receive_waker.wake();
304 }
305 self.front = 0;
306 self.back = 0;
307 self.full = false;
308 }
309
310 fn len(&self) -> usize {
311 if !self.full {
312 if self.back >= self.front {
313 self.back - self.front
314 } else {
315 self.capacity + self.back - self.front
316 }
317 } else {
318 self.capacity
319 }
320 }
321
322 fn is_full(&self) -> bool {
323 self.full
324 }
325
326 fn is_empty(&self) -> bool {
327 self.front == self.back && !self.full
328 }
329
330 fn push_index(&mut self) -> Option<usize> {
331 match self.is_full() {
332 true => None,
333 false => Some(self.back),
334 }
335 }
336
337 fn push_done(&mut self) {
338 assert!(!self.is_full());
339 self.back = self.increment(self.back);
340 if self.back == self.front {
341 self.full = true;
342 }
343 self.send_waker.wake();
344 }
345
346 fn pop_index(&mut self) -> Option<usize> {
347 match self.is_empty() {
348 true => None,
349 false => Some(self.front),
350 }
351 }
352
353 fn pop_done(&mut self) {
354 assert!(!self.is_empty());
355 self.front = self.increment(self.front);
356 self.full = false;
357 self.receive_waker.wake();
358 }
359}