1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
//! Module exposing APIs based around `AsyncBufRead` from Tokio.
use futures::stream::{self, Stream};
use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use std::io::Error;
/// Provides async iteration over bytes of input, split by line.
///
/// ```rust ignore
/// use bytelines::*;
/// use std::fs::File;
/// use std::io::BufReader;
///
/// // construct our iterator from our file input
/// let file = File::open("./res/numbers.txt").await?;
/// let reader = BufReader::new(file);
/// let mut lines = AsyncByteLines::new(reader);
///
/// // walk our lines using `while` syntax
/// while let Some(line) = lines.next().await? {
/// // do something with the line, which is &[u8]
/// }
///
/// This differs from the `stdlib` version of the API as it fits
/// more closely with the Tokio API for types.
///
/// For those who prefer the `Stream` API, this structure can be
/// converted using `into_stream`. This comes at the cost of an
/// allocation of a `Vec` for each line in the `Stream`. This is
/// negligible in many cases, so often it comes down to which
/// syntax is preferred:
///
/// ```rust ignore
/// use bytelines::*;
/// use std::fs::File;
/// use std::io::BufReader;
///
/// // construct our iterator from our file input
/// let file = File::open("./res/numbers.txt").await?;
/// let reader = BufReader::new(file);
/// let mut lines = AsyncByteLines::new(reader);
///
/// // walk our lines using `Stream` syntax
/// lines.into_stream().for_each(|line| {
///
/// });
/// ```
pub struct AsyncByteLines<B>
where
B: AsyncBufRead + Unpin,
{
buffer: Vec<u8>,
reader: B,
}
impl<B> AsyncByteLines<B>
where
B: AsyncBufRead + Unpin,
{
/// Constructs a new `ByteLines` from an input `AsyncBufRead`.
pub fn new(buf: B) -> Self {
Self {
buffer: Vec::new(),
reader: buf,
}
}
/// Retrieves a reference to the next line of bytes in the reader (if any).
pub async fn next(&mut self) -> Result<Option<&[u8]>, Error> {
self.buffer.clear();
let handled = crate::util::handle_line(
self.reader.read_until(b'\n', &mut self.buffer).await,
&mut self.buffer,
);
handled.transpose()
}
/// Converts this wrapper to provide a `Stream` API.
pub fn into_stream(self) -> impl Stream<Item = Result<Vec<u8>, Error>> {
stream::try_unfold(self, |mut lines| async {
Ok(lines
.next()
.await?
.map(|line| line.to_vec())
.map(|line| (line, lines)))
})
}
}
#[cfg(test)]
#[allow(clippy::needless_range_loop)]
mod tests {
use tokio::fs::File;
use tokio::io::BufReader;
#[tokio::test]
async fn test_basic_loop() {
let file = File::open("./res/numbers.txt").await.unwrap();
let brdr = BufReader::new(file);
let mut brdr = crate::from_tokio(brdr);
let mut lines = Vec::new();
while let Some(line) = brdr.next().await.unwrap() {
let line = line.to_vec();
let line = String::from_utf8(line).unwrap();
lines.push(line);
}
for i in 0..9 {
assert_eq!(lines[i], format!("{}", i));
}
}
#[tokio::test]
async fn test_basic_stream() {
use futures::StreamExt;
let file = File::open("./res/numbers.txt").await.unwrap();
let brdr = BufReader::new(file);
let lines = crate::from_tokio(brdr)
.into_stream()
.map(|line| String::from_utf8(line.unwrap()).unwrap())
.collect::<Vec<_>>()
.await;
for i in 0..9 {
assert_eq!(lines[i], format!("{}", i));
}
}
}