timely_util/
operators.rs

1/*
2    Some useful custom operators and aggregators
3    that aren't provided out-of-the-box in Timely,
4    particularly for experiments where we just
5    want to aggregate the entire stream.
6*/
7
8use super::{Pipeline, Scope, Stream, Timestamp};
9use crate::util::either::Either;
10
11use std::fmt::Debug;
12use std::fs::OpenOptions;
13use std::io::prelude::*;
14use timely::dataflow::operators::{
15    Accumulate, Concat, Exchange, Filter, Inspect, Map, Operator,
16};
17
18/*
19    Simple enum with two variants, used internally for some operators
20*/
21
22/*
23    Window over the entire input stream, producing a single
24    output at the end.
25
26    This version is parallel: it preserves the partition on the
27    input stream and thus produces one output per worker.
28
29    Possible Improvements:
30    - It would be nice if 'emit' was an FnOnce. Right now I'm not sure
31      how to accomplish that.
32    - It would also be nice if the window does not persist at all after emit
33      is called; perhaps this can be accomplished by using Option magic
34      to set the state to None at the end.
35*/
36pub fn window_all_parallel<D1, D2, D3, I, F, E, T, G>(
37    name: &str,
38    in_stream: &Stream<G, D1>,
39    init: I,
40    fold: F,
41    emit: E,
42) -> Stream<G, D3>
43where
44    D1: timely::Data + Debug, // input data
45    D2: timely::Data + Debug, // accumulator
46    D3: timely::Data + Debug, // output data
47    I: FnOnce() -> D2 + 'static,
48    F: Fn(&mut D2, &T, Vec<D1>) + 'static,
49    E: Fn(&D2) -> D3 + 'static,
50    T: Timestamp + Copy,
51    G: Scope<Timestamp = T>,
52{
53    in_stream.unary_frontier(Pipeline, name, |capability1, _info| {
54        let mut agg = init();
55        let cap_time = *capability1.time();
56        let mut maybe_cap = Some(capability1);
57
58        move |input, output| {
59            while let Some((capability2, data)) = input.next() {
60                let mut data_vec = Vec::new();
61                data.swap(&mut data_vec);
62                fold(&mut agg, capability2.time(), data_vec);
63                if *capability2.time() > cap_time {
64                    maybe_cap = Some(capability2.retain());
65                }
66            }
67            // Check if entire input is done
68            if input.frontier().is_empty() {
69                if let Some(cap) = maybe_cap.as_ref() {
70                    output.session(&cap).give(emit(&agg));
71                    maybe_cap = None;
72                }
73            }
74        }
75    })
76}
77
78/*
79    Window over the entire input stream, producing a single
80    output at the end.
81
82    This version forwards all inputs to a single worker,
83    and produces only one output item (for that worker).
84*/
85pub fn window_all<D1, D2, D3, I, F, E, T, G>(
86    name: &str,
87    in_stream: &Stream<G, D1>,
88    init: I,
89    fold: F,
90    emit: E,
91) -> Stream<G, D3>
92where
93    D1: timely::Data + Debug + timely::ExchangeData, // input data
94    D2: timely::Data + Debug,                        // accumulator
95    D3: timely::Data + Debug,                        // output data
96    I: FnOnce() -> D2 + 'static,
97    F: Fn(&mut D2, &T, Vec<D1>) + 'static,
98    E: Fn(&D2) -> D3 + 'static,
99    T: Timestamp + Copy,
100    G: Scope<Timestamp = T>,
101{
102    let in_stream_single = in_stream.exchange(|_x| 0);
103    window_all_parallel(
104        name,
105        &in_stream_single,
106        || (init(), false),
107        move |(x, nonempty), time, data| {
108            fold(x, time, data);
109            *nonempty = true;
110        },
111        move |(x, nonempty)| {
112            if *nonempty {
113                Some(emit(x))
114            } else {
115                None
116            }
117        },
118    )
119    .filter(|x| x.is_some())
120    .map(|x| x.unwrap()) // guaranteed not to panic
121}
122
123/*
124    Unary operation on a "singleton" stream, i.e.
125    one which has only one element.
126
127    Notes:
128    - Panics if called on an input stream which receives more than 2 elements.
129    - Waits for an input stream to finish before emitting output, so will hang
130      on an input stream which isn't closed even if it only ever gets 1 element.
131    - Clones the input once. (This shouldn't be necessary, it's just due to some
132      difficulties with ownernship, probably because window_all isn't quite
133      implemented in the best way yet.)
134*/
135pub fn single_op_unary<D1, D2, F, T, G>(
136    name: &str,
137    in_stream: &Stream<G, D1>,
138    op: F,
139) -> Stream<G, D2>
140where
141    D1: timely::Data + Debug + timely::ExchangeData, // input data
142    D2: timely::Data + Debug,                        // output data
143    F: Fn(D1) -> D2 + 'static,
144    T: Timestamp + Copy,
145    G: Scope<Timestamp = T>,
146{
147    window_all(
148        name,
149        in_stream,
150        || None,
151        |seen, _time, data| {
152            for d in data {
153                assert!(seen.is_none());
154                *seen = Some(d);
155            }
156        },
157        move |seen| op(seen.clone().unwrap()),
158    )
159}
160
161/*
162    Binary operation on two "singleton" streams, i.e.
163    streams which have only one element each.
164*/
165pub fn single_op_binary<D1, D2, D3, F, T, G>(
166    name: &str,
167    in_stream1: &Stream<G, D1>,
168    in_stream2: &Stream<G, D2>,
169    op: F,
170) -> Stream<G, D3>
171where
172    D1: timely::Data + Debug + timely::ExchangeData, // input data 1
173    D2: timely::Data + Debug + timely::ExchangeData, // input data 2
174    D3: timely::Data + Debug,                        // output data
175    F: Fn(D1, D2) -> D3 + 'static,
176    T: Timestamp + Copy,
177    G: Scope<Timestamp = T>,
178{
179    let stream1 = in_stream1.map(Either::Left);
180    let stream2 = in_stream2.map(Either::Right);
181    let stream = stream1.concat(&stream2);
182
183    window_all(
184        name,
185        &stream,
186        || (None, None),
187        |(seen1, seen2), _time, data| {
188            for d in data {
189                match d {
190                    Either::Left(d1) => {
191                        assert!(seen1.is_none());
192                        *seen1 = Some(d1);
193                    }
194                    Either::Right(d2) => {
195                        assert!(seen2.is_none());
196                        *seen2 = Some(d2);
197                    }
198                }
199            }
200        },
201        move |(seen1, seen2)| {
202            op(seen1.clone().unwrap(), seen2.clone().unwrap())
203        },
204    )
205}
206
207/*
208    Save a stream to a file in append mode.
209
210    Requires as input a formatting function for what to print.
211    Returns the input stream (unchanged as output).
212    Panics if file handling fails.
213*/
214pub fn save_to_file<D, F, T, G>(
215    in_stream: &Stream<G, D>,
216    filename: &str,
217    format: F,
218) -> Stream<G, D>
219where
220    D: timely::Data, // input data
221    F: Fn(&D) -> std::string::String + 'static,
222    T: Timestamp,
223    G: Scope<Timestamp = T>,
224{
225    let mut file =
226        OpenOptions::new().create(true).append(true).open(filename).unwrap();
227    in_stream.inspect(move |d| {
228        writeln!(file, "{}", format(d)).unwrap();
229    })
230}
231
232/*
233    Sum the values in each timestamp.
234    (Like count, produce a separate value for each worker)
235*/
236pub trait Sum<G: Scope> {
237    fn sum(&self) -> Stream<G, usize>;
238}
239impl<G: Scope> Sum<G> for Stream<G, usize> {
240    fn sum(&self) -> Stream<G, usize> {
241        self.accumulate(0, |sum, data| {
242            for &x in data.iter() {
243                *sum += x;
244            }
245        })
246    }
247}
248
249/*
250    A simple implementation of join-by-timestamp.
251    This is data-parallel (does not re-partition input streams).
252    Note: we could potentially use Differential Dataflow for this,
253    and there are other existing implementations of join out there,
254    but we only need this very simple case which can be done
255    in plain Timely with concat, accumulate, and flat_map.
256
257    Note: would be nice to avoid the first instance of .clone().
258    The second instance where we clone for the cross product
259    is more unavoidable.
260*/
261#[rustfmt::skip]
262pub fn join_by_timestamp<D1, D2, T, G>(
263    in_stream1: &Stream<G, D1>,
264    in_stream2: &Stream<G, D2>,
265) -> Stream<G, (D1, D2)>
266where
267    D1: timely::Data, // input data 1
268    D2: timely::Data, // input data 2
269    G: Scope<Timestamp = T>,
270{
271    let stream1 = in_stream1.map(Either::Left);
272    let stream2 = in_stream2.map(Either::Right);
273    let combined = stream1.concat(&stream2);
274
275    // Map items at each timestamp to a pair of vectors
276    let collected = combined.accumulate(
277        (Vec::new(), Vec::new()),
278        |(vec1, vec2), items| {
279            for item in items.iter() {
280                match item {
281                    Either::Left(x) => { vec1.push(x.clone()); }
282                    Either::Right(x) => { vec2.push(x.clone()); }
283                }
284            };
285        },
286    );
287
288    collected.flat_map(|(vec1, vec2)| {
289        let mut result = Vec::new();
290        for item1 in vec1.iter() {
291            for item2 in vec2.iter() {
292                result.push((item1.clone(), item2.clone()));
293            }
294        }
295        result
296    })
297}