salvo_utils/rt/
tokio_io.rs

1#![allow(dead_code)]
2//! Tokio IO integration for hyper
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// A wrapping implementing hyper IO traits for a type that
12    /// implements Tokio's IO traits.
13    #[derive(Debug)]
14    pub struct TokioIo<T> {
15        #[pin]
16        inner: T,
17    }
18}
19
20impl<T> TokioIo<T> {
21    /// Wrap a type implementing Tokio's IO traits.
22    pub fn new(inner: T) -> Self {
23        Self { inner }
24    }
25
26    /// Borrow the inner type.
27    pub fn inner(&self) -> &T {
28        &self.inner
29    }
30
31    /// Consume this wrapper and get the inner type.
32    pub fn into_inner(self) -> T {
33        self.inner
34    }
35}
36
37impl<T> hyper::rt::Read for TokioIo<T>
38where
39    T: tokio::io::AsyncRead,
40{
41    fn poll_read(
42        self: Pin<&mut Self>,
43        cx: &mut Context<'_>,
44        mut buf: hyper::rt::ReadBufCursor<'_>,
45    ) -> Poll<Result<(), std::io::Error>> {
46        let n = unsafe {
47            let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
48            match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
49                Poll::Ready(Ok(())) => tbuf.filled().len(),
50                other => return other,
51            }
52        };
53
54        unsafe {
55            buf.advance(n);
56        }
57        Poll::Ready(Ok(()))
58    }
59}
60
61impl<T> hyper::rt::Write for TokioIo<T>
62where
63    T: tokio::io::AsyncWrite,
64{
65    fn poll_write(
66        self: Pin<&mut Self>,
67        cx: &mut Context<'_>,
68        buf: &[u8],
69    ) -> Poll<Result<usize, std::io::Error>> {
70        tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
71    }
72
73    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
74        tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
75    }
76
77    fn poll_shutdown(
78        self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80    ) -> Poll<Result<(), std::io::Error>> {
81        tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
82    }
83
84    fn is_write_vectored(&self) -> bool {
85        tokio::io::AsyncWrite::is_write_vectored(&self.inner)
86    }
87
88    fn poll_write_vectored(
89        self: Pin<&mut Self>,
90        cx: &mut Context<'_>,
91        bufs: &[std::io::IoSlice<'_>],
92    ) -> Poll<Result<usize, std::io::Error>> {
93        tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
94    }
95}
96
97impl<T> tokio::io::AsyncRead for TokioIo<T>
98where
99    T: hyper::rt::Read,
100{
101    fn poll_read(
102        self: Pin<&mut Self>,
103        cx: &mut Context<'_>,
104        tbuf: &mut tokio::io::ReadBuf<'_>,
105    ) -> Poll<Result<(), std::io::Error>> {
106        //let init = tbuf.initialized().len();
107        let filled = tbuf.filled().len();
108        let sub_filled = unsafe {
109            let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
110
111            match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
112                Poll::Ready(Ok(())) => buf.filled().len(),
113                other => return other,
114            }
115        };
116
117        let n_filled = filled + sub_filled;
118        // At least sub_filled bytes had to have been initialized.
119        let n_init = sub_filled;
120        unsafe {
121            tbuf.assume_init(n_init);
122            tbuf.set_filled(n_filled);
123        }
124
125        Poll::Ready(Ok(()))
126    }
127}
128
129impl<T> tokio::io::AsyncWrite for TokioIo<T>
130where
131    T: hyper::rt::Write,
132{
133    fn poll_write(
134        self: Pin<&mut Self>,
135        cx: &mut Context<'_>,
136        buf: &[u8],
137    ) -> Poll<Result<usize, std::io::Error>> {
138        hyper::rt::Write::poll_write(self.project().inner, cx, buf)
139    }
140
141    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
142        hyper::rt::Write::poll_flush(self.project().inner, cx)
143    }
144
145    fn poll_shutdown(
146        self: Pin<&mut Self>,
147        cx: &mut Context<'_>,
148    ) -> Poll<Result<(), std::io::Error>> {
149        hyper::rt::Write::poll_shutdown(self.project().inner, cx)
150    }
151
152    fn is_write_vectored(&self) -> bool {
153        hyper::rt::Write::is_write_vectored(&self.inner)
154    }
155
156    fn poll_write_vectored(
157        self: Pin<&mut Self>,
158        cx: &mut Context<'_>,
159        bufs: &[std::io::IoSlice<'_>],
160    ) -> Poll<Result<usize, std::io::Error>> {
161        hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
162    }
163}