1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::{
future::Future,
io::{Error, ErrorKind, Result},
str::from_utf8,
};
use memchr::memchr;
use crate::io::AsyncBufRead;
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
}
}
}
async fn read_until<A>(r: &mut A, delim: u8, buf: &mut Vec<u8>) -> Result<usize>
where
A: AsyncBufRead + ?Sized,
{
let mut read = 0;
loop {
let (done, used) = {
let available = match r.fill_buf().await {
Ok(n) => n,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
match memchr(delim, available) {
Some(i) => {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
}
None => {
buf.extend_from_slice(available);
(false, available.len())
}
}
};
r.consume(used);
read += used;
if done || used == 0 {
return Ok(read);
}
}
}
/// AsyncBufReadExt
pub trait AsyncBufReadExt {
/// This function will read bytes from the underlying stream until the delimiter or EOF is
/// found. Once found, all bytes up to, and including, the delimiter (if found) will be appended
/// to buf.
///
/// If successful, this function will return the total number of bytes read.
///
/// # Errors
/// This function will ignore all instances of ErrorKind::Interrupted and will otherwise return
/// any errors returned by fill_buf.
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> impl Future<Output = Result<usize>>;
/// This function will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
/// found) will be appended to buf.
///
/// If successful, this function will return the total number of bytes read.
///
/// If this function returns Ok(0), the stream has reached EOF.
///
/// # Errors
/// This function has the same error semantics as read_until and will also return an error if
/// the read bytes are not valid UTF-8. If an I/O error is encountered then buf may contain some
/// bytes already read in the event that all data read so far was valid UTF-8.
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> impl Future<Output = Result<usize>>;
}
impl<A> AsyncBufReadExt for A
where
A: AsyncBufRead + ?Sized,
{
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> impl Future<Output = Result<usize>> {
read_until(self, byte, buf)
}
async fn read_line<'a>(&'a mut self, buf: &'a mut String) -> Result<usize> {
unsafe {
let mut g = Guard {
len: buf.len(),
buf: buf.as_mut_vec(),
};
let ret = read_until(self, b'\n', g.buf).await;
if from_utf8(&g.buf[g.len..]).is_err() {
ret.and_then(|_| {
Err(Error::new(
ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
})
} else {
g.len = g.buf.len();
ret
}
}
}
}