1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! Module exposing APIs based around `AsyncBufRead` from Tokio.
use futures::stream::{self, Stream};
use tokio::io::{AsyncBufRead, AsyncBufReadExt};

use std::io::Error;

/// Provides async iteration over bytes of input, split by line.
///
/// ```rust ignore
/// use bytelines::*;
/// use std::fs::File;
/// use std::io::BufReader;
///
/// // construct our iterator from our file input
/// let file = File::open("./res/numbers.txt").await?;
/// let reader = BufReader::new(file);
/// let mut lines = AsyncByteLines::new(reader);
///
/// // walk our lines using `while` syntax
/// while let Some(line) = lines.next().await? {
///     // do something with the line, which is &[u8]
/// }
///
/// This differs from the `stdlib` version of the API as it fits
/// more closely with the Tokio API for types.
///
/// For those who prefer the `Stream` API, this structure can be
/// converted using `into_stream`. This comes at the cost of an
/// allocation of a `Vec` for each line in the `Stream`. This is
/// negligible in many cases, so often it comes down to which
/// syntax is preferred:
///
/// ```rust ignore
/// use bytelines::*;
/// use std::fs::File;
/// use std::io::BufReader;
///
/// // construct our iterator from our file input
/// let file = File::open("./res/numbers.txt").await?;
/// let reader = BufReader::new(file);
/// let mut lines = AsyncByteLines::new(reader);
///
/// // walk our lines using `Stream` syntax
/// lines.into_stream().for_each(|line| {
///
/// });
/// ```
pub struct AsyncByteLines<B>
where
    B: AsyncBufRead + Unpin,
{
    buffer: Vec<u8>,
    reader: B,
}

impl<B> AsyncByteLines<B>
where
    B: AsyncBufRead + Unpin,
{
    /// Constructs a new `ByteLines` from an input `AsyncBufRead`.
    pub fn new(buf: B) -> Self {
        Self {
            buffer: Vec::new(),
            reader: buf,
        }
    }

    /// Retrieves a reference to the next line of bytes in the reader (if any).
    pub async fn next(&mut self) -> Result<Option<&[u8]>, Error> {
        self.buffer.clear();
        let handled = crate::util::handle_line(
            self.reader.read_until(b'\n', &mut self.buffer).await,
            &mut self.buffer,
        );
        handled.transpose()
    }

    /// Converts this wrapper to provide a `Stream` API.
    pub fn into_stream(self) -> impl Stream<Item = Result<Vec<u8>, Error>> {
        stream::try_unfold(self, |mut lines| async {
            Ok(lines
                .next()
                .await?
                .map(|line| line.to_vec())
                .map(|line| (line, lines)))
        })
    }
}

#[cfg(test)]
#[allow(clippy::needless_range_loop)]
mod tests {
    use tokio::fs::File;
    use tokio::io::BufReader;

    #[tokio::test]
    async fn test_basic_loop() {
        let file = File::open("./res/numbers.txt").await.unwrap();
        let brdr = BufReader::new(file);
        let mut brdr = crate::from_tokio(brdr);
        let mut lines = Vec::new();

        while let Some(line) = brdr.next().await.unwrap() {
            let line = line.to_vec();
            let line = String::from_utf8(line).unwrap();

            lines.push(line);
        }

        for i in 0..9 {
            assert_eq!(lines[i], format!("{}", i));
        }
    }

    #[tokio::test]
    async fn test_basic_stream() {
        use futures::StreamExt;

        let file = File::open("./res/numbers.txt").await.unwrap();
        let brdr = BufReader::new(file);

        let lines = crate::from_tokio(brdr)
            .into_stream()
            .map(|line| String::from_utf8(line.unwrap()).unwrap())
            .collect::<Vec<_>>()
            .await;

        for i in 0..9 {
            assert_eq!(lines[i], format!("{}", i));
        }
    }
}