forge_core_utils/
stream_lines.rs

1use bytes::Bytes;
2use futures::{Stream, StreamExt, TryStreamExt};
3use tokio_util::{
4    codec::{FramedRead, LinesCodec},
5    io::StreamReader,
6};
7
8/// Extension trait for converting chunked string streams to line streams.
9pub trait LinesStreamExt: Stream<Item = Result<String, std::io::Error>> + Sized {
10    /// Convert a chunked string stream to a line stream.
11    fn lines(self) -> futures::stream::BoxStream<'static, std::io::Result<String>>
12    where
13        Self: Send + 'static,
14    {
15        let reader = StreamReader::new(self.map(|result| result.map(Bytes::from)));
16        FramedRead::new(reader, LinesCodec::new())
17            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
18            .boxed()
19    }
20}
21
22impl<S> LinesStreamExt for S where S: Stream<Item = Result<String, std::io::Error>> {}