declarative_dataflow/sinks/
mod.rs

1//! Types and operators to feed outputs into external systems.
2
3use std::fs::File;
4use std::io::{LineWriter, Write};
5use std::time::Instant;
6
7use timely::dataflow::channels::pact::ParallelizationContract;
8use timely::dataflow::operators::generic::{Operator, OutputHandle};
9use timely::dataflow::operators::probe::Probe;
10use timely::dataflow::{ProbeHandle, Scope, Stream};
11use timely::progress::Timestamp;
12
13use differential_dataflow::lattice::Lattice;
14
15use crate::{Error, Output, ResultDiff, Time};
16
17// #[cfg(feature = "csv-source")]
18// pub mod csv_file;
19// #[cfg(feature = "csv-source")]
20// pub use self::csv_file::CsvFile;
21
22#[cfg(feature = "serde_json")]
23pub mod assoc_in;
24#[cfg(feature = "serde_json")]
25pub use self::assoc_in::AssocIn;
26
27/// A struct encapsulating any state required to create sinks.
28pub struct SinkingContext {
29    /// The name of the dataflow feeding this sink.
30    pub name: String,
31    /// Granularity at which to send results. None indicates no delay.
32    pub granularity: Option<Time>,
33}
34
35/// An external system that wants to receive result diffs.
36pub trait Sinkable<T>
37where
38    T: Timestamp + Lattice + std::convert::Into<Time>,
39{
40    /// Creates a timely operator feeding dataflow outputs to a
41    /// specialized data sink.
42    fn sink<S, P>(
43        &self,
44        stream: &Stream<S, ResultDiff<T>>,
45        pact: P,
46        probe: &mut ProbeHandle<T>,
47        context: SinkingContext,
48    ) -> Result<Option<Stream<S, Output>>, Error>
49    where
50        S: Scope<Timestamp = T>,
51        P: ParallelizationContract<S::Timestamp, ResultDiff<T>>;
52}
53
54/// Supported external systems.
55#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
56pub enum Sink {
57    /// /dev/null, used for benchmarking
58    TheVoid(Option<String>),
59    // /// CSV files
60    // #[cfg(feature = "csv-source")]
61    // CsvFile(CsvFile),
62    /// Nested Hash-Maps
63    #[cfg(feature = "serde_json")]
64    AssocIn(AssocIn),
65}
66
67impl<T> Sinkable<T> for Sink
68where
69    T: Timestamp + Lattice + Default + std::convert::Into<Time>,
70{
71    fn sink<S, P>(
72        &self,
73        stream: &Stream<S, ResultDiff<T>>,
74        pact: P,
75        probe: &mut ProbeHandle<T>,
76        context: SinkingContext,
77    ) -> Result<Option<Stream<S, Output>>, Error>
78    where
79        S: Scope<Timestamp = T>,
80        P: ParallelizationContract<S::Timestamp, ResultDiff<T>>,
81    {
82        match *self {
83            Sink::TheVoid(ref filename) => {
84                let mut writer = match *filename {
85                    None => None,
86                    Some(ref filename) => {
87                        let file = File::create(filename.to_owned()).unwrap();
88                        Some(LineWriter::new(file))
89                    }
90                };
91
92                let mut t0 = Instant::now();
93                let mut last: T = Default::default();
94                let mut buffer = Vec::new();
95
96                stream
97                    .unary_frontier(pact, "TheVoid", move |_cap, _info| {
98                        move |input, _output: &mut OutputHandle<_, ResultDiff<T>, _>| {
99                            let mut received_input = false;
100                            input.for_each(|_time, data| {
101                                data.swap(&mut buffer);
102                                received_input = !buffer.is_empty();
103                                buffer.clear();
104                            });
105
106                            if input.frontier.is_empty() {
107                                println!("[{:?}] inputs to void sink ceased", t0.elapsed());
108
109                                if let Some(ref mut writer) = &mut writer {
110                                    writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last)
111                                        .unwrap();
112                                }
113                            } else if received_input && !input.frontier.frontier().less_equal(&last)
114                            {
115                                if let Some(ref mut writer) = &mut writer {
116                                    writeln!(writer, "{},{:?}", t0.elapsed().as_millis(), last)
117                                        .unwrap();
118                                }
119
120                                last = input.frontier.frontier()[0].clone();
121                                t0 = Instant::now();
122                            }
123                        }
124                    })
125                    .probe_with(probe);
126
127                Ok(None)
128            }
129            #[cfg(feature = "serde_json")]
130            Sink::AssocIn(ref sink) => sink.sink(stream, pact, probe, context),
131            _ => unimplemented!(),
132        }
133    }
134}