rdf_reader_turtle/oxrdf/
reader.rs1use super::{TurtleReaderResult, TurtleTriple};
4use futures::Stream;
5use oxttl::turtle::{TokioAsyncReaderTurtleParser, TurtleParser};
6use rdf_reader::StreamIter;
7use tokio::{io::AsyncRead, runtime::Handle};
8
9pub struct TurtleReader<T: AsyncRead + Unpin + Send + 'static> {
11 pub(crate) parser: TokioAsyncReaderTurtleParser<T>,
12 pub(crate) handle: Handle,
13}
14
15impl<T: AsyncRead + Unpin + Send + 'static> From<T> for TurtleReader<T> {
16 fn from(input: T) -> Self {
18 Self {
19 parser: TurtleParser::new().for_tokio_async_reader(input),
20 handle: Handle::current(),
21 }
22 }
23}
24
25impl<T: AsyncRead + Unpin + Send + 'static> TurtleReader<T> {
26 pub fn into_stream(mut self) -> impl Stream<Item = TurtleReaderResult<TurtleTriple>> {
27 async_stream::stream! {
28 while let Some(input) = self.parser.next().await {
29 yield match input {
30 Ok(triple) => Ok(triple.into()),
31 Err(err) => Err(err),
32 }
33 }
34 }
35 }
36}
37
38impl<T: AsyncRead + Unpin + Send + 'static> IntoIterator for TurtleReader<T> {
39 type Item = TurtleReaderResult<TurtleTriple>;
40 type IntoIter = StreamIter<Self::Item>;
41
42 fn into_iter(self) -> Self::IntoIter {
43 let handle = self.handle.clone();
44 StreamIter::new(self.into_stream(), handle)
45 }
46}