airio_core/utils/
rw_stream_sink.rs1use futures::{AsyncRead, AsyncWrite, Sink, TryStream, ready};
2use std::{
3 io::{self, Read},
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8#[pin_project::pin_project]
9pub struct RwStreamSink<S: TryStream> {
10 #[pin]
11 inner: S,
12 current_item: Option<std::io::Cursor<<S as TryStream>::Ok>>,
13}
14
15impl<S: TryStream> RwStreamSink<S> {
16 pub fn new(inner: S) -> Self {
18 RwStreamSink {
19 inner,
20 current_item: None,
21 }
22 }
23}
24
25impl<S> AsyncRead for RwStreamSink<S>
26where
27 S: TryStream<Error = io::Error>,
28 <S as TryStream>::Ok: AsRef<[u8]>,
29{
30 fn poll_read(
31 self: Pin<&mut Self>,
32 cx: &mut Context,
33 buf: &mut [u8],
34 ) -> Poll<io::Result<usize>> {
35 let mut this = self.project();
36
37 let item_to_copy = loop {
39 if let Some(i) = this.current_item {
40 if i.position() < i.get_ref().as_ref().len() as u64 {
41 break i;
42 }
43 }
44 *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) {
45 Some(Ok(i)) => std::io::Cursor::new(i),
46 Some(Err(e)) => return Poll::Ready(Err(e)),
47 None => return Poll::Ready(Ok(0)), });
49 };
50
51 Poll::Ready(Ok(item_to_copy.read(buf)?))
53 }
54}
55
56impl<S> AsyncWrite for RwStreamSink<S>
57where
58 S: TryStream + Sink<<S as TryStream>::Ok, Error = io::Error>,
59 <S as TryStream>::Ok: for<'r> From<&'r [u8]>,
60{
61 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
62 let mut this = self.project();
63 ready!(this.inner.as_mut().poll_ready(cx)?);
64 let n = buf.len();
65 if let Err(e) = this.inner.start_send(buf.into()) {
66 return Poll::Ready(Err(e));
67 }
68 Poll::Ready(Ok(n))
69 }
70
71 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
72 let this = self.project();
73 this.inner.poll_flush(cx)
74 }
75
76 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
77 let this = self.project();
78 this.inner.poll_close(cx)
79 }
80}