async_transmit/
transmit.rs

1use async_trait::async_trait;
2
3/// The `Transmit` trait allows for transmitting item to a peer.
4///
5/// Implementors of the `Transmit` trait are called 'transmitters'.
6///
7/// Transmitters are defined by one required method, [`transmit()`].
8/// The method will attempt to transmit some data to a peer asynchronously, returning
9/// if the transmission has succeeded.
10#[must_use = "transmit do nothing unless polled"]
11#[async_trait]
12pub trait Transmit {
13    type Item;
14    type Error;
15
16    /// Attempts to transmit a value to the peer asynchronously.
17    async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
18    where
19        Self::Item: 'async_trait;
20}
21
22#[async_trait]
23impl<T> Transmit for &mut T
24where
25    T: Transmit + Send + ?Sized,
26    T::Item: Send,
27{
28    type Item = T::Item;
29    type Error = T::Error;
30
31    async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
32    where
33        Self::Item: 'async_trait,
34    {
35        let this = &mut **self;
36        this.transmit(item).await
37    }
38}
39
40/// A helper function to make sure the value is 'Transmit'
41#[allow(dead_code)]
42pub(crate) fn assert_transmit<I, E, T>(t: T) -> T
43where
44    T: Transmit<Item = I, Error = E>,
45{
46    t
47}
48
49#[cfg(feature = "with-async-channel")]
50mod async_channel;
51#[cfg(feature = "with-async-channel")]
52pub use self::async_channel::*;
53
54#[cfg(feature = "with-tokio")]
55mod tokio;
56#[cfg(feature = "with-tokio")]
57pub use self::tokio::*;
58
59#[cfg(feature = "with-sink")]
60mod from_sink;
61#[cfg(feature = "with-sink")]
62pub use from_sink::FromSink;
63
64mod with;
65pub use with::With;
66
67mod transmit_map_err;
68pub use transmit_map_err::TransmitMapErr;
69
70impl<T: ?Sized> TransmitExt for T where T: Transmit {}
71
72/// Create `FromSink` object which implements `Transmit` trait from an object which implements
73/// `futures::sink::Sink`.
74#[cfg(feature = "with-sink")]
75pub fn from_sink<S, I>(sink: S) -> FromSink<S, I>
76where
77    S: futures_sink::Sink<I> + Unpin + Send,
78    I: Send,
79    S::Error: Send,
80{
81    assert_transmit::<I, S::Error, _>(from_sink::FromSink::from(sink))
82}
83
84/// An extension trait for `Transmit`s that provides a variety of convenient
85/// functions.
86pub trait TransmitExt: Transmit {
87    fn with<F, U>(self, f: F) -> with::With<Self, F, Self::Item, U, Self::Error>
88    where
89        Self: Sized + Send,
90        Self::Item: Send,
91        Self::Error: Send,
92        F: FnMut(U) -> Self::Item + Send,
93        U: Send,
94    {
95        assert_transmit::<U, Self::Error, _>(with::With::new(self, f))
96    }
97
98    fn transmit_map_err<E, F>(self, f: F) -> TransmitMapErr<Self, F>
99    where
100        Self: Sized + Send,
101        Self::Item: Send,
102        F: FnOnce(Self::Error) -> E + Send,
103    {
104        assert_transmit::<Self::Item, E, _>(TransmitMapErr::new(self, f))
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::assert_transmit;
111    use super::*;
112
113    use anyhow::Result;
114    use futures::channel::mpsc;
115    use futures::prelude::*;
116    use futures_await_test::async_test;
117
118    #[test]
119    fn transmit_mut_ref_ok() -> Result<()> {
120        struct DummyTransmitter {}
121
122        #[async_trait]
123        impl Transmit for DummyTransmitter {
124            type Item = String;
125            type Error = anyhow::Error;
126
127            async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
128            where
129                Self::Item: 'async_trait,
130            {
131                unimplemented!();
132            }
133        }
134
135        let mut t = DummyTransmitter {};
136        let mut_t = &mut t;
137
138        assert_transmit(mut_t);
139
140        Ok(())
141    }
142
143    #[cfg(feature = "with-sink")]
144    #[async_test]
145    async fn transmit_ext_from_sink_is_transmit() -> Result<()> {
146        let (s, mut r) = mpsc::unbounded::<&'static str>();
147
148        let mut t = assert_transmit(from_sink(s));
149        t.transmit("Hello").await?;
150        t.transmit("World").await?;
151        drop(t);
152        assert_eq!(r.next().await, Some("Hello"));
153        assert_eq!(r.next().await, Some("World"));
154        assert_eq!(r.next().await, None);
155
156        Ok(())
157    }
158
159    #[async_test]
160    async fn transmit_ext_with_is_transmit() -> Result<()> {
161        let (s, mut r) = mpsc::unbounded::<String>();
162
163        struct DummyTransmitter<T> {
164            sender: mpsc::UnboundedSender<T>,
165        }
166
167        #[async_trait]
168        impl<T> Transmit for DummyTransmitter<T>
169        where
170            T: Send,
171        {
172            type Item = T;
173            type Error = anyhow::Error;
174
175            async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
176            where
177                Self::Item: 'async_trait,
178            {
179                self.sender.send(item).await.map_err(Into::into)
180            }
181        }
182
183        let t = assert_transmit(DummyTransmitter { sender: s });
184        let mut t = t.with(|s| format!("!!!{}!!!", s));
185        t.transmit("Hello").await?;
186        t.transmit("World").await?;
187        drop(t);
188        assert_eq!(r.next().await, Some("!!!Hello!!!".to_string()));
189        assert_eq!(r.next().await, Some("!!!World!!!".to_string()));
190        assert_eq!(r.next().await, None);
191
192        Ok(())
193    }
194
195    #[async_test]
196    async fn transmit_ext_transmit_map_err_is_transmit() -> Result<()> {
197        struct DummyTransmitter {}
198
199        #[async_trait]
200        impl Transmit for DummyTransmitter {
201            type Item = &'static str;
202            type Error = String;
203
204            async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
205            where
206                Self::Item: 'async_trait,
207            {
208                Err("Hello World".to_string())
209            }
210        }
211
212        let mut t = assert_transmit(DummyTransmitter {});
213        assert_eq!(
214            "Hello World".to_string(),
215            t.transmit("Hello").await.err().unwrap()
216        );
217        let mut t = t.transmit_map_err(|e| anyhow::anyhow!("!!!{}!!!", e));
218        assert_eq!(
219            "!!!Hello World!!!",
220            format!("{:?}", t.transmit("Hello").await.err().unwrap())
221        );
222
223        Ok(())
224    }
225}