1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use ::Data;
use progress::nested::subgraph::{Source, Target};
use dataflow::{Stream, Scope};
use progress::count_map::CountMap;
use progress::{Timestamp, Operate, Antichain};
use dataflow::channels::{Message};
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use dataflow::channels::pushers::Tee;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pact::{Pusher, Puller};
use timely_communication::{Allocate};
use std::rc::Rc;
use std::cell::RefCell;
pub trait Broadcast<D: Data> {
fn broadcast(&self) -> Self;
}
impl<G: Scope, D: Data> Broadcast<D> for Stream<G, D> {
fn broadcast(&self) -> Stream<G, D> {
let mut scope = self.scope();
let (pushers, puller) = scope.allocate::<Message<G::Timestamp, D>>();
let (targets, registrar) = Tee::<G::Timestamp, D>::new();
let channel_id = scope.new_identifier();
assert!(pushers.len() == scope.peers());
let receiver = Puller::new(puller, scope.index(), channel_id);
let operator = BroadcastOperator {
index: scope.index(),
peers: scope.peers(),
input: PullCounter::new(Box::new(receiver)),
output: PushBuffer::new(PushCounter::new(targets, Rc::new(RefCell::new(CountMap::new())))),
};
let operator_index = scope.add_operator(operator);
for (i, pusher) in pushers.into_iter().enumerate() {
let sender = Pusher::new(pusher, scope.index(), i, channel_id);
self.connect_to(Target { index: operator_index, port: i }, sender, channel_id);
}
Stream::new(Source { index: operator_index, port: 0 }, registrar, scope)
}
}
struct BroadcastOperator<T: Timestamp, D: Data> {
index: usize,
peers: usize,
input: PullCounter<T, D>,
output: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
}
impl<T: Timestamp, D: Data> Operate<T> for BroadcastOperator<T, D> {
fn name(&self) -> String { "Broadcast".to_owned() }
fn inputs(&self) -> usize { self.peers }
fn outputs(&self) -> usize { 1 }
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<CountMap<T>>) {
let summary = (0..self.peers).map(|_| vec![Antichain::from_elem(Default::default())]).collect::<Vec<_>>();
(summary, vec![CountMap::new()])
}
fn pull_internal_progress(&mut self, consumed: &mut [CountMap<T>],
_internal: &mut [CountMap<T>],
produced: &mut [CountMap<T>]) -> bool {
while let Some((time, data)) = self.input.next() {
self.output.session(time).give_content(data);
}
self.output.cease();
self.input.pull_progress(&mut consumed[self.index]);
self.output.inner().pull_progress(&mut produced[0]);
return false;
}
fn notify_me(&self) -> bool { false }
}