1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// this module is transparently re-exported by its parent `stream`

use super::*;

use crate::quad::streaming_mode::StreamedQuad;

use std::collections::VecDeque;

/// The result of [`TripleSource::filter_map_triples`]
pub struct FilterMapSource<S, F> {
    pub source: S,
    pub filter_map: F,
}

impl<S, F, T> TripleSource for FilterMapSource<S, F>
where
    S: TripleSource,
    F: FnMut(StreamedTriple<S::Triple>) -> Option<T>,
    T: Triple,
{
    type Error = S::Error;
    type Triple = ByValue<T>;
    fn try_for_some_triple<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
    where
        G: FnMut(StreamedTriple<Self::Triple>) -> Result<(), E>,
        E: Error,
    {
        let filter_map = &mut self.filter_map;
        self.source.try_for_some_triple(&mut |t| {
            if let Some(q) = (filter_map)(t) {
                f(StreamedTriple::by_value(q))
            } else {
                Ok(())
            }
        })
    }

    fn size_hint_triples(&self) -> (usize, Option<usize>) {
        (0, self.source.size_hint_triples().1)
    }
}

impl<S, F, T> crate::quad::stream::QuadSource for FilterMapSource<S, F>
where
    S: TripleSource,
    F: FnMut(StreamedTriple<S::Triple>) -> Option<T>,
    T: crate::quad::Quad,
{
    type Error = S::Error;
    type Quad = crate::quad::streaming_mode::ByValue<T>;
    fn try_for_some_quad<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
    where
        G: FnMut(StreamedQuad<Self::Quad>) -> Result<(), E>,
        E: Error,
    {
        let filter_map = &mut self.filter_map;
        self.source.try_for_some_triple(&mut |t| {
            if let Some(u) = (filter_map)(t) {
                f(StreamedQuad::by_value(u))
            } else {
                Ok(())
            }
        })
    }

    fn size_hint_quads(&self) -> (usize, Option<usize>) {
        (0, self.source.size_hint_triples().1)
    }
}

impl<S, F, T> IntoIterator for FilterMapSource<S, F>
where
    S: TripleSource,
    F: FnMut(StreamedTriple<S::Triple>) -> Option<T>,
    T: 'static,
{
    type Item = Result<T, S::Error>;
    type IntoIter = FilterMapSourceIterator<S, F, T, S::Error>;
    fn into_iter(self) -> Self::IntoIter {
        FilterMapSourceIterator {
            source: self.source,
            filter_map: self.filter_map,
            buffer: VecDeque::new(),
        }
    }
}

/// An iterator over the result of [`TripleSource::filter_map_triples`]
pub struct FilterMapSourceIterator<S, F, T, E> {
    pub source: S,
    pub filter_map: F,
    pub buffer: VecDeque<Result<T, E>>,
}

impl<S, F, T, E> Iterator for FilterMapSourceIterator<S, F, T, E>
where
    S: TripleSource<Error = E>,
    F: FnMut(StreamedTriple<S::Triple>) -> Option<T>,
    T: 'static,
    E: 'static + std::error::Error,
{
    type Item = Result<T, S::Error>;
    fn next(&mut self) -> Option<Result<T, S::Error>> {
        let mut remaining = true;
        let mut buffer = VecDeque::new();
        std::mem::swap(&mut self.buffer, &mut buffer);
        let filter_map = &mut self.filter_map;
        while self.buffer.is_empty() && remaining {
            match self.source.for_some_triple(&mut |t| {
                if let Some(v) = (filter_map)(t) {
                    buffer.push_back(Ok(v));
                }
            }) {
                Ok(b) => {
                    remaining = b;
                }
                Err(err) => {
                    buffer.push_back(Err(err));
                }
            };
        }
        std::mem::swap(&mut self.buffer, &mut buffer);
        self.buffer.pop_front()
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, self.source.size_hint_triples().1)
    }
}