timely/progress/
broadcast.rs1use crate::progress::{ChangeBatch, Timestamp};
4use crate::progress::{Location, Port};
5use crate::communication::{Message, Push, Pull};
6use crate::logging::TimelyLogger as Logger;
7use crate::logging::TimelyProgressLogger as ProgressLogger;
8
9pub type ProgressVec<T> = Vec<((Location, T), i64)>;
11pub type ProgressMsg<T> = Message<(usize, usize, ProgressVec<T>)>;
14
15pub struct Progcaster<T:Timestamp> {
17 to_push: Option<ProgressMsg<T>>,
18 pushers: Vec<Box<dyn Push<ProgressMsg<T>>>>,
19 puller: Box<dyn Pull<ProgressMsg<T>>>,
20 source: usize,
22 counter: usize,
24 addr: Vec<usize>,
26 channel_identifier: usize,
28
29 progress_logging: Option<ProgressLogger>,
30}
31
32impl<T:Timestamp+Send> Progcaster<T> {
33 pub fn new<A: crate::worker::AsWorker>(worker: &mut A, path: &Vec<usize>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
35
36 let channel_identifier = worker.new_identifier();
37 let (pushers, puller) = worker.allocate(channel_identifier, &path[..]);
38 logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
39 identifier: channel_identifier,
40 kind: crate::logging::CommChannelKind::Progress,
41 }));
42 let worker_index = worker.index();
43 let addr = path.clone();
44 Progcaster {
45 to_push: None,
46 pushers,
47 puller,
48 source: worker_index,
49 counter: 0,
50 addr,
51 channel_identifier,
52 progress_logging,
53 }
54 }
55
56 pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
58
59 changes.compact();
60 if !changes.is_empty() {
61
62 self.progress_logging.as_ref().map(|l| {
63
64 let mut messages = Box::new(Vec::with_capacity(changes.len()));
68 let mut internal = Box::new(Vec::with_capacity(changes.len()));
69
70 for ((location, time), diff) in changes.iter() {
71 match location.port {
72 Port::Target(port) => {
73 messages.push((location.node, port, time.clone(), *diff))
74 },
75 Port::Source(port) => {
76 internal.push((location.node, port, time.clone(), *diff))
77 }
78 }
79 }
80
81 l.log(crate::logging::TimelyProgressEvent {
82 is_send: true,
83 source: self.source,
84 channel: self.channel_identifier,
85 seq_no: self.counter,
86 addr: self.addr.clone(),
87 messages,
88 internal,
89 });
90 });
91
92 for pusher in self.pushers.iter_mut() {
93
94 if let Some(tuple) = &mut self.to_push {
96 let tuple = tuple.as_mut();
97 tuple.0 = self.source;
98 tuple.1 = self.counter;
99 tuple.2.clear(); tuple.2.extend(changes.iter().cloned());
100 }
101 if self.to_push.is_none() {
103 self.to_push = Some(Message::from_typed((
104 self.source,
105 self.counter,
106 changes.clone().into_inner(),
107 )));
108 }
109
110 pusher.push(&mut self.to_push);
112 pusher.done();
113 }
114
115 self.counter += 1;
116 changes.clear();
117 }
118 }
119
120 pub fn recv(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
122
123 while let Some(message) = self.puller.pull() {
124
125 let source = message.0;
126 let counter = message.1;
127 let recv_changes = &message.2;
128
129 let addr = &mut self.addr;
130 let channel = self.channel_identifier;
131
132 self.progress_logging.as_ref().map(|l| {
135
136 let mut messages = Box::new(Vec::with_capacity(changes.len()));
137 let mut internal = Box::new(Vec::with_capacity(changes.len()));
138
139 for ((location, time), diff) in recv_changes.iter() {
140
141 match location.port {
142 Port::Target(port) => {
143 messages.push((location.node, port, time.clone(), *diff))
144 },
145 Port::Source(port) => {
146 internal.push((location.node, port, time.clone(), *diff))
147 }
148 }
149 }
150
151 l.log(crate::logging::TimelyProgressEvent {
152 is_send: false,
153 source: source,
154 seq_no: counter,
155 channel,
156 addr: addr.clone(),
157 messages,
158 internal,
159 });
160 });
161
162 for &(ref update, delta) in recv_changes.iter() {
164 changes.update(update.clone(), delta);
165 }
166 }
167
168 }
169}