async_transmit/transmit/
from_sink.rs

1use async_trait::async_trait;
2use futures_sink::Sink;
3use futures_util::sink::SinkExt;
4use std::marker::PhantomData;
5
6use crate::transmit::Transmit;
7
8#[derive(Debug)]
9pub struct FromSink<S, I> {
10    sink: S,
11    phantom: PhantomData<I>,
12}
13
14impl<S, I> FromSink<S, I> {
15    fn new(sink: S) -> Self {
16        Self {
17            sink,
18            phantom: PhantomData,
19        }
20    }
21
22    /// Consumes this transmit, returning the underlying sink.
23    pub fn into_inner(self) -> S {
24        self.sink
25    }
26
27    /// Acquires a reference to the underlying sink that this
28    /// transmit is pulling from.
29    pub fn get_ref(&self) -> &S {
30        &self.sink
31    }
32
33    /// Acquires a mutable reference to the underlying sink that
34    /// this transmit is pulling from.
35    pub fn get_mut(&mut self) -> &mut S {
36        &mut self.sink
37    }
38}
39
40#[async_trait]
41impl<S, I> Transmit for FromSink<S, I>
42where
43    I: Send,
44    S: Sink<I> + Unpin + Send,
45    S::Error: Send,
46{
47    type Item = I;
48    type Error = S::Error;
49
50    async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error> {
51        SinkExt::send(&mut self.sink, item).await?;
52        Ok(())
53    }
54}
55
56impl<S, I> From<S> for FromSink<S, I>
57where
58    S: Sink<I>,
59{
60    fn from(sink: S) -> Self {
61        Self::new(sink)
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::super::assert_transmit;
68    use super::*;
69
70    use anyhow::Result;
71    use futures::channel::mpsc;
72    use futures::prelude::*;
73    use futures_await_test::async_test;
74
75    #[async_test]
76    async fn from_sink_is_transmit() -> Result<()> {
77        let (s, mut r) = mpsc::unbounded::<&'static str>();
78
79        let mut t = assert_transmit(FromSink::from(s));
80        t.transmit("Hello").await?;
81        t.transmit("World").await?;
82        drop(t);
83        assert_eq!(r.next().await, Some("Hello"));
84        assert_eq!(r.next().await, Some("World"));
85        assert_eq!(r.next().await, None);
86
87        Ok(())
88    }
89}