1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//! Broadcast records to all workers.

use communication::Pull;

use ::ExchangeData;
use progress::{Source, Target};
use dataflow::{Stream, Scope};
use progress::ChangeBatch;
use progress::{Timestamp, Operate, Antichain};
use dataflow::channels::{Message, Bundle};
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use dataflow::channels::pushers::Tee;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pact::{LogPusher, LogPuller};

/// Broadcast records to all workers.
pub trait Broadcast<D: ExchangeData> {
    /// Broadcast records to all workers.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .broadcast()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn broadcast(&self) -> Self;
}

impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D> {
    fn broadcast(&self) -> Stream<G, D> {
        let mut scope = self.scope();

        let channel_id = scope.new_identifier();

        let (pushers, puller) = scope.allocate::<Message<G::Timestamp, D>>(channel_id);
        let (targets, registrar) = Tee::<G::Timestamp, D>::new();

        assert_eq!(pushers.len(), scope.peers());

        let receiver = LogPuller::new(puller, scope.index(), channel_id, scope.logging());

        let operator = BroadcastOperator {
            index: scope.index(),
            peers: scope.peers(),
            input: PullCounter::new(receiver),
            output: PushBuffer::new(PushCounter::new(targets)),
        };

        let operator_index = scope.add_operator(Box::new(operator));

        for (i, pusher) in pushers.into_iter().enumerate() {
            let sender = LogPusher::new(pusher, scope.index(), i, channel_id, scope.logging());
            self.connect_to(Target { index: operator_index, port: i }, sender, channel_id);
        }

        Stream::new(Source { index: operator_index, port: 0 }, registrar, scope)
    }
}

struct BroadcastOperator<T: Timestamp, D: ExchangeData> {
    index: usize,
    peers: usize,
    input: PullCounter<T, D, LogPuller<T, D, Box<Pull<Bundle<T, D>>>>>,
    output: PushBuffer<T, D, PushCounter<T, D, Tee<T, D>>>,
}

impl<T: Timestamp, D: ExchangeData> Operate<T> for BroadcastOperator<T, D> {
    fn name(&self) -> String { "Broadcast".to_owned() }
    fn inputs(&self) -> usize { self.peers }
    fn outputs(&self) -> usize { 1 }

    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<ChangeBatch<T>>) {
        // TODO: (optimization) some of these internal paths do not actually exist
        let summary = (0..self.peers).map(|_| vec![Antichain::from_elem(Default::default())]).collect::<Vec<_>>();
        (summary, vec![ChangeBatch::new()])
    }

    fn pull_internal_progress(&mut self, consumed: &mut [ChangeBatch<T>],
                                         _internal: &mut [ChangeBatch<T>],
                                         produced: &mut [ChangeBatch<T>]) -> bool {

        let mut vec = Vec::new();
        while let Some(bundle) = self.input.next() {

            use communication::message::RefOrMut;

            match bundle.as_ref_or_mut() {
                RefOrMut::Ref(bundle) => {
                    RefOrMut::Ref(&bundle.data).swap(&mut vec);
                    self.output.session(&bundle.time).give_vec(&mut vec);
                },
                RefOrMut::Mut(bundle) => {
                    self.output.session(&bundle.time).give_vec(&mut bundle.data);
                },
            }
        }
        self.output.cease();
        self.input.consumed().borrow_mut().drain_into(&mut consumed[self.index]);
        self.output.inner().produced().borrow_mut().drain_into(&mut produced[0]);
        false
    }

    fn notify_me(&self) -> bool { false }
}