1mod async_sys;
24mod underlying;
25
26pub use async_sys::*;
27pub use underlying::*;
28
29use pin_project_lite::pin_project;
30use std::{
31 fmt::{Debug, Formatter, Result as FmtResult},
32 future::Future,
33 pin::Pin,
34 string::FromUtf8Error,
35 task::{Context, Poll},
36};
37use tokio::io::{AsyncBufRead, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
38
39macro_rules! ready {
40 ($e:expr $(,)?) => {
41 match $e {
42 std::task::Poll::Ready(t) => t,
43 std::task::Poll::Pending => return std::task::Poll::Pending,
44 }
45 };
46}
47
48pin_project! {
49 #[derive(Debug)]
58 #[must_use = "streams do nothing unless polled"]
59 pub struct SerialLines<ReaderTy> {
60 #[pin]
61 reader: ReaderTy,
62 buf: String,
63 bytes: Vec<u8>,
64 read: usize,
65 }
66}
67
68impl<ReaderTy> SerialLines<ReaderTy>
69where
70 ReaderTy: AsyncBufRead + Unpin,
71{
72 pub async fn next_line(&mut self) -> IoResult<Option<String>> {
79 poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
80 }
81}
82
83impl<ReaderTy> SerialLines<ReaderTy>
84where
85 ReaderTy: AsyncBufRead,
86{
87 pub fn new(reader: ReaderTy) -> Self {
89 SerialLines {
90 reader,
91 buf: String::new(),
92 bytes: Vec::new(),
93 read: 0,
94 }
95 }
96
97 pub fn poll_next_line(
112 self: Pin<&mut Self>,
113 cx: &mut Context<'_>,
114 ) -> Poll<IoResult<Option<String>>> {
115 let me = self.project();
116
117 let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
118 debug_assert_eq!(*me.read, 0);
119
120 if n == 0 && me.buf.is_empty() {
121 return Poll::Ready(Ok(None));
122 }
123
124 if me.buf.ends_with('\r') {
125 me.buf.pop();
126 }
127
128 Poll::Ready(Ok(Some(std::mem::take(me.buf))))
129 }
130}
131
132fn read_line_internal<ReaderTy: AsyncBufRead + ?Sized>(
133 reader: Pin<&mut ReaderTy>,
134 cx: &mut Context<'_>,
135 output: &mut String,
136 buf: &mut Vec<u8>,
137 read: &mut usize,
138) -> Poll<IoResult<usize>> {
139 let io_res = ready!(read_until_internal(reader, cx, b'\r', buf, read));
140 let utf8_res = String::from_utf8(std::mem::take(buf));
141 debug_assert!(buf.is_empty());
143 debug_assert!(output.is_empty());
144 finish_string_read(io_res, utf8_res, *read, output, false)
145}
146
147fn read_until_internal<ReaderTy: AsyncBufRead + ?Sized>(
148 mut reader: Pin<&mut ReaderTy>,
149 cx: &mut Context<'_>,
150 delimiter: u8,
151 buf: &mut Vec<u8>,
152 read: &mut usize,
153) -> Poll<IoResult<usize>> {
154 loop {
155 let (done, used) = {
156 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
157 if let Some(i) = memchr(delimiter, available) {
158 buf.extend_from_slice(&available[..=i]);
159 (true, i + 1)
160 } else {
161 buf.extend_from_slice(available);
162 (false, available.len())
163 }
164 };
165 reader.as_mut().consume(used);
166 *read += used;
167 if done || used == 0 {
168 return Poll::Ready(Ok(std::mem::replace(read, 0)));
169 }
170 }
171}
172
173#[cfg(not(unix))]
174fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
175 haystack.iter().position(|val| needle == *val)
176}
177
178#[cfg(unix)]
179#[allow(clippy::cast_lossless)]
180fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
181 let start = haystack.as_ptr();
182
183 let ptr = unsafe { libc::memchr(start.cast(), needle as _, haystack.len()) };
185
186 if ptr.is_null() {
187 None
188 } else {
189 Some(ptr as usize - start as usize)
190 }
191}
192
193struct PollFn<F> {
204 f: F,
205}
206
207fn poll_fn<T, F>(f: F) -> PollFn<F>
209where
210 F: FnMut(&mut Context<'_>) -> Poll<T>,
211{
212 PollFn { f }
213}
214
215impl<F> Debug for PollFn<F> {
216 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
217 f.debug_struct("PollFn").finish()
218 }
219}
220
221impl<T, F> Future for PollFn<F>
222where
223 F: FnMut(&mut Context<'_>) -> Poll<T>,
224{
225 type Output = T;
226
227 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
228 let me = unsafe { Pin::into_inner_unchecked(self) };
238 (me.f)(cx)
239 }
240}
241
242fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_read: usize) {
243 let original_len = vector.len() - num_bytes_read;
244 vector.truncate(original_len);
245 *output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
246}
247
248fn finish_string_read(
253 io_res: IoResult<usize>,
254 utf8_res: Result<String, FromUtf8Error>,
255 read: usize,
256 output: &mut String,
257 truncate_on_io_error: bool,
258) -> Poll<IoResult<usize>> {
259 match (io_res, utf8_res) {
260 (Ok(num_bytes), Ok(string)) => {
261 debug_assert_eq!(read, 0);
262 *output = string;
263 Poll::Ready(Ok(num_bytes))
264 }
265 (Err(io_err), Ok(string)) => {
266 *output = string;
267 if truncate_on_io_error {
268 let original_len = output.len() - read;
269 output.truncate(original_len);
270 }
271 Poll::Ready(Err(io_err))
272 }
273 (Ok(num_bytes), Err(utf8_err)) => {
274 debug_assert_eq!(read, 0);
275 put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
276 Poll::Ready(Err(IoError::new(
277 IoErrorKind::InvalidData,
278 "stream did not contain valid UTF-8",
279 )))
280 }
281 (Err(io_err), Err(utf8_err)) => {
282 put_back_original_data(output, utf8_err.into_bytes(), read);
283 Poll::Ready(Err(io_err))
284 }
285 }
286}
287
288#[cfg(test)]
289mod unit_tests {
290 use super::*;
291
292 #[test]
293 fn memchr_test() {
294 let haystack = b"123abc456\0\xffabc\n";
295 assert_eq!(memchr(b'1', haystack), Some(0));
296 assert_eq!(memchr(b'2', haystack), Some(1));
297 assert_eq!(memchr(b'3', haystack), Some(2));
298 assert_eq!(memchr(b'4', haystack), Some(6));
299 assert_eq!(memchr(b'5', haystack), Some(7));
300 assert_eq!(memchr(b'6', haystack), Some(8));
301 assert_eq!(memchr(b'7', haystack), None);
302 assert_eq!(memchr(b'a', haystack), Some(3));
303 assert_eq!(memchr(b'b', haystack), Some(4));
304 assert_eq!(memchr(b'c', haystack), Some(5));
305 assert_eq!(memchr(b'd', haystack), None);
306 assert_eq!(memchr(b'A', haystack), None);
307 assert_eq!(memchr(0, haystack), Some(9));
308 assert_eq!(memchr(0xff, haystack), Some(10));
309 assert_eq!(memchr(0xfe, haystack), None);
310 assert_eq!(memchr(1, haystack), None);
311 assert_eq!(memchr(b'\n', haystack), Some(14));
312 assert_eq!(memchr(b'\r', haystack), None);
313 }
314
315 #[test]
316 fn memchr_all() {
317 let mut arr = Vec::new();
318 for b in 0..=255 {
319 arr.push(b);
320 }
321 for b in 0..=255 {
322 assert_eq!(memchr(b, &arr), Some(b as usize));
323 }
324 arr.reverse();
325 for b in 0..=255 {
326 assert_eq!(memchr(b, &arr), Some(255 - b as usize));
327 }
328 }
329
330 #[test]
331 fn memchr_empty() {
332 for b in 0..=255 {
333 assert_eq!(memchr(b, b""), None);
334 }
335 }
336}