timely/progress/
broadcast.rs

1//! Broadcasts progress information among workers.
2
3use crate::progress::{ChangeBatch, Timestamp};
4use crate::progress::{Location, Port};
5use crate::communication::{Message, Push, Pull};
6use crate::logging::TimelyLogger as Logger;
7use crate::logging::TimelyProgressLogger as ProgressLogger;
8
9/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
10pub type ProgressVec<T> = Vec<((Location, T), i64)>;
11/// A progress update message consisting of source worker id, sequence number and lists of
12/// message and internal updates
13pub type ProgressMsg<T> = Message<(usize, usize, ProgressVec<T>)>;
14
15/// Manages broadcasting of progress updates to and receiving updates from workers.
16pub struct Progcaster<T:Timestamp> {
17    to_push: Option<ProgressMsg<T>>,
18    pushers: Vec<Box<dyn Push<ProgressMsg<T>>>>,
19    puller: Box<dyn Pull<ProgressMsg<T>>>,
20    /// Source worker index
21    source: usize,
22    /// Sequence number counter
23    counter: usize,
24    /// Sequence of nested scope identifiers indicating the path from the root to this subgraph
25    addr: Vec<usize>,
26    /// Communication channel identifier
27    channel_identifier: usize,
28
29    progress_logging: Option<ProgressLogger>,
30}
31
32impl<T:Timestamp+Send> Progcaster<T> {
33    /// Creates a new `Progcaster` using a channel from the supplied worker.
34    pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
35
36        let channel_identifier = worker.new_identifier();
37        let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
38        logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
39            identifier: channel_identifier,
40            kind: crate::logging::CommChannelKind::Progress,
41        }));
42        let worker_index = worker.index();
43        let addr = path.clone();
44        Progcaster {
45            to_push: None,
46            pushers,
47            puller,
48            source: worker_index,
49            counter: 0,
50            addr,
51            channel_identifier,
52            progress_logging,
53        }
54    }
55
56    /// Sends pointstamp changes to all workers.
57    pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
58
59        changes.compact();
60        if !changes.is_empty() {
61
62            self.progress_logging.as_ref().map(|l| {
63
64                // Pre-allocate enough space; we transfer ownership, so there is not
65                // an apportunity to re-use allocations (w/o changing the logging
66                // interface to accept references).
67                let mut messages = Box::new(Vec::with_capacity(changes.len()));
68                let mut internal = Box::new(Vec::with_capacity(changes.len()));
69
70                for ((location, time), diff) in changes.iter() {
71                    match location.port {
72                        Port::Target(port) => {
73                            messages.push((location.node, port, time.clone(), *diff))
74                        },
75                        Port::Source(port) => {
76                            internal.push((location.node, port, time.clone(), *diff))
77                        }
78                    }
79                }
80
81                l.log(crate::logging::TimelyProgressEvent {
82                    is_send: true,
83                    source: self.source,
84                    channel: self.channel_identifier,
85                    seq_no: self.counter,
86                    addr: self.addr.clone(),
87                    messages,
88                    internal,
89                });
90            });
91
92            for pusher in self.pushers.iter_mut() {
93
94                // Attempt to reuse allocations, if possible.
95                if let Some(tuple) = &mut self.to_push {
96                    let tuple = tuple.as_mut();
97                    tuple.0 = self.source;
98                    tuple.1 = self.counter;
99                    tuple.2.clear(); tuple.2.extend(changes.iter().cloned());
100                }
101                // If we don't have an allocation ...
102                if self.to_push.is_none() {
103                    self.to_push = Some(Message::from_typed((
104                        self.source,
105                        self.counter,
106                        changes.clone().into_inner(),
107                    )));
108                }
109
110                // TODO: This should probably use a broadcast channel.
111                pusher.push(&mut self.to_push);
112                pusher.done();
113            }
114
115            self.counter += 1;
116            changes.clear();
117        }
118    }
119
120    /// Receives pointstamp changes from all workers.
121    pub fn recv(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
122
123        while let Some(message) = self.puller.pull() {
124
125            let source = message.0;
126            let counter = message.1;
127            let recv_changes = &message.2;
128
129            let addr = &mut self.addr;
130            let channel = self.channel_identifier;
131
132            // See comments above about the relatively high cost of this logging, and our
133            // options for improving it if performance limits users who want other logging.
134            self.progress_logging.as_ref().map(|l| {
135
136                let mut messages = Box::new(Vec::with_capacity(changes.len()));
137                let mut internal = Box::new(Vec::with_capacity(changes.len()));
138
139                for ((location, time), diff) in recv_changes.iter() {
140
141                    match location.port {
142                        Port::Target(port) => {
143                            messages.push((location.node, port, time.clone(), *diff))
144                        },
145                        Port::Source(port) => {
146                            internal.push((location.node, port, time.clone(), *diff))
147                        }
148                    }
149                }
150
151                l.log(crate::logging::TimelyProgressEvent {
152                    is_send: false,
153                    source: source,
154                    seq_no: counter,
155                    channel,
156                    addr: addr.clone(),
157                    messages,
158                    internal,
159                });
160            });
161
162            // We clone rather than drain to avoid deserialization.
163            for &(ref update, delta) in recv_changes.iter() {
164                changes.update(update.clone(), delta);
165            }
166        }
167
168    }
169}