timely/synchronization/sequence.rs
1//! A shared ordered log.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::{Instant, Duration};
6use std::collections::VecDeque;
7
8use crate::{communication::Allocate, ExchangeData, PartialOrder};
9use crate::scheduling::Scheduler;
10use crate::worker::Worker;
11use crate::dataflow::channels::pact::Exchange;
12use crate::dataflow::operators::generic::operator::source;
13use crate::dataflow::operators::generic::operator::Operator;
14use crate::scheduling::activate::Activator;
15
16// A Sequencer needs all operators firing with high frequency, because
17// it uses the timer to gauge progress. If other workers cease
18// advancing their own capabilities, although they might receive a
19// record they may not actually tick forward their own source clocks,
20// and no one will actually form the sequence.
21//
22// A CatchupActivator is an activator with an optional timestamp
23// attached. This allows us to represent a special state, where after
24// receiving an action from another worker, each of the other workers
25// will keep scheduling its source operator, until its capability
26// timestamp exceeds the greatest timestamp that the sink has
27// received.
28//
29// This allows operators to go quiet again until a new requests shows
30// up. The operators lose the ability to confirm that nothing is
31// scheduled for a particular time (they could request this with a
32// no-op event bearing a timestamp), but everyone still sees the same
33// sequence.
34struct CatchupActivator {
35 pub catchup_until: Option<Duration>,
36 activator: Activator,
37}
38
39impl CatchupActivator {
40 pub fn activate(&self) {
41 self.activator.activate();
42 }
43}
44
45/// Orders elements inserted across all workers.
46///
47/// A Sequencer allows each worker to insert into a consistent ordered
48/// sequence that is seen by all workers in the same order.
49pub struct Sequencer<T> {
50 activator: Rc<RefCell<Option<CatchupActivator>>>,
51 send: Rc<RefCell<VecDeque<T>>>, // proposed items.
52 recv: Rc<RefCell<VecDeque<T>>>, // sequenced items.
53}
54
55impl<T: ExchangeData> Sequencer<T> {
56
57 /// Creates a new Sequencer.
58 ///
59 /// The `timer` instant is used to synchronize the workers, who use this
60 /// elapsed time as their timestamp. Elements are ordered by this time,
61 /// and cannot be made visible until all workers have reached the time.
62 ///
63 /// # Examples
64 ///
65 /// ```rust
66 /// use std::time::{Instant, Duration};
67 ///
68 /// use timely::Config;
69 /// use timely::synchronization::Sequencer;
70 ///
71 /// timely::execute(Config::process(4), |worker| {
72 /// let timer = Instant::now();
73 /// let mut sequencer = Sequencer::new(worker, timer);
74 ///
75 /// for round in 0 .. 10 {
76 ///
77 /// // Sleep, and then send an announcement on wake-up.
78 /// std::thread::sleep(Duration::from_millis(1 + worker.index() as u64));
79 /// sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round));
80 ///
81 /// // Ensures the pushed string is sent.
82 /// worker.step();
83 ///
84 /// // Read out received announcements.
85 /// while let Some(element) = sequencer.next() {
86 /// println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element);
87 /// }
88 /// }
89 /// }).expect("Timely computation did not complete correctly.");
90 /// ```
91 pub fn new<A: Allocate>(worker: &mut Worker<A>, timer: Instant) -> Self {
92 Sequencer::preloaded(worker, timer, VecDeque::new())
93 }
94
95 /// Creates a new Sequencer preloaded with a queue of
96 /// elements.
97 pub fn preloaded<A: Allocate>(worker: &mut Worker<A>, timer: Instant, preload: VecDeque<T>) -> Self {
98
99 let send: Rc<RefCell<VecDeque<T>>> = Rc::new(RefCell::new(VecDeque::new()));
100 let recv = Rc::new(RefCell::new(preload));
101 let send_weak = Rc::downgrade(&send);
102 let recv_weak = Rc::downgrade(&recv);
103
104 // The SequenceInput activator will be held by the sequencer,
105 // by the operator itself, and by the sink operator. We can
106 // only initialize the activator once we obtain the operator
107 // address.
108 let activator = Rc::new(RefCell::new(None));
109 let activator_source = activator.clone();
110 let activator_sink = activator.clone();
111
112 // build a dataflow used to serialize and circulate commands
113 worker.dataflow::<Duration,_,_>(move |dataflow| {
114
115 let scope = dataflow.clone();
116 let peers = dataflow.peers();
117
118 let mut recvd = Vec::new();
119 let mut vector = Vec::new();
120
121 // monotonic counter to maintain per-worker total order.
122 let mut counter = 0;
123
124 // a source that attempts to pull from `recv` and produce commands for everyone
125 source(dataflow, "SequenceInput", move |capability, info| {
126
127 // intialize activator, now that we have the address
128 activator_source
129 .borrow_mut()
130 .replace(CatchupActivator {
131 activator: scope.activator_for(&info.address[..]),
132 catchup_until: None,
133 });
134
135 // so we can drop, if input queue vanishes.
136 let mut capability = Some(capability);
137
138 // closure broadcasts any commands it grabs.
139 move |output| {
140
141 if let Some(send_queue) = send_weak.upgrade() {
142
143 // capability *should* still be non-None.
144 let capability = capability.as_mut().expect("Capability unavailable");
145
146 // downgrade capability to current time.
147 capability.downgrade(&timer.elapsed());
148
149 // drain and broadcast `send`.
150 let mut session = output.session(&capability);
151 let mut borrow = send_queue.borrow_mut();
152 for element in borrow.drain(..) {
153 for worker_index in 0 .. peers {
154 session.give((worker_index, counter, element.clone()));
155 }
156 counter += 1;
157 }
158
159 let mut activator_borrow = activator_source.borrow_mut();
160 let mut activator = activator_borrow.as_mut().unwrap();
161
162 if let Some(t) = activator.catchup_until {
163 if capability.time().less_than(&t) {
164 activator.activate();
165 } else {
166 activator.catchup_until = None;
167 }
168 }
169 } else {
170 capability = None;
171 }
172 }
173 })
174 .sink(
175 Exchange::new(|x: &(usize, usize, T)| x.0 as u64),
176 "SequenceOutput",
177 move |input| {
178
179 // grab each command and queue it up
180 input.for_each(|time, data| {
181 data.swap(&mut vector);
182
183 recvd.reserve(vector.len());
184 for (worker, counter, element) in vector.drain(..) {
185 recvd.push(((time.time().clone(), worker, counter), element));
186 }
187 });
188
189 recvd.sort_by(|x,y| x.0.cmp(&y.0));
190
191 if let Some(last) = recvd.last() {
192 let mut activator_borrow = activator_sink.borrow_mut();
193 let mut activator = activator_borrow.as_mut().unwrap();
194
195 activator.catchup_until = Some((last.0).0);
196 activator.activate();
197 }
198
199 // determine how many (which) elements to read from `recvd`.
200 let count = recvd.iter().filter(|&((ref time, _, _), _)| !input.frontier().less_equal(time)).count();
201 let iter = recvd.drain(..count);
202
203 if let Some(recv_queue) = recv_weak.upgrade() {
204 recv_queue.borrow_mut().extend(iter.map(|(_,elem)| elem));
205 }
206 }
207 );
208 });
209
210 Sequencer { activator, send, recv, }
211 }
212
213 /// Adds an element to the shared log.
214 pub fn push(&mut self, element: T) {
215 self.send.borrow_mut().push_back(element);
216 self.activator.borrow_mut().as_mut().unwrap().activate();
217 }
218}
219
220impl<T> Iterator for Sequencer<T> {
221 type Item = T;
222 fn next(&mut self) -> Option<T> {
223 self.recv.borrow_mut().pop_front()
224 }
225}
226
227// We should activate on drop, as this will cause the source to drop its capability.
228impl<T> Drop for Sequencer<T> {
229 fn drop(&mut self) {
230 self.activator
231 .borrow()
232 .as_ref()
233 .expect("Sequencer.activator unavailable")
234 .activate()
235 }
236}