Skip to main content

rdf_reader_turtle/oxrdf/
reader.rs

1// This is free and unencumbered software released into the public domain.
2
3use super::{TurtleReaderResult, TurtleTriple};
4use futures::Stream;
5use oxttl::turtle::{TokioAsyncReaderTurtleParser, TurtleParser};
6use rdf_reader::StreamIter;
7use tokio::{io::AsyncRead, runtime::Handle};
8
9/// A reader for the Turtle text format.
10pub struct TurtleReader<T: AsyncRead + Unpin + Send + 'static> {
11    pub(crate) parser: TokioAsyncReaderTurtleParser<T>,
12    pub(crate) handle: Handle,
13}
14
15impl<T: AsyncRead + Unpin + Send + 'static> From<T> for TurtleReader<T> {
16    /// Creates a Turtle reader for an `AsyncRead` source.
17    fn from(input: T) -> Self {
18        Self {
19            parser: TurtleParser::new().for_tokio_async_reader(input),
20            handle: Handle::current(),
21        }
22    }
23}
24
25impl<T: AsyncRead + Unpin + Send + 'static> TurtleReader<T> {
26    pub fn into_stream(mut self) -> impl Stream<Item = TurtleReaderResult<TurtleTriple>> {
27        async_stream::stream! {
28            while let Some(input) = self.parser.next().await {
29                yield match input {
30                    Ok(triple) => Ok(triple.into()),
31                    Err(err) => Err(err),
32                }
33            }
34        }
35    }
36}
37
38impl<T: AsyncRead + Unpin + Send + 'static> IntoIterator for TurtleReader<T> {
39    type Item = TurtleReaderResult<TurtleTriple>;
40    type IntoIter = StreamIter<Self::Item>;
41
42    fn into_iter(self) -> Self::IntoIter {
43        let handle = self.handle.clone();
44        StreamIter::new(self.into_stream(), handle)
45    }
46}