salvo_utils/rt/
tokio_io.rs1#![allow(dead_code)]
2use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use pin_project_lite::pin_project;
9
10pin_project! {
11 #[derive(Debug)]
14 pub struct TokioIo<T> {
15 #[pin]
16 inner: T,
17 }
18}
19
20impl<T> TokioIo<T> {
21 pub fn new(inner: T) -> Self {
23 Self { inner }
24 }
25
26 pub fn inner(&self) -> &T {
28 &self.inner
29 }
30
31 pub fn into_inner(self) -> T {
33 self.inner
34 }
35}
36
37impl<T> hyper::rt::Read for TokioIo<T>
38where
39 T: tokio::io::AsyncRead,
40{
41 fn poll_read(
42 self: Pin<&mut Self>,
43 cx: &mut Context<'_>,
44 mut buf: hyper::rt::ReadBufCursor<'_>,
45 ) -> Poll<Result<(), std::io::Error>> {
46 let n = unsafe {
47 let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
48 match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
49 Poll::Ready(Ok(())) => tbuf.filled().len(),
50 other => return other,
51 }
52 };
53
54 unsafe {
55 buf.advance(n);
56 }
57 Poll::Ready(Ok(()))
58 }
59}
60
61impl<T> hyper::rt::Write for TokioIo<T>
62where
63 T: tokio::io::AsyncWrite,
64{
65 fn poll_write(
66 self: Pin<&mut Self>,
67 cx: &mut Context<'_>,
68 buf: &[u8],
69 ) -> Poll<Result<usize, std::io::Error>> {
70 tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
71 }
72
73 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
74 tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
75 }
76
77 fn poll_shutdown(
78 self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 ) -> Poll<Result<(), std::io::Error>> {
81 tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
82 }
83
84 fn is_write_vectored(&self) -> bool {
85 tokio::io::AsyncWrite::is_write_vectored(&self.inner)
86 }
87
88 fn poll_write_vectored(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 bufs: &[std::io::IoSlice<'_>],
92 ) -> Poll<Result<usize, std::io::Error>> {
93 tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
94 }
95}
96
97impl<T> tokio::io::AsyncRead for TokioIo<T>
98where
99 T: hyper::rt::Read,
100{
101 fn poll_read(
102 self: Pin<&mut Self>,
103 cx: &mut Context<'_>,
104 tbuf: &mut tokio::io::ReadBuf<'_>,
105 ) -> Poll<Result<(), std::io::Error>> {
106 let filled = tbuf.filled().len();
108 let sub_filled = unsafe {
109 let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
110
111 match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) {
112 Poll::Ready(Ok(())) => buf.filled().len(),
113 other => return other,
114 }
115 };
116
117 let n_filled = filled + sub_filled;
118 let n_init = sub_filled;
120 unsafe {
121 tbuf.assume_init(n_init);
122 tbuf.set_filled(n_filled);
123 }
124
125 Poll::Ready(Ok(()))
126 }
127}
128
129impl<T> tokio::io::AsyncWrite for TokioIo<T>
130where
131 T: hyper::rt::Write,
132{
133 fn poll_write(
134 self: Pin<&mut Self>,
135 cx: &mut Context<'_>,
136 buf: &[u8],
137 ) -> Poll<Result<usize, std::io::Error>> {
138 hyper::rt::Write::poll_write(self.project().inner, cx, buf)
139 }
140
141 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
142 hyper::rt::Write::poll_flush(self.project().inner, cx)
143 }
144
145 fn poll_shutdown(
146 self: Pin<&mut Self>,
147 cx: &mut Context<'_>,
148 ) -> Poll<Result<(), std::io::Error>> {
149 hyper::rt::Write::poll_shutdown(self.project().inner, cx)
150 }
151
152 fn is_write_vectored(&self) -> bool {
153 hyper::rt::Write::is_write_vectored(&self.inner)
154 }
155
156 fn poll_write_vectored(
157 self: Pin<&mut Self>,
158 cx: &mut Context<'_>,
159 bufs: &[std::io::IoSlice<'_>],
160 ) -> Poll<Result<usize, std::io::Error>> {
161 hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
162 }
163}