suspend_channel/
channel.rs

1use core::{
2    fmt::{self, Debug, Formatter},
3    future::Future,
4    marker::PhantomData,
5    mem::ManuallyDrop,
6    pin::Pin,
7    sync::atomic::{fence, spin_loop_hint, AtomicU8, Ordering},
8    task::{Context, Poll, Waker},
9};
10use std::time::Instant;
11
12use futures_core::{FusedFuture, FusedStream, Stream};
13use suspend_core::{listen::block_on_poll, Expiry};
14
15use super::error::{Incomplete, TimedOut};
16use super::util::{BoxPtr, Maybe, MaybeCopy};
17
18const STATE_DONE: u8 = 0b0000;
19const STATE_INIT: u8 = 0b0001;
20const STATE_LOCKED: u8 = 0b0010;
21const STATE_LOADED: u8 = 0b0100;
22const STATE_WAKE: u8 = 0b1000;
23const WAKE_RECV: u8 = STATE_WAKE;
24const WAKE_SEND: u8 = STATE_WAKE | STATE_LOADED;
25
26/// Create a channel for sending a single value between a producer and consumer.
27pub fn send_once<T>() -> (SendOnce<T>, ReceiveOnce<T>) {
28    let channel = BoxPtr::new(Box::new(Channel::new()));
29    (
30        SendOnce { channel },
31        ReceiveOnce {
32            channel: channel.into(),
33        },
34    )
35}
36
37/// Create a channel for sending multiple values between a producer and consumer,
38/// with synchronization between each consecutive result.
39pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
40    let channel = BoxPtr::new(Box::new(Channel::new()));
41    (
42        Sender {
43            channel: Some(channel),
44        },
45        Receiver {
46            channel: Some(channel).into(),
47        },
48    )
49}
50
51pub(crate) struct Channel<T> {
52    state: AtomicU8,
53    value: Maybe<T>,
54    waker: Maybe<Waker>,
55}
56
57impl<T> Channel<T> {
58    pub const fn new() -> Self {
59        Self {
60            state: AtomicU8::new(STATE_INIT),
61            value: Maybe::empty(),
62            waker: Maybe::empty(),
63        }
64    }
65
66    #[inline]
67    fn is_done(&self) -> bool {
68        self.state.load(Ordering::Relaxed) == STATE_DONE
69    }
70
71    /// Try to receive a value, registering a waker if none is stored.
72    /// Ready value is (stored value, dropped flag).
73    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<(Option<T>, bool)> {
74        match self.wait_for_lock() {
75            STATE_DONE => {
76                // sender dropped without storing a value
77                self.state.store(STATE_DONE, Ordering::Relaxed);
78                return Poll::Ready((None, true));
79            }
80            STATE_LOADED => {
81                // sender dropped after storing a value
82                let value = unsafe { self.value.load() };
83                self.state.store(STATE_DONE, Ordering::Relaxed);
84                return Poll::Ready((Some(value), true));
85            }
86            WAKE_SEND => {
87                // sender stored a value and a waker
88                let value = Some(unsafe { self.value.load() });
89                let send_waker = unsafe { self.waker.load() };
90                if self.state.swap(STATE_INIT, Ordering::Release) == STATE_DONE {
91                    // sender dropped
92                    drop(send_waker);
93                    return Poll::Ready((value, true));
94                }
95                send_waker.wake();
96                return Poll::Ready((value, false));
97            }
98            WAKE_RECV => {
99                // drop previous recv waker
100                unsafe { self.waker.clear() };
101            }
102            STATE_INIT => {
103                // waiting for sender
104            }
105            _ => panic!("Invalid state"),
106        }
107
108        unsafe { self.waker.store(cx.waker().clone()) };
109        if self.state.swap(WAKE_RECV, Ordering::Release) == STATE_DONE {
110            // sender dropped
111            unsafe { self.waker.clear() };
112            return Poll::Ready((None, true));
113        }
114        Poll::Pending
115    }
116
117    /// Try to receive a value without registering a waker.
118    /// Ready value is (stored value, dropped flag).
119    pub fn try_recv(&mut self) -> Poll<(Option<T>, bool)> {
120        let mut locked = false;
121        let mut state = self.state.load(Ordering::Relaxed);
122        loop {
123            match state {
124                STATE_INIT | WAKE_RECV => {
125                    return Poll::Pending;
126                }
127                STATE_DONE => {
128                    // sender dropped without storing a value
129                    if locked {
130                        self.state.store(STATE_DONE, Ordering::Relaxed);
131                    }
132                    return Poll::Ready((None, true));
133                }
134                STATE_LOADED => {
135                    // sender dropped after storing a value
136                    let value = unsafe { self.value.load() };
137                    self.state.store(STATE_DONE, Ordering::Relaxed);
138                    return Poll::Ready((Some(value), true));
139                }
140                WAKE_SEND => {
141                    // sender stored a value and a waker
142                    if !locked {
143                        // need to lock the state now to avoid conflict if the sender is dropped
144                        state = self.wait_for_lock();
145                        locked = true;
146                        continue;
147                    }
148                    let value = Some(unsafe { self.value.load() });
149                    let send_waker = unsafe { self.waker.load() };
150                    if self.state.swap(STATE_INIT, Ordering::Release) == STATE_DONE {
151                        // sender dropped
152                        drop(send_waker);
153                        return Poll::Ready((value, true));
154                    }
155                    send_waker.wake();
156                    return Poll::Ready((value, false));
157                }
158                _ => panic!("Invalid state"),
159            }
160        }
161    }
162
163    /// Returns (stored value, dropped flag).
164    pub fn cancel_recv(&mut self) -> (Option<T>, bool) {
165        match self.state.fetch_and(STATE_LOADED, Ordering::Release) {
166            prev if prev & STATE_LOCKED != 0 => {
167                // sender was holding the lock, will handle drop
168                (None, false)
169            }
170            STATE_INIT => {
171                // sender still active
172                (None, false)
173            }
174            STATE_DONE => {
175                // sender dropped
176                (None, true)
177            }
178            STATE_LOADED => {
179                // sender loaded a value and dropped
180                (Some(unsafe { self.value.load() }), true)
181            }
182            WAKE_SEND => {
183                // sender loaded a value, waiting for receiver
184                // the state is now STATE_LOADED
185                unsafe { self.waker.load() }.wake();
186                (None, false)
187            }
188            WAKE_RECV => {
189                // drop previous waker
190                unsafe { self.waker.clear() };
191                (None, false)
192            }
193            _ => panic!("Invalid state"),
194        }
195    }
196
197    // Returns (stored value, dropped flag).
198    pub fn cancel_recv_poll(&mut self) -> (Option<T>, bool) {
199        match self.wait_for_lock() {
200            prev if prev & STATE_LOCKED != 0 => {
201                // sender was holding the lock, will handle drop
202                (None, false)
203            }
204            STATE_DONE => {
205                // sender dropped
206                (None, true)
207            }
208            STATE_LOADED => {
209                // sender loaded a value and dropped
210                (Some(unsafe { self.value.load() }), true)
211            }
212            WAKE_SEND => {
213                // sender loaded a value, waiting for receiver
214                let value = Some(unsafe { self.value.load() });
215                let send_waker = unsafe { self.waker.load() };
216                self.state.store(STATE_INIT, Ordering::Release);
217                send_waker.wake();
218                (value, false)
219            }
220            WAKE_RECV => {
221                // drop previous waker
222                unsafe { self.waker.clear() };
223                self.state.store(STATE_INIT, Ordering::Release);
224                (None, false)
225            }
226            _ => panic!("Invalid state"),
227        }
228    }
229
230    /// Error value is (unstored value, dropped flag).
231    pub fn send(&mut self, value: T, cx: Option<&mut Context<'_>>) -> Result<(), (T, bool)> {
232        let recv_waker = match self.wait_for_lock() {
233            STATE_INIT => {
234                // receiver is waiting
235                None
236            }
237            STATE_DONE => {
238                // receiver dropped
239                self.state.store(STATE_DONE, Ordering::Relaxed);
240                return Err((value, true));
241            }
242            WAKE_RECV => {
243                // receiver stored a waker
244                Some(unsafe { self.waker.load() })
245            }
246            _ => panic!("Invalid state"),
247        };
248
249        unsafe { self.value.store(value) };
250        let state = if let Some(cx) = cx {
251            unsafe { self.waker.store(cx.waker().clone()) };
252            WAKE_SEND
253        } else {
254            STATE_LOADED
255        };
256        if self.state.swap(state, Ordering::Release) == STATE_DONE {
257            // receiver dropped
258            drop(recv_waker);
259            if state == WAKE_SEND {
260                unsafe { self.waker.clear() };
261            }
262            return Err((unsafe { self.value.load() }, true));
263        }
264        recv_waker.map(Waker::wake);
265        Ok(())
266    }
267
268    fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<(Option<T>, bool)> {
269        match self.wait_for_lock() {
270            STATE_DONE => {
271                // receiver completed and dropped
272                Poll::Ready((None, true))
273            }
274            prev @ STATE_INIT | prev @ WAKE_RECV => {
275                // receiver completed and reset
276                self.state.store(prev, Ordering::Release);
277                Poll::Ready((None, false))
278            }
279            STATE_LOADED => {
280                // receiver dropped and left the result
281                Poll::Ready((Some(unsafe { self.value.load() }), true))
282            }
283            WAKE_SEND => {
284                // still waiting for receiver
285                unsafe { self.waker.replace(cx.waker().clone()) };
286                self.state.store(WAKE_SEND, Ordering::Release);
287                Poll::Pending
288            }
289            _ => panic!("Invalid state"),
290        }
291    }
292
293    /// Returns dropped flag.
294    pub fn cancel_send(&mut self) -> bool {
295        match self.state.swap(STATE_DONE, Ordering::Release) {
296            prev if prev & STATE_LOCKED != 0 => {
297                // receiver has the lock, will handle drop
298                false
299            }
300            STATE_INIT => {
301                // receiver still active
302                false
303            }
304            STATE_DONE => {
305                // receiver already dropped
306                true
307            }
308            WAKE_RECV => {
309                // receiver loaded a waker
310                unsafe { self.waker.load() }.wake();
311                false
312            }
313            _ => panic!("Invalid state"),
314        }
315    }
316
317    fn cancel_send_poll(&mut self) -> bool {
318        match self.state.fetch_or(STATE_LOCKED, Ordering::AcqRel) {
319            prev if prev & STATE_LOCKED != 0 => {
320                // lock held by receiver
321                false
322            }
323            STATE_INIT => {
324                // receiver completed and reset
325                self.state.store(STATE_DONE, Ordering::Release);
326                false
327            }
328            STATE_DONE => {
329                // receiver completed and dropped
330                true
331            }
332            WAKE_SEND => {
333                // still waiting for receiver
334                unsafe { self.waker.clear() };
335                self.state.store(STATE_LOADED, Ordering::Release);
336                false
337            }
338            _ => panic!("Invalid state"),
339        }
340    }
341
342    pub fn wait_recv(&mut self) -> (Option<T>, bool) {
343        if let Poll::Ready(result) = self.wait_recv_poll(None) {
344            result
345        } else {
346            unreachable!()
347        }
348    }
349
350    pub fn wait_recv_timeout(&mut self, timeout: Option<Instant>) -> (Option<T>, bool) {
351        match self.wait_recv_poll(timeout) {
352            Poll::Ready(result) => result,
353            Poll::Pending => self.cancel_recv_poll(),
354        }
355    }
356
357    #[inline]
358    fn wait_for_lock(&mut self) -> u8 {
359        loop {
360            let prev = self.state.fetch_or(STATE_LOCKED, Ordering::Relaxed);
361            if prev & STATE_LOCKED == 0 {
362                fence(Ordering::Acquire);
363                return prev;
364            }
365            spin_loop_hint();
366        }
367    }
368
369    #[inline]
370    fn wait_recv_poll(&mut self, timeout: impl Expiry) -> Poll<(Option<T>, bool)> {
371        let mut first = true;
372        let timeout: Option<Instant> = timeout.into_expire();
373        block_on_poll(
374            |cx| {
375                if first {
376                    first = false;
377                    self.poll_recv(cx)
378                } else {
379                    // no need to update the waker after the first poll
380                    self.try_recv()
381                }
382            },
383            timeout,
384        )
385    }
386}
387
388impl<T> Debug for Channel<T> {
389    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
390        f.debug_struct("Channel")
391            .field("done", &self.is_done())
392            .finish()
393    }
394}
395
396/// A [`Future`] which resolves once a send has completed.
397#[derive(Debug)]
398pub struct TrackSend<'a, T> {
399    channel: MaybeCopy<Option<BoxPtr<Channel<T>>>>,
400    value: Maybe<Option<T>>,
401    drops: bool,
402    _marker: PhantomData<&'a mut T>,
403}
404
405impl<T> Drop for TrackSend<'_, T> {
406    fn drop(&mut self) {
407        unsafe {
408            let channel = self.channel.load();
409            channel.map(|mut c| c.cancel_send_poll());
410            self.value.clear();
411        }
412    }
413}
414
415impl<T> Future for TrackSend<'_, T> {
416    type Output = Result<(), T>;
417
418    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
419        if let Some(value) = unsafe { self.value.replace(None) } {
420            if let Some(mut channel) = unsafe { self.channel.load() } {
421                if let Err((value, dropped)) = channel.send(value, Some(cx)) {
422                    if dropped && self.drops {
423                        drop(channel.into_box());
424                    }
425                    unsafe { self.channel.store(None) };
426                    Poll::Ready(Err(value))
427                } else {
428                    Poll::Pending
429                }
430            } else {
431                Poll::Ready(Err(value))
432            }
433        } else if let Some(mut channel) = unsafe { self.channel.load() } {
434            channel.poll_send(cx).map(|(result, dropped)| {
435                if dropped && self.drops {
436                    drop(channel.into_box());
437                }
438                unsafe { self.channel.store(None) };
439                result.map(Err).unwrap_or(Ok(()))
440            })
441        } else {
442            Poll::Ready(Ok(()))
443        }
444    }
445}
446
447impl<T> FusedFuture for TrackSend<'_, T> {
448    fn is_terminated(&self) -> bool {
449        unsafe { self.channel.as_ref() }.is_none()
450    }
451}
452
453/// Created by [`send_once()`] and used to dispatch a single value
454/// to an associated [`ReceiveOnce`] instance.
455#[derive(Debug)]
456pub struct SendOnce<T> {
457    channel: BoxPtr<Channel<T>>,
458}
459
460unsafe impl<T: Send> Send for SendOnce<T> {}
461unsafe impl<T: Send> Sync for SendOnce<T> {}
462
463impl<T> SendOnce<T> {
464    /// Check if the receiver has already been dropped.
465    pub fn is_canceled(&self) -> bool {
466        self.channel.is_done()
467    }
468
469    /// Dispatch the result and consume the [`SendOnce`].
470    pub fn send(self, value: T) -> Result<(), T> {
471        let mut channel = ManuallyDrop::new(self).channel;
472        channel.send(value, None).map_err(|(value, drop_channel)| {
473            if drop_channel {
474                drop(channel.into_box());
475            }
476            value
477        })
478    }
479
480    /// Load a value to be sent, returning a [`Future`] which resolves when
481    /// the value is received or the [`ReceiveOnce`] is dropped.
482    pub fn track_send(self, value: T) -> TrackSend<'static, T> {
483        let channel = ManuallyDrop::new(self).channel;
484        TrackSend {
485            channel: Some(channel).into(),
486            value: Some(value).into(),
487            drops: true,
488            _marker: PhantomData,
489        }
490    }
491}
492
493impl<T> Drop for SendOnce<T> {
494    fn drop(&mut self) {
495        if self.channel.cancel_send() {
496            drop(self.channel.into_box());
497        }
498    }
499}
500
501/// Created by [`send_once()`] and used to receive a single value from
502/// an associated [`SendOnce`] instance.
503#[derive(Debug)]
504pub struct ReceiveOnce<T> {
505    channel: MaybeCopy<BoxPtr<Channel<T>>>,
506}
507
508unsafe impl<T: Send> Send for ReceiveOnce<T> {}
509unsafe impl<T: Send> Sync for ReceiveOnce<T> {}
510
511impl<T> ReceiveOnce<T> {
512    /// Safely cancel the receive operation, consuming the [`ReceiveOnce`].
513    pub fn cancel(self) -> Option<T> {
514        let mut channel = unsafe { ManuallyDrop::new(self).channel.load() };
515        let (result, dropped) = channel.cancel_recv();
516        if dropped {
517            drop(channel.into_box());
518        }
519        result
520    }
521
522    /// Try to receive the value, consuming the [`ReceiveOnce`] if the value
523    /// has been loaded or the [`SendOnce`] has been dropped.
524    pub fn try_recv(self) -> Result<Result<T, Incomplete>, Self> {
525        let mut channel = unsafe { self.channel.load() };
526        match channel.try_recv() {
527            Poll::Ready((result, dropped)) => {
528                let _ = ManuallyDrop::new(self);
529                if dropped {
530                    drop(channel.into_box());
531                }
532                Ok(result.ok_or(Incomplete))
533            }
534            Poll::Pending => {
535                unsafe { self.channel.store(channel) };
536                Err(self)
537            }
538        }
539    }
540
541    /// Block the current thread until a value is received or the [`SendOnce`] is dropped.
542    pub fn wait(self) -> Result<T, Incomplete> {
543        let mut channel = unsafe { ManuallyDrop::new(self).channel.load() };
544        let (result, dropped) = channel.wait_recv();
545        if dropped {
546            drop(channel.into_box());
547        }
548        result.ok_or(Incomplete)
549    }
550
551    /// Block the current thread until a value is received or the [`SendOnce`] is dropped,
552    /// returning `Err(Self)` if a timeout is reached.
553    pub fn wait_timeout(self, timeout: impl Expiry) -> Result<Result<T, Incomplete>, Self> {
554        let mut channel = unsafe { self.channel.load() };
555        match channel.wait_recv_timeout(timeout.into_expire()) {
556            (Some(result), true) => {
557                let _ = ManuallyDrop::new(self);
558                drop(channel.into_box());
559                Ok(Ok(result))
560            }
561            (None, true) => {
562                let _ = ManuallyDrop::new(self);
563                drop(channel.into_box());
564                Ok(Err(Incomplete))
565            }
566            (Some(result), false) => {
567                let _ = ManuallyDrop::new(self);
568                Ok(Ok(result))
569            }
570            (None, false) => Err(self),
571        }
572    }
573}
574
575impl<T> Drop for ReceiveOnce<T> {
576    fn drop(&mut self) {
577        let mut channel = unsafe { self.channel.load() };
578        if let (_, true) = channel.cancel_recv() {
579            drop(channel.into_box());
580        }
581    }
582}
583
584impl<T> Future for ReceiveOnce<T> {
585    type Output = Result<T, Incomplete>;
586
587    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
588        let mut channel = unsafe { self.channel.load() };
589        channel.poll_recv(cx).map(|r| r.0.ok_or(Incomplete))
590    }
591}
592
593impl<T> FusedFuture for ReceiveOnce<T> {
594    fn is_terminated(&self) -> bool {
595        unsafe { self.channel.as_ref() }.is_done()
596    }
597}
598
599impl<T> Stream for ReceiveOnce<T> {
600    type Item = T;
601
602    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
603        let mut channel = unsafe { self.channel.load() };
604        channel.poll_recv(cx).map(|r| r.0)
605    }
606}
607
608impl<T> FusedStream for ReceiveOnce<T> {
609    fn is_terminated(&self) -> bool {
610        unsafe { self.channel.as_ref() }.is_done()
611    }
612}
613
614/// Created by [`channel()`] and used to dispatch a stream of values to a
615/// an associated [`Receiver`].
616#[derive(Debug)]
617pub struct Sender<T> {
618    channel: Option<BoxPtr<Channel<T>>>,
619}
620
621unsafe impl<T: Send> Send for Sender<T> {}
622unsafe impl<T: Send> Sync for Sender<T> {}
623
624impl<T> Sender<T> {
625    /// Check if the receiver has already been dropped.
626    pub fn is_canceled(&self) -> bool {
627        self.channel.map(|c| c.is_done()).unwrap_or(true)
628    }
629
630    /// Dispatch the result and consume the [`Sender`].
631    pub fn send(&mut self, value: T) -> TrackSend<'_, T> {
632        TrackSend {
633            channel: self.channel.into(),
634            value: Some(value).into(),
635            drops: false,
636            _marker: PhantomData,
637        }
638    }
639
640    /// Send a single result and consume the [`Sender`].
641    pub fn into_send(self, value: T) -> Result<(), T> {
642        if let Some(mut channel) = ManuallyDrop::new(self).channel {
643            channel.send(value, None).map_err(|(result, drop_channel)| {
644                if drop_channel {
645                    drop(channel.into_box());
646                }
647                result
648            })
649        } else {
650            Err(value)
651        }
652    }
653}
654
655impl<T> Drop for Sender<T> {
656    fn drop(&mut self) {
657        if let Some(mut channel) = self.channel.take() {
658            if channel.cancel_send() {
659                drop(channel.into_box());
660            }
661        }
662    }
663}
664
665/// Created by [`channel()`] and used to receive a stream of values from
666/// an associated [`Sender`].
667#[derive(Debug)]
668pub struct Receiver<T> {
669    channel: MaybeCopy<Option<BoxPtr<Channel<T>>>>,
670}
671
672unsafe impl<T: Send> Send for Receiver<T> {}
673unsafe impl<T: Send> Sync for Receiver<T> {}
674
675impl<T> Receiver<T> {
676    /// Safely cancel the receive operation, consuming the [`Receiver`].
677    pub fn cancel(self) -> Option<T> {
678        if let Some(mut channel) = unsafe { ManuallyDrop::new(self).channel.load() } {
679            let (result, dropped) = channel.cancel_recv();
680            if dropped {
681                drop(channel.into_box());
682            }
683            result
684        } else {
685            None
686        }
687    }
688
689    /// Try to receive the next value from the stream.
690    pub fn try_recv(&mut self) -> Poll<Option<T>> {
691        if let Some(mut channel) = unsafe { self.channel.load() } {
692            channel.try_recv().map(|(result, dropped)| {
693                if dropped || result.is_some() {
694                    if dropped {
695                        drop(channel.into_box());
696                        unsafe { self.channel.store(None) };
697                    }
698                }
699                result
700            })
701        } else {
702            Poll::Ready(None)
703        }
704    }
705
706    /// Block the current thread on the next result from the [`Sender`], returning
707    /// [`None`] if the sender has been dropped.
708    pub fn wait_next(&mut self) -> Option<T> {
709        if let Some(mut channel) = unsafe { self.channel.load() } {
710            let (result, dropped) = channel.wait_recv();
711            if dropped {
712                drop(channel.into_box());
713                unsafe { self.channel.replace(None) };
714            }
715            result
716        } else {
717            None
718        }
719    }
720
721    /// Block the current thread on the next result from the [`Sender`], returning
722    /// `Ok(None)`] if the sender has been dropped and `Err(Incomplete)` if the
723    /// provided timeout is reached.
724    pub fn wait_next_timeout(&mut self, timeout: impl Expiry) -> Result<Option<T>, TimedOut> {
725        if let Some(mut channel) = unsafe { self.channel.load() } {
726            let (result, dropped) = channel.wait_recv_timeout(timeout.into_expire());
727            if dropped {
728                drop(channel.into_box());
729                unsafe { self.channel.replace(None) };
730                Ok(result)
731            } else if result.is_none() {
732                Err(TimedOut)
733            } else {
734                Ok(result)
735            }
736        } else {
737            Ok(None)
738        }
739    }
740}
741
742impl<T> Drop for Receiver<T> {
743    fn drop(&mut self) {
744        if let Some(mut channel) = unsafe { self.channel.load() } {
745            if channel.cancel_recv().1 {
746                drop(channel.into_box());
747            }
748        }
749    }
750}
751
752impl<T> Stream for Receiver<T> {
753    type Item = T;
754
755    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
756        if let Some(mut channel) = unsafe { self.channel.load() } {
757            channel.poll_recv(cx).map(|(result, dropped)| {
758                if dropped {
759                    drop(channel.into_box());
760                    unsafe { self.channel.store(None) };
761                }
762                result
763            })
764        } else {
765            Poll::Ready(None)
766        }
767    }
768}
769
770impl<T> FusedStream for Receiver<T> {
771    fn is_terminated(&self) -> bool {
772        unsafe { self.channel.load() }
773            .map(|c| c.is_done())
774            .unwrap_or(true)
775    }
776}