declarative_dataflow/sources/
mod.rs1use std::cell::RefCell;
4use std::rc::{Rc, Weak};
5use std::time::{Duration, Instant};
6
7use timely::dataflow::operators::capture::event::link::EventLink;
8use timely::dataflow::{ProbeHandle, Scope, Stream};
9use timely::logging::TimelyEvent;
10use timely::progress::Timestamp;
11
12use differential_dataflow::lattice::Lattice;
13use differential_dataflow::logging::DifferentialEvent;
14
15use crate::server::scheduler::Scheduler;
16use crate::AttributeConfig;
17use crate::{Aid, Value};
18
19#[cfg(feature = "csv-source")]
20pub mod csv_file;
21pub mod differential_logging;
23pub mod timely_logging;
25
26#[cfg(feature = "csv-source")]
27pub use self::csv_file::CsvFile;
28pub struct SourcingContext<T: Timestamp> {
32 pub t0: Instant,
35 pub domain_probe: ProbeHandle<T>,
37 pub scheduler: Weak<RefCell<Scheduler>>,
40 pub timely_events: Rc<EventLink<Duration, (Duration, usize, TimelyEvent)>>,
42 pub differential_events: Rc<EventLink<Duration, (Duration, usize, DifferentialEvent)>>,
44}
45
46pub trait Sourceable<S>
48where
49 S: Scope,
50 S::Timestamp: Timestamp + Lattice,
51{
52 fn source(
55 &self,
56 scope: &mut S,
57 context: SourcingContext<S::Timestamp>,
58 ) -> Vec<(
59 Aid,
60 AttributeConfig,
61 Stream<S, ((Value, Value), S::Timestamp, isize)>,
62 )>;
63}
64
65#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
67pub enum Source {
68 TimelyLogging(timely_logging::TimelyLogging),
70 DifferentialLogging(differential_logging::DifferentialLogging),
72 #[cfg(feature = "csv-source")]
76 CsvFile(CsvFile),
77 }
80
81#[cfg(feature = "real-time")]
82impl<S: Scope<Timestamp = Duration>> Sourceable<S> for Source {
83 fn source(
84 &self,
85 scope: &mut S,
86 context: SourcingContext<S::Timestamp>,
87 ) -> Vec<(
88 Aid,
89 AttributeConfig,
90 Stream<S, ((Value, Value), Duration, isize)>,
91 )> {
92 match *self {
93 Source::TimelyLogging(ref source) => source.source(scope, context),
94 Source::DifferentialLogging(ref source) => source.source(scope, context),
95 #[cfg(feature = "csv-source")]
97 Source::CsvFile(ref source) => source.source(scope, context),
98 _ => unimplemented!(),
99 }
100 }
101}
102
103#[cfg(not(feature = "real-time"))]
104impl<S: Scope> Sourceable<S> for Source
105where
106 S::Timestamp: Timestamp + Lattice,
107{
108 fn source(
109 &self,
110 _scope: &mut S,
111 _context: SourcingContext<S::Timestamp>,
112 ) -> Vec<(
113 Aid,
114 AttributeConfig,
115 Stream<S, ((Value, Value), S::Timestamp, isize)>,
116 )> {
117 unimplemented!();
118 }
119}