forge_core_utils/
stream_lines.rs1use bytes::Bytes;
2use futures::{Stream, StreamExt, TryStreamExt};
3use tokio_util::{
4 codec::{FramedRead, LinesCodec},
5 io::StreamReader,
6};
7
8pub trait LinesStreamExt: Stream<Item = Result<String, std::io::Error>> + Sized {
10 fn lines(self) -> futures::stream::BoxStream<'static, std::io::Result<String>>
12 where
13 Self: Send + 'static,
14 {
15 let reader = StreamReader::new(self.map(|result| result.map(Bytes::from)));
16 FramedRead::new(reader, LinesCodec::new())
17 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
18 .boxed()
19 }
20}
21
22impl<S> LinesStreamExt for S where S: Stream<Item = Result<String, std::io::Error>> {}