rdf_dynsyn/util/
stream.rs

1use std::io;
2
3use bytes::Bytes;
4use futures::{AsyncRead, Stream, StreamExt, TryStream, TryStreamExt};
5use tokio::runtime::Handle;
6
7pub fn bytes_stream_to_async_reader<S>(data: S) -> impl AsyncRead
8where
9    S: TryStream<Ok = Bytes> + Send + 'static + Unpin,
10    S::Error: 'static + Into<Box<dyn std::error::Error + Send + Sync>>,
11{
12    data.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
13        .into_async_read()
14}
15
16/// A blocking iterator backed by non-blocking stream.
17pub struct BlockingStreamIterator<S: Stream + Unpin> {
18    /// Inner stream.
19    stream: S,
20
21    /// Tokio runtime.
22    rt: Handle,
23}
24
25impl<S: Stream + Unpin> BlockingStreamIterator<S> {
26    /// Get new [`BlockingStreamIterator`] backed by given stream.
27    ///
28    /// # Panics
29    ///
30    /// This will panic if called outside the context of a Tokio runtime.
31    // TODO must make run-time agnostic / interoperable.
32    #[inline]
33    #[track_caller]
34    pub fn new(stream: S) -> io::Result<Self> {
35        Self::new_with_handle(stream, tokio::runtime::Handle::current())
36    }
37
38    /// Get new [`BlockingStreamIterator`] backed by given stream, and given runtime handle.
39    #[inline]
40    pub fn new_with_handle(stream: S, rt: Handle) -> io::Result<Self> {
41        Ok(Self { stream, rt })
42    }
43}
44
45impl<S: Stream + Unpin> Iterator for BlockingStreamIterator<S> {
46    type Item = S::Item;
47
48    #[inline]
49    fn next(&mut self) -> Option<Self::Item> {
50        self.rt.block_on(self.stream.next())
51    }
52}