use std::collections::HashMap;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Inspect, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Exchange;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
let mut input = InputHandle::new();
let probe = ProbeHandle::new();
worker.dataflow::<usize,_,_>(|scope| {
let mut counts_by_time = HashMap::new();
scope.input_from(&mut input)
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
move |input, output| {
input.for_each_time(|time, data| {
let counts =
counts_by_time
.entry(*time.time())
.or_insert(HashMap::new());
let mut session = output.session(&time);
for data in data {
for &datum in data.iter() {
let count = counts.entry(datum).or_insert(0);
if *count == 0 {
session.give(datum);
}
*count += 1;
}
}
})
})
.container::<Vec<_>>()
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
.probe_with(&probe);
});
for round in 0..1 {
if index == 0 {
[0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
} else if index == 1 {
[0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}