differential_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, Batch, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr>
29where
30 Tr: TraceReader,
31{
32 trace: Rc<RefCell<TraceBox<Tr>>>,
33 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
34 logical_compaction: Antichain<Tr::Time>,
35 physical_compaction: Antichain<Tr::Time>,
36 temp_antichain: Antichain<Tr::Time>,
37
38 operator: OperatorInfo,
39 logging: Option<crate::logging::Logger>,
40}
41
42impl<Tr> TraceReader for TraceAgent<Tr>
43where
44 Tr: TraceReader,
45{
46 type Key<'a> = Tr::Key<'a>;
47 type Val<'a> = Tr::Val<'a>;
48 type Time = Tr::Time;
49 type TimeGat<'a> = Tr::TimeGat<'a>;
50 type Diff = Tr::Diff;
51 type DiffGat<'a> = Tr::DiffGat<'a>;
52
53 type Batch = Tr::Batch;
54 type Storage = Tr::Storage;
55 type Cursor = Tr::Cursor;
56
57 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
58 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
59 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
60 crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
61 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
62 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
63 self.temp_antichain.clear();
64 }
65 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
66 self.logical_compaction.borrow()
67 }
68 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
69 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
70 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
71 crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
72 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
73 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
74 self.temp_antichain.clear();
75 }
76 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
77 self.physical_compaction.borrow()
78 }
79 fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
80 self.trace.borrow_mut().trace.cursor_through(frontier)
81 }
82 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
83}
84
85impl<Tr: TraceReader> TraceAgent<Tr> {
86 /// Creates a new agent from a trace reader.
87 pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
88 where
89 Tr: Trace,
90 Tr::Batch: Batch,
91 {
92 let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
93 let queues = Rc::new(RefCell::new(Vec::new()));
94
95 if let Some(logging) = &logging {
96 logging.log(
97 crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
98 );
99 }
100
101 let reader = TraceAgent {
102 trace: trace.clone(),
103 queues: Rc::downgrade(&queues),
104 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
105 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
106 temp_antichain: Antichain::new(),
107 operator,
108 logging,
109 };
110
111 let writer = TraceWriter::new(
112 vec![<Tr::Time as Timestamp>::minimum()],
113 Rc::downgrade(&trace),
114 queues,
115 );
116
117 (reader, writer)
118 }
119
120 /// Attaches a new shared queue to the trace.
121 ///
122 /// The queue is first populated with existing batches from the trace,
123 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
124 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
125 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
126 {
127 // create a new queue for progress and batch information.
128 let mut new_queue = VecDeque::new();
129
130 // add the existing batches from the trace
131 let mut upper = None;
132 self.trace
133 .borrow_mut()
134 .trace
135 .map_batches(|batch| {
136 new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
137 upper = Some(batch.upper().clone());
138 });
139
140 if let Some(upper) = upper {
141 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
142 }
143
144 let reference = Rc::new((activator, RefCell::new(new_queue)));
145
146 // wraps the queue in a ref-counted ref cell and enqueue/return it.
147 if let Some(queue) = self.queues.upgrade() {
148 queue.borrow_mut().push(Rc::downgrade(&reference));
149 }
150 reference.0.activate();
151 reference
152 }
153
154 /// The [OperatorInfo] of the underlying Timely operator
155 pub fn operator(&self) -> &OperatorInfo {
156 &self.operator
157 }
158
159 /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
160 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
161 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
162 ///
163 /// This method is subject to changes and removal and should not be considered part of a stable
164 /// interface.
165 pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
166 Rc::clone(&self.trace)
167 }
168}
169
170impl<Tr> TraceAgent<Tr>
171where
172 Tr: TraceReader+'static,
173{
174 /// Copies an existing collection into the supplied scope.
175 ///
176 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
177 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
178 /// of the collection is its current state, and updates occur from this point forward. The historical changes
179 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
180 /// are no longer evident.
181 ///
182 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
183 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
184 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
185 /// the historical collection may move through configurations that did not actually occur, even if eventually
186 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
187 /// the intermediate computation could do something that the original computation did not, like diverge.
188 ///
189 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
190 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
191 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
192 /// to advance times before using them).
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use timely::Config;
198 /// use differential_dataflow::input::Input;
199 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
200 /// use differential_dataflow::operators::reduce::Reduce;
201 /// use differential_dataflow::trace::Trace;
202 ///
203 /// ::timely::execute(Config::thread(), |worker| {
204 ///
205 /// // create a first dataflow
206 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
207 /// // create input handle and collection.
208 /// scope.new_collection_from(0 .. 10).1
209 /// .arrange_by_self()
210 /// .trace
211 /// });
212 ///
213 /// // do some work.
214 /// worker.step();
215 /// worker.step();
216 ///
217 /// // create a second dataflow
218 /// worker.dataflow(move |scope| {
219 /// trace.import(scope)
220 /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
221 /// });
222 ///
223 /// }).unwrap();
224 /// ```
225 pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
226 where
227 G: Scope<Timestamp=Tr::Time>,
228 {
229 self.import_named(scope, "ArrangedSource")
230 }
231
232 /// Same as `import`, but allows to name the source.
233 pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
234 where
235 G: Scope<Timestamp=Tr::Time>,
236 {
237 // Drop ShutdownButton and return only the arrangement.
238 self.import_core(scope, name).0
239 }
240
241 /// Imports an arrangement into the supplied scope.
242 ///
243 /// # Examples
244 ///
245 /// ```
246 /// use timely::Config;
247 /// use timely::dataflow::ProbeHandle;
248 /// use timely::dataflow::operators::Probe;
249 /// use differential_dataflow::input::InputSession;
250 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
251 /// use differential_dataflow::operators::reduce::Reduce;
252 /// use differential_dataflow::trace::Trace;
253 ///
254 /// ::timely::execute(Config::thread(), |worker| {
255 ///
256 /// let mut input = InputSession::<_,(),isize>::new();
257 /// let mut probe = ProbeHandle::new();
258 ///
259 /// // create a first dataflow
260 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
261 /// // create input handle and collection.
262 /// input.to_collection(scope)
263 /// .arrange_by_self()
264 /// .trace
265 /// });
266 ///
267 /// // do some work.
268 /// worker.step();
269 /// worker.step();
270 ///
271 /// // create a second dataflow
272 /// let mut shutdown = worker.dataflow(|scope| {
273 /// let (arrange, button) = trace.import_core(scope, "Import");
274 /// arrange.stream.probe_with(&mut probe);
275 /// button
276 /// });
277 ///
278 /// worker.step();
279 /// worker.step();
280 /// assert!(!probe.done());
281 ///
282 /// shutdown.press();
283 ///
284 /// worker.step();
285 /// worker.step();
286 /// assert!(probe.done());
287 ///
288 /// }).unwrap();
289 /// ```
290 pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
291 where
292 G: Scope<Timestamp=Tr::Time>,
293 {
294 let trace = self.clone();
295
296 let mut shutdown_button = None;
297
298 let stream = {
299
300 let shutdown_button_ref = &mut shutdown_button;
301 source(scope, name, move |capability, info| {
302
303 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
304
305 let activator = scope.activator_for(Rc::clone(&info.address));
306 let queue = self.new_listener(activator);
307
308 let activator = scope.activator_for(info.address);
309 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
310
311 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
312
313 move |output| {
314
315 let mut capabilities = capabilities.borrow_mut();
316 if let Some(ref mut capabilities) = *capabilities {
317
318 let mut borrow = queue.1.borrow_mut();
319 for instruction in borrow.drain(..) {
320 match instruction {
321 TraceReplayInstruction::Frontier(frontier) => {
322 capabilities.downgrade(&frontier.borrow()[..]);
323 },
324 TraceReplayInstruction::Batch(batch, hint) => {
325 if let Some(time) = hint {
326 if !batch.is_empty() {
327 let delayed = capabilities.delayed(&time);
328 output.session(&delayed).give(batch);
329 }
330 }
331 }
332 }
333 }
334 }
335 }
336 })
337 };
338
339 (Arranged { stream, trace }, shutdown_button.unwrap())
340 }
341
342 /// Imports an arrangement into the supplied scope.
343 ///
344 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
345 ///
346 /// # Examples
347 ///
348 /// ```
349 /// use timely::Config;
350 /// use timely::progress::frontier::AntichainRef;
351 /// use timely::dataflow::ProbeHandle;
352 /// use timely::dataflow::operators::Probe;
353 /// use timely::dataflow::operators::Inspect;
354 /// use differential_dataflow::input::InputSession;
355 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
356 /// use differential_dataflow::operators::reduce::Reduce;
357 /// use differential_dataflow::trace::Trace;
358 /// use differential_dataflow::trace::TraceReader;
359 /// use differential_dataflow::input::Input;
360 ///
361 /// ::timely::execute(Config::thread(), |worker| {
362 ///
363 /// let mut probe = ProbeHandle::new();
364 ///
365 /// // create a first dataflow
366 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
367 /// // create input handle and collection.
368 /// let (handle, stream) = scope.new_collection();
369 /// let trace = stream.arrange_by_self().trace;
370 /// (handle, trace)
371 /// });
372 ///
373 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
374 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
375 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
376 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
377 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
378 ///
379 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
380 ///
381 /// // create a second dataflow
382 /// let mut shutdown = worker.dataflow(|scope| {
383 /// let (arrange, button) = trace.import_frontier(scope, "Import");
384 /// arrange
385 /// .as_collection(|k,v| (*k,*v))
386 /// .inner
387 /// .inspect(|(d,t,r)| {
388 /// assert!(t >= &5);
389 /// })
390 /// .probe_with(&mut probe);
391 ///
392 /// button
393 /// });
394 ///
395 /// worker.step();
396 /// worker.step();
397 /// assert!(!probe.done());
398 ///
399 /// shutdown.press();
400 ///
401 /// worker.step();
402 /// worker.step();
403 /// assert!(probe.done());
404 ///
405 /// }).unwrap();
406 /// ```
407 pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
408 where
409 G: Scope<Timestamp=Tr::Time>,
410 Tr: TraceReader,
411 {
412 // This frontier describes our only guarantee on the compaction frontier.
413 let since = self.get_logical_compaction().to_owned();
414 self.import_frontier_core(scope, name, since, Antichain::new())
415 }
416
417 /// Import a trace restricted to a specific time interval `[since, until)`.
418 ///
419 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
420 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
421 /// to `until` the operator capability will be dropped.
422 ///
423 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
424 /// frontier indicates the end of times.
425 pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
426 where
427 G: Scope<Timestamp=Tr::Time>,
428 Tr: TraceReader,
429 {
430 let trace = self.clone();
431 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
432
433 let mut shutdown_button = None;
434
435 let stream = {
436
437 let shutdown_button_ref = &mut shutdown_button;
438 source(scope, name, move |capability, info| {
439
440 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
441
442 let activator = scope.activator_for(Rc::clone(&info.address));
443 let queue = self.new_listener(activator);
444
445 let activator = scope.activator_for(info.address);
446 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
447
448 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
449
450 move |output| {
451
452 let mut capabilities = capabilities.borrow_mut();
453 if let Some(ref mut capabilities) = *capabilities {
454 let mut borrow = queue.1.borrow_mut();
455 for instruction in borrow.drain(..) {
456 // If we have dropped the capabilities due to `until`, attempt no further work.
457 // Without the capabilities, we should soon be shut down (once this loop ends).
458 if !capabilities.is_empty() {
459 match instruction {
460 TraceReplayInstruction::Frontier(frontier) => {
461 if timely::PartialOrder::less_equal(&until, &frontier) {
462 // It might be nice to actively *drop* `capabilities`, but it seems
463 // complicated logically (i.e. we'd have to break out of the loop).
464 capabilities.downgrade(&[]);
465 } else {
466 capabilities.downgrade(&frontier.borrow()[..]);
467 }
468 },
469 TraceReplayInstruction::Batch(batch, hint) => {
470 if let Some(time) = hint {
471 if !batch.is_empty() {
472 let delayed = capabilities.delayed(&time);
473 output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
474 }
475 }
476 }
477 }
478 }
479 }
480 }
481 }
482 })
483 };
484
485 (Arranged { stream, trace }, shutdown_button.unwrap())
486 }
487}
488
489
490
491/// Wrapper than can drop shared references.
492pub struct ShutdownButton<T> {
493 reference: Rc<RefCell<Option<T>>>,
494 activator: Activator,
495}
496
497impl<T> ShutdownButton<T> {
498 /// Creates a new ShutdownButton.
499 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
500 Self { reference, activator }
501 }
502 /// Push the shutdown button, dropping the shared objects.
503 pub fn press(&mut self) {
504 *self.reference.borrow_mut() = None;
505 self.activator.activate();
506 }
507 /// Hotwires the button to one that is pressed if dropped.
508 pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
509 ShutdownDeadmans {
510 button: self
511 }
512 }
513}
514
515/// A deadman's switch version of a shutdown button.
516///
517/// This type hosts a shutdown button and will press it when dropped.
518pub struct ShutdownDeadmans<T> {
519 button: ShutdownButton<T>,
520}
521
522impl<T> Drop for ShutdownDeadmans<T> {
523 fn drop(&mut self) {
524 self.button.press();
525 }
526}
527
528impl<Tr> Clone for TraceAgent<Tr>
529where
530 Tr: TraceReader,
531{
532 fn clone(&self) -> Self {
533
534 if let Some(logging) = &self.logging {
535 logging.log(
536 crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
537 );
538 }
539
540 // increase counts for wrapped `TraceBox`.
541 let empty_frontier = Antichain::new();
542 self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
543 self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
544
545 TraceAgent {
546 trace: self.trace.clone(),
547 queues: self.queues.clone(),
548 logical_compaction: self.logical_compaction.clone(),
549 physical_compaction: self.physical_compaction.clone(),
550 operator: self.operator.clone(),
551 logging: self.logging.clone(),
552 temp_antichain: Antichain::new(),
553 }
554 }
555}
556
557impl<Tr> Drop for TraceAgent<Tr>
558where
559 Tr: TraceReader,
560{
561 fn drop(&mut self) {
562
563 if let Some(logging) = &self.logging {
564 logging.log(
565 crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
566 );
567 }
568
569 // decrement borrow counts to remove all holds
570 let empty_frontier = Antichain::new();
571 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
572 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
573 }
574}