1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use super::*;
use crate::triple::streaming_mode::StreamedTriple;
use std::collections::VecDeque;
pub struct MapSource<S, F> {
pub source: S,
pub map: F,
}
impl<S, F, T> QuadSource for MapSource<S, F>
where
S: QuadSource,
F: FnMut(StreamedQuad<S::Quad>) -> T,
T: Quad,
{
type Error = S::Error;
type Quad = ByValue<T>;
fn try_for_some_quad<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
where
G: FnMut(StreamedQuad<Self::Quad>) -> Result<(), E>,
E: Error,
{
let map = &mut self.map;
self.source
.try_for_some_quad(&mut |t| f(StreamedQuad::by_value((map)(t))))
}
fn size_hint_quads(&self) -> (usize, Option<usize>) {
self.source.size_hint_quads()
}
}
impl<S, F, T> crate::triple::stream::TripleSource for MapSource<S, F>
where
S: QuadSource,
F: FnMut(StreamedQuad<S::Quad>) -> T,
T: crate::triple::Triple,
{
type Error = S::Error;
type Triple = crate::triple::streaming_mode::ByValue<T>;
fn try_for_some_triple<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
where
G: FnMut(StreamedTriple<Self::Triple>) -> Result<(), E>,
E: Error,
{
let map = &mut self.map;
self.source
.try_for_some_quad(&mut |q| f(StreamedTriple::by_value((map)(q))))
}
fn size_hint_triples(&self) -> (usize, Option<usize>) {
self.source.size_hint_quads()
}
}
impl<S, F, T> IntoIterator for MapSource<S, F>
where
S: QuadSource,
F: FnMut(StreamedQuad<S::Quad>) -> T,
T: 'static,
{
type Item = Result<T, S::Error>;
type IntoIter = MapSourceIterator<S, F, T, S::Error>;
fn into_iter(self) -> Self::IntoIter {
MapSourceIterator {
source: self.source,
map: self.map,
buffer: VecDeque::new(),
}
}
}
pub struct MapSourceIterator<S, F, T, E> {
pub source: S,
pub map: F,
pub buffer: VecDeque<Result<T, E>>,
}
impl<S, F, T, E> Iterator for MapSourceIterator<S, F, T, E>
where
S: QuadSource<Error = E>,
F: FnMut(StreamedQuad<S::Quad>) -> T,
T: 'static,
E: 'static + std::error::Error,
{
type Item = Result<T, S::Error>;
fn next(&mut self) -> Option<Result<T, S::Error>> {
let mut remaining = true;
let mut buffer = VecDeque::new();
std::mem::swap(&mut self.buffer, &mut buffer);
let map = &mut self.map;
while buffer.is_empty() && remaining {
match self.source.for_some_quad(&mut |q| {
buffer.push_back(Ok((map)(q)));
}) {
Ok(b) => {
remaining = b;
}
Err(err) => {
buffer.push_back(Err(err));
}
}
}
std::mem::swap(&mut self.buffer, &mut buffer);
self.buffer.pop_front()
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.source.size_hint_quads()
}
}