1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// this module is transparently re-exported by its parent `stream`

use super::*;

use crate::triple::streaming_mode::StreamedTriple;

use std::collections::VecDeque;

/// The result of [`QuadSource::map_quads`]
pub struct MapSource<S, F> {
    pub source: S,
    pub map: F,
}

impl<S, F, T> QuadSource for MapSource<S, F>
where
    S: QuadSource,
    F: FnMut(StreamedQuad<S::Quad>) -> T,
    T: Quad,
{
    type Error = S::Error;
    type Quad = ByValue<T>;
    fn try_for_some_quad<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
    where
        G: FnMut(StreamedQuad<Self::Quad>) -> Result<(), E>,
        E: Error,
    {
        let map = &mut self.map;
        self.source
            .try_for_some_quad(&mut |t| f(StreamedQuad::by_value((map)(t))))
    }

    fn size_hint_quads(&self) -> (usize, Option<usize>) {
        self.source.size_hint_quads()
    }
}

impl<S, F, T> crate::triple::stream::TripleSource for MapSource<S, F>
where
    S: QuadSource,
    F: FnMut(StreamedQuad<S::Quad>) -> T,
    T: crate::triple::Triple,
{
    type Error = S::Error;
    type Triple = crate::triple::streaming_mode::ByValue<T>;
    fn try_for_some_triple<G, E>(&mut self, f: &mut G) -> StreamResult<bool, Self::Error, E>
    where
        G: FnMut(StreamedTriple<Self::Triple>) -> Result<(), E>,
        E: Error,
    {
        let map = &mut self.map;
        self.source
            .try_for_some_quad(&mut |q| f(StreamedTriple::by_value((map)(q))))
    }

    fn size_hint_triples(&self) -> (usize, Option<usize>) {
        self.source.size_hint_quads()
    }
}

impl<S, F, T> IntoIterator for MapSource<S, F>
where
    S: QuadSource,
    F: FnMut(StreamedQuad<S::Quad>) -> T,
    T: 'static,
{
    type Item = Result<T, S::Error>;
    type IntoIter = MapSourceIterator<S, F, T, S::Error>;
    fn into_iter(self) -> Self::IntoIter {
        MapSourceIterator {
            source: self.source,
            map: self.map,
            buffer: VecDeque::new(),
        }
    }
}

/// An iterator over the result of [`QuadSource::map_quads`]
pub struct MapSourceIterator<S, F, T, E> {
    pub source: S,
    pub map: F,
    pub buffer: VecDeque<Result<T, E>>,
}

impl<S, F, T, E> Iterator for MapSourceIterator<S, F, T, E>
where
    S: QuadSource<Error = E>,
    F: FnMut(StreamedQuad<S::Quad>) -> T,
    T: 'static,
    E: 'static + std::error::Error,
{
    type Item = Result<T, S::Error>;
    fn next(&mut self) -> Option<Result<T, S::Error>> {
        let mut remaining = true;
        let mut buffer = VecDeque::new();
        std::mem::swap(&mut self.buffer, &mut buffer);
        let map = &mut self.map;
        while buffer.is_empty() && remaining {
            match self.source.for_some_quad(&mut |q| {
                buffer.push_back(Ok((map)(q)));
            }) {
                Ok(b) => {
                    remaining = b;
                }
                Err(err) => {
                    buffer.push_back(Err(err));
                }
            }
        }
        std::mem::swap(&mut self.buffer, &mut buffer);
        self.buffer.pop_front()
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.source.size_hint_quads()
    }
}