1use super::AsyncBufRead;
4use crate::stream::Stream;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10#[derive(Debug)]
12pub struct Lines<R> {
13 reader: R,
14 buf: Vec<u8>,
15 completed: bool,
16}
17
18impl<R> Lines<R> {
19 pub fn new(reader: R) -> Self {
21 Self {
22 reader,
23 buf: Vec::new(),
24 completed: false,
25 }
26 }
27}
28
29impl<R: AsyncBufRead + Unpin> Stream for Lines<R> {
30 type Item = io::Result<String>;
31
32 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33 let this = self.get_mut();
34 if this.completed {
35 return Poll::Ready(None);
36 }
37 let mut steps = 0;
38
39 loop {
40 if steps > 32 {
41 cx.waker().wake_by_ref();
42 return Poll::Pending;
43 }
44 steps += 1;
45
46 if this.buf.last() == Some(&b'\n') {
49 this.buf.pop();
51
52 if this.buf.last() == Some(&b'\r') {
54 this.buf.pop();
55 }
56
57 let s = String::from_utf8(mem::take(&mut this.buf))
58 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
59
60 if s.is_err() {
61 this.completed = true;
62 }
63
64 return Poll::Ready(Some(s));
65 }
66
67 let available = match Pin::new(&mut this.reader).poll_fill_buf(cx) {
69 Poll::Pending => return Poll::Pending,
70 Poll::Ready(Err(e)) => {
71 this.completed = true;
72 return Poll::Ready(Some(Err(e)));
73 }
74 Poll::Ready(Ok(buf)) => buf,
75 };
76
77 if available.is_empty() {
79 if this.buf.is_empty() {
80 this.completed = true;
81 return Poll::Ready(None);
82 }
83 let s = String::from_utf8(mem::take(&mut this.buf))
84 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
85 this.completed = true;
86 return Poll::Ready(Some(s));
87 }
88
89 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
91 this.buf.extend_from_slice(&available[..=pos]);
92 Pin::new(&mut this.reader).consume(pos + 1);
93 } else {
95 this.buf.extend_from_slice(available);
96 let len = available.len();
97 Pin::new(&mut this.reader).consume(len);
98 }
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
107 use std::sync::Arc;
108 use std::sync::atomic::{AtomicUsize, Ordering};
109 use std::task::Waker;
110
111 struct CountWaker {
112 wakes: AtomicUsize,
113 }
114
115 use std::task::Wake;
116 impl Wake for CountWaker {
117 fn wake(self: Arc<Self>) {
118 self.wakes.fetch_add(1, Ordering::SeqCst);
119 }
120 }
121
122 fn noop_waker() -> Waker {
123 std::task::Waker::noop().clone()
124 }
125
126 fn poll_next<S: Stream + Unpin>(stream: &mut S) -> Poll<Option<S::Item>> {
127 let waker = noop_waker();
128 let mut cx = Context::from_waker(&waker);
129 Pin::new(stream).poll_next(&mut cx)
130 }
131
132 fn init_test(name: &str) {
133 crate::test_utils::init_test_logging();
134 crate::test_phase!(name);
135 }
136
137 struct SplitReader {
138 chunks: Vec<Vec<u8>>,
139 }
140
141 impl AsyncRead for SplitReader {
142 fn poll_read(
143 self: Pin<&mut Self>,
144 _cx: &mut Context<'_>,
145 _buf: &mut ReadBuf<'_>,
146 ) -> Poll<io::Result<()>> {
147 unreachable!("lines should use poll_fill_buf for this test")
148 }
149 }
150
151 impl AsyncBufRead for SplitReader {
152 fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
153 let this = self.get_mut();
154 if this.chunks.is_empty() {
155 Poll::Ready(Ok(&[]))
156 } else {
157 Poll::Ready(Ok(&this.chunks[0]))
158 }
159 }
160
161 fn consume(self: Pin<&mut Self>, amt: usize) {
162 let this = self.get_mut();
163 if this.chunks.is_empty() {
164 return;
165 }
166 if amt >= this.chunks[0].len() {
167 this.chunks.remove(0);
168 } else {
169 this.chunks[0] = this.chunks[0][amt..].to_vec();
170 }
171 }
172 }
173
174 struct PendingBetweenChunksReader {
175 chunks: Vec<Vec<u8>>,
176 pending_once: bool,
177 }
178
179 impl AsyncRead for PendingBetweenChunksReader {
180 fn poll_read(
181 self: Pin<&mut Self>,
182 _cx: &mut Context<'_>,
183 _buf: &mut ReadBuf<'_>,
184 ) -> Poll<io::Result<()>> {
185 unreachable!("lines should use poll_fill_buf for this test")
186 }
187 }
188
189 impl AsyncBufRead for PendingBetweenChunksReader {
190 fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191 let this = self.get_mut();
192 if this.pending_once {
193 this.pending_once = false;
194 return Poll::Pending;
195 }
196
197 if this.chunks.is_empty() {
198 Poll::Ready(Ok(&[]))
199 } else {
200 Poll::Ready(Ok(&this.chunks[0]))
201 }
202 }
203
204 fn consume(self: Pin<&mut Self>, amt: usize) {
205 let this = self.get_mut();
206 if this.chunks.is_empty() {
207 return;
208 }
209
210 if amt >= this.chunks[0].len() {
211 this.chunks.remove(0);
212 this.pending_once = !this.chunks.is_empty();
213 } else {
214 this.chunks[0] = this.chunks[0][amt..].to_vec();
215 }
216 }
217 }
218
219 #[test]
220 fn lines_basic() {
221 init_test("lines_basic");
222 let data: &[u8] = b"line 1\nline 2\nline 3";
223 let reader = BufReader::new(data);
224 let mut lines = Lines::new(reader);
225
226 let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1");
227 crate::assert_with_log!(first, "line 1", true, first);
228 let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2");
229 crate::assert_with_log!(second, "line 2", true, second);
230 let third = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 3");
231 crate::assert_with_log!(third, "line 3", true, third);
232 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
234 crate::assert_with_log!(done, "done", true, done);
235 crate::test_complete!("lines_basic");
236 }
237
238 #[test]
239 fn lines_crlf() {
240 init_test("lines_crlf");
241 let data: &[u8] = b"line 1\r\nline 2\r\n";
242 let reader = BufReader::new(data);
243 let mut lines = Lines::new(reader);
244
245 let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1");
246 crate::assert_with_log!(first, "line 1", true, first);
247 let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2");
248 crate::assert_with_log!(second, "line 2", true, second);
249 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
250 crate::assert_with_log!(done, "done", true, done);
251 crate::test_complete!("lines_crlf");
252 }
253
254 #[test]
255 fn lines_empty() {
256 init_test("lines_empty");
257 let data: &[u8] = b"";
258 let reader = BufReader::new(data);
259 let mut lines = Lines::new(reader);
260 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
261 crate::assert_with_log!(done, "done", true, done);
262 crate::test_complete!("lines_empty");
263 }
264
265 #[test]
266 fn lines_incomplete_last() {
267 init_test("lines_incomplete_last");
268 let data: &[u8] = b"foo\nbar";
269 let reader = BufReader::new(data);
270 let mut lines = Lines::new(reader);
271
272 let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "foo");
273 crate::assert_with_log!(first, "foo", true, first);
274 let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "bar");
275 crate::assert_with_log!(second, "bar", true, second);
276 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
277 crate::assert_with_log!(done, "done", true, done);
278 crate::test_complete!("lines_incomplete_last");
279 }
280
281 #[test]
282 fn lines_repoll_after_empty_completion_returns_none() {
283 let data: &[u8] = b"";
284 let reader = BufReader::new(data);
285 let mut lines = Lines::new(reader);
286
287 assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
288
289 assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
291 assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
293 }
294
295 #[test]
296 fn lines_repoll_after_exhausting_non_empty_input_returns_none() {
297 let data: &[u8] = b"line 1\nline 2";
298 let reader = BufReader::new(data);
299 let mut lines = Lines::new(reader);
300
301 assert!(matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1"));
302 assert!(matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2"));
303 assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
304
305 assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
307 }
308
309 #[test]
310 fn lines_split_utf8_across_chunks() {
311 init_test("lines_split_utf8_across_chunks");
312 let reader = SplitReader {
313 chunks: vec![vec![0xF0, 0x9F], vec![0x94, 0xA5, b'\n']],
314 };
315 let mut lines = Lines::new(reader);
316
317 let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "🔥");
318 crate::assert_with_log!(first, "split utf8 line", true, first);
319 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
320 crate::assert_with_log!(done, "done", true, done);
321 crate::test_complete!("lines_split_utf8_across_chunks");
322 }
323
324 #[test]
325 fn lines_crlf_after_pending_between_chunks() {
326 init_test("lines_crlf_after_pending_between_chunks");
327 let reader = PendingBetweenChunksReader {
328 chunks: vec![b"hello\r".to_vec(), b"\n".to_vec()],
329 pending_once: false,
330 };
331 let mut lines = Lines::new(reader);
332 let waker = noop_waker();
333 let mut cx = Context::from_waker(&waker);
334
335 let first_pending = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Pending);
336 crate::assert_with_log!(first_pending, "first poll pending", true, first_pending);
337
338 let second = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(Some(Ok(s))) if s == "hello");
339 crate::assert_with_log!(second, "normalized line", true, second);
340 let done = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(None));
341 crate::assert_with_log!(done, "done", true, done);
342 crate::test_complete!("lines_crlf_after_pending_between_chunks");
343 }
344
345 #[test]
346 fn lines_bounded_self_wake_after_many_immediately_ready_chunks() {
347 init_test("lines_bounded_self_wake_after_many_immediately_ready_chunks");
348 let mut chunks = vec![vec![b'a']; 40];
349 chunks.push(vec![b'\n']);
350 let reader = SplitReader { chunks };
351 let mut lines = Lines::new(reader);
352 let wake_counter = Arc::new(CountWaker {
353 wakes: AtomicUsize::new(0),
354 });
355 let waker = Waker::from(wake_counter.clone());
356 let mut cx = Context::from_waker(&waker);
357
358 let first_pending = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Pending);
359 crate::assert_with_log!(
360 first_pending,
361 "bounded self-wake pending",
362 true,
363 first_pending
364 );
365
366 let woke_self = wake_counter.wakes.load(Ordering::SeqCst) > 0;
367 crate::assert_with_log!(woke_self, "self wake recorded", true, woke_self);
368
369 let expected = "a".repeat(40);
370 let second = matches!(
371 Pin::new(&mut lines).poll_next(&mut cx),
372 Poll::Ready(Some(Ok(ref s))) if s == &expected
373 );
374 crate::assert_with_log!(second, "line after rewake", true, second);
375
376 let done = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(None));
377 crate::assert_with_log!(done, "done", true, done);
378 crate::test_complete!("lines_bounded_self_wake_after_many_immediately_ready_chunks");
379 }
380
381 #[test]
382 fn lines_long_line_remains_unbounded() {
383 init_test("lines_long_line_remains_unbounded");
384 let long = "a".repeat(16 * 1024);
385 let payload = format!("{long}\n");
386 let reader = BufReader::new(payload.as_bytes());
387 let mut lines = Lines::new(reader);
388
389 let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(ref s))) if s == &long);
390 crate::assert_with_log!(first, "long line", true, first);
391 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
392 crate::assert_with_log!(done, "done", true, done);
393 crate::test_complete!("lines_long_line_remains_unbounded");
394 }
395
396 #[test]
397 fn lines_invalid_utf8_repoll_after_error_returns_none() {
398 init_test("lines_invalid_utf8_repoll_after_error_returns_none");
399 let reader = SplitReader {
400 chunks: vec![vec![0xF0, 0x9F], vec![b'\n']],
401 };
402 let mut lines = Lines::new(reader);
403
404 let invalid_data = matches!(poll_next(&mut lines), Poll::Ready(Some(Err(err))) if err.kind() == io::ErrorKind::InvalidData);
405 crate::assert_with_log!(invalid_data, "invalid-data line error", true, invalid_data);
406
407 let done = matches!(poll_next(&mut lines), Poll::Ready(None));
408 crate::assert_with_log!(done, "done", true, done);
409 crate::test_complete!("lines_invalid_utf8_repoll_after_error_returns_none");
410 }
411}