async_transmit/transmit/
from_sink.rs1use async_trait::async_trait;
2use futures_sink::Sink;
3use futures_util::sink::SinkExt;
4use std::marker::PhantomData;
5
6use crate::transmit::Transmit;
7
8#[derive(Debug)]
9pub struct FromSink<S, I> {
10 sink: S,
11 phantom: PhantomData<I>,
12}
13
14impl<S, I> FromSink<S, I> {
15 fn new(sink: S) -> Self {
16 Self {
17 sink,
18 phantom: PhantomData,
19 }
20 }
21
22 pub fn into_inner(self) -> S {
24 self.sink
25 }
26
27 pub fn get_ref(&self) -> &S {
30 &self.sink
31 }
32
33 pub fn get_mut(&mut self) -> &mut S {
36 &mut self.sink
37 }
38}
39
40#[async_trait]
41impl<S, I> Transmit for FromSink<S, I>
42where
43 I: Send,
44 S: Sink<I> + Unpin + Send,
45 S::Error: Send,
46{
47 type Item = I;
48 type Error = S::Error;
49
50 async fn transmit(&mut self, item: Self::Item) -> Result<(), Self::Error> {
51 SinkExt::send(&mut self.sink, item).await?;
52 Ok(())
53 }
54}
55
56impl<S, I> From<S> for FromSink<S, I>
57where
58 S: Sink<I>,
59{
60 fn from(sink: S) -> Self {
61 Self::new(sink)
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::super::assert_transmit;
68 use super::*;
69
70 use anyhow::Result;
71 use futures::channel::mpsc;
72 use futures::prelude::*;
73 use futures_await_test::async_test;
74
75 #[async_test]
76 async fn from_sink_is_transmit() -> Result<()> {
77 let (s, mut r) = mpsc::unbounded::<&'static str>();
78
79 let mut t = assert_transmit(FromSink::from(s));
80 t.transmit("Hello").await?;
81 t.transmit("World").await?;
82 drop(t);
83 assert_eq!(r.next().await, Some("Hello"));
84 assert_eq!(r.next().await, Some("World"));
85 assert_eq!(r.next().await, None);
86
87 Ok(())
88 }
89}