1use core::{
2 fmt::{self, Debug, Formatter},
3 future::Future,
4 marker::PhantomData,
5 mem::ManuallyDrop,
6 pin::Pin,
7 sync::atomic::{fence, spin_loop_hint, AtomicU8, Ordering},
8 task::{Context, Poll, Waker},
9};
10use std::time::Instant;
11
12use futures_core::{FusedFuture, FusedStream, Stream};
13use suspend_core::{listen::block_on_poll, Expiry};
14
15use super::error::{Incomplete, TimedOut};
16use super::util::{BoxPtr, Maybe, MaybeCopy};
17
18const STATE_DONE: u8 = 0b0000;
19const STATE_INIT: u8 = 0b0001;
20const STATE_LOCKED: u8 = 0b0010;
21const STATE_LOADED: u8 = 0b0100;
22const STATE_WAKE: u8 = 0b1000;
23const WAKE_RECV: u8 = STATE_WAKE;
24const WAKE_SEND: u8 = STATE_WAKE | STATE_LOADED;
25
26pub fn send_once<T>() -> (SendOnce<T>, ReceiveOnce<T>) {
28 let channel = BoxPtr::new(Box::new(Channel::new()));
29 (
30 SendOnce { channel },
31 ReceiveOnce {
32 channel: channel.into(),
33 },
34 )
35}
36
37pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
40 let channel = BoxPtr::new(Box::new(Channel::new()));
41 (
42 Sender {
43 channel: Some(channel),
44 },
45 Receiver {
46 channel: Some(channel).into(),
47 },
48 )
49}
50
51pub(crate) struct Channel<T> {
52 state: AtomicU8,
53 value: Maybe<T>,
54 waker: Maybe<Waker>,
55}
56
57impl<T> Channel<T> {
58 pub const fn new() -> Self {
59 Self {
60 state: AtomicU8::new(STATE_INIT),
61 value: Maybe::empty(),
62 waker: Maybe::empty(),
63 }
64 }
65
66 #[inline]
67 fn is_done(&self) -> bool {
68 self.state.load(Ordering::Relaxed) == STATE_DONE
69 }
70
71 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<(Option<T>, bool)> {
74 match self.wait_for_lock() {
75 STATE_DONE => {
76 self.state.store(STATE_DONE, Ordering::Relaxed);
78 return Poll::Ready((None, true));
79 }
80 STATE_LOADED => {
81 let value = unsafe { self.value.load() };
83 self.state.store(STATE_DONE, Ordering::Relaxed);
84 return Poll::Ready((Some(value), true));
85 }
86 WAKE_SEND => {
87 let value = Some(unsafe { self.value.load() });
89 let send_waker = unsafe { self.waker.load() };
90 if self.state.swap(STATE_INIT, Ordering::Release) == STATE_DONE {
91 drop(send_waker);
93 return Poll::Ready((value, true));
94 }
95 send_waker.wake();
96 return Poll::Ready((value, false));
97 }
98 WAKE_RECV => {
99 unsafe { self.waker.clear() };
101 }
102 STATE_INIT => {
103 }
105 _ => panic!("Invalid state"),
106 }
107
108 unsafe { self.waker.store(cx.waker().clone()) };
109 if self.state.swap(WAKE_RECV, Ordering::Release) == STATE_DONE {
110 unsafe { self.waker.clear() };
112 return Poll::Ready((None, true));
113 }
114 Poll::Pending
115 }
116
117 pub fn try_recv(&mut self) -> Poll<(Option<T>, bool)> {
120 let mut locked = false;
121 let mut state = self.state.load(Ordering::Relaxed);
122 loop {
123 match state {
124 STATE_INIT | WAKE_RECV => {
125 return Poll::Pending;
126 }
127 STATE_DONE => {
128 if locked {
130 self.state.store(STATE_DONE, Ordering::Relaxed);
131 }
132 return Poll::Ready((None, true));
133 }
134 STATE_LOADED => {
135 let value = unsafe { self.value.load() };
137 self.state.store(STATE_DONE, Ordering::Relaxed);
138 return Poll::Ready((Some(value), true));
139 }
140 WAKE_SEND => {
141 if !locked {
143 state = self.wait_for_lock();
145 locked = true;
146 continue;
147 }
148 let value = Some(unsafe { self.value.load() });
149 let send_waker = unsafe { self.waker.load() };
150 if self.state.swap(STATE_INIT, Ordering::Release) == STATE_DONE {
151 drop(send_waker);
153 return Poll::Ready((value, true));
154 }
155 send_waker.wake();
156 return Poll::Ready((value, false));
157 }
158 _ => panic!("Invalid state"),
159 }
160 }
161 }
162
163 pub fn cancel_recv(&mut self) -> (Option<T>, bool) {
165 match self.state.fetch_and(STATE_LOADED, Ordering::Release) {
166 prev if prev & STATE_LOCKED != 0 => {
167 (None, false)
169 }
170 STATE_INIT => {
171 (None, false)
173 }
174 STATE_DONE => {
175 (None, true)
177 }
178 STATE_LOADED => {
179 (Some(unsafe { self.value.load() }), true)
181 }
182 WAKE_SEND => {
183 unsafe { self.waker.load() }.wake();
186 (None, false)
187 }
188 WAKE_RECV => {
189 unsafe { self.waker.clear() };
191 (None, false)
192 }
193 _ => panic!("Invalid state"),
194 }
195 }
196
197 pub fn cancel_recv_poll(&mut self) -> (Option<T>, bool) {
199 match self.wait_for_lock() {
200 prev if prev & STATE_LOCKED != 0 => {
201 (None, false)
203 }
204 STATE_DONE => {
205 (None, true)
207 }
208 STATE_LOADED => {
209 (Some(unsafe { self.value.load() }), true)
211 }
212 WAKE_SEND => {
213 let value = Some(unsafe { self.value.load() });
215 let send_waker = unsafe { self.waker.load() };
216 self.state.store(STATE_INIT, Ordering::Release);
217 send_waker.wake();
218 (value, false)
219 }
220 WAKE_RECV => {
221 unsafe { self.waker.clear() };
223 self.state.store(STATE_INIT, Ordering::Release);
224 (None, false)
225 }
226 _ => panic!("Invalid state"),
227 }
228 }
229
230 pub fn send(&mut self, value: T, cx: Option<&mut Context<'_>>) -> Result<(), (T, bool)> {
232 let recv_waker = match self.wait_for_lock() {
233 STATE_INIT => {
234 None
236 }
237 STATE_DONE => {
238 self.state.store(STATE_DONE, Ordering::Relaxed);
240 return Err((value, true));
241 }
242 WAKE_RECV => {
243 Some(unsafe { self.waker.load() })
245 }
246 _ => panic!("Invalid state"),
247 };
248
249 unsafe { self.value.store(value) };
250 let state = if let Some(cx) = cx {
251 unsafe { self.waker.store(cx.waker().clone()) };
252 WAKE_SEND
253 } else {
254 STATE_LOADED
255 };
256 if self.state.swap(state, Ordering::Release) == STATE_DONE {
257 drop(recv_waker);
259 if state == WAKE_SEND {
260 unsafe { self.waker.clear() };
261 }
262 return Err((unsafe { self.value.load() }, true));
263 }
264 recv_waker.map(Waker::wake);
265 Ok(())
266 }
267
268 fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<(Option<T>, bool)> {
269 match self.wait_for_lock() {
270 STATE_DONE => {
271 Poll::Ready((None, true))
273 }
274 prev @ STATE_INIT | prev @ WAKE_RECV => {
275 self.state.store(prev, Ordering::Release);
277 Poll::Ready((None, false))
278 }
279 STATE_LOADED => {
280 Poll::Ready((Some(unsafe { self.value.load() }), true))
282 }
283 WAKE_SEND => {
284 unsafe { self.waker.replace(cx.waker().clone()) };
286 self.state.store(WAKE_SEND, Ordering::Release);
287 Poll::Pending
288 }
289 _ => panic!("Invalid state"),
290 }
291 }
292
293 pub fn cancel_send(&mut self) -> bool {
295 match self.state.swap(STATE_DONE, Ordering::Release) {
296 prev if prev & STATE_LOCKED != 0 => {
297 false
299 }
300 STATE_INIT => {
301 false
303 }
304 STATE_DONE => {
305 true
307 }
308 WAKE_RECV => {
309 unsafe { self.waker.load() }.wake();
311 false
312 }
313 _ => panic!("Invalid state"),
314 }
315 }
316
317 fn cancel_send_poll(&mut self) -> bool {
318 match self.state.fetch_or(STATE_LOCKED, Ordering::AcqRel) {
319 prev if prev & STATE_LOCKED != 0 => {
320 false
322 }
323 STATE_INIT => {
324 self.state.store(STATE_DONE, Ordering::Release);
326 false
327 }
328 STATE_DONE => {
329 true
331 }
332 WAKE_SEND => {
333 unsafe { self.waker.clear() };
335 self.state.store(STATE_LOADED, Ordering::Release);
336 false
337 }
338 _ => panic!("Invalid state"),
339 }
340 }
341
342 pub fn wait_recv(&mut self) -> (Option<T>, bool) {
343 if let Poll::Ready(result) = self.wait_recv_poll(None) {
344 result
345 } else {
346 unreachable!()
347 }
348 }
349
350 pub fn wait_recv_timeout(&mut self, timeout: Option<Instant>) -> (Option<T>, bool) {
351 match self.wait_recv_poll(timeout) {
352 Poll::Ready(result) => result,
353 Poll::Pending => self.cancel_recv_poll(),
354 }
355 }
356
357 #[inline]
358 fn wait_for_lock(&mut self) -> u8 {
359 loop {
360 let prev = self.state.fetch_or(STATE_LOCKED, Ordering::Relaxed);
361 if prev & STATE_LOCKED == 0 {
362 fence(Ordering::Acquire);
363 return prev;
364 }
365 spin_loop_hint();
366 }
367 }
368
369 #[inline]
370 fn wait_recv_poll(&mut self, timeout: impl Expiry) -> Poll<(Option<T>, bool)> {
371 let mut first = true;
372 let timeout: Option<Instant> = timeout.into_expire();
373 block_on_poll(
374 |cx| {
375 if first {
376 first = false;
377 self.poll_recv(cx)
378 } else {
379 self.try_recv()
381 }
382 },
383 timeout,
384 )
385 }
386}
387
388impl<T> Debug for Channel<T> {
389 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
390 f.debug_struct("Channel")
391 .field("done", &self.is_done())
392 .finish()
393 }
394}
395
396#[derive(Debug)]
398pub struct TrackSend<'a, T> {
399 channel: MaybeCopy<Option<BoxPtr<Channel<T>>>>,
400 value: Maybe<Option<T>>,
401 drops: bool,
402 _marker: PhantomData<&'a mut T>,
403}
404
405impl<T> Drop for TrackSend<'_, T> {
406 fn drop(&mut self) {
407 unsafe {
408 let channel = self.channel.load();
409 channel.map(|mut c| c.cancel_send_poll());
410 self.value.clear();
411 }
412 }
413}
414
415impl<T> Future for TrackSend<'_, T> {
416 type Output = Result<(), T>;
417
418 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
419 if let Some(value) = unsafe { self.value.replace(None) } {
420 if let Some(mut channel) = unsafe { self.channel.load() } {
421 if let Err((value, dropped)) = channel.send(value, Some(cx)) {
422 if dropped && self.drops {
423 drop(channel.into_box());
424 }
425 unsafe { self.channel.store(None) };
426 Poll::Ready(Err(value))
427 } else {
428 Poll::Pending
429 }
430 } else {
431 Poll::Ready(Err(value))
432 }
433 } else if let Some(mut channel) = unsafe { self.channel.load() } {
434 channel.poll_send(cx).map(|(result, dropped)| {
435 if dropped && self.drops {
436 drop(channel.into_box());
437 }
438 unsafe { self.channel.store(None) };
439 result.map(Err).unwrap_or(Ok(()))
440 })
441 } else {
442 Poll::Ready(Ok(()))
443 }
444 }
445}
446
447impl<T> FusedFuture for TrackSend<'_, T> {
448 fn is_terminated(&self) -> bool {
449 unsafe { self.channel.as_ref() }.is_none()
450 }
451}
452
453#[derive(Debug)]
456pub struct SendOnce<T> {
457 channel: BoxPtr<Channel<T>>,
458}
459
460unsafe impl<T: Send> Send for SendOnce<T> {}
461unsafe impl<T: Send> Sync for SendOnce<T> {}
462
463impl<T> SendOnce<T> {
464 pub fn is_canceled(&self) -> bool {
466 self.channel.is_done()
467 }
468
469 pub fn send(self, value: T) -> Result<(), T> {
471 let mut channel = ManuallyDrop::new(self).channel;
472 channel.send(value, None).map_err(|(value, drop_channel)| {
473 if drop_channel {
474 drop(channel.into_box());
475 }
476 value
477 })
478 }
479
480 pub fn track_send(self, value: T) -> TrackSend<'static, T> {
483 let channel = ManuallyDrop::new(self).channel;
484 TrackSend {
485 channel: Some(channel).into(),
486 value: Some(value).into(),
487 drops: true,
488 _marker: PhantomData,
489 }
490 }
491}
492
493impl<T> Drop for SendOnce<T> {
494 fn drop(&mut self) {
495 if self.channel.cancel_send() {
496 drop(self.channel.into_box());
497 }
498 }
499}
500
501#[derive(Debug)]
504pub struct ReceiveOnce<T> {
505 channel: MaybeCopy<BoxPtr<Channel<T>>>,
506}
507
508unsafe impl<T: Send> Send for ReceiveOnce<T> {}
509unsafe impl<T: Send> Sync for ReceiveOnce<T> {}
510
511impl<T> ReceiveOnce<T> {
512 pub fn cancel(self) -> Option<T> {
514 let mut channel = unsafe { ManuallyDrop::new(self).channel.load() };
515 let (result, dropped) = channel.cancel_recv();
516 if dropped {
517 drop(channel.into_box());
518 }
519 result
520 }
521
522 pub fn try_recv(self) -> Result<Result<T, Incomplete>, Self> {
525 let mut channel = unsafe { self.channel.load() };
526 match channel.try_recv() {
527 Poll::Ready((result, dropped)) => {
528 let _ = ManuallyDrop::new(self);
529 if dropped {
530 drop(channel.into_box());
531 }
532 Ok(result.ok_or(Incomplete))
533 }
534 Poll::Pending => {
535 unsafe { self.channel.store(channel) };
536 Err(self)
537 }
538 }
539 }
540
541 pub fn wait(self) -> Result<T, Incomplete> {
543 let mut channel = unsafe { ManuallyDrop::new(self).channel.load() };
544 let (result, dropped) = channel.wait_recv();
545 if dropped {
546 drop(channel.into_box());
547 }
548 result.ok_or(Incomplete)
549 }
550
551 pub fn wait_timeout(self, timeout: impl Expiry) -> Result<Result<T, Incomplete>, Self> {
554 let mut channel = unsafe { self.channel.load() };
555 match channel.wait_recv_timeout(timeout.into_expire()) {
556 (Some(result), true) => {
557 let _ = ManuallyDrop::new(self);
558 drop(channel.into_box());
559 Ok(Ok(result))
560 }
561 (None, true) => {
562 let _ = ManuallyDrop::new(self);
563 drop(channel.into_box());
564 Ok(Err(Incomplete))
565 }
566 (Some(result), false) => {
567 let _ = ManuallyDrop::new(self);
568 Ok(Ok(result))
569 }
570 (None, false) => Err(self),
571 }
572 }
573}
574
575impl<T> Drop for ReceiveOnce<T> {
576 fn drop(&mut self) {
577 let mut channel = unsafe { self.channel.load() };
578 if let (_, true) = channel.cancel_recv() {
579 drop(channel.into_box());
580 }
581 }
582}
583
584impl<T> Future for ReceiveOnce<T> {
585 type Output = Result<T, Incomplete>;
586
587 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
588 let mut channel = unsafe { self.channel.load() };
589 channel.poll_recv(cx).map(|r| r.0.ok_or(Incomplete))
590 }
591}
592
593impl<T> FusedFuture for ReceiveOnce<T> {
594 fn is_terminated(&self) -> bool {
595 unsafe { self.channel.as_ref() }.is_done()
596 }
597}
598
599impl<T> Stream for ReceiveOnce<T> {
600 type Item = T;
601
602 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
603 let mut channel = unsafe { self.channel.load() };
604 channel.poll_recv(cx).map(|r| r.0)
605 }
606}
607
608impl<T> FusedStream for ReceiveOnce<T> {
609 fn is_terminated(&self) -> bool {
610 unsafe { self.channel.as_ref() }.is_done()
611 }
612}
613
614#[derive(Debug)]
617pub struct Sender<T> {
618 channel: Option<BoxPtr<Channel<T>>>,
619}
620
621unsafe impl<T: Send> Send for Sender<T> {}
622unsafe impl<T: Send> Sync for Sender<T> {}
623
624impl<T> Sender<T> {
625 pub fn is_canceled(&self) -> bool {
627 self.channel.map(|c| c.is_done()).unwrap_or(true)
628 }
629
630 pub fn send(&mut self, value: T) -> TrackSend<'_, T> {
632 TrackSend {
633 channel: self.channel.into(),
634 value: Some(value).into(),
635 drops: false,
636 _marker: PhantomData,
637 }
638 }
639
640 pub fn into_send(self, value: T) -> Result<(), T> {
642 if let Some(mut channel) = ManuallyDrop::new(self).channel {
643 channel.send(value, None).map_err(|(result, drop_channel)| {
644 if drop_channel {
645 drop(channel.into_box());
646 }
647 result
648 })
649 } else {
650 Err(value)
651 }
652 }
653}
654
655impl<T> Drop for Sender<T> {
656 fn drop(&mut self) {
657 if let Some(mut channel) = self.channel.take() {
658 if channel.cancel_send() {
659 drop(channel.into_box());
660 }
661 }
662 }
663}
664
665#[derive(Debug)]
668pub struct Receiver<T> {
669 channel: MaybeCopy<Option<BoxPtr<Channel<T>>>>,
670}
671
672unsafe impl<T: Send> Send for Receiver<T> {}
673unsafe impl<T: Send> Sync for Receiver<T> {}
674
675impl<T> Receiver<T> {
676 pub fn cancel(self) -> Option<T> {
678 if let Some(mut channel) = unsafe { ManuallyDrop::new(self).channel.load() } {
679 let (result, dropped) = channel.cancel_recv();
680 if dropped {
681 drop(channel.into_box());
682 }
683 result
684 } else {
685 None
686 }
687 }
688
689 pub fn try_recv(&mut self) -> Poll<Option<T>> {
691 if let Some(mut channel) = unsafe { self.channel.load() } {
692 channel.try_recv().map(|(result, dropped)| {
693 if dropped || result.is_some() {
694 if dropped {
695 drop(channel.into_box());
696 unsafe { self.channel.store(None) };
697 }
698 }
699 result
700 })
701 } else {
702 Poll::Ready(None)
703 }
704 }
705
706 pub fn wait_next(&mut self) -> Option<T> {
709 if let Some(mut channel) = unsafe { self.channel.load() } {
710 let (result, dropped) = channel.wait_recv();
711 if dropped {
712 drop(channel.into_box());
713 unsafe { self.channel.replace(None) };
714 }
715 result
716 } else {
717 None
718 }
719 }
720
721 pub fn wait_next_timeout(&mut self, timeout: impl Expiry) -> Result<Option<T>, TimedOut> {
725 if let Some(mut channel) = unsafe { self.channel.load() } {
726 let (result, dropped) = channel.wait_recv_timeout(timeout.into_expire());
727 if dropped {
728 drop(channel.into_box());
729 unsafe { self.channel.replace(None) };
730 Ok(result)
731 } else if result.is_none() {
732 Err(TimedOut)
733 } else {
734 Ok(result)
735 }
736 } else {
737 Ok(None)
738 }
739 }
740}
741
742impl<T> Drop for Receiver<T> {
743 fn drop(&mut self) {
744 if let Some(mut channel) = unsafe { self.channel.load() } {
745 if channel.cancel_recv().1 {
746 drop(channel.into_box());
747 }
748 }
749 }
750}
751
752impl<T> Stream for Receiver<T> {
753 type Item = T;
754
755 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
756 if let Some(mut channel) = unsafe { self.channel.load() } {
757 channel.poll_recv(cx).map(|(result, dropped)| {
758 if dropped {
759 drop(channel.into_box());
760 unsafe { self.channel.store(None) };
761 }
762 result
763 })
764 } else {
765 Poll::Ready(None)
766 }
767 }
768}
769
770impl<T> FusedStream for Receiver<T> {
771 fn is_terminated(&self) -> bool {
772 unsafe { self.channel.load() }
773 .map(|c| c.is_done())
774 .unwrap_or(true)
775 }
776}