embassy_sync/
zerocopy_channel.rs1use core::cell::RefCell;
18use core::future::{Future, poll_fn};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::Mutex;
23use crate::blocking_mutex::raw::RawMutex;
24use crate::waitqueue::WakerRegistration;
25
26#[derive(Debug)]
38pub struct Channel<'a, M: RawMutex, T> {
39 buf: BufferPtr<T>,
40 phantom: PhantomData<&'a mut T>,
41 state: Mutex<M, RefCell<State>>,
42}
43
44impl<'a, M: RawMutex, T> Channel<'a, M, T> {
45 pub fn new(buf: &'a mut [T]) -> Self {
50 let len = buf.len();
51 assert!(len != 0);
52
53 Self {
54 buf: BufferPtr(buf.as_mut_ptr()),
55 phantom: PhantomData,
56 state: Mutex::new(RefCell::new(State {
57 capacity: len,
58 front: 0,
59 back: 0,
60 full: false,
61 send_waker: WakerRegistration::new(),
62 receive_waker: WakerRegistration::new(),
63 })),
64 }
65 }
66
67 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
72 (Sender { channel: self }, Receiver { channel: self })
73 }
74
75 pub fn clear(&mut self) {
77 self.state.lock(|s| {
78 s.borrow_mut().clear();
79 });
80 }
81
82 pub fn len(&self) -> usize {
84 self.state.lock(|s| s.borrow().len())
85 }
86
87 pub fn is_empty(&self) -> bool {
89 self.state.lock(|s| s.borrow().is_empty())
90 }
91
92 pub fn is_full(&self) -> bool {
94 self.state.lock(|s| s.borrow().is_full())
95 }
96}
97
98#[repr(transparent)]
99#[derive(Debug)]
100struct BufferPtr<T>(*mut T);
101
102impl<T> BufferPtr<T> {
103 unsafe fn add(&self, count: usize) -> *mut T {
104 self.0.add(count)
105 }
106}
107
108unsafe impl<T> Send for BufferPtr<T> {}
109unsafe impl<T> Sync for BufferPtr<T> {}
110
111#[derive(Debug)]
113pub struct Sender<'a, M: RawMutex, T> {
114 channel: &'a Channel<'a, M, T>,
115}
116
117impl<'a, M: RawMutex, T> Sender<'a, M, T> {
118 pub fn borrow(&mut self) -> Sender<'_, M, T> {
120 Sender { channel: self.channel }
121 }
122
123 pub fn try_send(&mut self) -> Option<&mut T> {
125 self.channel.state.lock(|s| {
126 let s = &mut *s.borrow_mut();
127 match s.push_index() {
128 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
129 None => None,
130 }
131 })
132 }
133
134 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
136 self.channel.state.lock(|s| {
137 let s = &mut *s.borrow_mut();
138 match s.push_index() {
139 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
140 None => {
141 s.receive_waker.register(cx.waker());
142 Poll::Pending
143 }
144 }
145 })
146 }
147
148 pub fn send(&mut self) -> impl Future<Output = &mut T> {
150 poll_fn(|cx| {
151 self.channel.state.lock(|s| {
152 let s = &mut *s.borrow_mut();
153 match s.push_index() {
154 Some(i) => {
155 let r = unsafe { &mut *self.channel.buf.add(i) };
156 Poll::Ready(r)
157 }
158 None => {
159 s.receive_waker.register(cx.waker());
160 Poll::Pending
161 }
162 }
163 })
164 })
165 }
166
167 pub fn send_done(&mut self) {
169 self.channel.state.lock(|s| s.borrow_mut().push_done())
170 }
171
172 pub fn clear(&mut self) {
174 self.channel.state.lock(|s| {
175 s.borrow_mut().clear();
176 });
177 }
178
179 pub fn len(&self) -> usize {
181 self.channel.state.lock(|s| s.borrow().len())
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.channel.state.lock(|s| s.borrow().is_empty())
187 }
188
189 pub fn is_full(&self) -> bool {
191 self.channel.state.lock(|s| s.borrow().is_full())
192 }
193}
194
195#[derive(Debug)]
197pub struct Receiver<'a, M: RawMutex, T> {
198 channel: &'a Channel<'a, M, T>,
199}
200
201impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
202 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
204 Receiver { channel: self.channel }
205 }
206
207 pub fn try_receive(&mut self) -> Option<&mut T> {
209 self.channel.state.lock(|s| {
210 let s = &mut *s.borrow_mut();
211 match s.pop_index() {
212 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
213 None => None,
214 }
215 })
216 }
217
218 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
220 self.channel.state.lock(|s| {
221 let s = &mut *s.borrow_mut();
222 match s.pop_index() {
223 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
224 None => {
225 s.send_waker.register(cx.waker());
226 Poll::Pending
227 }
228 }
229 })
230 }
231
232 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
234 poll_fn(|cx| {
235 self.channel.state.lock(|s| {
236 let s = &mut *s.borrow_mut();
237 match s.pop_index() {
238 Some(i) => {
239 let r = unsafe { &mut *self.channel.buf.add(i) };
240 Poll::Ready(r)
241 }
242 None => {
243 s.send_waker.register(cx.waker());
244 Poll::Pending
245 }
246 }
247 })
248 })
249 }
250
251 pub fn receive_done(&mut self) {
253 self.channel.state.lock(|s| s.borrow_mut().pop_done())
254 }
255
256 pub fn clear(&mut self) {
258 self.channel.state.lock(|s| {
259 s.borrow_mut().clear();
260 });
261 }
262
263 pub fn len(&self) -> usize {
265 self.channel.state.lock(|s| s.borrow().len())
266 }
267
268 pub fn is_empty(&self) -> bool {
270 self.channel.state.lock(|s| s.borrow().is_empty())
271 }
272
273 pub fn is_full(&self) -> bool {
275 self.channel.state.lock(|s| s.borrow().is_full())
276 }
277}
278
279#[derive(Debug)]
280struct State {
281 capacity: usize,
283
284 front: usize,
286 back: usize,
288
289 full: bool,
292
293 send_waker: WakerRegistration,
294 receive_waker: WakerRegistration,
295}
296
297impl State {
298 fn increment(&self, i: usize) -> usize {
299 if i + 1 == self.capacity { 0 } else { i + 1 }
300 }
301
302 fn clear(&mut self) {
303 if self.full {
304 self.receive_waker.wake();
305 }
306 self.front = 0;
307 self.back = 0;
308 self.full = false;
309 }
310
311 fn len(&self) -> usize {
312 if !self.full {
313 if self.back >= self.front {
314 self.back - self.front
315 } else {
316 self.capacity + self.back - self.front
317 }
318 } else {
319 self.capacity
320 }
321 }
322
323 fn is_full(&self) -> bool {
324 self.full
325 }
326
327 fn is_empty(&self) -> bool {
328 self.front == self.back && !self.full
329 }
330
331 fn push_index(&mut self) -> Option<usize> {
332 match self.is_full() {
333 true => None,
334 false => Some(self.back),
335 }
336 }
337
338 fn push_done(&mut self) {
339 assert!(!self.is_full());
340 self.back = self.increment(self.back);
341 if self.back == self.front {
342 self.full = true;
343 }
344 self.send_waker.wake();
345 }
346
347 fn pop_index(&mut self) -> Option<usize> {
348 match self.is_empty() {
349 true => None,
350 false => Some(self.front),
351 }
352 }
353
354 fn pop_done(&mut self) {
355 assert!(!self.is_empty());
356 self.front = self.increment(self.front);
357 self.full = false;
358 self.receive_waker.wake();
359 }
360}