1mod async_sys;
24mod underlying;
25
26pub use async_sys::*;
27pub use underlying::*;
28
29use pin_project_lite::pin_project;
30use std::{
31 fmt::{Debug, Formatter, Result as FmtResult},
32 future::Future,
33 pin::Pin,
34 string::FromUtf8Error,
35 task::{Context, Poll},
36};
37use tokio::io::{AsyncBufRead, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
38
39macro_rules! ready {
40 ($e:expr $(,)?) => {
41 match $e {
42 std::task::Poll::Ready(t) => t,
43 std::task::Poll::Pending => return std::task::Poll::Pending,
44 }
45 };
46}
47
48pin_project! {
49 #[derive(Debug)]
58 #[must_use = "streams do nothing unless polled"]
59 pub struct SerialLines<ReaderTy> {
60 #[pin]
61 reader: ReaderTy,
62 buf: String,
63 bytes: Vec<u8>,
64 read: usize,
65 }
66}
67
68impl<ReaderTy> SerialLines<ReaderTy>
69where
70 ReaderTy: AsyncBufRead + Unpin,
71{
72 pub async fn next_line(&mut self) -> IoResult<Option<String>> {
79 poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
80 }
81}
82
83impl<ReaderTy> SerialLines<ReaderTy>
84where
85 ReaderTy: AsyncBufRead,
86{
87 pub fn new(reader: ReaderTy) -> Self {
89 SerialLines {
90 reader,
91 buf: String::new(),
92 bytes: Vec::new(),
93 read: 0,
94 }
95 }
96
97 #[must_use]
99 pub const fn get_ref(&self) -> &ReaderTy {
100 &self.reader
101 }
102
103 #[must_use]
105 pub const fn get_mut(&mut self) -> &mut ReaderTy {
106 &mut self.reader
107 }
108
109 pub fn poll_next_line(
124 self: Pin<&mut Self>,
125 cx: &mut Context<'_>,
126 ) -> Poll<IoResult<Option<String>>> {
127 let me = self.project();
128
129 let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
130 debug_assert_eq!(*me.read, 0);
131
132 if n == 0 && me.buf.is_empty() {
133 return Poll::Ready(Ok(None));
134 }
135
136 if me.buf.ends_with('\r') {
137 me.buf.pop();
138 }
139
140 Poll::Ready(Ok(Some(std::mem::take(me.buf))))
141 }
142}
143
144fn read_line_internal<ReaderTy: AsyncBufRead + ?Sized>(
145 reader: Pin<&mut ReaderTy>,
146 cx: &mut Context<'_>,
147 output: &mut String,
148 buf: &mut Vec<u8>,
149 read: &mut usize,
150) -> Poll<IoResult<usize>> {
151 let io_res = ready!(read_until_internal(reader, cx, b'\r', buf, read));
152 let utf8_res = String::from_utf8(std::mem::take(buf));
153 debug_assert!(buf.is_empty());
155 debug_assert!(output.is_empty());
156 finish_string_read(io_res, utf8_res, *read, output, false)
157}
158
159fn read_until_internal<ReaderTy: AsyncBufRead + ?Sized>(
160 mut reader: Pin<&mut ReaderTy>,
161 cx: &mut Context<'_>,
162 delimiter: u8,
163 buf: &mut Vec<u8>,
164 read: &mut usize,
165) -> Poll<IoResult<usize>> {
166 loop {
167 let (done, used) = {
168 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
169 if let Some(i) = memchr(delimiter, available) {
170 buf.extend_from_slice(&available[..=i]);
171 (true, i + 1)
172 } else {
173 buf.extend_from_slice(available);
174 (false, available.len())
175 }
176 };
177 reader.as_mut().consume(used);
178 *read += used;
179 if done || used == 0 {
180 return Poll::Ready(Ok(std::mem::replace(read, 0)));
181 }
182 }
183}
184
185#[cfg(not(unix))]
186fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
187 haystack.iter().position(|val| needle == *val)
188}
189
190#[cfg(unix)]
191#[allow(clippy::cast_lossless)]
192fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
193 let start = haystack.as_ptr();
194
195 let ptr = unsafe { libc::memchr(start.cast(), needle as _, haystack.len()) };
197
198 if ptr.is_null() {
199 None
200 } else {
201 Some(ptr as usize - start as usize)
202 }
203}
204
205struct PollFn<F> {
216 f: F,
217}
218
219fn poll_fn<T, F>(f: F) -> PollFn<F>
221where
222 F: FnMut(&mut Context<'_>) -> Poll<T>,
223{
224 PollFn { f }
225}
226
227impl<F> Debug for PollFn<F> {
228 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
229 f.debug_struct("PollFn").finish()
230 }
231}
232
233impl<T, F> Future for PollFn<F>
234where
235 F: FnMut(&mut Context<'_>) -> Poll<T>,
236{
237 type Output = T;
238
239 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
240 let me = unsafe { Pin::into_inner_unchecked(self) };
250 (me.f)(cx)
251 }
252}
253
254fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_read: usize) {
255 let original_len = vector.len() - num_bytes_read;
256 vector.truncate(original_len);
257 *output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
258}
259
260fn finish_string_read(
265 io_res: IoResult<usize>,
266 utf8_res: Result<String, FromUtf8Error>,
267 read: usize,
268 output: &mut String,
269 truncate_on_io_error: bool,
270) -> Poll<IoResult<usize>> {
271 match (io_res, utf8_res) {
272 (Ok(num_bytes), Ok(string)) => {
273 debug_assert_eq!(read, 0);
274 *output = string;
275 Poll::Ready(Ok(num_bytes))
276 }
277 (Err(io_err), Ok(string)) => {
278 *output = string;
279 if truncate_on_io_error {
280 let original_len = output.len() - read;
281 output.truncate(original_len);
282 }
283 Poll::Ready(Err(io_err))
284 }
285 (Ok(num_bytes), Err(utf8_err)) => {
286 debug_assert_eq!(read, 0);
287 put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
288 Poll::Ready(Err(IoError::new(
289 IoErrorKind::InvalidData,
290 "stream did not contain valid UTF-8",
291 )))
292 }
293 (Err(io_err), Err(utf8_err)) => {
294 put_back_original_data(output, utf8_err.into_bytes(), read);
295 Poll::Ready(Err(io_err))
296 }
297 }
298}
299
300#[cfg(test)]
301mod unit_tests {
302 use super::*;
303
304 #[test]
305 fn memchr_test() {
306 let haystack = b"123abc456\0\xffabc\n";
307 assert_eq!(memchr(b'1', haystack), Some(0));
308 assert_eq!(memchr(b'2', haystack), Some(1));
309 assert_eq!(memchr(b'3', haystack), Some(2));
310 assert_eq!(memchr(b'4', haystack), Some(6));
311 assert_eq!(memchr(b'5', haystack), Some(7));
312 assert_eq!(memchr(b'6', haystack), Some(8));
313 assert_eq!(memchr(b'7', haystack), None);
314 assert_eq!(memchr(b'a', haystack), Some(3));
315 assert_eq!(memchr(b'b', haystack), Some(4));
316 assert_eq!(memchr(b'c', haystack), Some(5));
317 assert_eq!(memchr(b'd', haystack), None);
318 assert_eq!(memchr(b'A', haystack), None);
319 assert_eq!(memchr(0, haystack), Some(9));
320 assert_eq!(memchr(0xff, haystack), Some(10));
321 assert_eq!(memchr(0xfe, haystack), None);
322 assert_eq!(memchr(1, haystack), None);
323 assert_eq!(memchr(b'\n', haystack), Some(14));
324 assert_eq!(memchr(b'\r', haystack), None);
325 }
326
327 #[test]
328 fn memchr_all() {
329 let mut arr = Vec::new();
330 for b in 0..=255 {
331 arr.push(b);
332 }
333 for b in 0..=255 {
334 assert_eq!(memchr(b, &arr), Some(b as usize));
335 }
336 arr.reverse();
337 for b in 0..=255 {
338 assert_eq!(memchr(b, &arr), Some(255 - b as usize));
339 }
340 }
341
342 #[test]
343 fn memchr_empty() {
344 for b in 0..=255 {
345 assert_eq!(memchr(b, b""), None);
346 }
347 }
348}