local_sync/oneshot.rs
1//! Oneshot borrowed from tokio.
2//!
3//! A one-shot channel is used for sending a single message between
4//! asynchronous tasks. The [`channel`] function is used to create a
5//! [`Sender`] and [`Receiver`] handle pair that form the channel.
6//!
7//! The `Sender` handle is used by the producer to send the value.
8//! The `Receiver` handle is used by the consumer to receive the value.
9//!
10//! Each handle can be used on separate tasks.
11//!
12//! # Examples
13//!
14//! ```
15//! use local_sync::oneshot;
16//!
17//! #[monoio::main]
18//! async fn main() {
19//! let (tx, rx) = oneshot::channel();
20//!
21//! monoio::spawn(async move {
22//! if let Err(_) = tx.send(3) {
23//! println!("the receiver dropped");
24//! }
25//! });
26//!
27//! match rx.await {
28//! Ok(v) => println!("got = {:?}", v),
29//! Err(_) => println!("the sender dropped"),
30//! }
31//! }
32//! ```
33//!
34//! If the sender is dropped without sending, the receiver will fail with
35//! [`error::RecvError`]:
36//!
37//! ```
38//! use local_sync::oneshot;
39//!
40//! #[monoio::main]
41//! async fn main() {
42//! let (tx, rx) = oneshot::channel::<u32>();
43//!
44//! monoio::spawn(async move {
45//! drop(tx);
46//! });
47//!
48//! match rx.await {
49//! Ok(_) => panic!("This doesn't happen"),
50//! Err(_) => println!("the sender dropped"),
51//! }
52//! }
53//! ```
54
55use std::cell::{RefCell, UnsafeCell};
56use std::fmt;
57use std::future::Future;
58use std::mem::MaybeUninit;
59use std::pin::Pin;
60use std::rc::Rc;
61use std::task::Poll::{Pending, Ready};
62use std::task::{Context, Poll, Waker};
63
64/// Sends a value to the associated [`Receiver`].
65///
66/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
67/// [`channel`](fn@channel) function.
68#[derive(Debug)]
69pub struct Sender<T> {
70 inner: Option<Rc<Inner<T>>>,
71}
72
73/// Receive a value from the associated [`Sender`].
74///
75/// A pair of both a [`Sender`] and a [`Receiver`] are created by the
76/// [`channel`](fn@channel) function.
77///
78/// # Examples
79///
80/// ```
81/// use local_sync::oneshot;
82///
83/// #[monoio::main]
84/// async fn main() {
85/// let (tx, rx) = oneshot::channel();
86///
87/// monoio::spawn(async move {
88/// if let Err(_) = tx.send(3) {
89/// println!("the receiver dropped");
90/// }
91/// });
92///
93/// match rx.await {
94/// Ok(v) => println!("got = {:?}", v),
95/// Err(_) => println!("the sender dropped"),
96/// }
97/// }
98/// ```
99///
100/// If the sender is dropped without sending, the receiver will fail with
101/// [`error::RecvError`]:
102///
103/// ```
104/// use local_sync::oneshot;
105///
106/// #[monoio::main]
107/// async fn main() {
108/// let (tx, rx) = oneshot::channel::<u32>();
109///
110/// monoio::spawn(async move {
111/// drop(tx);
112/// });
113///
114/// match rx.await {
115/// Ok(_) => panic!("This doesn't happen"),
116/// Err(_) => println!("the sender dropped"),
117/// }
118/// }
119/// ```
120#[derive(Debug)]
121pub struct Receiver<T> {
122 inner: Option<Rc<Inner<T>>>,
123}
124
125pub mod error {
126 //! Oneshot error types
127
128 use std::fmt;
129
130 /// Error returned by the `Future` implementation for `Receiver`.
131 #[derive(Debug, Eq, PartialEq)]
132 pub struct RecvError(pub(super) ());
133
134 /// Error returned by the `try_recv` function on `Receiver`.
135 #[derive(Debug, Eq, PartialEq)]
136 pub enum TryRecvError {
137 /// The send half of the channel has not yet sent a value.
138 Empty,
139
140 /// The send half of the channel was dropped without sending a value.
141 Closed,
142 }
143
144 // ===== impl RecvError =====
145
146 impl fmt::Display for RecvError {
147 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
148 write!(fmt, "channel closed")
149 }
150 }
151
152 impl std::error::Error for RecvError {}
153
154 // ===== impl TryRecvError =====
155
156 impl fmt::Display for TryRecvError {
157 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
158 match self {
159 TryRecvError::Empty => write!(fmt, "channel empty"),
160 TryRecvError::Closed => write!(fmt, "channel closed"),
161 }
162 }
163 }
164
165 impl std::error::Error for TryRecvError {}
166}
167
168use futures_core::ready;
169
170use self::error::*;
171
172struct Inner<T> {
173 /// Manages the state of the inner cell
174 state: RefCell<usize>,
175
176 /// The value. This is set by `Sender` and read by `Receiver`. The state of
177 /// the cell is tracked by `state`.
178 value: UnsafeCell<Option<T>>,
179
180 /// The task to notify when the receiver drops without consuming the value.
181 tx_task: Task,
182
183 /// The task to notify when the value is sent.
184 rx_task: Task,
185}
186
187struct Task(UnsafeCell<MaybeUninit<Waker>>);
188
189impl Task {
190 unsafe fn will_wake(&self, cx: &mut Context<'_>) -> bool {
191 self.with_task(|w| w.will_wake(cx.waker()))
192 }
193
194 unsafe fn with_task<F, R>(&self, f: F) -> R
195 where
196 F: FnOnce(&Waker) -> R,
197 {
198 let ptr = self.0.get();
199 let waker: *const Waker = (&*ptr).as_ptr();
200 f(&*waker)
201 }
202
203 unsafe fn drop_task(&self) {
204 let ptr: *mut Waker = (&mut *self.0.get()).as_mut_ptr();
205 ptr.drop_in_place();
206 }
207
208 unsafe fn set_task(&self, cx: &mut Context<'_>) {
209 let ptr: *mut Waker = (&mut *self.0.get()).as_mut_ptr();
210 ptr.write(cx.waker().clone());
211 }
212}
213
214#[derive(Clone, Copy)]
215struct State(usize);
216
217/// Create a new one-shot channel for sending single values across asynchronous
218/// tasks.
219///
220/// The function returns separate "send" and "receive" handles. The `Sender`
221/// handle is used by the producer to send the value. The `Receiver` handle is
222/// used by the consumer to receive the value.
223///
224/// Each handle can be used on separate tasks.
225///
226/// # Examples
227///
228/// ```
229/// use local_sync::oneshot;
230///
231/// #[monoio::main]
232/// async fn main() {
233/// let (tx, rx) = oneshot::channel();
234///
235/// monoio::spawn(async move {
236/// if let Err(_) = tx.send(3) {
237/// println!("the receiver dropped");
238/// }
239/// });
240///
241/// match rx.await {
242/// Ok(v) => println!("got = {:?}", v),
243/// Err(_) => println!("the sender dropped"),
244/// }
245/// }
246/// ```
247pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
248 let inner = Rc::new(Inner {
249 state: RefCell::new(State::new().as_usize()),
250 value: UnsafeCell::new(None),
251 tx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
252 rx_task: Task(UnsafeCell::new(MaybeUninit::uninit())),
253 });
254
255 let tx = Sender {
256 inner: Some(inner.clone()),
257 };
258 let rx = Receiver { inner: Some(inner) };
259
260 (tx, rx)
261}
262
263impl<T> Sender<T> {
264 /// Attempts to send a value on this channel, returning it back if it could
265 /// not be sent.
266 ///
267 /// This method consumes `self` as only one value may ever be sent on a oneshot
268 /// channel. It is not marked async because sending a message to an oneshot
269 /// channel never requires any form of waiting. Because of this, the `send`
270 /// method can be used in both synchronous and asynchronous code without
271 /// problems.
272 ///
273 /// A successful send occurs when it is determined that the other end of the
274 /// channel has not hung up already. An unsuccessful send would be one where
275 /// the corresponding receiver has already been deallocated. Note that a
276 /// return value of `Err` means that the data will never be received, but
277 /// a return value of `Ok` does *not* mean that the data will be received.
278 /// It is possible for the corresponding receiver to hang up immediately
279 /// after this function returns `Ok`.
280 ///
281 /// # Examples
282 ///
283 /// Send a value to another task
284 ///
285 /// ```
286 /// use local_sync::oneshot;
287 ///
288 /// #[monoio::main]
289 /// async fn main() {
290 /// let (tx, rx) = oneshot::channel();
291 ///
292 /// monoio::spawn(async move {
293 /// if let Err(_) = tx.send(3) {
294 /// println!("the receiver dropped");
295 /// }
296 /// });
297 ///
298 /// match rx.await {
299 /// Ok(v) => println!("got = {:?}", v),
300 /// Err(_) => println!("the sender dropped"),
301 /// }
302 /// }
303 /// ```
304 pub fn send(mut self, t: T) -> Result<(), T> {
305 let inner = self.inner.take().unwrap();
306 let ptr = inner.value.get();
307 unsafe {
308 *ptr = Some(t);
309 }
310
311 if !inner.complete() {
312 unsafe {
313 return Err(inner.consume_value().unwrap());
314 }
315 }
316
317 Ok(())
318 }
319
320 /// Waits for the associated [`Receiver`] handle to close.
321 ///
322 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
323 /// [`Receiver`] value is dropped.
324 ///
325 /// This function is useful when paired with `select!` to abort a
326 /// computation when the receiver is no longer interested in the result.
327 ///
328 /// # Return
329 ///
330 /// Returns a `Future` which must be awaited on.
331 ///
332 /// [`Receiver`]: Receiver
333 /// [`close`]: Receiver::close
334 ///
335 /// # Examples
336 ///
337 /// Basic usage
338 ///
339 /// ```
340 /// use local_sync::oneshot;
341 ///
342 /// #[monoio::main]
343 /// async fn main() {
344 /// let (mut tx, rx) = oneshot::channel::<()>();
345 ///
346 /// monoio::spawn(async move {
347 /// drop(rx);
348 /// });
349 ///
350 /// tx.closed().await;
351 /// println!("the receiver dropped");
352 /// }
353 /// ```
354 ///
355 /// Paired with select
356 ///
357 /// ```
358 /// use local_sync::oneshot;
359 /// use monoio::time::{self, Duration};
360 ///
361 /// async fn compute() -> String {
362 /// // Complex computation returning a `String`
363 /// # "hello".to_string()
364 /// }
365 ///
366 /// #[monoio::main]
367 /// async fn main() {
368 /// let (mut tx, rx) = oneshot::channel();
369 ///
370 /// monoio::spawn(async move {
371 /// monoio::select! {
372 /// _ = tx.closed() => {
373 /// // The receiver dropped, no need to do any further work
374 /// }
375 /// value = compute() => {
376 /// // The send can fail if the channel was closed at the exact same
377 /// // time as when compute() finished, so just ignore the failure.
378 /// let _ = tx.send(value);
379 /// }
380 /// }
381 /// });
382 ///
383 /// // Wait for up to 10 seconds
384 /// let _ = time::timeout(Duration::from_secs(10), rx).await;
385 /// }
386 /// ```
387 pub async fn closed(&mut self) {
388 use futures_util::future::poll_fn;
389
390 poll_fn(|cx| self.poll_closed(cx)).await
391 }
392
393 /// Returns `true` if the associated [`Receiver`] handle has been dropped.
394 ///
395 /// A [`Receiver`] is closed by either calling [`close`] explicitly or the
396 /// [`Receiver`] value is dropped.
397 ///
398 /// If `true` is returned, a call to `send` will always result in an error.
399 ///
400 /// [`Receiver`]: Receiver
401 /// [`close`]: Receiver::close
402 ///
403 /// # Examples
404 ///
405 /// ```
406 /// use local_sync::oneshot;
407 ///
408 /// #[monoio::main]
409 /// async fn main() {
410 /// let (tx, rx) = oneshot::channel();
411 ///
412 /// assert!(!tx.is_closed());
413 ///
414 /// drop(rx);
415 ///
416 /// assert!(tx.is_closed());
417 /// assert!(tx.send("never received").is_err());
418 /// }
419 /// ```
420 pub fn is_closed(&self) -> bool {
421 let inner = self.inner.as_ref().unwrap();
422
423 let state = State(*inner.state.borrow());
424 state.is_closed()
425 }
426
427 /// Check whether the oneshot channel has been closed, and if not, schedules the
428 /// `Waker` in the provided `Context` to receive a notification when the channel is
429 /// closed.
430 ///
431 /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
432 /// [`Receiver`] value is dropped.
433 ///
434 /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
435 /// to the most recent call will be scheduled to receive a wakeup.
436 ///
437 /// [`Receiver`]: struct@crate::sync::oneshot::Receiver
438 /// [`close`]: fn@crate::sync::oneshot::Receiver::close
439 ///
440 /// # Return value
441 ///
442 /// This function returns:
443 ///
444 /// * `Poll::Pending` if the channel is still open.
445 /// * `Poll::Ready(())` if the channel is closed.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// use local_sync::oneshot;
451 ///
452 /// use futures_util::future::poll_fn;
453 ///
454 /// #[monoio::main]
455 /// async fn main() {
456 /// let (mut tx, mut rx) = oneshot::channel::<()>();
457 ///
458 /// monoio::spawn(async move {
459 /// rx.close();
460 /// });
461 ///
462 /// poll_fn(|cx| tx.poll_closed(cx)).await;
463 ///
464 /// println!("the receiver dropped");
465 /// }
466 /// ```
467 pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
468 let inner = self.inner.as_ref().unwrap();
469
470 let mut state = State(*inner.state.borrow());
471
472 if state.is_closed() {
473 return Poll::Ready(());
474 }
475
476 if state.is_tx_task_set() {
477 let will_notify = unsafe { inner.tx_task.will_wake(cx) };
478
479 if !will_notify {
480 state = State::unset_tx_task(&inner.state);
481
482 if state.is_closed() {
483 // Set the flag again so that the waker is released in drop
484 State::set_tx_task(&inner.state);
485 return Ready(());
486 } else {
487 unsafe { inner.tx_task.drop_task() };
488 }
489 }
490 }
491
492 if !state.is_tx_task_set() {
493 // Attempt to set the task
494 unsafe {
495 inner.tx_task.set_task(cx);
496 }
497
498 // Update the state
499 state = State::set_tx_task(&inner.state);
500
501 if state.is_closed() {
502 return Ready(());
503 }
504 }
505
506 Pending
507 }
508}
509
510impl<T> Drop for Sender<T> {
511 fn drop(&mut self) {
512 if let Some(inner) = self.inner.as_ref() {
513 inner.complete();
514 }
515 }
516}
517
518impl<T> Receiver<T> {
519 /// Prevents the associated [`Sender`] handle from sending a value.
520 ///
521 /// Any `send` operation which happens after calling `close` is guaranteed
522 /// to fail. After calling `close`, [`try_recv`] should be called to
523 /// receive a value if one was sent **before** the call to `close`
524 /// completed.
525 ///
526 /// This function is useful to perform a graceful shutdown and ensure that a
527 /// value will not be sent into the channel and never received.
528 ///
529 /// `close` is no-op if a message is already received or the channel
530 /// is already closed.
531 ///
532 /// [`Sender`]: Sender
533 /// [`try_recv`]: Receiver::try_recv
534 ///
535 /// # Examples
536 ///
537 /// Prevent a value from being sent
538 ///
539 /// ```
540 /// use local_sync::oneshot;
541 /// use local_sync::oneshot::error::TryRecvError;
542 ///
543 /// #[monoio::main]
544 /// async fn main() {
545 /// let (tx, mut rx) = oneshot::channel();
546 ///
547 /// assert!(!tx.is_closed());
548 ///
549 /// rx.close();
550 ///
551 /// assert!(tx.is_closed());
552 /// assert!(tx.send("never received").is_err());
553 ///
554 /// match rx.try_recv() {
555 /// Err(TryRecvError::Closed) => {}
556 /// _ => unreachable!(),
557 /// }
558 /// }
559 /// ```
560 ///
561 /// Receive a value sent **before** calling `close`
562 ///
563 /// ```
564 /// use local_sync::oneshot;
565 ///
566 /// #[monoio::main]
567 /// async fn main() {
568 /// let (tx, mut rx) = oneshot::channel();
569 ///
570 /// assert!(tx.send("will receive").is_ok());
571 ///
572 /// rx.close();
573 ///
574 /// let msg = rx.try_recv().unwrap();
575 /// assert_eq!(msg, "will receive");
576 /// }
577 /// ```
578 pub fn close(&mut self) {
579 if let Some(inner) = self.inner.as_ref() {
580 inner.close();
581 }
582 }
583
584 /// Attempts to receive a value.
585 ///
586 /// If a pending value exists in the channel, it is returned. If no value
587 /// has been sent, the current task **will not** be registered for
588 /// future notification.
589 ///
590 /// This function is useful to call from outside the context of an
591 /// asynchronous task.
592 ///
593 /// # Return
594 ///
595 /// - `Ok(T)` if a value is pending in the channel.
596 /// - `Err(TryRecvError::Empty)` if no value has been sent yet.
597 /// - `Err(TryRecvError::Closed)` if the sender has dropped without sending
598 /// a value.
599 ///
600 /// # Examples
601 ///
602 /// `try_recv` before a value is sent, then after.
603 ///
604 /// ```
605 /// use local_sync::oneshot;
606 /// use local_sync::oneshot::error::TryRecvError;
607 ///
608 /// #[monoio::main]
609 /// async fn main() {
610 /// let (tx, mut rx) = oneshot::channel();
611 ///
612 /// match rx.try_recv() {
613 /// // The channel is currently empty
614 /// Err(TryRecvError::Empty) => {}
615 /// _ => unreachable!(),
616 /// }
617 ///
618 /// // Send a value
619 /// tx.send("hello").unwrap();
620 ///
621 /// match rx.try_recv() {
622 /// Ok(value) => assert_eq!(value, "hello"),
623 /// _ => unreachable!(),
624 /// }
625 /// }
626 /// ```
627 ///
628 /// `try_recv` when the sender dropped before sending a value
629 ///
630 /// ```
631 /// use local_sync::oneshot;
632 /// use local_sync::oneshot::error::TryRecvError;
633 ///
634 /// #[monoio::main]
635 /// async fn main() {
636 /// let (tx, mut rx) = oneshot::channel::<()>();
637 ///
638 /// drop(tx);
639 ///
640 /// match rx.try_recv() {
641 /// // The channel will never receive a value.
642 /// Err(TryRecvError::Closed) => {}
643 /// _ => unreachable!(),
644 /// }
645 /// }
646 /// ```
647 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
648 let result = if let Some(inner) = self.inner.as_ref() {
649 let state = State(*inner.state.borrow());
650
651 if state.is_complete() {
652 match unsafe { inner.consume_value() } {
653 Some(value) => Ok(value),
654 None => Err(TryRecvError::Closed),
655 }
656 } else if state.is_closed() {
657 Err(TryRecvError::Closed)
658 } else {
659 // Not ready, this does not clear `inner`
660 return Err(TryRecvError::Empty);
661 }
662 } else {
663 Err(TryRecvError::Closed)
664 };
665
666 self.inner = None;
667 result
668 }
669}
670
671impl<T> Drop for Receiver<T> {
672 fn drop(&mut self) {
673 if let Some(inner) = self.inner.as_ref() {
674 inner.close();
675 }
676 }
677}
678
679impl<T> Future for Receiver<T> {
680 type Output = Result<T, RecvError>;
681
682 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
683 // If `inner` is `None`, then `poll()` has already completed.
684 let ret = if let Some(inner) = self.as_ref().get_ref().inner.as_ref() {
685 ready!(inner.poll_recv(cx))?
686 } else {
687 panic!("called after complete");
688 };
689
690 self.inner = None;
691 Ready(Ok(ret))
692 }
693}
694
695impl<T> Inner<T> {
696 fn complete(&self) -> bool {
697 let prev = State::set_complete(&self.state);
698
699 if prev.is_closed() {
700 return false;
701 }
702
703 if prev.is_rx_task_set() {
704 // TODO: Consume waker?
705 unsafe {
706 self.rx_task.with_task(Waker::wake_by_ref);
707 }
708 }
709
710 true
711 }
712
713 fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
714 // Load the state
715 let mut state = State(*self.state.borrow());
716
717 if state.is_complete() {
718 match unsafe { self.consume_value() } {
719 Some(value) => Ready(Ok(value)),
720 None => Ready(Err(RecvError(()))),
721 }
722 } else if state.is_closed() {
723 Ready(Err(RecvError(())))
724 } else {
725 if state.is_rx_task_set() {
726 let will_notify = unsafe { self.rx_task.will_wake(cx) };
727
728 // Check if the task is still the same
729 if !will_notify {
730 // Unset the task
731 state = State::unset_rx_task(&self.state);
732 if state.is_complete() {
733 // Set the flag again so that the waker is released in drop
734 State::set_rx_task(&self.state);
735
736 return match unsafe { self.consume_value() } {
737 Some(value) => Ready(Ok(value)),
738 None => Ready(Err(RecvError(()))),
739 };
740 } else {
741 unsafe { self.rx_task.drop_task() };
742 }
743 }
744 }
745
746 if !state.is_rx_task_set() {
747 // Attempt to set the task
748 unsafe {
749 self.rx_task.set_task(cx);
750 }
751
752 // Update the state
753 state = State::set_rx_task(&self.state);
754
755 if state.is_complete() {
756 match unsafe { self.consume_value() } {
757 Some(value) => Ready(Ok(value)),
758 None => Ready(Err(RecvError(()))),
759 }
760 } else {
761 Pending
762 }
763 } else {
764 Pending
765 }
766 }
767 }
768
769 /// Called by `Receiver` to indicate that the value will never be received.
770 fn close(&self) {
771 let prev = State::set_closed(&self.state);
772
773 if prev.is_tx_task_set() && !prev.is_complete() {
774 unsafe {
775 self.tx_task.with_task(Waker::wake_by_ref);
776 }
777 }
778 }
779
780 /// Consumes the value. This function does not check `state`.
781 unsafe fn consume_value(&self) -> Option<T> {
782 let ptr = self.value.get();
783 (*ptr).take()
784 }
785}
786
787impl<T> Drop for Inner<T> {
788 fn drop(&mut self) {
789 let state = State(*self.state.borrow());
790
791 if state.is_rx_task_set() {
792 unsafe {
793 self.rx_task.drop_task();
794 }
795 }
796
797 if state.is_tx_task_set() {
798 unsafe {
799 self.tx_task.drop_task();
800 }
801 }
802 }
803}
804
805impl<T: fmt::Debug> fmt::Debug for Inner<T> {
806 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
807 fmt.debug_struct("Inner")
808 .field("state", &self.state.borrow())
809 .finish()
810 }
811}
812
813const RX_TASK_SET: usize = 0b00001;
814const VALUE_SENT: usize = 0b00010;
815const CLOSED: usize = 0b00100;
816const TX_TASK_SET: usize = 0b01000;
817
818impl State {
819 fn new() -> State {
820 State(0)
821 }
822
823 fn is_complete(self) -> bool {
824 self.0 & VALUE_SENT == VALUE_SENT
825 }
826
827 fn set_complete(cell: &RefCell<usize>) -> State {
828 let mut val = cell.borrow_mut();
829 *val |= VALUE_SENT;
830 State(*val)
831 }
832
833 fn is_rx_task_set(self) -> bool {
834 self.0 & RX_TASK_SET == RX_TASK_SET
835 }
836
837 fn set_rx_task(cell: &RefCell<usize>) -> State {
838 let mut val = cell.borrow_mut();
839 *val |= RX_TASK_SET;
840 State(*val)
841 }
842
843 fn unset_rx_task(cell: &RefCell<usize>) -> State {
844 let mut val = cell.borrow_mut();
845 *val &= !RX_TASK_SET;
846 State(*val)
847 }
848
849 fn is_closed(self) -> bool {
850 self.0 & CLOSED == CLOSED
851 }
852
853 fn set_closed(cell: &RefCell<usize>) -> State {
854 // Acquire because we want all later writes (attempting to poll) to be
855 // ordered after this.
856 let mut val = cell.borrow_mut();
857 *val |= CLOSED;
858 State(*val)
859 }
860
861 fn set_tx_task(cell: &RefCell<usize>) -> State {
862 let mut val = cell.borrow_mut();
863 *val |= TX_TASK_SET;
864 State(*val)
865 }
866
867 fn unset_tx_task(cell: &RefCell<usize>) -> State {
868 let mut val = cell.borrow_mut();
869 *val &= !TX_TASK_SET;
870 State(*val)
871 }
872
873 fn is_tx_task_set(self) -> bool {
874 self.0 & TX_TASK_SET == TX_TASK_SET
875 }
876
877 fn as_usize(self) -> usize {
878 self.0
879 }
880}
881
882impl fmt::Debug for State {
883 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
884 fmt.debug_struct("State")
885 .field("is_complete", &self.is_complete())
886 .field("is_closed", &self.is_closed())
887 .field("is_rx_task_set", &self.is_rx_task_set())
888 .field("is_tx_task_set", &self.is_tx_task_set())
889 .finish()
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::channel;
896
897 #[monoio::test]
898 async fn it_works() {
899 let (tx, rx) = channel();
900 let join = monoio::spawn(async move { rx.await });
901 tx.send(1).unwrap();
902 assert_eq!(join.await.unwrap(), 1);
903 }
904}