use futures::stream::{self, Stream};
use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use std::io::Error;
pub struct AsyncByteLines<B>
where
B: AsyncBufRead + Unpin,
{
buffer: Vec<u8>,
reader: B,
}
impl<B> AsyncByteLines<B>
where
B: AsyncBufRead + Unpin,
{
pub fn new(buf: B) -> Self {
Self {
buffer: Vec::new(),
reader: buf,
}
}
pub async fn next(&mut self) -> Result<Option<&[u8]>, Error> {
self.buffer.clear();
let handled = crate::util::handle_line(
self.reader.read_until(b'\n', &mut self.buffer).await,
&mut self.buffer,
);
handled.transpose()
}
pub fn into_stream(self) -> impl Stream<Item = Result<Vec<u8>, Error>> {
stream::try_unfold(self, |mut lines| async {
Ok(lines
.next()
.await?
.map(|line| line.to_vec())
.map(|line| (line, lines)))
})
}
}
#[cfg(test)]
#[allow(clippy::needless_range_loop)]
mod tests {
use tokio::fs::File;
use tokio::io::BufReader;
#[tokio::test]
async fn test_basic_loop() {
let file = File::open("./res/numbers.txt").await.unwrap();
let brdr = BufReader::new(file);
let mut brdr = crate::from_tokio(brdr);
let mut lines = Vec::new();
while let Some(line) = brdr.next().await.unwrap() {
let line = line.to_vec();
let line = String::from_utf8(line).unwrap();
lines.push(line);
}
for i in 0..9 {
assert_eq!(lines[i], format!("{}", i));
}
}
#[tokio::test]
async fn test_basic_stream() {
use futures::StreamExt;
let file = File::open("./res/numbers.txt").await.unwrap();
let brdr = BufReader::new(file);
let lines = crate::from_tokio(brdr)
.into_stream()
.map(|line| String::from_utf8(line.unwrap()).unwrap())
.collect::<Vec<_>>()
.await;
for i in 0..9 {
assert_eq!(lines[i], format!("{}", i));
}
}
}