differential_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::lattice::Lattice;
14use crate::trace::{Trace, TraceReader, Batch, BatchReader};
15use crate::trace::wrappers::rc::TraceBox;
16
17use timely::scheduling::Activator;
18
19use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
20use super::TraceReplayInstruction;
21
22use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
23
24
25/// A `TraceReader` wrapper which can be imported into other dataflows.
26///
27/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
28/// from the dataflow in which it was defined, and imported into other dataflows.
29pub struct TraceAgent<Tr>
30where
31 Tr: TraceReader,
32 Tr::Time: Lattice+Ord+Clone+'static,
33{
34 trace: Rc<RefCell<TraceBox<Tr>>>,
35 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
36 logical_compaction: Antichain<Tr::Time>,
37 physical_compaction: Antichain<Tr::Time>,
38 temp_antichain: Antichain<Tr::Time>,
39
40 operator: OperatorInfo,
41 logging: Option<crate::logging::Logger>,
42}
43
44impl<Tr> TraceReader for TraceAgent<Tr>
45where
46 Tr: TraceReader,
47 Tr::Time: Lattice+Ord+Clone+'static,
48{
49 type Key<'a> = Tr::Key<'a>;
50 type KeyOwned = Tr::KeyOwned;
51 type Val<'a> = Tr::Val<'a>;
52 type ValOwned = Tr::ValOwned;
53 type Time = Tr::Time;
54 type Diff = Tr::Diff;
55
56 type Batch = Tr::Batch;
57 type Storage = Tr::Storage;
58 type Cursor = Tr::Cursor;
59
60 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
61 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
62 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
63 crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
64 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
65 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
66 self.temp_antichain.clear();
67 }
68 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
69 self.logical_compaction.borrow()
70 }
71 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
72 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
73 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
74 crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
75 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
76 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
77 self.temp_antichain.clear();
78 }
79 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
80 self.physical_compaction.borrow()
81 }
82 fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
83 self.trace.borrow_mut().trace.cursor_through(frontier)
84 }
85 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
86}
87
88impl<Tr> TraceAgent<Tr>
89where
90 Tr: TraceReader,
91 Tr::Time: Timestamp+Lattice,
92{
93 /// Creates a new agent from a trace reader.
94 pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
95 where
96 Tr: Trace,
97 Tr::Batch: Batch,
98 {
99 let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
100 let queues = Rc::new(RefCell::new(Vec::new()));
101
102 if let Some(logging) = &logging {
103 logging.log(
104 crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
105 );
106 }
107
108 let reader = TraceAgent {
109 trace: trace.clone(),
110 queues: Rc::downgrade(&queues),
111 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
112 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
113 temp_antichain: Antichain::new(),
114 operator,
115 logging,
116 };
117
118 let writer = TraceWriter::new(
119 vec![<Tr::Time as Timestamp>::minimum()],
120 Rc::downgrade(&trace),
121 queues,
122 );
123
124 (reader, writer)
125 }
126
127 /// Attaches a new shared queue to the trace.
128 ///
129 /// The queue is first populated with existing batches from the trace,
130 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
131 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
132 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
133 {
134 // create a new queue for progress and batch information.
135 let mut new_queue = VecDeque::new();
136
137 // add the existing batches from the trace
138 let mut upper = None;
139 self.trace
140 .borrow_mut()
141 .trace
142 .map_batches(|batch| {
143 new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
144 upper = Some(batch.upper().clone());
145 });
146
147 if let Some(upper) = upper {
148 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
149 }
150
151 let reference = Rc::new((activator, RefCell::new(new_queue)));
152
153 // wraps the queue in a ref-counted ref cell and enqueue/return it.
154 if let Some(queue) = self.queues.upgrade() {
155 queue.borrow_mut().push(Rc::downgrade(&reference));
156 }
157 reference.0.activate();
158 reference
159 }
160
161 /// The [OperatorInfo] of the underlying Timely operator
162 pub fn operator(&self) -> &OperatorInfo {
163 &self.operator
164 }
165
166 /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
167 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
168 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
169 ///
170 /// This method is subject to changes and removal and should not be considered part of a stable
171 /// interface.
172 pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
173 Rc::clone(&self.trace)
174 }
175}
176
177impl<Tr> TraceAgent<Tr>
178where
179 Tr: TraceReader+'static,
180 Tr::Time: Lattice+Ord+Clone+'static,
181{
182 /// Copies an existing collection into the supplied scope.
183 ///
184 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
185 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
186 /// of the collection is its current state, and updates occur from this point forward. The historical changes
187 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
188 /// are no longer evident.
189 ///
190 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
191 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
192 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
193 /// the historical collection may move through configurations that did not actually occur, even if eventually
194 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
195 /// the intermediate computation could do something that the original computation did not, like diverge.
196 ///
197 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
198 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
199 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
200 /// to advance times before using them).
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use timely::Config;
206 /// use differential_dataflow::input::Input;
207 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
208 /// use differential_dataflow::operators::reduce::Reduce;
209 /// use differential_dataflow::trace::Trace;
210 ///
211 /// ::timely::execute(Config::thread(), |worker| {
212 ///
213 /// // create a first dataflow
214 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
215 /// // create input handle and collection.
216 /// scope.new_collection_from(0 .. 10).1
217 /// .arrange_by_self()
218 /// .trace
219 /// });
220 ///
221 /// // do some work.
222 /// worker.step();
223 /// worker.step();
224 ///
225 /// // create a second dataflow
226 /// worker.dataflow(move |scope| {
227 /// trace.import(scope)
228 /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
229 /// });
230 ///
231 /// }).unwrap();
232 /// ```
233 pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
234 where
235 G: Scope<Timestamp=Tr::Time>,
236 Tr::Time: Timestamp,
237 {
238 self.import_named(scope, "ArrangedSource")
239 }
240
241 /// Same as `import`, but allows to name the source.
242 pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
243 where
244 G: Scope<Timestamp=Tr::Time>,
245 Tr::Time: Timestamp,
246 {
247 // Drop ShutdownButton and return only the arrangement.
248 self.import_core(scope, name).0
249 }
250
251 /// Imports an arrangement into the supplied scope.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use timely::Config;
257 /// use timely::dataflow::ProbeHandle;
258 /// use timely::dataflow::operators::Probe;
259 /// use differential_dataflow::input::InputSession;
260 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
261 /// use differential_dataflow::operators::reduce::Reduce;
262 /// use differential_dataflow::trace::Trace;
263 ///
264 /// ::timely::execute(Config::thread(), |worker| {
265 ///
266 /// let mut input = InputSession::<_,(),isize>::new();
267 /// let mut probe = ProbeHandle::new();
268 ///
269 /// // create a first dataflow
270 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
271 /// // create input handle and collection.
272 /// input.to_collection(scope)
273 /// .arrange_by_self()
274 /// .trace
275 /// });
276 ///
277 /// // do some work.
278 /// worker.step();
279 /// worker.step();
280 ///
281 /// // create a second dataflow
282 /// let mut shutdown = worker.dataflow(|scope| {
283 /// let (arrange, button) = trace.import_core(scope, "Import");
284 /// arrange.stream.probe_with(&mut probe);
285 /// button
286 /// });
287 ///
288 /// worker.step();
289 /// worker.step();
290 /// assert!(!probe.done());
291 ///
292 /// shutdown.press();
293 ///
294 /// worker.step();
295 /// worker.step();
296 /// assert!(probe.done());
297 ///
298 /// }).unwrap();
299 /// ```
300 pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
301 where
302 G: Scope<Timestamp=Tr::Time>,
303 Tr::Time: Timestamp,
304 {
305 let trace = self.clone();
306
307 let mut shutdown_button = None;
308
309 let stream = {
310
311 let shutdown_button_ref = &mut shutdown_button;
312 source(scope, name, move |capability, info| {
313
314 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
315
316 let activator = scope.activator_for(&info.address[..]);
317 let queue = self.new_listener(activator);
318
319 let activator = scope.activator_for(&info.address[..]);
320 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
321
322 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
323
324 move |output| {
325
326 let mut capabilities = capabilities.borrow_mut();
327 if let Some(ref mut capabilities) = *capabilities {
328
329 let mut borrow = queue.1.borrow_mut();
330 for instruction in borrow.drain(..) {
331 match instruction {
332 TraceReplayInstruction::Frontier(frontier) => {
333 capabilities.downgrade(&frontier.borrow()[..]);
334 },
335 TraceReplayInstruction::Batch(batch, hint) => {
336 if let Some(time) = hint {
337 if !batch.is_empty() {
338 let delayed = capabilities.delayed(&time);
339 output.session(&delayed).give(batch);
340 }
341 }
342 }
343 }
344 }
345 }
346 }
347 })
348 };
349
350 (Arranged { stream, trace }, shutdown_button.unwrap())
351 }
352
353 /// Imports an arrangement into the supplied scope.
354 ///
355 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use timely::Config;
361 /// use timely::progress::frontier::AntichainRef;
362 /// use timely::dataflow::ProbeHandle;
363 /// use timely::dataflow::operators::Probe;
364 /// use timely::dataflow::operators::Inspect;
365 /// use differential_dataflow::input::InputSession;
366 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
367 /// use differential_dataflow::operators::reduce::Reduce;
368 /// use differential_dataflow::trace::Trace;
369 /// use differential_dataflow::trace::TraceReader;
370 /// use differential_dataflow::input::Input;
371 ///
372 /// ::timely::execute(Config::thread(), |worker| {
373 ///
374 /// let mut probe = ProbeHandle::new();
375 ///
376 /// // create a first dataflow
377 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
378 /// // create input handle and collection.
379 /// let (handle, stream) = scope.new_collection();
380 /// let trace = stream.arrange_by_self().trace;
381 /// (handle, trace)
382 /// });
383 ///
384 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
385 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
386 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
387 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
388 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
389 ///
390 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
391 ///
392 /// // create a second dataflow
393 /// let mut shutdown = worker.dataflow(|scope| {
394 /// let (arrange, button) = trace.import_frontier(scope, "Import");
395 /// arrange
396 /// .as_collection(|k,v| (*k,*v))
397 /// .inner
398 /// .inspect(|(d,t,r)| {
399 /// assert!(t >= &5);
400 /// })
401 /// .probe_with(&mut probe);
402 ///
403 /// button
404 /// });
405 ///
406 /// worker.step();
407 /// worker.step();
408 /// assert!(!probe.done());
409 ///
410 /// shutdown.press();
411 ///
412 /// worker.step();
413 /// worker.step();
414 /// assert!(probe.done());
415 ///
416 /// }).unwrap();
417 /// ```
418 pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
419 where
420 G: Scope<Timestamp=Tr::Time>,
421 Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
422 Tr: TraceReader,
423 {
424 // This frontier describes our only guarantee on the compaction frontier.
425 let since = self.get_logical_compaction().to_owned();
426 self.import_frontier_core(scope, name, since, Antichain::new())
427 }
428
429 /// Import a trace restricted to a specific time interval `[since, until)`.
430 ///
431 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
432 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
433 /// to `until` the operator capability will be dropped.
434 ///
435 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
436 /// frontier indicates the end of times.
437 pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
438 where
439 G: Scope<Timestamp=Tr::Time>,
440 Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
441 Tr: TraceReader,
442 {
443 let trace = self.clone();
444 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
445
446 let mut shutdown_button = None;
447
448 let stream = {
449
450 let shutdown_button_ref = &mut shutdown_button;
451 source(scope, name, move |capability, info| {
452
453 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
454
455 let activator = scope.activator_for(&info.address[..]);
456 let queue = self.new_listener(activator);
457
458 let activator = scope.activator_for(&info.address[..]);
459 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
460
461 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
462
463 move |output| {
464
465 let mut capabilities = capabilities.borrow_mut();
466 if let Some(ref mut capabilities) = *capabilities {
467 let mut borrow = queue.1.borrow_mut();
468 for instruction in borrow.drain(..) {
469 // If we have dropped the capabilities due to `until`, attempt no further work.
470 // Without the capabilities, we should soon be shut down (once this loop ends).
471 if !capabilities.is_empty() {
472 match instruction {
473 TraceReplayInstruction::Frontier(frontier) => {
474 if timely::PartialOrder::less_equal(&until, &frontier) {
475 // It might be nice to actively *drop* `capabilities`, but it seems
476 // complicated logically (i.e. we'd have to break out of the loop).
477 capabilities.downgrade(&[]);
478 } else {
479 capabilities.downgrade(&frontier.borrow()[..]);
480 }
481 },
482 TraceReplayInstruction::Batch(batch, hint) => {
483 if let Some(time) = hint {
484 if !batch.is_empty() {
485 let delayed = capabilities.delayed(&time);
486 output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
487 }
488 }
489 }
490 }
491 }
492 }
493 }
494 }
495 })
496 };
497
498 (Arranged { stream, trace }, shutdown_button.unwrap())
499 }
500}
501
502
503
504/// Wrapper than can drop shared references.
505pub struct ShutdownButton<T> {
506 reference: Rc<RefCell<Option<T>>>,
507 activator: Activator,
508}
509
510impl<T> ShutdownButton<T> {
511 /// Creates a new ShutdownButton.
512 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
513 Self { reference, activator }
514 }
515 /// Push the shutdown button, dropping the shared objects.
516 pub fn press(&mut self) {
517 *self.reference.borrow_mut() = None;
518 self.activator.activate();
519 }
520 /// Hotwires the button to one that is pressed if dropped.
521 pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
522 ShutdownDeadmans {
523 button: self
524 }
525 }
526}
527
528/// A deadman's switch version of a shutdown button.
529///
530/// This type hosts a shutdown button and will press it when dropped.
531pub struct ShutdownDeadmans<T> {
532 button: ShutdownButton<T>,
533}
534
535impl<T> Drop for ShutdownDeadmans<T> {
536 fn drop(&mut self) {
537 self.button.press();
538 }
539}
540
541impl<Tr> Clone for TraceAgent<Tr>
542where
543 Tr: TraceReader,
544 Tr::Time: Lattice+Ord+Clone+'static,
545{
546 fn clone(&self) -> Self {
547
548 if let Some(logging) = &self.logging {
549 logging.log(
550 crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
551 );
552 }
553
554 // increase counts for wrapped `TraceBox`.
555 let empty_frontier = Antichain::new();
556 self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
557 self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
558
559 TraceAgent {
560 trace: self.trace.clone(),
561 queues: self.queues.clone(),
562 logical_compaction: self.logical_compaction.clone(),
563 physical_compaction: self.physical_compaction.clone(),
564 operator: self.operator.clone(),
565 logging: self.logging.clone(),
566 temp_antichain: Antichain::new(),
567 }
568 }
569}
570
571impl<Tr> Drop for TraceAgent<Tr>
572where
573 Tr: TraceReader,
574 Tr::Time: Lattice+Ord+Clone+'static,
575{
576 fn drop(&mut self) {
577
578 if let Some(logging) = &self.logging {
579 logging.log(
580 crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
581 );
582 }
583
584 // decrement borrow counts to remove all holds
585 let empty_frontier = Antichain::new();
586 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
587 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
588 }
589}