1use std::collections::HashMap;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Operator, Inspect, Probe};
5use timely::dataflow::operators::vec::Map;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9 timely::execute_from_args(std::env::args(), |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18 worker.dataflow::<usize,_,_>(|scope| {
20 input.to_stream(scope)
21 .flat_map(|(text, diff): (String, i64)|
22 text.split_whitespace()
23 .map(move |word| (word.to_owned(), diff))
24 .collect::<Vec<_>>()
25 )
26 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28 let mut queues = HashMap::new();
29 let mut counts = HashMap::new();
30
31 move |(input, frontier), output| {
32 input.for_each_time(|time, data| {
33 queues.entry(time.retain(output.output_index()))
34 .or_insert(Vec::new())
35 .extend(data.map(std::mem::take));
36 });
37
38 for (key, val) in queues.iter_mut() {
39 if !frontier.less_equal(key.time()) {
40 let mut session = output.session(key);
41 for mut batch in val.drain(..) {
42 for (word, diff) in batch.drain(..) {
43 let entry = counts.entry(word.clone()).or_insert(0i64);
44 *entry += diff;
45 session.give((word, *entry));
46 }
47 }
48 }
49 }
50
51 queues.retain(|_key, val| !val.is_empty());
52 }})
53 .container::<Vec<_>>()
54 .inspect(|x| println!("seen: {:?}", x))
55 .probe_with(&probe);
56 });
57
58 for round in 0..10 {
60 input.send(("round".to_owned(), 1));
61 input.advance_to(round + 1);
62 while probe.less_than(input.time()) {
63 worker.step();
64 }
65 }
66 }).unwrap();
67}