Skip to main content

wordcount/
wordcount.rs

1use std::collections::HashMap;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Operator, Inspect, Probe};
5use timely::dataflow::operators::vec::Map;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9    // initializes and runs a timely dataflow.
10    timely::execute_from_args(std::env::args(), |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        // define a distribution function for strings.
16        let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
17
18        // create a new input, exchange data, and inspect its output
19        worker.dataflow::<usize,_,_>(|scope| {
20            input.to_stream(scope)
21                 .flat_map(|(text, diff): (String, i64)|
22                    text.split_whitespace()
23                        .map(move |word| (word.to_owned(), diff))
24                        .collect::<Vec<_>>()
25                 )
26                 .unary_frontier(exchange, "WordCount", |_capability, _info| {
27
28                    let mut queues = HashMap::new();
29                    let mut counts = HashMap::new();
30
31                    move |(input, frontier), output| {
32                        input.for_each_time(|time, data| {
33                            queues.entry(time.retain(output.output_index()))
34                                  .or_insert(Vec::new())
35                                  .extend(data.map(std::mem::take));
36                        });
37
38                        for (key, val) in queues.iter_mut() {
39                            if !frontier.less_equal(key.time()) {
40                                let mut session = output.session(key);
41                                for mut batch in val.drain(..) {
42                                    for (word, diff) in batch.drain(..) {
43                                        let entry = counts.entry(word.clone()).or_insert(0i64);
44                                        *entry += diff;
45                                        session.give((word, *entry));
46                                    }
47                                }
48                            }
49                        }
50
51                        queues.retain(|_key, val| !val.is_empty());
52                    }})
53                 .container::<Vec<_>>()
54                 .inspect(|x| println!("seen: {:?}", x))
55                 .probe_with(&probe);
56        });
57
58        // introduce data and watch!
59        for round in 0..10 {
60            input.send(("round".to_owned(), 1));
61            input.advance_to(round + 1);
62            while probe.less_than(input.time()) {
63                worker.step();
64            }
65        }
66    }).unwrap();
67}