1use std::collections::HashMap;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Input, Inspect, Probe};
5use timely::dataflow::operators::generic::operator::Operator;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9 timely::execute_from_args(std::env::args(), |worker| {
11 let index = worker.index();
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 worker.dataflow::<usize,_,_>(|scope| {
17 let mut counts_by_time = HashMap::new();
18 scope.input_from(&mut input)
19 .unary(Exchange::new(|x| *x), "Distinct", move |_, _|
20 move |input, output| {
21 input.for_each_time(|time, data| {
22 let counts =
23 counts_by_time
24 .entry(*time.time())
25 .or_insert(HashMap::new());
26 let mut session = output.session(&time);
27 for data in data {
28 for &datum in data.iter() {
29 let count = counts.entry(datum).or_insert(0);
30 if *count == 0 {
31 session.give(datum);
32 }
33 *count += 1;
34 }
35 }
36 })
37 })
38 .container::<Vec<_>>()
39 .inspect(move |x| println!("worker {}:\tvalue {}", index, x))
40 .probe_with(&probe);
41 });
42
43 for round in 0..1 {
45 if index == 0 {
46 [0, 1, 2, 2, 2, 3, 3, 4].iter().for_each(|x| input.send(*x));
47 } else if index == 1 {
48 [0, 0, 3, 4, 4, 5, 7, 7].iter().for_each(|x| input.send(*x));
49 }
50 input.advance_to(round + 1);
51 while probe.less_than(input.time()) {
52 worker.step();
53 }
54 }
55 }).unwrap();
56}