1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//! I/O (i.e., `Read` and `Write` traits) related module.
use crate::io::{BufferedIo, ReadBuf, StreamState, WriteBuf};
use crate::{Error, Result};
use core::pin::Pin;
use core::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
impl<B: AsRef<[u8]> + AsMut<[u8]>> ReadBuf<B> {
/// Fills the read buffer by reading bytes from the given reader.
///
/// The fill process continues until one of the following condition is satisfied:
/// - The read buffer became full
/// - A read operation returned a `WouldBlock` error
/// - The input stream has reached EOS
pub fn poll_fill<R: AsyncRead>(
&mut self,
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
) -> Poll<Result<()>> {
while !self.is_full() {
let mut buffer = tokio::io::ReadBuf::new(&mut self.inner.as_mut()[self.tail..]);
match reader.as_mut().poll_read(cx, &mut buffer) {
Poll::Pending => {
self.stream_state = StreamState::WouldBlock;
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
let size = buffer.filled().len();
if size == 0 {
self.stream_state = StreamState::Eos;
return Poll::Ready(Ok(()));
}
self.stream_state = StreamState::Normal;
self.tail += size;
}
Poll::Ready(Err(e)) => {
self.stream_state = StreamState::Error;
return Poll::Ready(Err(track!(Error::from(e))));
}
}
}
Poll::Ready(Ok(()))
}
}
impl<B: AsRef<[u8]> + AsMut<[u8]>> WriteBuf<B> {
/// Writes the encoded bytes contained in this buffer to the given writer.
///
/// The written bytes will be removed from the buffer.
///
/// The flush process continues until one of the following condition is satisfied:
/// - The write buffer became empty
/// - A write operation returned a `WouldBlock` error
/// - The output stream has reached EOS
pub fn poll_flush<W: AsyncWrite>(
&mut self,
mut writer: Pin<&mut W>,
cx: &mut Context<'_>,
) -> Poll<Result<()>> {
while !self.is_empty() {
match writer
.as_mut()
.poll_write(cx, &self.inner.as_ref()[self.head..self.tail])
{
Poll::Ready(Err(e)) => {
self.stream_state = StreamState::Error;
return Poll::Ready(Err(track!(Error::from(e))));
}
Poll::Ready(Ok(0)) => {
self.stream_state = StreamState::Eos;
// stream is closed. No need to wake up this future :)
return Poll::Ready(Ok(()));
}
Poll::Ready(Ok(size)) => {
self.stream_state = StreamState::Normal;
self.head += size;
if self.head == self.tail {
self.head = 0;
self.tail = 0;
}
}
Poll::Pending => {
self.stream_state = StreamState::WouldBlock;
return Poll::Pending;
}
}
}
// Now the buffer is empty. Because the returned value is not Poll::Pending,
// it is *the caller*'s responsibility to ensure this future is woken up.
Poll::Ready(Ok(()))
}
}
impl<T: AsyncRead + AsyncWrite> BufferedIo<T> {
/// Executes an I/O operation on the inner stream.
///
/// "I/O operation" means "filling the read buffer" and "flushing the write buffer".
/// This function returns Poll::Pending when both rbuf and wbuf are not ready for I/O operations.
pub fn execute_io_poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut this = self.project();
let rresult = this.rbuf.poll_fill(this.stream.as_mut(), cx);
let wresult = this.wbuf.poll_flush(this.stream.as_mut(), cx);
if let (&Poll::Pending, &Poll::Pending) = (&rresult, &wresult) {
// This future will be polled again when either rbuf or wbuf is ready.
return Poll::Pending;
}
if let Poll::Ready(rresult) = rresult {
track!(rresult)?;
}
if let Poll::Ready(wresult) = wresult {
track!(wresult)?;
}
// If at least one of rbuf or wbuf returns Poll::Ready,
// there's no guarantee that the waker is signaled at some point.
// Poll::Ready here means it's the caller's responsibility to ensure the waker is signaled later.
Poll::Ready(Ok(()))
}
}