1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::rc::Rc;
use std::cell::RefCell;
use progress::{Timestamp, Operate};
use progress::nested::{Source, Target};
use progress::count_map::CountMap;
use Data;
use dataflow::channels::pact::{ParallelizationContract, Pipeline};
use dataflow::channels::pushers::tee::Tee;
use dataflow::channels::pushers::counter::Counter as PushCounter;
use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::{Stream, Scope};
pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>;
}
impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> {
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> {
let mut scope = self.scope();
let channel_id = scope.new_identifier();
let (sender, receiver) = Pipeline.connect(&mut scope, channel_id);
let mut targets = Vec::new();
let mut registrars = Vec::new();
for _ in 0..parts {
let (target, registrar) = Tee::<G::Timestamp,D2>::new();
targets.push(target);
registrars.push(registrar);
}
let operator = Operator::new(PullCounter::new(receiver), targets, route);
let index = scope.add_operator(operator);
self.connect_to(Target { index: index, port: 0 }, sender, channel_id);
let mut results = Vec::new();
for (output, registrar) in registrars.into_iter().enumerate() {
results.push(Stream::new(Source { index: index, port: output }, registrar, scope.clone()));
}
results
}
}
struct Operator<T:Timestamp, D: Data, D2: Data, F: Fn(D)->(u64, D2)> {
input: PullCounter<T, D>,
outputs: Vec<PushBuffer<T, D2, PushCounter<T, D2, Tee<T, D2>>>>,
route: F,
}
impl<T:Timestamp, D: Data, D2: Data, F: Fn(D)->(u64, D2)> Operator<T, D, D2, F> {
fn new(input: PullCounter<T, D>, outputs: Vec<Tee<T, D2>>, route: F) -> Operator<T, D, D2, F> {
Operator {
input: input,
outputs: outputs.into_iter().map(|x| PushBuffer::new(PushCounter::new(x, Rc::new(RefCell::new(CountMap::new()))))).collect(),
route: route,
}
}
}
impl<T:Timestamp, D: Data, D2: Data, F: Fn(D)->(u64, D2)> Operate<T> for Operator<T, D, D2, F> {
fn name(&self) -> String { "Partition".to_owned() }
fn inputs(&self) -> usize { 1 }
fn outputs(&self) -> usize { self.outputs.len() }
fn pull_internal_progress(&mut self, consumed: &mut [CountMap<T>],
_internal: &mut [CountMap<T>],
produced: &mut [CountMap<T>]) -> bool {
while let Some((time, data)) = self.input.next() {
let outputs = self.outputs.iter_mut();
let mut sessions: Vec<_> = outputs.map(|x| x.session(&time)).collect();
for (part, datum) in data.drain(..).map(&self.route) {
sessions[part as usize].give(datum);
}
}
self.input.pull_progress(&mut consumed[0]);
for (index, output) in self.outputs.iter_mut().enumerate() {
output.cease();
output.inner().pull_progress(&mut produced[index]);
}
false
}
fn notify_me(&self) -> bool { false }
}