1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/// inner module, used to group feature-specific imports
#[cfg(async_channel_impl = "tokio")]
mod inner {
    pub use tokio::sync::oneshot::{error::TryRecvError as OneShotTryRecvError, Receiver, Sender};

    // There is a `RecvError` in tokio but we don't use it because we can't create an instance of `RecvError` so we
    // can't turn a `TryRecvError` into a `RecvError`.

    /// Error that occurs when the [`OneShotReceiver`] could not receive a message.
    #[derive(Debug, PartialEq, Eq)]
    pub struct OneShotRecvError;

    impl From<tokio::sync::oneshot::error::RecvError> for OneShotRecvError {
        fn from(_: tokio::sync::oneshot::error::RecvError) -> Self {
            Self
        }
    }

    impl std::fmt::Display for OneShotRecvError {
        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(fmt, stringify!(OneShotRecvError))
        }
    }

    impl std::error::Error for OneShotRecvError {}

    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotSender<T>(pub(super) Sender<T>);
    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);

    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
    ///
    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
    #[must_use]
    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
        let (sender, receiver) = tokio::sync::oneshot::channel();
        (OneShotSender(sender), OneShotReceiver(receiver))
    }
}

/// inner module, used to group feature-specific imports
#[cfg(async_channel_impl = "flume")]
mod inner {
    use flume::{Receiver, Sender};
    pub use flume::{RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError};

    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotSender<T>(pub(super) Sender<T>);
    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);

    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
    ///
    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
    #[must_use]
    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
        let (sender, receiver) = flume::bounded(1);
        (OneShotSender(sender), OneShotReceiver(receiver))
    }
}

/// inner module, used to group feature-specific imports
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
mod inner {
    use async_std::channel::{Receiver, Sender};
    pub use async_std::channel::{
        RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError,
    };

    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotSender<T>(pub(super) Sender<T>);
    /// A single-use sender, created with [`oneshot`]
    pub struct OneShotReceiver<T>(pub(super) Receiver<T>);

    /// Create a single-use channel. Under water this might be implemented as a `bounded(1)`, but more efficient implementations might exist.
    ///
    /// Both the sender and receiver take `self` instead of `&self` or `&mut self`, so they can only be used once.
    #[must_use]
    pub fn oneshot<T>() -> (OneShotSender<T>, OneShotReceiver<T>) {
        let (sender, receiver) = async_std::channel::bounded(1);
        (OneShotSender(sender), OneShotReceiver(receiver))
    }
}

pub use inner::*;

impl<T> OneShotSender<T> {
    /// Send a message on this sender.
    ///
    /// If this fails because the receiver is dropped, a warning will be printed.
    pub fn send(self, msg: T) {
        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
        if self.0.try_send(msg).is_err() {
            tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
        }
        #[cfg(any(async_channel_impl = "flume", async_channel_impl = "tokio"))]
        if self.0.send(msg).is_err() {
            tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
        }
    }
}

impl<T> OneShotReceiver<T> {
    /// Receive a value from this oneshot receiver. If the sender is dropped, an error is returned.
    ///
    /// # Errors
    ///
    /// Will return an error if the sender channel is dropped
    pub async fn recv(self) -> Result<T, OneShotRecvError> {
        #[cfg(async_channel_impl = "tokio")]
        let result = self.0.await.map_err(Into::into);
        #[cfg(async_channel_impl = "flume")]
        let result = self.0.recv_async().await;
        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
        let result = self.0.recv().await;

        result
    }
}

// Debug impl
impl<T> std::fmt::Debug for OneShotSender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("OneShotSender").finish()
    }
}
impl<T> std::fmt::Debug for OneShotReceiver<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("OneShotReceiver").finish()
    }
}