1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
//! A triple source produces triples, and may also fail in the process.
//!
//! The trait [`TripleSource`]
//! provides an API similar to (a subset of) the [`Iterator`] API,
//! with methods such as [`for_each_triple`] and [`try_for_each_triple`].
//!
//! # Rationale (or Why not simply use `Iterator`?)
//!
//! The [`Iterator`] trait is designed in such a way that items must live at least as long as the iterator itself.
//! This assumption may be too strong in some situations.
//!
//! For example,
//! consider a parser using 3 buffers to store the subject, predicate,
//! and object of the triples it parses.
//! Each time it extracts a triple from the data,
//! it yields it (as 3 references to its internal buffers)
//! to the closure passed to `for_each_triple`.
//! Then, it **reuses** the buffers to store the data for the next triple,
//! and yields the new triple, as 3 references to the *same* buffers.
//!
//! Such a parser can not implement [`Iterator`],
//! because, once yielded by the iterator's `next` method,
//! an item is free to live during further iterations.
//! In particular, it can be stored in a vector,
//! and still be alive when the `next` method is called again
//! (consider for example the [`Iterator::collect`] method).
//!
//! Because many parsers (as well as other triple sources)
//! will be implemented in a manner similar to that described above,
//! we have to provide a trait with *weaker assumptions*
//! on the lifetime of the yielded triples.
//!
//! The alternative would be to wrap such parsers with a layer that would *copy*
//! the data from internal buffers to fresh buffers for each triples,
//! but we do not want to impose that cost on all implementations
//! — especially when many consumers will be happy with short-lived references.
//!
//! [`for_each_triple`]: TripleSource::for_each_triple
//! [`try_for_each_triple`]: TripleSource::try_for_each_triple

use std::error::Error;

use crate::graph::*;
use crate::triple::streaming_mode::*;
use crate::triple::*;

mod _error;
pub use self::_error::*;
mod _filter;
pub use self::_filter::*;
mod _filter_map;
pub use self::_filter_map::*;
mod _iterator;
pub use self::_iterator::*;
mod _map;
pub use self::_map::*;

/// Type alias for referencing the `Term` used in a `TripleSource`.
pub type TsTerm<S> =
    <<<S as TripleSource>::Triple as TripleStreamingMode>::UnsafeTriple as UnsafeTriple>::Term;

#[allow(clippy::upper_case_acronyms)]
#[deprecated(
    since = "0.6.3",
    note = "Was renamed to TsTerm, according to naming conventions"
)]
pub type TSTerm<S> = TsTerm<S>;

/// A triple source produces [`Triple`]s, and may also fail in the process.
///
/// Any iterator yielding [`Triple`]s wrapped in `Result`
/// implements the `TripleSource` trait.
pub trait TripleSource {
    /// The type of errors produced by this source.
    type Error: 'static + Error;

    /// Determine the type of [`Triple`]s
    /// that this triple source yields.
    /// (see [`streaming_mode`])
    type Triple: TripleStreamingMode;

    /// Call f for at least one triple from this triple source, if any.
    ///
    /// Return false if there are no more triples in this source.
    fn try_for_some_triple<F, E>(&mut self, f: &mut F) -> StreamResult<bool, Self::Error, E>
    where
        F: FnMut(StreamedTriple<Self::Triple>) -> Result<(), E>,
        E: Error;

    /// Call f for all triples from this triple source.
    #[inline]
    fn try_for_each_triple<F, E>(&mut self, f: F) -> StreamResult<(), Self::Error, E>
    where
        F: FnMut(StreamedTriple<Self::Triple>) -> Result<(), E>,
        E: Error,
    {
        let mut f = f;
        while self.try_for_some_triple(&mut f)? {}
        Ok(())
    }
    /// Call f for at least one triple from this triple source, if any.
    ///
    /// Return false if there are no more triples in this source.
    #[inline]
    fn for_some_triple<F>(&mut self, f: &mut F) -> Result<bool, Self::Error>
    where
        F: FnMut(StreamedTriple<Self::Triple>),
    {
        self.try_for_some_triple(&mut |t| -> Result<(), Self::Error> {
            f(t);
            Ok(())
        })
        .map_err(StreamError::inner_into)
    }
    /// Call f for all triples from this triple source.
    #[inline]
    fn for_each_triple<F>(&mut self, f: F) -> Result<(), Self::Error>
    where
        F: FnMut(StreamedTriple<Self::Triple>),
    {
        let mut f = f;
        while self.for_some_triple(&mut f)? {}
        Ok(())
    }
    /// Creates a triple source which uses a closure to determine if a triple should be yielded.
    #[inline]
    fn filter_triples<F>(self, filter: F) -> FilterSource<Self, F>
    where
        Self: Sized,
        F: FnMut(&StreamedTriple<Self::Triple>) -> bool,
    {
        FilterSource {
            source: self,
            filter,
        }
    }
    /// Creates a triple source that both filters and maps.
    #[inline]
    fn filter_map_triples<F, T>(self, filter_map: F) -> FilterMapSource<Self, F>
    where
        Self: Sized,
        F: FnMut(StreamedTriple<Self::Triple>) -> Option<T>,
    {
        FilterMapSource {
            source: self,
            filter_map,
        }
    }
    /// Takes a closure and creates triple source which yield the result of that closure for each triple.
    #[inline]
    fn map_triples<F, T>(self, map: F) -> MapSource<Self, F>
    where
        Self: Sized,
        F: FnMut(StreamedTriple<Self::Triple>) -> T,
    {
        MapSource { source: self, map }
    }
    /// Returns the bounds on the remaining length of the triple source.
    ///
    /// This method has the same contract as [`Iterator::size_hint`].
    ///
    /// [`Iterator::size_hint`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.size_hint
    fn size_hint_triples(&self) -> (usize, Option<usize>) {
        (0, None)
    }
    /// Collect these triples into a new graph.
    fn collect_triples<G>(self) -> StreamResult<G, Self::Error, <G as Graph>::Error>
    where
        Self: Sized,
        G: CollectibleGraph,
    {
        G::from_triple_source(self)
    }
    /// Insert all triples from this source into the given [MutableGraph].
    ///
    /// Stop on the first error (in the source or in the graph).
    #[inline]
    fn add_to_graph<G: MutableGraph>(
        self,
        graph: &mut G,
    ) -> StreamResult<usize, Self::Error, <G as MutableGraph>::MutationError>
    where
        Self: Sized,
    {
        graph.insert_all(self)
    }
}

#[cfg(test)]
mod test;