local_sync/
oneshot.rs

1//! Oneshot borrowed from tokio.
2//!
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! # Examples
13//!
14//! ```
15//! use local_sync::oneshot;
16//!
17//! #[monoio::main]
18//! async fn main() {
19//!     let (tx, rx) = oneshot::channel();
20//!
21//!     monoio::spawn(async move {
22//!         if let Err(_) = tx.send(3) {
23//!             println!("the receiver dropped");
24//!         }
25//!     });
26//!
27//!     match rx.await {
28//!         Ok(v) => println!("got = {:?}", v),
29//!         Err(_) => println!("the sender dropped"),
30//!     }
31//! }
32//! ```
33//!
34//! If the sender is dropped without sending, the receiver will fail with
35//! [`error::RecvError`]:
36//!
37//! ```
38//! use local_sync::oneshot;
39//!
40//! #[monoio::main]
41//! async fn main() {
42//!     let (tx, rx) = oneshot::channel::<u32>();
43//!
44//!     monoio::spawn(async move {
45//!         drop(tx);
46//!     });
47//!
48//!     match rx.await {
49//!         Ok(_) => panic!("This doesn't happen"),
50//!         Err(_) => println!("the sender dropped"),
51//!     }
52//! }
53//! ```
54
55use std::cell::{RefCell, UnsafeCell};
56use std::fmt;
57use std::future::Future;
58use std::mem::MaybeUninit;
59use std::pin::Pin;
60use std::rc::Rc;
61use std::task::Poll::{Pending, Ready};
62use std::task::{Context, Poll, Waker};
63
64/// Sends a value to the associated [`Receiver`].
65///
66/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
67/// [`channel`](fn@channel) function.
68#[derive(Debug)]
69pub struct Sender<T> {
70    inner: Option<Rc<Inner<T>>>,
71}
72
73/// Receive a value from the associated [`Sender`].
74///
75/// A pair of both a [`Sender`] and a [`Receiver`]  are created by the
76/// [`channel`](fn@channel) function.
77///
78/// # Examples
79///
80/// ```
81/// use local_sync::oneshot;
82///
83/// #[monoio::main]
84/// async fn main() {
85///     let (tx, rx) = oneshot::channel();
86///
87///     monoio::spawn(async move {
88///         if let Err(_) = tx.send(3) {
89///             println!("the receiver dropped");
90///         }
91///     });
92///
93///     match rx.await {
94///         Ok(v) => println!("got = {:?}", v),
95///         Err(_) => println!("the sender dropped"),
96///     }
97/// }
98/// ```
99///
100/// If the sender is dropped without sending, the receiver will fail with
101/// [`error::RecvError`]:
102///
103/// ```
104/// use local_sync::oneshot;
105///
106/// #[monoio::main]
107/// async fn main() {
108///     let (tx, rx) = oneshot::channel::<u32>();
109///
110///     monoio::spawn(async move {
111///         drop(tx);
112///     });
113///
114///     match rx.await {
115///         Ok(_) => panic!("This doesn't happen"),
116///         Err(_) => println!("the sender dropped"),
117///     }
118/// }
119/// ```
120#[derive(Debug)]
121pub struct Receiver<T> {
122    inner: Option<Rc<Inner<T>>>,
123}
124
125pub mod error {
126    //! Oneshot error types
127
128    use std::fmt;
129
130    /// Error returned by the `Future` implementation for `Receiver`.
131    #[derive(Debug, Eq, PartialEq)]
132    pub struct RecvError(pub(super) ());
133
134    /// Error returned by the `try_recv` function on `Receiver`.
135    #[derive(Debug, Eq, PartialEq)]
136    pub enum TryRecvError {
137        /// The send half of the channel has not yet sent a value.
138        Empty,
139
140        /// The send half of the channel was dropped without sending a value.
141        Closed,
142    }
143
144    // ===== impl RecvError =====
145
146    impl fmt::Display for RecvError {
147        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
148            write!(fmt, "channel closed")
149        }
150    }
151
152    impl std::error::Error for RecvError {}
153
154    // ===== impl TryRecvError =====
155
156    impl fmt::Display for TryRecvError {
157        fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
158            match self {
159                TryRecvError::Empty => write!(fmt, "channel empty"),
160                TryRecvError::Closed => write!(fmt, "channel closed"),
161            }
162        }
163    }
164
165    impl std::error::Error for TryRecvError {}
166}
167
168use futures_core::ready;
169
170use self::error::*;
171
172struct Inner<T> {
173    /// Manages the state of the inner cell
174    state: RefCell<usize>,
175
176    /// The value. This is set by `Sender` and read by `Receiver`. The state of
177    /// the cell is tracked by `state`.
178    value: UnsafeCell<Option<T>>,
179
180    /// The task to notify when the receiver drops without consuming the value.
181    tx_task: Task,
182
183    /// The task to notify when the value is sent.
184    rx_task: Task,
185}
186
187struct Task(UnsafeCell<MaybeUninit<Waker>>);
188
189impl Task {
190    unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
191        self.with_task(|w| w.will_wake(cx.waker()))
192    }
193
194    unsafe fn with_task<F, R>(&self, f: F) -> R
195    where
196        F: FnOnce(&Waker) -> R,
197    {
198        let ptr = self.0.get();
199        let waker: *const Waker = (&*ptr).as_ptr();
200        f(&*waker)
201    }
202
203    unsafe fn drop_task(&self) {
204        let ptr: *mut Waker = (&mut *self.0.get()).as_mut_ptr();
205        ptr.drop_in_place();
206    }
207
208    unsafe fn set_task(&self, cx: &mut Context<'_>) {
209        let ptr: *mut Waker = (&mut *self.0.get()).as_mut_ptr();
210        ptr.write(cx.waker().clone());
211    }
212}
213
214#[derive(Clone, Copy)]
215struct State(usize);
216
217/// Create a new one-shot channel for sending single values across asynchronous
218/// tasks.
219///
220/// The function returns separate "send" and "receive" handles. The `Sender`
221/// handle is used by the producer to send the value. The `Receiver` handle is
222/// used by the consumer to receive the value.
223///
224/// Each handle can be used on separate tasks.
225///
226/// # Examples
227///
228/// ```
229/// use local_sync::oneshot;
230///
231/// #[monoio::main]
232/// async fn main() {
233///     let (tx, rx) = oneshot::channel();
234///
235///     monoio::spawn(async move {
236///         if let Err(_) = tx.send(3) {
237///             println!("the receiver dropped");
238///         }
239///     });
240///
241///     match rx.await {
242///         Ok(v) => println!("got = {:?}", v),
243///         Err(_) => println!("the sender dropped"),
244///     }
245/// }
246/// ```
247pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
248    let inner = Rc::new(Inner {
249        state: RefCell::new(State::new().as_usize()),
250        value: UnsafeCell::new(None),
251        tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
252        rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
253    });
254
255    let tx = Sender {
256        inner: Some(inner.clone()),
257    };
258    let rx = Receiver { inner: Some(inner) };
259
260    (tx, rx)
261}
262
263impl<T> Sender<T> {
264    /// Attempts to send a value on this channel, returning it back if it could
265    /// not be sent.
266    ///
267    /// This method consumes `self` as only one value may ever be sent on a oneshot
268    /// channel. It is not marked async because sending a message to an oneshot
269    /// channel never requires any form of waiting.  Because of this, the `send`
270    /// method can be used in both synchronous and asynchronous code without
271    /// problems.
272    ///
273    /// A successful send occurs when it is determined that the other end of the
274    /// channel has not hung up already. An unsuccessful send would be one where
275    /// the corresponding receiver has already been deallocated. Note that a
276    /// return value of `Err` means that the data will never be received, but
277    /// a return value of `Ok` does *not* mean that the data will be received.
278    /// It is possible for the corresponding receiver to hang up immediately
279    /// after this function returns `Ok`.
280    ///
281    /// # Examples
282    ///
283    /// Send a value to another task
284    ///
285    /// ```
286    /// use local_sync::oneshot;
287    ///
288    /// #[monoio::main]
289    /// async fn main() {
290    ///     let (tx, rx) = oneshot::channel();
291    ///
292    ///     monoio::spawn(async move {
293    ///         if let Err(_) = tx.send(3) {
294    ///             println!("the receiver dropped");
295    ///         }
296    ///     });
297    ///
298    ///     match rx.await {
299    ///         Ok(v) => println!("got = {:?}", v),
300    ///         Err(_) => println!("the sender dropped"),
301    ///     }
302    /// }
303    /// ```
304    pub fn send(mut self, t: T) -> Result<(), T> {
305        let inner = self.inner.take().unwrap();
306        let ptr = inner.value.get();
307        unsafe {
308            *ptr = Some(t);
309        }
310
311        if !inner.complete() {
312            unsafe {
313                return Err(inner.consume_value().unwrap());
314            }
315        }
316
317        Ok(())
318    }
319
320    /// Waits for the associated [`Receiver`] handle to close.
321    ///
322    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
323    /// [`Receiver`] value is dropped.
324    ///
325    /// This function is useful when paired with `select!` to abort a
326    /// computation when the receiver is no longer interested in the result.
327    ///
328    /// # Return
329    ///
330    /// Returns a `Future` which must be awaited on.
331    ///
332    /// [`Receiver`]: Receiver
333    /// [`close`]: Receiver::close
334    ///
335    /// # Examples
336    ///
337    /// Basic usage
338    ///
339    /// ```
340    /// use local_sync::oneshot;
341    ///
342    /// #[monoio::main]
343    /// async fn main() {
344    ///     let (mut tx, rx) = oneshot::channel::<()>();
345    ///
346    ///     monoio::spawn(async move {
347    ///         drop(rx);
348    ///     });
349    ///
350    ///     tx.closed().await;
351    ///     println!("the receiver dropped");
352    /// }
353    /// ```
354    ///
355    /// Paired with select
356    ///
357    /// ```
358    /// use local_sync::oneshot;
359    /// use monoio::time::{self, Duration};
360    ///
361    /// async fn compute() -> String {
362    ///     // Complex computation returning a `String`
363    /// # "hello".to_string()
364    /// }
365    ///
366    /// #[monoio::main]
367    /// async fn main() {
368    ///     let (mut tx, rx) = oneshot::channel();
369    ///
370    ///     monoio::spawn(async move {
371    ///         monoio::select! {
372    ///             _ = tx.closed() => {
373    ///                 // The receiver dropped, no need to do any further work
374    ///             }
375    ///             value = compute() => {
376    ///                 // The send can fail if the channel was closed at the exact same
377    ///                 // time as when compute() finished, so just ignore the failure.
378    ///                 let _ = tx.send(value);
379    ///             }
380    ///         }
381    ///     });
382    ///
383    ///     // Wait for up to 10 seconds
384    ///     let _ = time::timeout(Duration::from_secs(10), rx).await;
385    /// }
386    /// ```
387    pub async fn closed(&mut self) {
388        use futures_util::future::poll_fn;
389
390        poll_fn(|cx| self.poll_closed(cx)).await
391    }
392
393    /// Returns `true` if the associated [`Receiver`] handle has been dropped.
394    ///
395    /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
396    /// [`Receiver`] value is dropped.
397    ///
398    /// If `true` is returned, a call to `send` will always result in an error.
399    ///
400    /// [`Receiver`]: Receiver
401    /// [`close`]: Receiver::close
402    ///
403    /// # Examples
404    ///
405    /// ```
406    /// use local_sync::oneshot;
407    ///
408    /// #[monoio::main]
409    /// async fn main() {
410    ///     let (tx, rx) = oneshot::channel();
411    ///
412    ///     assert!(!tx.is_closed());
413    ///
414    ///     drop(rx);
415    ///
416    ///     assert!(tx.is_closed());
417    ///     assert!(tx.send("never received").is_err());
418    /// }
419    /// ```
420    pub fn is_closed(&self) -> bool {
421        let inner = self.inner.as_ref().unwrap();
422
423        let state = State(*inner.state.borrow());
424        state.is_closed()
425    }
426
427    /// Check whether the oneshot channel has been closed, and if not, schedules the
428    /// `Waker` in the provided `Context` to receive a notification when the channel is
429    /// closed.
430    ///
431    /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
432    /// [`Receiver`] value is dropped.
433    ///
434    /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
435    /// to the most recent call will be scheduled to receive a wakeup.
436    ///
437    /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
438    /// [`close`]: fn@crate::sync::oneshot::Receiver::close
439    ///
440    /// # Return value
441    ///
442    /// This function returns:
443    ///
444    ///  * `Poll::Pending` if the channel is still open.
445    ///  * `Poll::Ready(())` if the channel is closed.
446    ///
447    /// # Examples
448    ///
449    /// ```
450    /// use local_sync::oneshot;
451    ///
452    /// use futures_util::future::poll_fn;
453    ///
454    /// #[monoio::main]
455    /// async fn main() {
456    ///     let (mut tx, mut rx) = oneshot::channel::<()>();
457    ///
458    ///     monoio::spawn(async move {
459    ///         rx.close();
460    ///     });
461    ///
462    ///     poll_fn(|cx| tx.poll_closed(cx)).await;
463    ///
464    ///     println!("the receiver dropped");
465    /// }
466    /// ```
467    pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
468        let inner = self.inner.as_ref().unwrap();
469
470        let mut state = State(*inner.state.borrow());
471
472        if state.is_closed() {
473            return Poll::Ready(());
474        }
475
476        if state.is_tx_task_set() {
477            let will_notify = unsafe { inner.tx_task.will_wake(cx) };
478
479            if !will_notify {
480                state = State::unset_tx_task(&inner.state);
481
482                if state.is_closed() {
483                    // Set the flag again so that the waker is released in drop
484                    State::set_tx_task(&inner.state);
485                    return Ready(());
486                } else {
487                    unsafe { inner.tx_task.drop_task() };
488                }
489            }
490        }
491
492        if !state.is_tx_task_set() {
493            // Attempt to set the task
494            unsafe {
495                inner.tx_task.set_task(cx);
496            }
497
498            // Update the state
499            state = State::set_tx_task(&inner.state);
500
501            if state.is_closed() {
502                return Ready(());
503            }
504        }
505
506        Pending
507    }
508}
509
510impl<T> Drop for Sender<T> {
511    fn drop(&mut self) {
512        if let Some(inner) = self.inner.as_ref() {
513            inner.complete();
514        }
515    }
516}
517
518impl<T> Receiver<T> {
519    /// Prevents the associated [`Sender`] handle from sending a value.
520    ///
521    /// Any `send` operation which happens after calling `close` is guaranteed
522    /// to fail. After calling `close`, [`try_recv`] should be called to
523    /// receive a value if one was sent **before** the call to `close`
524    /// completed.
525    ///
526    /// This function is useful to perform a graceful shutdown and ensure that a
527    /// value will not be sent into the channel and never received.
528    ///
529    /// `close` is no-op if a message is already received or the channel
530    /// is already closed.
531    ///
532    /// [`Sender`]: Sender
533    /// [`try_recv`]: Receiver::try_recv
534    ///
535    /// # Examples
536    ///
537    /// Prevent a value from being sent
538    ///
539    /// ```
540    /// use local_sync::oneshot;
541    /// use local_sync::oneshot::error::TryRecvError;
542    ///
543    /// #[monoio::main]
544    /// async fn main() {
545    ///     let (tx, mut rx) = oneshot::channel();
546    ///
547    ///     assert!(!tx.is_closed());
548    ///
549    ///     rx.close();
550    ///
551    ///     assert!(tx.is_closed());
552    ///     assert!(tx.send("never received").is_err());
553    ///
554    ///     match rx.try_recv() {
555    ///         Err(TryRecvError::Closed) => {}
556    ///         _ => unreachable!(),
557    ///     }
558    /// }
559    /// ```
560    ///
561    /// Receive a value sent **before** calling `close`
562    ///
563    /// ```
564    /// use local_sync::oneshot;
565    ///
566    /// #[monoio::main]
567    /// async fn main() {
568    ///     let (tx, mut rx) = oneshot::channel();
569    ///
570    ///     assert!(tx.send("will receive").is_ok());
571    ///
572    ///     rx.close();
573    ///
574    ///     let msg = rx.try_recv().unwrap();
575    ///     assert_eq!(msg, "will receive");
576    /// }
577    /// ```
578    pub fn close(&mut self) {
579        if let Some(inner) = self.inner.as_ref() {
580            inner.close();
581        }
582    }
583
584    /// Attempts to receive a value.
585    ///
586    /// If a pending value exists in the channel, it is returned. If no value
587    /// has been sent, the current task **will not** be registered for
588    /// future notification.
589    ///
590    /// This function is useful to call from outside the context of an
591    /// asynchronous task.
592    ///
593    /// # Return
594    ///
595    /// - `Ok(T)` if a value is pending in the channel.
596    /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
597    /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
598    ///   a value.
599    ///
600    /// # Examples
601    ///
602    /// `try_recv` before a value is sent, then after.
603    ///
604    /// ```
605    /// use local_sync::oneshot;
606    /// use local_sync::oneshot::error::TryRecvError;
607    ///
608    /// #[monoio::main]
609    /// async fn main() {
610    ///     let (tx, mut rx) = oneshot::channel();
611    ///
612    ///     match rx.try_recv() {
613    ///         // The channel is currently empty
614    ///         Err(TryRecvError::Empty) => {}
615    ///         _ => unreachable!(),
616    ///     }
617    ///
618    ///     // Send a value
619    ///     tx.send("hello").unwrap();
620    ///
621    ///     match rx.try_recv() {
622    ///         Ok(value) => assert_eq!(value, "hello"),
623    ///         _ => unreachable!(),
624    ///     }
625    /// }
626    /// ```
627    ///
628    /// `try_recv` when the sender dropped before sending a value
629    ///
630    /// ```
631    /// use local_sync::oneshot;
632    /// use local_sync::oneshot::error::TryRecvError;
633    ///
634    /// #[monoio::main]
635    /// async fn main() {
636    ///     let (tx, mut rx) = oneshot::channel::<()>();
637    ///
638    ///     drop(tx);
639    ///
640    ///     match rx.try_recv() {
641    ///         // The channel will never receive a value.
642    ///         Err(TryRecvError::Closed) => {}
643    ///         _ => unreachable!(),
644    ///     }
645    /// }
646    /// ```
647    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
648        let result = if let Some(inner) = self.inner.as_ref() {
649            let state = State(*inner.state.borrow());
650
651            if state.is_complete() {
652                match unsafe { inner.consume_value() } {
653                    Some(value) => Ok(value),
654                    None => Err(TryRecvError::Closed),
655                }
656            } else if state.is_closed() {
657                Err(TryRecvError::Closed)
658            } else {
659                // Not ready, this does not clear `inner`
660                return Err(TryRecvError::Empty);
661            }
662        } else {
663            Err(TryRecvError::Closed)
664        };
665
666        self.inner = None;
667        result
668    }
669}
670
671impl<T> Drop for Receiver<T> {
672    fn drop(&mut self) {
673        if let Some(inner) = self.inner.as_ref() {
674            inner.close();
675        }
676    }
677}
678
679impl<T> Future for Receiver<T> {
680    type Output = Result<T, RecvError>;
681
682    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
683        // If `inner` is `None`, then `poll()` has already completed.
684        let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
685            ready!(inner.poll_recv(cx))?
686        } else {
687            panic!("called after complete");
688        };
689
690        self.inner = None;
691        Ready(Ok(ret))
692    }
693}
694
695impl<T> Inner<T> {
696    fn complete(&self) -> bool {
697        let prev = State::set_complete(&self.state);
698
699        if prev.is_closed() {
700            return false;
701        }
702
703        if prev.is_rx_task_set() {
704            // TODO: Consume waker?
705            unsafe {
706                self.rx_task.with_task(Waker::wake_by_ref);
707            }
708        }
709
710        true
711    }
712
713    fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
714        // Load the state
715        let mut state = State(*self.state.borrow());
716
717        if state.is_complete() {
718            match unsafe { self.consume_value() } {
719                Some(value) => Ready(Ok(value)),
720                None => Ready(Err(RecvError(()))),
721            }
722        } else if state.is_closed() {
723            Ready(Err(RecvError(())))
724        } else {
725            if state.is_rx_task_set() {
726                let will_notify = unsafe { self.rx_task.will_wake(cx) };
727
728                // Check if the task is still the same
729                if !will_notify {
730                    // Unset the task
731                    state = State::unset_rx_task(&self.state);
732                    if state.is_complete() {
733                        // Set the flag again so that the waker is released in drop
734                        State::set_rx_task(&self.state);
735
736                        return match unsafe { self.consume_value() } {
737                            Some(value) => Ready(Ok(value)),
738                            None => Ready(Err(RecvError(()))),
739                        };
740                    } else {
741                        unsafe { self.rx_task.drop_task() };
742                    }
743                }
744            }
745
746            if !state.is_rx_task_set() {
747                // Attempt to set the task
748                unsafe {
749                    self.rx_task.set_task(cx);
750                }
751
752                // Update the state
753                state = State::set_rx_task(&self.state);
754
755                if state.is_complete() {
756                    match unsafe { self.consume_value() } {
757                        Some(value) => Ready(Ok(value)),
758                        None => Ready(Err(RecvError(()))),
759                    }
760                } else {
761                    Pending
762                }
763            } else {
764                Pending
765            }
766        }
767    }
768
769    /// Called by `Receiver` to indicate that the value will never be received.
770    fn close(&self) {
771        let prev = State::set_closed(&self.state);
772
773        if prev.is_tx_task_set() && !prev.is_complete() {
774            unsafe {
775                self.tx_task.with_task(Waker::wake_by_ref);
776            }
777        }
778    }
779
780    /// Consumes the value. This function does not check `state`.
781    unsafe fn consume_value(&self) -> Option<T> {
782        let ptr = self.value.get();
783        (*ptr).take()
784    }
785}
786
787impl<T> Drop for Inner<T> {
788    fn drop(&mut self) {
789        let state = State(*self.state.borrow());
790
791        if state.is_rx_task_set() {
792            unsafe {
793                self.rx_task.drop_task();
794            }
795        }
796
797        if state.is_tx_task_set() {
798            unsafe {
799                self.tx_task.drop_task();
800            }
801        }
802    }
803}
804
805impl<T: fmt::Debug> fmt::Debug for Inner<T> {
806    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
807        fmt.debug_struct("Inner")
808            .field("state", &self.state.borrow())
809            .finish()
810    }
811}
812
813const RX_TASK_SET: usize = 0b00001;
814const VALUE_SENT: usize = 0b00010;
815const CLOSED: usize = 0b00100;
816const TX_TASK_SET: usize = 0b01000;
817
818impl State {
819    fn new() -> State {
820        State(0)
821    }
822
823    fn is_complete(self) -> bool {
824        self.0 & VALUE_SENT == VALUE_SENT
825    }
826
827    fn set_complete(cell: &RefCell<usize>) -> State {
828        let mut val = cell.borrow_mut();
829        *val |= VALUE_SENT;
830        State(*val)
831    }
832
833    fn is_rx_task_set(self) -> bool {
834        self.0 & RX_TASK_SET == RX_TASK_SET
835    }
836
837    fn set_rx_task(cell: &RefCell<usize>) -> State {
838        let mut val = cell.borrow_mut();
839        *val |= RX_TASK_SET;
840        State(*val)
841    }
842
843    fn unset_rx_task(cell: &RefCell<usize>) -> State {
844        let mut val = cell.borrow_mut();
845        *val &= !RX_TASK_SET;
846        State(*val)
847    }
848
849    fn is_closed(self) -> bool {
850        self.0 & CLOSED == CLOSED
851    }
852
853    fn set_closed(cell: &RefCell<usize>) -> State {
854        // Acquire because we want all later writes (attempting to poll) to be
855        // ordered after this.
856        let mut val = cell.borrow_mut();
857        *val |= CLOSED;
858        State(*val)
859    }
860
861    fn set_tx_task(cell: &RefCell<usize>) -> State {
862        let mut val = cell.borrow_mut();
863        *val |= TX_TASK_SET;
864        State(*val)
865    }
866
867    fn unset_tx_task(cell: &RefCell<usize>) -> State {
868        let mut val = cell.borrow_mut();
869        *val &= !TX_TASK_SET;
870        State(*val)
871    }
872
873    fn is_tx_task_set(self) -> bool {
874        self.0 & TX_TASK_SET == TX_TASK_SET
875    }
876
877    fn as_usize(self) -> usize {
878        self.0
879    }
880}
881
882impl fmt::Debug for State {
883    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
884        fmt.debug_struct("State")
885            .field("is_complete", &self.is_complete())
886            .field("is_closed", &self.is_closed())
887            .field("is_rx_task_set", &self.is_rx_task_set())
888            .field("is_tx_task_set", &self.is_tx_task_set())
889            .finish()
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::channel;
896
897    #[monoio::test]
898    async fn it_works() {
899        let (tx, rx) = channel();
900        let join = monoio::spawn(async move { rx.await });
901        tx.send(1).unwrap();
902        assert_eq!(join.await.unwrap(), 1);
903    }
904}