declarative_dataflow/sources/
mod.rs

1//! Types and operators to work with external data sources.
2
3use std::cell::RefCell;
4use std::rc::{Rc, Weak};
5use std::time::{Duration, Instant};
6
7use timely::dataflow::operators::capture::event::link::EventLink;
8use timely::dataflow::{ProbeHandle, Scope, Stream};
9use timely::logging::TimelyEvent;
10use timely::progress::Timestamp;
11
12use differential_dataflow::lattice::Lattice;
13use differential_dataflow::logging::DifferentialEvent;
14
15use crate::server::scheduler::Scheduler;
16use crate::AttributeConfig;
17use crate::{Aid, Value};
18
19#[cfg(feature = "csv-source")]
20pub mod csv_file;
21// pub mod declarative_logging;
22pub mod differential_logging;
23// pub mod json_file;
24pub mod timely_logging;
25
26#[cfg(feature = "csv-source")]
27pub use self::csv_file::CsvFile;
28// pub use self::json_file::JsonFile;
29
30/// A struct encapsulating any state required to create sources.
31pub struct SourcingContext<T: Timestamp> {
32    /// The logical start of the computation, used by sources to
33    /// compute their relative progress.
34    pub t0: Instant,
35    /// A handle to the timely probe of the domain this source is created in.
36    pub domain_probe: ProbeHandle<T>,
37    /// A weak handle to a scheduler, used by sources to defer their
38    /// next activation when polling.
39    pub scheduler: Weak<RefCell<Scheduler>>,
40    /// A weak handle to a Timely event link.
41    pub timely_events: Rc<EventLink<Duration, (Duration, usize, TimelyEvent)>>,
42    /// A weak handle to Differential event link.
43    pub differential_events: Rc<EventLink<Duration, (Duration, usize, DifferentialEvent)>>,
44}
45
46/// An external data source that can provide Datoms.
47pub trait Sourceable<S>
48where
49    S: Scope,
50    S::Timestamp: Timestamp + Lattice,
51{
52    /// Conjures from thin air (or from wherever the source lives) one
53    /// or more timely streams feeding directly into attributes.
54    fn source(
55        &self,
56        scope: &mut S,
57        context: SourcingContext<S::Timestamp>,
58    ) -> Vec<(
59        Aid,
60        AttributeConfig,
61        Stream<S, ((Value, Value), S::Timestamp, isize)>,
62    )>;
63}
64
65/// Supported external data sources.
66#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
67pub enum Source {
68    /// Timely logging streams
69    TimelyLogging(timely_logging::TimelyLogging),
70    /// Differential logging streams
71    DifferentialLogging(differential_logging::DifferentialLogging),
72    // /// Declarative logging streams
73    // DeclarativeLogging(declarative_logging::DeclarativeLogging),
74    /// CSV files
75    #[cfg(feature = "csv-source")]
76    CsvFile(CsvFile),
77    // /// Files containing json objects
78    // JsonFile(JsonFile),
79}
80
81#[cfg(feature = "real-time")]
82impl<S: Scope<Timestamp = Duration>> Sourceable<S> for Source {
83    fn source(
84        &self,
85        scope: &mut S,
86        context: SourcingContext<S::Timestamp>,
87    ) -> Vec<(
88        Aid,
89        AttributeConfig,
90        Stream<S, ((Value, Value), Duration, isize)>,
91    )> {
92        match *self {
93            Source::TimelyLogging(ref source) => source.source(scope, context),
94            Source::DifferentialLogging(ref source) => source.source(scope, context),
95            // Source::DeclarativeLogging(ref source) => source.source(scope, context),
96            #[cfg(feature = "csv-source")]
97            Source::CsvFile(ref source) => source.source(scope, context),
98            _ => unimplemented!(),
99        }
100    }
101}
102
103#[cfg(not(feature = "real-time"))]
104impl<S: Scope> Sourceable<S> for Source
105where
106    S::Timestamp: Timestamp + Lattice,
107{
108    fn source(
109        &self,
110        _scope: &mut S,
111        _context: SourcingContext<S::Timestamp>,
112    ) -> Vec<(
113        Aid,
114        AttributeConfig,
115        Stream<S, ((Value, Value), S::Timestamp, isize)>,
116    )> {
117        unimplemented!();
118    }
119}