async_compatibility_layer/channel/
oneshot.rs

1/// inner module, used to group feature-specific imports
2#[cfg(async_channel_impl = "tokio")]
3mod inner {
4    pub use tokio::sync::oneshot::{error::TryRecvError as OneShotTryRecvError, Receiver, Sender};
5
6    // There is a `RecvError` in tokio but we don't use it because we can't create an instance of `RecvError` so we
7    // can't turn a `TryRecvError` into a `RecvError`.
8
9    /// Error that occurs when the [`OneShotReceiver`] could not receive a message.
10    #[derive(Debug, PartialEq, Eq)]
11    pub struct OneShotRecvError;
12
13    impl From<tokio::sync::oneshot::error::RecvError> for OneShotRecvError {
14        fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
15            Self
16        }
17    }
18
19    impl std::fmt::Display for OneShotRecvError {
20        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21            write!(fmt, stringify!(OneShotRecvError))
22        }
23    }
24
25    impl std::error::Error for OneShotRecvError {}
26
27    /// A single-use sender, created with [`oneshot`]
28    pub struct OneShotSender<T>(pub(super) Sender<T>);
29    /// A single-use sender, created with [`oneshot`]
30    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);
31
32    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
33    ///
34    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
35    #[must_use]
36    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
37        let (sender, receiver) = tokio::sync::oneshot::channel();
38        (OneShotSender(sender), OneShotReceiver(receiver))
39    }
40}
41
42/// inner module, used to group feature-specific imports
43#[cfg(async_channel_impl = "flume")]
44mod inner {
45    use flume::{Receiver, Sender};
46    pub use flume::{RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError};
47
48    /// A single-use sender, created with [`oneshot`]
49    pub struct OneShotSender<T>(pub(super) Sender<T>);
50    /// A single-use sender, created with [`oneshot`]
51    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);
52
53    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
54    ///
55    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
56    #[must_use]
57    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
58        let (sender, receiver) = flume::bounded(1);
59        (OneShotSender(sender), OneShotReceiver(receiver))
60    }
61}
62
63/// inner module, used to group feature-specific imports
64#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
65mod inner {
66    use async_std::channel::{Receiver, Sender};
67    pub use async_std::channel::{
68        RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError,
69    };
70
71    /// A single-use sender, created with [`oneshot`]
72    pub struct OneShotSender<T>(pub(super) Sender<T>);
73    /// A single-use sender, created with [`oneshot`]
74    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);
75
76    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
77    ///
78    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
79    #[must_use]
80    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
81        let (sender, receiver) = async_std::channel::bounded(1);
82        (OneShotSender(sender), OneShotReceiver(receiver))
83    }
84}
85
86pub use inner::*;
87
88impl<T> OneShotSender<T> {
89    /// Send a message on this sender.
90    ///
91    /// If this fails because the receiver is dropped, a warning will be printed.
92    pub fn send(self, msg: T) {
93        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
94        if self.0.try_send(msg).is_err() {
95            tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
96        }
97        #[cfg(any(async_channel_impl = "flume", async_channel_impl = "tokio"))]
98        if self.0.send(msg).is_err() {
99            tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
100        }
101    }
102}
103
104impl<T> OneShotReceiver<T> {
105    /// Receive a value from this oneshot receiver. If the sender is dropped, an error is returned.
106    ///
107    /// # Errors
108    ///
109    /// Will return an error if the sender channel is dropped
110    pub async fn recv(self) -> Result<T, OneShotRecvError> {
111        #[cfg(async_channel_impl = "tokio")]
112        let result = self.0.await.map_err(Into::into);
113        #[cfg(async_channel_impl = "flume")]
114        let result = self.0.recv_async().await;
115        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
116        let result = self.0.recv().await;
117
118        result
119    }
120}
121
122// Debug impl
123impl<T> std::fmt::Debug for OneShotSender<T> {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("OneShotSender").finish()
126    }
127}
128impl<T> std::fmt::Debug for OneShotReceiver<T> {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct("OneShotReceiver").finish()
131    }
132}