use std::collections::HashMap;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Operator, Inspect, Probe};
use timely::dataflow::operators::vec::Map;
use timely::dataflow::channels::pact::Exchange;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let mut input = InputHandle::new();
let probe = ProbeHandle::new();
let exchange = Exchange::new(|x: &(String, i64)| (x.0).len() as u64);
worker.dataflow::<usize,_,_>(|scope| {
input.to_stream(scope)
.flat_map(|(text, diff): (String, i64)|
text.split_whitespace()
.map(move |word| (word.to_owned(), diff))
.collect::<Vec<_>>()
)
.unary_frontier(exchange, "WordCount", |_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();
move |(input, frontier), output| {
input.for_each_time(|time, data| {
queues.entry(time.retain(output.output_index()))
.or_insert(Vec::new())
.extend(data.map(std::mem::take));
});
for (key, val) in queues.iter_mut() {
if !frontier.less_equal(key.time()) {
let mut session = output.session(key);
for mut batch in val.drain(..) {
for (word, diff) in batch.drain(..) {
let entry = counts.entry(word.clone()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
}
}
queues.retain(|_key, val| !val.is_empty());
}})
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&probe);
});
for round in 0..10 {
input.send(("round".to_owned(), 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}