1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! A quad source produces quads, and may also fail in the process.
//!
//! The trait [`QuadSource`]
//! provides an API similar to (a subset of) the [`Iterator`] API,
//! with methods such as [`for_each_quad`] and [`try_for_each_quad`].
//!
//! # Rationale (or Why not simply use `Iterator`?)
//!
//! See the documentation of the [`triple::stream`] module.
//!
//! [`for_each_quad`]: QuadSource::for_each_quad
//! [`try_for_each_quad`]: QuadSource::try_for_each_quad
//! [`triple::stream`]: crate::triple::stream

use std::error::Error;

use crate::dataset::*;
use crate::quad::streaming_mode::*;
use crate::quad::*;
pub use crate::triple::stream::{SinkError, SourceError, StreamError, StreamResult};

mod _filter;
pub use _filter::*;
mod _filter_map;
pub use _filter_map::*;
mod _iterator;
pub use _iterator::*;
mod _map;
pub use _map::*;

/// Type alias for referencing the `TermData` used in a `QuadSource`.
pub type QsTerm<S> =
    <<<S as QuadSource>::Quad as QuadStreamingMode>::UnsafeQuad as UnsafeQuad>::Term;

#[allow(clippy::upper_case_acronyms)]
#[deprecated(
    since = "0.6.3",
    note = "Was renamed to QsTerm, according to naming conventions"
)]
pub type QSTerm<S> = QsTerm<S>;

/// A quad source produces [`Quad`]s, and may also fail in the process.
///
/// Any iterator yielding [`Quad`]s wrapped in `Result`
/// implements the `QuadSource` trait.
pub trait QuadSource {
    /// The type of errors produced by this source.
    type Error: 'static + Error;

    /// Determine the type of [`Quad`]s
    /// that this quad source yields.
    /// (see [`streaming_mode`](super::streaming_mode)
    type Quad: QuadStreamingMode;

    /// Call f for at least one quad from this quad source, if any.
    ///
    /// Return false if there are no more quads in this source.
    fn try_for_some_quad<F, E>(&mut self, f: &mut F) -> StreamResult<bool, Self::Error, E>
    where
        F: FnMut(StreamedQuad<Self::Quad>) -> Result<(), E>,
        E: Error;

    /// Call f for all quads from this quad source.
    #[inline]
    fn try_for_each_quad<F, E>(&mut self, f: F) -> StreamResult<(), Self::Error, E>
    where
        F: FnMut(StreamedQuad<Self::Quad>) -> Result<(), E>,
        E: Error,
    {
        let mut f = f;
        while self.try_for_some_quad(&mut f)? {}
        Ok(())
    }
    /// Call f for at least one quad from this quad source, if any.
    ///
    /// Return false if there are no more quads in this source.
    #[inline]
    fn for_some_quad<F>(&mut self, f: &mut F) -> Result<bool, Self::Error>
    where
        F: FnMut(StreamedQuad<Self::Quad>),
    {
        self.try_for_some_quad(&mut |t| -> Result<(), Self::Error> {
            f(t);
            Ok(())
        })
        .map_err(StreamError::inner_into)
    }
    /// Call f for all quads from this quad source.
    #[inline]
    fn for_each_quad<F>(&mut self, f: F) -> Result<(), Self::Error>
    where
        F: FnMut(StreamedQuad<Self::Quad>),
    {
        let mut f = f;
        while self.for_some_quad(&mut f)? {}
        Ok(())
    }
    /// Creates a quad source which uses a closure to determine if a quad should be yielded.
    #[inline]
    fn filter_quads<F>(self, filter: F) -> FilterSource<Self, F>
    where
        Self: Sized,
        F: FnMut(&StreamedQuad<Self::Quad>) -> bool,
    {
        FilterSource {
            source: self,
            filter,
        }
    }
    /// Creates a quad source that both filters and maps.
    #[inline]
    fn filter_map_quads<F, T>(self, filter_map: F) -> FilterMapSource<Self, F>
    where
        Self: Sized,
        F: FnMut(StreamedQuad<Self::Quad>) -> Option<T>,
    {
        FilterMapSource {
            source: self,
            filter_map,
        }
    }
    /// Takes a closure and creates quad source which yield the result of that closure for each quad.
    #[inline]
    fn map_quads<F, T>(self, map: F) -> MapSource<Self, F>
    where
        Self: Sized,
        F: FnMut(StreamedQuad<Self::Quad>) -> T,
    {
        MapSource { source: self, map }
    }
    /// Returns the bounds on the remaining length of the quad source.
    ///
    /// This method has the same contract as [`Iterator::size_hint`].
    ///
    /// [`Iterator::size_hint`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.size_hint
    fn size_hint_quads(&self) -> (usize, Option<usize>) {
        (0, None)
    }
    /// Collect these quads into a new dataset.
    fn collect_quads<D>(self) -> StreamResult<D, Self::Error, <D as Dataset>::Error>
    where
        Self: Sized,
        D: CollectibleDataset,
    {
        D::from_quad_source(self)
    }
    /// Insert all quads from this source into the given [MutableDataset].
    ///
    /// Stop on the first error (in the source or in the dataset).
    #[inline]
    fn add_to_dataset<D: MutableDataset>(
        self,
        dataset: &mut D,
    ) -> StreamResult<usize, Self::Error, <D as MutableDataset>::MutationError>
    where
        Self: Sized,
    {
        dataset.insert_all(self)
    }
}

#[cfg(test)]
mod test;