1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use crate::io::{BufferedIo, ReadBuf, StreamState, WriteBuf};
use crate::{Error, Result};
use core::pin::Pin;
use core::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
impl<B: AsRef<[u8]> + AsMut<[u8]>> ReadBuf<B> {
pub fn poll_fill<R: AsyncRead>(
&mut self,
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<Result<()>> {
while !self.is_full() {
let mut buffer = tokio::io::ReadBuf::new(&mut self.inner.as_mut()[self.tail..]);
match reader.as_mut().poll_read(cx, &mut buffer) {
Poll::Pending => {
self.stream_state = StreamState::WouldBlock;
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
let size = buffer.filled().len();
if size == 0 {
self.stream_state = StreamState::Eos;
return Poll::Ready(Ok(()));
}
self.stream_state = StreamState::Normal;
self.tail += size;
}
Poll::Ready(Err(e)) => {
self.stream_state = StreamState::Error;
return Poll::Ready(Err(track!(Error::from(e))));
}
}
}
Poll::Ready(Ok(()))
}
}
impl<B: AsRef<[u8]> + AsMut<[u8]>> WriteBuf<B> {
pub fn poll_flush<W: AsyncWrite>(
&mut self,
mut writer: Pin<&mut W>,
cx: &mut Context<'_>,
) -> Poll<Result<()>> {
while !self.is_empty() {
match writer
.as_mut()
.poll_write(cx, &self.inner.as_ref()[self.head..self.tail])
{
Poll::Ready(Err(e)) => {
self.stream_state = StreamState::Error;
return Poll::Ready(Err(track!(Error::from(e))));
}
Poll::Ready(Ok(0)) => {
self.stream_state = StreamState::Eos;
return Poll::Ready(Ok(()));
}
Poll::Ready(Ok(size)) => {
self.stream_state = StreamState::Normal;
self.head += size;
if self.head == self.tail {
self.head = 0;
self.tail = 0;
}
}
Poll::Pending => {
self.stream_state = StreamState::WouldBlock;
return Poll::Pending;
}
}
}
Poll::Ready(Ok(()))
}
}
impl<T: AsyncRead + AsyncWrite> BufferedIo<T> {
pub fn execute_io_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut this = self.project();
let rresult = this.rbuf.poll_fill(this.stream.as_mut(), cx);
let wresult = this.wbuf.poll_flush(this.stream.as_mut(), cx);
if let (&Poll::Pending, &Poll::Pending) = (&rresult, &wresult) {
return Poll::Pending;
}
if let Poll::Ready(rresult) = rresult {
track!(rresult)?;
}
if let Poll::Ready(wresult) = wresult {
track!(wresult)?;
}
Poll::Ready(Ok(()))
}
}