use super::{Pipeline, Scope, Stream, Timestamp};
use crate::util::either::Either;
use std::fmt::Debug;
use std::fs::OpenOptions;
use std::io::prelude::*;
use timely::dataflow::operators::{
Accumulate, Concat, Exchange, Filter, Inspect, Map, Operator,
};
pub fn window_all_parallel<D1, D2, D3, I, F, E, T, G>(
name: &str,
in_stream: &Stream<G, D1>,
init: I,
fold: F,
emit: E,
) -> Stream<G, D3>
where
D1: timely::Data + Debug, D2: timely::Data + Debug, D3: timely::Data + Debug, I: FnOnce() -> D2 + 'static,
F: Fn(&mut D2, &T, Vec<D1>) + 'static,
E: Fn(&D2) -> D3 + 'static,
T: Timestamp + Copy,
G: Scope<Timestamp = T>,
{
in_stream.unary_frontier(Pipeline, name, |capability1, _info| {
let mut agg = init();
let cap_time = *capability1.time();
let mut maybe_cap = Some(capability1);
move |input, output| {
while let Some((capability2, data)) = input.next() {
let mut data_vec = Vec::new();
data.swap(&mut data_vec);
fold(&mut agg, capability2.time(), data_vec);
if *capability2.time() > cap_time {
maybe_cap = Some(capability2.retain());
}
}
if input.frontier().is_empty() {
if let Some(cap) = maybe_cap.as_ref() {
output.session(&cap).give(emit(&agg));
maybe_cap = None;
}
}
}
})
}
pub fn window_all<D1, D2, D3, I, F, E, T, G>(
name: &str,
in_stream: &Stream<G, D1>,
init: I,
fold: F,
emit: E,
) -> Stream<G, D3>
where
D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug, D3: timely::Data + Debug, I: FnOnce() -> D2 + 'static,
F: Fn(&mut D2, &T, Vec<D1>) + 'static,
E: Fn(&D2) -> D3 + 'static,
T: Timestamp + Copy,
G: Scope<Timestamp = T>,
{
let in_stream_single = in_stream.exchange(|_x| 0);
window_all_parallel(
name,
&in_stream_single,
|| (init(), false),
move |(x, nonempty), time, data| {
fold(x, time, data);
*nonempty = true;
},
move |(x, nonempty)| {
if *nonempty {
Some(emit(x))
} else {
None
}
},
)
.filter(|x| x.is_some())
.map(|x| x.unwrap()) }
pub fn single_op_unary<D1, D2, F, T, G>(
name: &str,
in_stream: &Stream<G, D1>,
op: F,
) -> Stream<G, D2>
where
D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug, F: Fn(D1) -> D2 + 'static,
T: Timestamp + Copy,
G: Scope<Timestamp = T>,
{
window_all(
name,
in_stream,
|| None,
|seen, _time, data| {
for d in data {
assert!(seen.is_none());
*seen = Some(d);
}
},
move |seen| op(seen.clone().unwrap()),
)
}
pub fn single_op_binary<D1, D2, D3, F, T, G>(
name: &str,
in_stream1: &Stream<G, D1>,
in_stream2: &Stream<G, D2>,
op: F,
) -> Stream<G, D3>
where
D1: timely::Data + Debug + timely::ExchangeData, D2: timely::Data + Debug + timely::ExchangeData, D3: timely::Data + Debug, F: Fn(D1, D2) -> D3 + 'static,
T: Timestamp + Copy,
G: Scope<Timestamp = T>,
{
let stream1 = in_stream1.map(Either::Left);
let stream2 = in_stream2.map(Either::Right);
let stream = stream1.concat(&stream2);
window_all(
name,
&stream,
|| (None, None),
|(seen1, seen2), _time, data| {
for d in data {
match d {
Either::Left(d1) => {
assert!(seen1.is_none());
*seen1 = Some(d1);
}
Either::Right(d2) => {
assert!(seen2.is_none());
*seen2 = Some(d2);
}
}
}
},
move |(seen1, seen2)| {
op(seen1.clone().unwrap(), seen2.clone().unwrap())
},
)
}
pub fn save_to_file<D, F, T, G>(
in_stream: &Stream<G, D>,
filename: &str,
format: F,
) -> Stream<G, D>
where
D: timely::Data, F: Fn(&D) -> std::string::String + 'static,
T: Timestamp,
G: Scope<Timestamp = T>,
{
let mut file =
OpenOptions::new().create(true).append(true).open(filename).unwrap();
in_stream.inspect(move |d| {
writeln!(file, "{}", format(d)).unwrap();
})
}
pub trait Sum<G: Scope> {
fn sum(&self) -> Stream<G, usize>;
}
impl<G: Scope> Sum<G> for Stream<G, usize> {
fn sum(&self) -> Stream<G, usize> {
self.accumulate(0, |sum, data| {
for &x in data.iter() {
*sum += x;
}
})
}
}
#[rustfmt::skip]
pub fn join_by_timestamp<D1, D2, T, G>(
in_stream1: &Stream<G, D1>,
in_stream2: &Stream<G, D2>,
) -> Stream<G, (D1, D2)>
where
D1: timely::Data, D2: timely::Data, G: Scope<Timestamp = T>,
{
let stream1 = in_stream1.map(Either::Left);
let stream2 = in_stream2.map(Either::Right);
let combined = stream1.concat(&stream2);
let collected = combined.accumulate(
(Vec::new(), Vec::new()),
|(vec1, vec2), items| {
for item in items.iter() {
match item {
Either::Left(x) => { vec1.push(x.clone()); }
Either::Right(x) => { vec2.push(x.clone()); }
}
};
},
);
collected.flat_map(|(vec1, vec2)| {
let mut result = Vec::new();
for item1 in vec1.iter() {
for item2 in vec2.iter() {
result.push((item1.clone(), item2.clone()));
}
}
result
})
}