async_transmit/
transmit.rs1use async_trait::async_trait;
2
3#[must_use = "transmit do nothing unless polled"]
11#[async_trait]
12pub trait Transmit {
13 type Item;
14 type Error;
15
16 async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
18 where
19 Self::Item: 'async_trait;
20}
21
22#[async_trait]
23impl<T> Transmit for &mut T
24where
25 T: Transmit + Send + ?Sized,
26 T::Item: Send,
27{
28 type Item = T::Item;
29 type Error = T::Error;
30
31 async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
32 where
33 Self::Item: 'async_trait,
34 {
35 let this = &mut **self;
36 this.transmit(item).await
37 }
38}
39
40#[allow(dead_code)]
42pub(crate) fn assert_transmit<I, E, T>(t: T) -> T
43where
44 T: Transmit<Item = I, Error = E>,
45{
46 t
47}
48
49#[cfg(feature = "with-async-channel")]
50mod async_channel;
51#[cfg(feature = "with-async-channel")]
52pub use self::async_channel::*;
53
54#[cfg(feature = "with-tokio")]
55mod tokio;
56#[cfg(feature = "with-tokio")]
57pub use self::tokio::*;
58
59#[cfg(feature = "with-sink")]
60mod from_sink;
61#[cfg(feature = "with-sink")]
62pub use from_sink::FromSink;
63
64mod with;
65pub use with::With;
66
67mod transmit_map_err;
68pub use transmit_map_err::TransmitMapErr;
69
70impl<T: ?Sized> TransmitExt for T where T: Transmit {}
71
72#[cfg(feature = "with-sink")]
75pub fn from_sink<S, I>(sink: S) -> FromSink<S, I>
76where
77 S: futures_sink::Sink<I> + Unpin + Send,
78 I: Send,
79 S::Error: Send,
80{
81 assert_transmit::<I, S::Error, _>(from_sink::FromSink::from(sink))
82}
83
84pub trait TransmitExt: Transmit {
87 fn with<F, U>(self, f: F) -> with::With<Self, F, Self::Item, U, Self::Error>
88 where
89 Self: Sized + Send,
90 Self::Item: Send,
91 Self::Error: Send,
92 F: FnMut(U) -> Self::Item + Send,
93 U: Send,
94 {
95 assert_transmit::<U, Self::Error, _>(with::With::new(self, f))
96 }
97
98 fn transmit_map_err<E, F>(self, f: F) -> TransmitMapErr<Self, F>
99 where
100 Self: Sized + Send,
101 Self::Item: Send,
102 F: FnOnce(Self::Error) -> E + Send,
103 {
104 assert_transmit::<Self::Item, E, _>(TransmitMapErr::new(self, f))
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::assert_transmit;
111 use super::*;
112
113 use anyhow::Result;
114 use futures::channel::mpsc;
115 use futures::prelude::*;
116 use futures_await_test::async_test;
117
118 #[test]
119 fn transmit_mut_ref_ok() -> Result<()> {
120 struct DummyTransmitter {}
121
122 #[async_trait]
123 impl Transmit for DummyTransmitter {
124 type Item = String;
125 type Error = anyhow::Error;
126
127 async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
128 where
129 Self::Item: 'async_trait,
130 {
131 unimplemented!();
132 }
133 }
134
135 let mut t = DummyTransmitter {};
136 let mut_t = &mut t;
137
138 assert_transmit(mut_t);
139
140 Ok(())
141 }
142
143 #[cfg(feature = "with-sink")]
144 #[async_test]
145 async fn transmit_ext_from_sink_is_transmit() -> Result<()> {
146 let (s, mut r) = mpsc::unbounded::<&'static str>();
147
148 let mut t = assert_transmit(from_sink(s));
149 t.transmit("Hello").await?;
150 t.transmit("World").await?;
151 drop(t);
152 assert_eq!(r.next().await, Some("Hello"));
153 assert_eq!(r.next().await, Some("World"));
154 assert_eq!(r.next().await, None);
155
156 Ok(())
157 }
158
159 #[async_test]
160 async fn transmit_ext_with_is_transmit() -> Result<()> {
161 let (s, mut r) = mpsc::unbounded::<String>();
162
163 struct DummyTransmitter<T> {
164 sender: mpsc::UnboundedSender<T>,
165 }
166
167 #[async_trait]
168 impl<T> Transmit for DummyTransmitter<T>
169 where
170 T: Send,
171 {
172 type Item = T;
173 type Error = anyhow::Error;
174
175 async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error>
176 where
177 Self::Item: 'async_trait,
178 {
179 self.sender.send(item).await.map_err(Into::into)
180 }
181 }
182
183 let t = assert_transmit(DummyTransmitter { sender: s });
184 let mut t = t.with(|s| format!("!!!{}!!!", s));
185 t.transmit("Hello").await?;
186 t.transmit("World").await?;
187 drop(t);
188 assert_eq!(r.next().await, Some("!!!Hello!!!".to_string()));
189 assert_eq!(r.next().await, Some("!!!World!!!".to_string()));
190 assert_eq!(r.next().await, None);
191
192 Ok(())
193 }
194
195 #[async_test]
196 async fn transmit_ext_transmit_map_err_is_transmit() -> Result<()> {
197 struct DummyTransmitter {}
198
199 #[async_trait]
200 impl Transmit for DummyTransmitter {
201 type Item = &'static str;
202 type Error = String;
203
204 async fn transmit(&mut self, _item: Self::Item) -> Result<(), Self::Error>
205 where
206 Self::Item: 'async_trait,
207 {
208 Err("Hello World".to_string())
209 }
210 }
211
212 let mut t = assert_transmit(DummyTransmitter {});
213 assert_eq!(
214 "Hello World".to_string(),
215 t.transmit("Hello").await.err().unwrap()
216 );
217 let mut t = t.transmit_map_err(|e| anyhow::anyhow!("!!!{}!!!", e));
218 assert_eq!(
219 "!!!Hello World!!!",
220 format!("{:?}", t.transmit("Hello").await.err().unwrap())
221 );
222
223 Ok(())
224 }
225}