Skip to main content

airio_core/utils/
rw_stream_sink.rs

1use futures::{AsyncRead, AsyncWrite, Sink, TryStream, ready};
2use std::{
3    io::{self, Read},
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8#[pin_project::pin_project]
9pub struct RwStreamSink<S: TryStream> {
10    #[pin]
11    inner: S,
12    current_item: Option<std::io::Cursor<<S as TryStream>::Ok>>,
13}
14
15impl<S: TryStream> RwStreamSink<S> {
16    /// Wraps around `inner`.
17    pub fn new(inner: S) -> Self {
18        RwStreamSink {
19            inner,
20            current_item: None,
21        }
22    }
23}
24
25impl<S> AsyncRead for RwStreamSink<S>
26where
27    S: TryStream<Error = io::Error>,
28    <S as TryStream>::Ok: AsRef<[u8]>,
29{
30    fn poll_read(
31        self: Pin<&mut Self>,
32        cx: &mut Context,
33        buf: &mut [u8],
34    ) -> Poll<io::Result<usize>> {
35        let mut this = self.project();
36
37        // Grab the item to copy from.
38        let item_to_copy = loop {
39            if let Some(i) = this.current_item {
40                if i.position() < i.get_ref().as_ref().len() as u64 {
41                    break i;
42                }
43            }
44            *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) {
45                Some(Ok(i)) => std::io::Cursor::new(i),
46                Some(Err(e)) => return Poll::Ready(Err(e)),
47                None => return Poll::Ready(Ok(0)), // EOF
48            });
49        };
50
51        // Copy it!
52        Poll::Ready(Ok(item_to_copy.read(buf)?))
53    }
54}
55
56impl<S> AsyncWrite for RwStreamSink<S>
57where
58    S: TryStream + Sink<<S as TryStream>::Ok, Error = io::Error>,
59    <S as TryStream>::Ok: for<'r> From<&'r [u8]>,
60{
61    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
62        let mut this = self.project();
63        ready!(this.inner.as_mut().poll_ready(cx)?);
64        let n = buf.len();
65        if let Err(e) = this.inner.start_send(buf.into()) {
66            return Poll::Ready(Err(e));
67        }
68        Poll::Ready(Ok(n))
69    }
70
71    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
72        let this = self.project();
73        this.inner.poll_flush(cx)
74    }
75
76    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
77        let this = self.project();
78        this.inner.poll_close(cx)
79    }
80}