1use std::error::Error;
10
11use crate::model::Trusted;
12use sophia_api::source::{
13 StreamError,
14 StreamError::{SinkError, SourceError},
15 StreamResult,
16};
17
18pub struct StrictRioTripleSource<T>(pub T);
21
22impl<T> sophia_api::source::Source for StrictRioTripleSource<T>
23where
24 T: rio_api::parser::TriplesParser,
25 T::Error: Error + Send + Sync + 'static,
26{
27 type Item<'x> = Trusted<rio_api::model::Triple<'x>>;
28
29 type Error = T::Error;
30
31 fn try_for_some_item<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
32 where
33 EF: Error + Send + Sync + 'static,
34 F: FnMut(Self::Item<'_>) -> Result<(), EF>,
35 {
36 let parser = &mut self.0;
37 if parser.is_end() {
38 return Ok(false);
39 }
40 parser
41 .parse_step(&mut |t| -> Result<(), RioStreamError<T::Error, EF>> {
42 f(Trusted(t)).map_err(RioStreamError::Sink)
43 })
46 .map_err(StreamError::from)
47 .and(Ok(true))
48 }
49}
50
51pub struct StrictRioQuadSource<T>(pub T);
54
55impl<T> sophia_api::source::Source for StrictRioQuadSource<T>
56where
57 T: rio_api::parser::QuadsParser,
58 T::Error: Error + Send + Sync + 'static,
59{
60 type Item<'x> = Trusted<rio_api::model::Quad<'x>>;
61
62 type Error = T::Error;
63
64 fn try_for_some_item<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
65 where
66 EF: Error + Send + Sync + 'static,
67 F: FnMut(Self::Item<'_>) -> Result<(), EF>,
68 {
69 let parser = &mut self.0;
70 if parser.is_end() {
71 return Ok(false);
72 }
73 parser
74 .parse_step(&mut |q| -> Result<(), RioStreamError<T::Error, EF>> {
75 f(Trusted(q)).map_err(RioStreamError::Sink)
76 })
79 .map_err(StreamError::from)
80 .and(Ok(true))
81 }
82}
83
84pub struct GeneralizedRioSource<T>(pub T);
87
88impl<T> sophia_api::source::Source for GeneralizedRioSource<T>
89where
90 T: rio_api::parser::GeneralizedQuadsParser,
91 T::Error: Error + Send + Sync + 'static,
92{
93 type Item<'x> = Trusted<rio_api::model::GeneralizedQuad<'x>>;
94
95 type Error = T::Error;
96
97 fn try_for_some_item<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
98 where
99 EF: Error + Send + Sync + 'static,
100 F: FnMut(Self::Item<'_>) -> Result<(), EF>,
101 {
102 let parser = &mut self.0;
103 if parser.is_end() {
104 return Ok(false);
105 }
106 parser
107 .parse_step(&mut |q| -> Result<(), RioStreamError<T::Error, EF>> {
108 f(Trusted(q)).map_err(RioStreamError::Sink)
109 })
112 .map_err(StreamError::from)
113 .and(Ok(true))
114 }
115}
116
117enum RioStreamError<E1, E2> {
126 Source(E1),
128 Sink(E2),
130}
131impl<E1, E2> From<E1> for RioStreamError<E1, E2>
132where
133 E1: Error + Send + Sync + 'static,
134 E2: Error + Send + Sync + 'static,
135{
136 fn from(other: E1) -> Self {
137 Self::Source(other)
138 }
139}
140impl<E1, E2> From<RioStreamError<E1, E2>> for StreamError<E1, E2>
141where
142 E1: Error + Send + Sync + 'static,
143 E2: Error + Send + Sync + 'static,
144{
145 fn from(other: RioStreamError<E1, E2>) -> Self {
146 match other {
147 RioStreamError::Source(err) => SourceError(err),
148 RioStreamError::Sink(err) => SinkError(err),
149 }
150 }
151}