1use super::{Pipeline, Scope, Stream, Timestamp};
9use crate::util::either::Either;
10
11use std::fmt::Debug;
12use std::fs::OpenOptions;
13use std::io::prelude::*;
14use timely::dataflow::operators::{
15 Accumulate, Concat, Exchange, Filter, Inspect, Map, Operator,
16};
17
18pub fn window_all_parallel<D1, D2, D3, I, F, E, T, G>(
37 name: &str,
38 in_stream: &Stream<G, D1>,
39 init: I,
40 fold: F,
41 emit: E,
42) -> Stream<G, D3>
43where
44 D1: timely::Data + Debug, D2: timely::Data + Debug, D3: timely::Data + Debug, I: FnOnce() -> D2 + 'static,
48 F: Fn(&mut D2, &T, Vec<D1>) + 'static,
49 E: Fn(&D2) -> D3 + 'static,
50 T: Timestamp + Copy,
51 G: Scope<Timestamp = T>,
52{
53 in_stream.unary_frontier(Pipeline, name, |capability1, _info| {
54 let mut agg = init();
55 let cap_time = *capability1.time();
56 let mut maybe_cap = Some(capability1);
57
58 move |input, output| {
59 while let Some((capability2, data)) = input.next() {
60 let mut data_vec = Vec::new();
61 data.swap(&mut data_vec);
62 fold(&mut agg, capability2.time(), data_vec);
63 if *capability2.time() > cap_time {
64 maybe_cap = Some(capability2.retain());
65 }
66 }
67 if input.frontier().is_empty() {
69 if let Some(cap) = maybe_cap.as_ref() {
70 output.session(&cap).give(emit(&agg));
71 maybe_cap = None;
72 }
73 }
74 }
75 })
76}
77
78pub fn window_all<D1, D2, D3, I, F, E, T, G>(
86 name: &str,
87 in_stream: &Stream<G, D1>,
88 init: I,
89 fold: F,
90 emit: E,
91) -> Stream<G, D3>
92where
93 D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug, D3: timely::Data + Debug, I: FnOnce() -> D2 + 'static,
97 F: Fn(&mut D2, &T, Vec<D1>) + 'static,
98 E: Fn(&D2) -> D3 + 'static,
99 T: Timestamp + Copy,
100 G: Scope<Timestamp = T>,
101{
102 let in_stream_single = in_stream.exchange(|_x| 0);
103 window_all_parallel(
104 name,
105 &in_stream_single,
106 || (init(), false),
107 move |(x, nonempty), time, data| {
108 fold(x, time, data);
109 *nonempty = true;
110 },
111 move |(x, nonempty)| {
112 if *nonempty {
113 Some(emit(x))
114 } else {
115 None
116 }
117 },
118 )
119 .filter(|x| x.is_some())
120 .map(|x| x.unwrap()) }
122
123pub fn single_op_unary<D1, D2, F, T, G>(
136 name: &str,
137 in_stream: &Stream<G, D1>,
138 op: F,
139) -> Stream<G, D2>
140where
141 D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug, F: Fn(D1) -> D2 + 'static,
144 T: Timestamp + Copy,
145 G: Scope<Timestamp = T>,
146{
147 window_all(
148 name,
149 in_stream,
150 || None,
151 |seen, _time, data| {
152 for d in data {
153 assert!(seen.is_none());
154 *seen = Some(d);
155 }
156 },
157 move |seen| op(seen.clone().unwrap()),
158 )
159}
160
161pub fn single_op_binary<D1, D2, D3, F, T, G>(
166 name: &str,
167 in_stream1: &Stream<G, D1>,
168 in_stream2: &Stream<G, D2>,
169 op: F,
170) -> Stream<G, D3>
171where
172 D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug + timely::ExchangeData, D3: timely::Data + Debug, F: Fn(D1, D2) -> D3 + 'static,
176 T: Timestamp + Copy,
177 G: Scope<Timestamp = T>,
178{
179 let stream1 = in_stream1.map(Either::Left);
180 let stream2 = in_stream2.map(Either::Right);
181 let stream = stream1.concat(&stream2);
182
183 window_all(
184 name,
185 &stream,
186 || (None, None),
187 |(seen1, seen2), _time, data| {
188 for d in data {
189 match d {
190 Either::Left(d1) => {
191 assert!(seen1.is_none());
192 *seen1 = Some(d1);
193 }
194 Either::Right(d2) => {
195 assert!(seen2.is_none());
196 *seen2 = Some(d2);
197 }
198 }
199 }
200 },
201 move |(seen1, seen2)| {
202 op(seen1.clone().unwrap(), seen2.clone().unwrap())
203 },
204 )
205}
206
207pub fn save_to_file<D, F, T, G>(
215 in_stream: &Stream<G, D>,
216 filename: &str,
217 format: F,
218) -> Stream<G, D>
219where
220 D: timely::Data, F: Fn(&D) -> std::string::String + 'static,
222 T: Timestamp,
223 G: Scope<Timestamp = T>,
224{
225 let mut file =
226 OpenOptions::new().create(true).append(true).open(filename).unwrap();
227 in_stream.inspect(move |d| {
228 writeln!(file, "{}", format(d)).unwrap();
229 })
230}
231
232pub trait Sum<G: Scope> {
237 fn sum(&self) -> Stream<G, usize>;
238}
239impl<G: Scope> Sum<G> for Stream<G, usize> {
240 fn sum(&self) -> Stream<G, usize> {
241 self.accumulate(0, |sum, data| {
242 for &x in data.iter() {
243 *sum += x;
244 }
245 })
246 }
247}
248
249#[rustfmt::skip]
262pub fn join_by_timestamp<D1, D2, T, G>(
263 in_stream1: &Stream<G, D1>,
264 in_stream2: &Stream<G, D2>,
265) -> Stream<G, (D1, D2)>
266where
267 D1: timely::Data, D2: timely::Data, G: Scope<Timestamp = T>,
270{
271 let stream1 = in_stream1.map(Either::Left);
272 let stream2 = in_stream2.map(Either::Right);
273 let combined = stream1.concat(&stream2);
274
275 let collected = combined.accumulate(
277 (Vec::new(), Vec::new()),
278 |(vec1, vec2), items| {
279 for item in items.iter() {
280 match item {
281 Either::Left(x) => { vec1.push(x.clone()); }
282 Either::Right(x) => { vec2.push(x.clone()); }
283 }
284 };
285 },
286 );
287
288 collected.flat_map(|(vec1, vec2)| {
289 let mut result = Vec::new();
290 for item1 in vec1.iter() {
291 for item2 in vec2.iter() {
292 result.push((item1.clone(), item2.clone()));
293 }
294 }
295 result
296 })
297}