Skip to main content

asupersync/io/
lines.rs

1//! Async line iterator.
2
3use super::AsyncBufRead;
4use crate::stream::Stream;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10/// Iterator over the lines of an [`AsyncBufRead`].
11#[derive(Debug)]
12pub struct Lines<R> {
13    reader: R,
14    buf: Vec<u8>,
15    completed: bool,
16}
17
18impl<R> Lines<R> {
19    /// Creates a new `Lines` iterator.
20    pub fn new(reader: R) -> Self {
21        Self {
22            reader,
23            buf: Vec::new(),
24            completed: false,
25        }
26    }
27}
28
29impl<R: AsyncBufRead + Unpin> Stream for Lines<R> {
30    type Item = io::Result<String>;
31
32    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33        let this = self.get_mut();
34        if this.completed {
35            return Poll::Ready(None);
36        }
37        let mut steps = 0;
38
39        loop {
40            if steps > 32 {
41                cx.waker().wake_by_ref();
42                return Poll::Pending;
43            }
44            steps += 1;
45
46            // 1. Check if we already have a newline at the end of `this.buf`
47            // We know it can only be at the end because of step 4.
48            if this.buf.last() == Some(&b'\n') {
49                // Remove \n
50                this.buf.pop();
51
52                // Handle \r\n
53                if this.buf.last() == Some(&b'\r') {
54                    this.buf.pop();
55                }
56
57                let s = String::from_utf8(mem::take(&mut this.buf))
58                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
59
60                if s.is_err() {
61                    this.completed = true;
62                }
63
64                return Poll::Ready(Some(s));
65            }
66
67            // 2. Poll the reader
68            let available = match Pin::new(&mut this.reader).poll_fill_buf(cx) {
69                Poll::Pending => return Poll::Pending,
70                Poll::Ready(Err(e)) => {
71                    this.completed = true;
72                    return Poll::Ready(Some(Err(e)));
73                }
74                Poll::Ready(Ok(buf)) => buf,
75            };
76
77            // 3. EOF check
78            if available.is_empty() {
79                if this.buf.is_empty() {
80                    this.completed = true;
81                    return Poll::Ready(None);
82                }
83                let s = String::from_utf8(mem::take(&mut this.buf))
84                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
85                this.completed = true;
86                return Poll::Ready(Some(s));
87            }
88
89            // 4. Scan available for newline
90            if let Some(pos) = available.iter().position(|&b| b == b'\n') {
91                this.buf.extend_from_slice(&available[..=pos]);
92                Pin::new(&mut this.reader).consume(pos + 1);
93                // Loop will catch it in step 1
94            } else {
95                this.buf.extend_from_slice(available);
96                let len = available.len();
97                Pin::new(&mut this.reader).consume(len);
98            }
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::io::{AsyncBufRead, AsyncRead, BufReader, ReadBuf};
107    use std::sync::Arc;
108    use std::sync::atomic::{AtomicUsize, Ordering};
109    use std::task::Waker;
110
111    struct CountWaker {
112        wakes: AtomicUsize,
113    }
114
115    use std::task::Wake;
116    impl Wake for CountWaker {
117        fn wake(self: Arc<Self>) {
118            self.wakes.fetch_add(1, Ordering::SeqCst);
119        }
120    }
121
122    fn noop_waker() -> Waker {
123        std::task::Waker::noop().clone()
124    }
125
126    fn poll_next<S: Stream + Unpin>(stream: &mut S) -> Poll<Option<S::Item>> {
127        let waker = noop_waker();
128        let mut cx = Context::from_waker(&waker);
129        Pin::new(stream).poll_next(&mut cx)
130    }
131
132    fn init_test(name: &str) {
133        crate::test_utils::init_test_logging();
134        crate::test_phase!(name);
135    }
136
137    struct SplitReader {
138        chunks: Vec<Vec<u8>>,
139    }
140
141    impl AsyncRead for SplitReader {
142        fn poll_read(
143            self: Pin<&mut Self>,
144            _cx: &mut Context<'_>,
145            _buf: &mut ReadBuf<'_>,
146        ) -> Poll<io::Result<()>> {
147            unreachable!("lines should use poll_fill_buf for this test")
148        }
149    }
150
151    impl AsyncBufRead for SplitReader {
152        fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
153            let this = self.get_mut();
154            if this.chunks.is_empty() {
155                Poll::Ready(Ok(&[]))
156            } else {
157                Poll::Ready(Ok(&this.chunks[0]))
158            }
159        }
160
161        fn consume(self: Pin<&mut Self>, amt: usize) {
162            let this = self.get_mut();
163            if this.chunks.is_empty() {
164                return;
165            }
166            if amt >= this.chunks[0].len() {
167                this.chunks.remove(0);
168            } else {
169                this.chunks[0] = this.chunks[0][amt..].to_vec();
170            }
171        }
172    }
173
174    struct PendingBetweenChunksReader {
175        chunks: Vec<Vec<u8>>,
176        pending_once: bool,
177    }
178
179    impl AsyncRead for PendingBetweenChunksReader {
180        fn poll_read(
181            self: Pin<&mut Self>,
182            _cx: &mut Context<'_>,
183            _buf: &mut ReadBuf<'_>,
184        ) -> Poll<io::Result<()>> {
185            unreachable!("lines should use poll_fill_buf for this test")
186        }
187    }
188
189    impl AsyncBufRead for PendingBetweenChunksReader {
190        fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
191            let this = self.get_mut();
192            if this.pending_once {
193                this.pending_once = false;
194                return Poll::Pending;
195            }
196
197            if this.chunks.is_empty() {
198                Poll::Ready(Ok(&[]))
199            } else {
200                Poll::Ready(Ok(&this.chunks[0]))
201            }
202        }
203
204        fn consume(self: Pin<&mut Self>, amt: usize) {
205            let this = self.get_mut();
206            if this.chunks.is_empty() {
207                return;
208            }
209
210            if amt >= this.chunks[0].len() {
211                this.chunks.remove(0);
212                this.pending_once = !this.chunks.is_empty();
213            } else {
214                this.chunks[0] = this.chunks[0][amt..].to_vec();
215            }
216        }
217    }
218
219    #[test]
220    fn lines_basic() {
221        init_test("lines_basic");
222        let data: &[u8] = b"line 1\nline 2\nline 3";
223        let reader = BufReader::new(data);
224        let mut lines = Lines::new(reader);
225
226        let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1");
227        crate::assert_with_log!(first, "line 1", true, first);
228        let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2");
229        crate::assert_with_log!(second, "line 2", true, second);
230        let third = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 3");
231        crate::assert_with_log!(third, "line 3", true, third);
232        // No newline at end of file logic check: "line 3" should return then None.
233        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
234        crate::assert_with_log!(done, "done", true, done);
235        crate::test_complete!("lines_basic");
236    }
237
238    #[test]
239    fn lines_crlf() {
240        init_test("lines_crlf");
241        let data: &[u8] = b"line 1\r\nline 2\r\n";
242        let reader = BufReader::new(data);
243        let mut lines = Lines::new(reader);
244
245        let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1");
246        crate::assert_with_log!(first, "line 1", true, first);
247        let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2");
248        crate::assert_with_log!(second, "line 2", true, second);
249        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
250        crate::assert_with_log!(done, "done", true, done);
251        crate::test_complete!("lines_crlf");
252    }
253
254    #[test]
255    fn lines_empty() {
256        init_test("lines_empty");
257        let data: &[u8] = b"";
258        let reader = BufReader::new(data);
259        let mut lines = Lines::new(reader);
260        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
261        crate::assert_with_log!(done, "done", true, done);
262        crate::test_complete!("lines_empty");
263    }
264
265    #[test]
266    fn lines_incomplete_last() {
267        init_test("lines_incomplete_last");
268        let data: &[u8] = b"foo\nbar";
269        let reader = BufReader::new(data);
270        let mut lines = Lines::new(reader);
271
272        let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "foo");
273        crate::assert_with_log!(first, "foo", true, first);
274        let second = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "bar");
275        crate::assert_with_log!(second, "bar", true, second);
276        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
277        crate::assert_with_log!(done, "done", true, done);
278        crate::test_complete!("lines_incomplete_last");
279    }
280
281    #[test]
282    fn lines_repoll_after_empty_completion_returns_none() {
283        let data: &[u8] = b"";
284        let reader = BufReader::new(data);
285        let mut lines = Lines::new(reader);
286
287        assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
288
289        // Fail-closed: repoll after completion returns None instead of panicking
290        assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
291        // Third poll also safe
292        assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
293    }
294
295    #[test]
296    fn lines_repoll_after_exhausting_non_empty_input_returns_none() {
297        let data: &[u8] = b"line 1\nline 2";
298        let reader = BufReader::new(data);
299        let mut lines = Lines::new(reader);
300
301        assert!(matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 1"));
302        assert!(matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "line 2"));
303        assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
304
305        // Fail-closed: repoll after completion returns None instead of panicking
306        assert!(matches!(poll_next(&mut lines), Poll::Ready(None)));
307    }
308
309    #[test]
310    fn lines_split_utf8_across_chunks() {
311        init_test("lines_split_utf8_across_chunks");
312        let reader = SplitReader {
313            chunks: vec![vec![0xF0, 0x9F], vec![0x94, 0xA5, b'\n']],
314        };
315        let mut lines = Lines::new(reader);
316
317        let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(s))) if s == "🔥");
318        crate::assert_with_log!(first, "split utf8 line", true, first);
319        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
320        crate::assert_with_log!(done, "done", true, done);
321        crate::test_complete!("lines_split_utf8_across_chunks");
322    }
323
324    #[test]
325    fn lines_crlf_after_pending_between_chunks() {
326        init_test("lines_crlf_after_pending_between_chunks");
327        let reader = PendingBetweenChunksReader {
328            chunks: vec![b"hello\r".to_vec(), b"\n".to_vec()],
329            pending_once: false,
330        };
331        let mut lines = Lines::new(reader);
332        let waker = noop_waker();
333        let mut cx = Context::from_waker(&waker);
334
335        let first_pending = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Pending);
336        crate::assert_with_log!(first_pending, "first poll pending", true, first_pending);
337
338        let second = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(Some(Ok(s))) if s == "hello");
339        crate::assert_with_log!(second, "normalized line", true, second);
340        let done = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(None));
341        crate::assert_with_log!(done, "done", true, done);
342        crate::test_complete!("lines_crlf_after_pending_between_chunks");
343    }
344
345    #[test]
346    fn lines_bounded_self_wake_after_many_immediately_ready_chunks() {
347        init_test("lines_bounded_self_wake_after_many_immediately_ready_chunks");
348        let mut chunks = vec![vec![b'a']; 40];
349        chunks.push(vec![b'\n']);
350        let reader = SplitReader { chunks };
351        let mut lines = Lines::new(reader);
352        let wake_counter = Arc::new(CountWaker {
353            wakes: AtomicUsize::new(0),
354        });
355        let waker = Waker::from(wake_counter.clone());
356        let mut cx = Context::from_waker(&waker);
357
358        let first_pending = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Pending);
359        crate::assert_with_log!(
360            first_pending,
361            "bounded self-wake pending",
362            true,
363            first_pending
364        );
365
366        let woke_self = wake_counter.wakes.load(Ordering::SeqCst) > 0;
367        crate::assert_with_log!(woke_self, "self wake recorded", true, woke_self);
368
369        let expected = "a".repeat(40);
370        let second = matches!(
371            Pin::new(&mut lines).poll_next(&mut cx),
372            Poll::Ready(Some(Ok(ref s))) if s == &expected
373        );
374        crate::assert_with_log!(second, "line after rewake", true, second);
375
376        let done = matches!(Pin::new(&mut lines).poll_next(&mut cx), Poll::Ready(None));
377        crate::assert_with_log!(done, "done", true, done);
378        crate::test_complete!("lines_bounded_self_wake_after_many_immediately_ready_chunks");
379    }
380
381    #[test]
382    fn lines_long_line_remains_unbounded() {
383        init_test("lines_long_line_remains_unbounded");
384        let long = "a".repeat(16 * 1024);
385        let payload = format!("{long}\n");
386        let reader = BufReader::new(payload.as_bytes());
387        let mut lines = Lines::new(reader);
388
389        let first = matches!(poll_next(&mut lines), Poll::Ready(Some(Ok(ref s))) if s == &long);
390        crate::assert_with_log!(first, "long line", true, first);
391        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
392        crate::assert_with_log!(done, "done", true, done);
393        crate::test_complete!("lines_long_line_remains_unbounded");
394    }
395
396    #[test]
397    fn lines_invalid_utf8_repoll_after_error_returns_none() {
398        init_test("lines_invalid_utf8_repoll_after_error_returns_none");
399        let reader = SplitReader {
400            chunks: vec![vec![0xF0, 0x9F], vec![b'\n']],
401        };
402        let mut lines = Lines::new(reader);
403
404        let invalid_data = matches!(poll_next(&mut lines), Poll::Ready(Some(Err(err))) if err.kind() == io::ErrorKind::InvalidData);
405        crate::assert_with_log!(invalid_data, "invalid-data line error", true, invalid_data);
406
407        let done = matches!(poll_next(&mut lines), Poll::Ready(None));
408        crate::assert_with_log!(done, "done", true, done);
409        crate::test_complete!("lines_invalid_utf8_repoll_after_error_returns_none");
410    }
411}