rdf_dynsyn/util/
stream.rs1use std::io;
2
3use bytes::Bytes;
4use futures::{AsyncRead, Stream, StreamExt, TryStream, TryStreamExt};
5use tokio::runtime::Handle;
6
7pub fn bytes_stream_to_async_reader<S>(data: S) -> impl AsyncRead
8where
9 S: TryStream<Ok = Bytes> + Send + 'static + Unpin,
10 S::Error: 'static + Into<Box<dyn std::error::Error + Send + Sync>>,
11{
12 data.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
13 .into_async_read()
14}
15
16pub struct BlockingStreamIterator<S: Stream + Unpin> {
18 stream: S,
20
21 rt: Handle,
23}
24
25impl<S: Stream + Unpin> BlockingStreamIterator<S> {
26 #[inline]
33 #[track_caller]
34 pub fn new(stream: S) -> io::Result<Self> {
35 Self::new_with_handle(stream, tokio::runtime::Handle::current())
36 }
37
38 #[inline]
40 pub fn new_with_handle(stream: S, rt: Handle) -> io::Result<Self> {
41 Ok(Self { stream, rt })
42 }
43}
44
45impl<S: Stream + Unpin> Iterator for BlockingStreamIterator<S> {
46 type Item = S::Item;
47
48 #[inline]
49 fn next(&mut self) -> Option<Self::Item> {
50 self.rt.block_on(self.stream.next())
51 }
52}