declarative_dataflow/sinks/
mod.rs1use std::fs::File;
4use std::io::{LineWriter, Write};
5use std::time::Instant;
6
7use timely::dataflow::channels::pact::ParallelizationContract;
8use timely::dataflow::operators::generic::{Operator, OutputHandle};
9use timely::dataflow::operators::probe::Probe;
10use timely::dataflow::{ProbeHandle, Scope, Stream};
11use timely::progress::Timestamp;
12
13use differential_dataflow::lattice::Lattice;
14
15use crate::{Error, Output, ResultDiff, Time};
16
17#[cfg(feature = "serde_json")]
23pub mod assoc_in;
24#[cfg(feature = "serde_json")]
25pub use self::assoc_in::AssocIn;
26
27pub struct SinkingContext {
29 pub name: String,
31 pub granularity: Option<Time>,
33}
34
35pub trait Sinkable<T>
37where
38 T: Timestamp + Lattice + std::convert::Into<Time>,
39{
40 fn sink<S, P>(
43 &self,
44 stream: &Stream<S, ResultDiff<T>>,
45 pact: P,
46 probe: &mut ProbeHandle<T>,
47 context: SinkingContext,
48 ) -> Result<Option<Stream<S, Output>>, Error>
49 where
50 S: Scope<Timestamp = T>,
51 P: ParallelizationContract<S::Timestamp, ResultDiff<T>>;
52}
53
54#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
56pub enum Sink {
57 TheVoid(Option<String>),
59 #[cfg(feature = "serde_json")]
64 AssocIn(AssocIn),
65}
66
67impl<T> Sinkable<T> for Sink
68where
69 T: Timestamp + Lattice + Default + std::convert::Into<Time>,
70{
71 fn sink<S, P>(
72 &self,
73 stream: &Stream<S, ResultDiff<T>>,
74 pact: P,
75 probe: &mut ProbeHandle<T>,
76 context: SinkingContext,
77 ) -> Result<Option<Stream<S, Output>>, Error>
78 where
79 S: Scope<Timestamp = T>,
80 P: ParallelizationContract<S::Timestamp, ResultDiff<T>>,
81 {
82 match *self {
83 Sink::TheVoid(ref filename) => {
84 let mut writer = match *filename {
85 None => None,
86 Some(ref filename) => {
87 let file = File::create(filename.to_owned()).unwrap();
88 Some(LineWriter::new(file))
89 }
90 };
91
92 let mut t0 = Instant::now();
93 let mut last: T = Default::default();
94 let mut buffer = Vec::new();
95
96 stream
97 .unary_frontier(pact, "TheVoid", move |_cap, _info| {
98 move |input, _output: &mut OutputHandle<_, ResultDiff<T>, _>| {
99 let mut received_input = false;
100 input.for_each(|_time, data| {
101 data.swap(&mut buffer);
102 received_input = !buffer.is_empty();
103 buffer.clear();
104 });
105
106 if input.frontier.is_empty() {
107 println!("[{:?}] inputs to void sink ceased", t0.elapsed());
108
109 if let Some(ref mut writer) = &mut writer {
110 writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last)
111 .unwrap();
112 }
113 } else if received_input && !input.frontier.frontier().less_equal(&last)
114 {
115 if let Some(ref mut writer) = &mut writer {
116 writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last)
117 .unwrap();
118 }
119
120 last = input.frontier.frontier()[0].clone();
121 t0 = Instant::now();
122 }
123 }
124 })
125 .probe_with(probe);
126
127 Ok(None)
128 }
129 #[cfg(feature = "serde_json")]
130 Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe, context),
131 _ => unimplemented!(),
132 }
133 }
134}