1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Types and operators to feed outputs into external systems.

use std::fs::File;
use std::io::{LineWriter, Write};
use std::time::{Duration, Instant};

use timely::dataflow::channels::pact::ParallelizationContract;
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::{Scope, Stream};
use timely::order::TotalOrder;
use timely::progress::Timestamp;

use differential_dataflow::lattice::Lattice;

use crate::{Error, ResultDiff};

#[cfg(feature = "csv-source")]
pub mod csv_file;

#[cfg(feature = "csv-source")]
pub use self::csv_file::CsvFile;

/// An external system that wants to receive result diffs.
pub trait Sinkable<T>
where
    T: Timestamp + Lattice + TotalOrder,
{
    /// Creates a timely operator reading from the source andn
    /// producing inputs.
    fn sink<S, P>(
        &self,
        stream: &Stream<S, ResultDiff<T>>,
        pact: P,
    ) -> Result<Stream<S, ResultDiff<T>>, Error>
    where
        S: Scope<Timestamp = T>,
        P: ParallelizationContract<S::Timestamp, ResultDiff<T>>;
}

/// Supported external systems.
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum Sink {
    /// /dev/null, used for benchmarking
    TheVoid(Option<String>),
    /// CSV files
    #[cfg(feature = "csv-source")]
    CsvFile(CsvFile),
}

impl Sinkable<u64> for Sink {
    fn sink<S, P>(
        &self,
        stream: &Stream<S, ResultDiff<u64>>,
        pact: P,
    ) -> Result<Stream<S, ResultDiff<u64>>, Error>
    where
        S: Scope<Timestamp = u64>,
        P: ParallelizationContract<S::Timestamp, ResultDiff<u64>>,
    {
        match *self {
            #[cfg(feature = "csv-source")]
            Sink::CsvFile(ref sink) => sink.sink(stream, pact),
            _ => unimplemented!(),
        }
    }
}

impl Sinkable<Duration> for Sink {
    fn sink<S, P>(
        &self,
        stream: &Stream<S, ResultDiff<Duration>>,
        pact: P,
    ) -> Result<Stream<S, ResultDiff<Duration>>, Error>
    where
        S: Scope<Timestamp = Duration>,
        P: ParallelizationContract<S::Timestamp, ResultDiff<Duration>>,
    {
        match *self {
            Sink::TheVoid(ref filename) => {
                let mut writer = match *filename {
                    None => None,
                    Some(ref filename) => {
                        let file = File::create(filename.to_owned()).unwrap();
                        Some(LineWriter::new(file))
                    }
                };

                let mut t0 = Instant::now();
                let mut last = Duration::from_millis(0);
                let mut buffer = Vec::new();

                let sunk = stream.unary_frontier(pact, "TheVoid", move |_cap, _info| {
                    move |input, _output: &mut OutputHandle<_, ResultDiff<Duration>, _>| {
                        let mut received_input = false;
                        input.for_each(|time, data| {
                            data.swap(&mut buffer);
                            received_input = !buffer.is_empty();
                            buffer.clear();
                        });

                        if input.frontier.is_empty() {
                            println!("[{:?}] inputs to void sink ceased", t0.elapsed());
                        } else if received_input && !input.frontier.frontier().less_equal(&last) {
                            if let Some(ref mut writer) = &mut writer {
                                write!(writer, "{},{:?}\n", t0.elapsed().as_millis(), last)
                                    .unwrap();
                            }

                            last = input.frontier.frontier()[0].clone();
                            t0 = Instant::now();
                        }
                    }
                });

                Ok(sunk)
            }
            _ => unimplemented!(),
        }
    }
}