1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
//! Broadcast records to all workers.

use timely_communication::Pull;

use ::ExchangeData;
use progress::nested::subgraph::{Source, Target};
use dataflow::{Stream, Scope};
use progress::ChangeBatch;
use progress::{Timestamp, Operate, Antichain};
use dataflow::channels::{Message};
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use dataflow::channels::pushers::Tee;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pact::{Pusher, Puller};

/// Broadcast records to all workers.
pub trait Broadcast<D: ExchangeData> {
    /// Broadcast records to all workers.
    ///
    /// #Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .broadcast()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn broadcast(&self) -> Self;
}

impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
    fn broadcast(&self) -> Stream<G, D> {
        let mut scope = self.scope();

        let (pushers, puller) = scope.allocate::<Message<G::Timestamp, D>>();
        let (targets, registrar) = Tee::<G::Timestamp, D>::new();

        let channel_id = scope.new_identifier();

        assert_eq!(pushers.len(), scope.peers());

        let receiver = Puller::new(puller, scope.index(), channel_id);

        let operator = BroadcastOperator {
            index: scope.index(),
            peers: scope.peers(),
            input: PullCounter::new(receiver),
            output: PushBuffer::new(PushCounter::new(targets)),
        };

        let operator_index = scope.add_operator(operator);

        for (i, pusher) in pushers.into_iter().enumerate() {
            let sender = Pusher::new(pusher, scope.index(), i, channel_id);
            self.connect_to(Target { index: operator_index, port: i }, sender, channel_id);
        }

        Stream::new(Source { index: operator_index, port: 0 }, registrar, scope)
    }
}

struct BroadcastOperator<T: Timestamp, D: ExchangeData> {
    index: usize,
    peers: usize,
    input: PullCounter<T, D, Puller<T, D, Box<Pull<Message<T, D>>>>>,
    output: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
}

impl<T: Timestamp, D: ExchangeData> Operate<T> for BroadcastOperator<T, D> {
    fn name(&self) -> String { "Broadcast".to_owned() }
    fn inputs(&self) -> usize { self.peers }
    fn outputs(&self) -> usize { 1 }

    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<ChangeBatch<T>>) {
        // TODO: (optimization) some of these internal paths do not actually exist
        let summary = (0..self.peers).map(|_| vec![Antichain::from_elem(Default::default())]).collect::<Vec<_>>();
        (summary, vec![ChangeBatch::new()])
    }

    fn pull_internal_progress(&mut self, consumed: &mut [ChangeBatch<T>],
                                         _internal: &mut [ChangeBatch<T>],
                                         produced: &mut [ChangeBatch<T>]) -> bool {

        while let Some((time, data)) = self.input.next() {
            self.output.session(time).give_content(data);
        }
        self.output.cease();
        self.input.consumed().borrow_mut().drain_into(&mut consumed[self.index]);
        self.output.inner().produced().borrow_mut().drain_into(&mut produced[0]);
        false
    }

    fn notify_me(&self) -> bool { false }
}