Skip to main content

differential_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr: TraceReader> {
29    trace: Rc<RefCell<TraceBox<Tr>>>,
30    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
31    logical_compaction: Antichain<Tr::Time>,
32    physical_compaction: Antichain<Tr::Time>,
33    temp_antichain: Antichain<Tr::Time>,
34
35    operator: OperatorInfo,
36    logging: Option<crate::logging::Logger>,
37}
38
39use crate::trace::implementations::WithLayout;
40impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
41    type Layout = Tr::Layout;
42}
43
44impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
45
46    type Batch = Tr::Batch;
47    type Storage = Tr::Storage;
48    type Cursor = Tr::Cursor;
49
50    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
51        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
52        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
53        crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
54        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
55        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
56        self.temp_antichain.clear();
57    }
58    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
59        self.logical_compaction.borrow()
60    }
61    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
62        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
63        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
64        crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
65        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
66        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
67        self.temp_antichain.clear();
68    }
69    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
70        self.physical_compaction.borrow()
71    }
72    fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
73        self.trace.borrow_mut().trace.cursor_through(frontier)
74    }
75    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
76}
77
78impl<Tr: TraceReader> TraceAgent<Tr> {
79    /// Creates a new agent from a trace reader.
80    pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
81    where
82        Tr: Trace,
83    {
84        let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
85        let queues = Rc::new(RefCell::new(Vec::new()));
86
87        if let Some(logging) = &logging {
88            logging.log(
89                crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
90            );
91        }
92
93        let reader = TraceAgent {
94            trace: trace.clone(),
95            queues: Rc::downgrade(&queues),
96            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
97            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
98            temp_antichain: Antichain::new(),
99            operator,
100            logging,
101        };
102
103        let writer = TraceWriter::new(
104            vec![Tr::Time::minimum()],
105            Rc::downgrade(&trace),
106            queues,
107        );
108
109        (reader, writer)
110    }
111
112    /// Attaches a new shared queue to the trace.
113    ///
114    /// The queue is first populated with existing batches from the trace,
115    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
116    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
117    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
118    {
119        // create a new queue for progress and batch information.
120        let mut new_queue = VecDeque::new();
121
122        // add the existing batches from the trace
123        let mut upper = None;
124        self.trace
125            .borrow_mut()
126            .trace
127            .map_batches(|batch| {
128                new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(Tr::Time::minimum())));
129                upper = Some(batch.upper().clone());
130            });
131
132        if let Some(upper) = upper {
133            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
134        }
135
136        let reference = Rc::new((activator, RefCell::new(new_queue)));
137
138        // wraps the queue in a ref-counted ref cell and enqueue/return it.
139        if let Some(queue) = self.queues.upgrade() {
140            queue.borrow_mut().push(Rc::downgrade(&reference));
141        }
142        reference.0.activate();
143        reference
144    }
145
146    /// The [OperatorInfo] of the underlying Timely operator
147    pub fn operator(&self) -> &OperatorInfo {
148        &self.operator
149    }
150
151    /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
152    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
153    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
154    ///
155    /// This method is subject to changes and removal and should not be considered part of a stable
156    /// interface.
157    pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
158        Rc::clone(&self.trace)
159    }
160}
161
162impl<Tr: TraceReader+'static> TraceAgent<Tr> {
163    /// Copies an existing collection into the supplied scope.
164    ///
165    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
166    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
167    /// of the collection is its current state, and updates occur from this point forward. The historical changes
168    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
169    /// are no longer evident.
170    ///
171    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
172    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
173    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
174    /// the historical collection may move through configurations that did not actually occur, even if eventually
175    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
176    /// the intermediate computation could do something that the original computation did not, like diverge.
177    ///
178    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
179    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
180    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
181    /// to advance times before using them).
182    ///
183    /// # Examples
184    ///
185    /// ```
186    /// use timely::Config;
187    /// use differential_dataflow::input::Input;
188    /// use differential_dataflow::trace::Trace;
189    /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
190    ///
191    /// ::timely::execute(Config::thread(), |worker| {
192    ///
193    ///     // create a first dataflow
194    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
195    ///         // create input handle and collection.
196    ///         scope.new_collection_from(0 .. 10).1
197    ///              .arrange_by_self()
198    ///              .trace
199    ///     });
200    ///
201    ///     // do some work.
202    ///     worker.step();
203    ///     worker.step();
204    ///
205    ///     // create a second dataflow
206    ///     worker.dataflow(move |scope| {
207    ///         trace.import(scope)
208    ///              .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>(
209    ///                  "Reduce",
210    ///                  |_key, src, dst| dst.push((*src[0].0, 1)),
211    ///                  |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
212    ///              )
213    ///              .as_collection(|k,v| (k.clone(), v.clone()));
214    ///     });
215    ///
216    /// }).unwrap();
217    /// ```
218    pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent<Tr>>
219    {
220        self.import_named(scope, "ArrangedSource")
221    }
222
223    /// Same as `import`, but allows to name the source.
224    pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
225    {
226        // Drop ShutdownButton and return only the arrangement.
227        self.import_core(scope, name).0
228    }
229
230    /// Imports an arrangement into the supplied scope.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// use timely::Config;
236    /// use timely::dataflow::ProbeHandle;
237    /// use timely::dataflow::operators::Probe;
238    /// use differential_dataflow::input::InputSession;
239    /// use differential_dataflow::trace::Trace;
240    ///
241    /// ::timely::execute(Config::thread(), |worker| {
242    ///
243    ///     let mut input = InputSession::<_,(),isize>::new();
244    ///     let mut probe = ProbeHandle::new();
245    ///
246    ///     // create a first dataflow
247    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
248    ///         // create input handle and collection.
249    ///         input.to_collection(scope)
250    ///              .arrange_by_self()
251    ///              .trace
252    ///     });
253    ///
254    ///     // do some work.
255    ///     worker.step();
256    ///     worker.step();
257    ///
258    ///     // create a second dataflow
259    ///     let mut shutdown = worker.dataflow(|scope| {
260    ///         let (arrange, button) = trace.import_core(scope, "Import");
261    ///         arrange.stream.probe_with(&mut probe);
262    ///         button
263    ///     });
264    ///
265    ///     worker.step();
266    ///     worker.step();
267    ///     assert!(!probe.done());
268    ///
269    ///     shutdown.press();
270    ///
271    ///     worker.step();
272    ///     worker.step();
273    ///     assert!(probe.done());
274    ///
275    /// }).unwrap();
276    /// ```
277    pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
278    {
279        let trace = self.clone();
280
281        let mut shutdown_button = None;
282
283        let stream = {
284
285            let shutdown_button_ref = &mut shutdown_button;
286            source(scope, name, move |capability, info| {
287
288                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
289
290                let activator = scope.activator_for(Rc::clone(&info.address));
291                let queue = self.new_listener(activator);
292
293                let activator = scope.activator_for(info.address);
294                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
295
296                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
297
298                move |output| {
299
300                    let mut capabilities = capabilities.borrow_mut();
301                    if let Some(ref mut capabilities) = *capabilities {
302
303                        let mut borrow = queue.1.borrow_mut();
304                        for instruction in borrow.drain(..) {
305                            match instruction {
306                                TraceReplayInstruction::Frontier(frontier) => {
307                                    capabilities.downgrade(&frontier.borrow()[..]);
308                                },
309                                TraceReplayInstruction::Batch(batch, hint) => {
310                                    if let Some(time) = hint {
311                                        if !batch.is_empty() {
312                                            let delayed = capabilities.delayed(&time);
313                                            output.session(&delayed).give(batch);
314                                        }
315                                    }
316                                }
317                            }
318                        }
319                    }
320                }
321            })
322        };
323
324        (Arranged { stream, trace }, shutdown_button.unwrap())
325    }
326
327    /// Imports an arrangement into the supplied scope.
328    ///
329    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
330    ///
331    /// # Examples
332    ///
333    /// ```
334    /// use timely::Config;
335    /// use timely::progress::frontier::AntichainRef;
336    /// use timely::dataflow::ProbeHandle;
337    /// use timely::dataflow::operators::Probe;
338    /// use timely::dataflow::operators::Inspect;
339    /// use differential_dataflow::input::InputSession;
340    /// use differential_dataflow::trace::Trace;
341    /// use differential_dataflow::trace::TraceReader;
342    /// use differential_dataflow::input::Input;
343    ///
344    /// ::timely::execute(Config::thread(), |worker| {
345    ///
346    ///     let mut probe = ProbeHandle::new();
347    ///
348    ///     // create a first dataflow
349    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
350    ///         // create input handle and collection.
351    ///         let (handle, stream) = scope.new_collection();
352    ///         let trace = stream.arrange_by_self().trace;
353    ///         (handle, trace)
354    ///     });
355    ///
356    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
357    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
358    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
359    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
360    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
361    ///
362    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
363    ///
364    ///     // create a second dataflow
365    ///     let mut shutdown = worker.dataflow(|scope| {
366    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
367    ///         arrange
368    ///             .as_collection(|k,v| (*k,*v))
369    ///             .inner
370    ///             .inspect(|(d,t,r)| {
371    ///                 assert!(t >= &5);
372    ///             })
373    ///             .probe_with(&mut probe);
374    ///
375    ///         button
376    ///     });
377    ///
378    ///     worker.step();
379    ///     worker.step();
380    ///     assert!(!probe.done());
381    ///
382    ///     shutdown.press();
383    ///
384    ///     worker.step();
385    ///     worker.step();
386    ///     assert!(probe.done());
387    ///
388    /// }).unwrap();
389    /// ```
390    pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
391    where
392        Tr: TraceReader,
393    {
394        // This frontier describes our only guarantee on the compaction frontier.
395        let since = self.get_logical_compaction().to_owned();
396        self.import_frontier_core(scope, name, since, Antichain::new())
397    }
398
399    /// Import a trace restricted to a specific time interval `[since, until)`.
400    ///
401    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
402    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
403    /// to `until` the operator capability will be dropped.
404    ///
405    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
406    /// frontier indicates the end of times.
407    pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
408    where
409        Tr: TraceReader,
410    {
411        let trace = self.clone();
412        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
413
414        let mut shutdown_button = None;
415
416        let stream = {
417
418            let shutdown_button_ref = &mut shutdown_button;
419            source(scope, name, move |capability, info| {
420
421                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
422
423                let activator = scope.activator_for(Rc::clone(&info.address));
424                let queue = self.new_listener(activator);
425
426                let activator = scope.activator_for(info.address);
427                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
428
429                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
430
431                move |output| {
432
433                    let mut capabilities = capabilities.borrow_mut();
434                    if let Some(ref mut capabilities) = *capabilities {
435                        let mut borrow = queue.1.borrow_mut();
436                        for instruction in borrow.drain(..) {
437                            // If we have dropped the capabilities due to `until`, attempt no further work.
438                            // Without the capabilities, we should soon be shut down (once this loop ends).
439                            if !capabilities.is_empty() {
440                                match instruction {
441                                    TraceReplayInstruction::Frontier(frontier) => {
442                                        if timely::PartialOrder::less_equal(&until, &frontier) {
443                                            // It might be nice to actively *drop* `capabilities`, but it seems
444                                            // complicated logically (i.e. we'd have to break out of the loop).
445                                            capabilities.downgrade(&[]);
446                                        } else {
447                                            capabilities.downgrade(&frontier.borrow()[..]);
448                                        }
449                                    },
450                                    TraceReplayInstruction::Batch(batch, hint) => {
451                                        if let Some(time) = hint {
452                                            if !batch.is_empty() {
453                                                let delayed = capabilities.delayed(&time);
454                                                output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
455                                            }
456                                        }
457                                    }
458                                }
459                            }
460                        }
461                    }
462                }
463            })
464        };
465
466        (Arranged { stream, trace }, shutdown_button.unwrap())
467    }
468}
469
470
471
472/// Wrapper than can drop shared references.
473pub struct ShutdownButton<T> {
474    reference: Rc<RefCell<Option<T>>>,
475    activator: Activator,
476}
477
478impl<T> ShutdownButton<T> {
479    /// Creates a new ShutdownButton.
480    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
481        Self { reference, activator }
482    }
483    /// Push the shutdown button, dropping the shared objects.
484    pub fn press(&mut self) {
485        *self.reference.borrow_mut() = None;
486        self.activator.activate();
487    }
488    /// Hotwires the button to one that is pressed if dropped.
489    pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
490        ShutdownDeadmans {
491            button: self
492        }
493    }
494}
495
496/// A deadman's switch version of a shutdown button.
497///
498/// This type hosts a shutdown button and will press it when dropped.
499pub struct ShutdownDeadmans<T> {
500    button: ShutdownButton<T>,
501}
502
503impl<T> Drop for ShutdownDeadmans<T> {
504    fn drop(&mut self) {
505        self.button.press();
506    }
507}
508
509impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
510    fn clone(&self) -> Self {
511
512        if let Some(logging) = &self.logging {
513            logging.log(
514                crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
515            );
516        }
517
518        // increase counts for wrapped `TraceBox`.
519        let empty_frontier = Antichain::new();
520        self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
521        self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
522
523        TraceAgent {
524            trace: self.trace.clone(),
525            queues: self.queues.clone(),
526            logical_compaction: self.logical_compaction.clone(),
527            physical_compaction: self.physical_compaction.clone(),
528            operator: self.operator.clone(),
529            logging: self.logging.clone(),
530            temp_antichain: Antichain::new(),
531        }
532    }
533}
534
535impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
536    fn drop(&mut self) {
537
538        if let Some(logging) = &self.logging {
539            logging.log(
540                crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
541            );
542        }
543
544        // decrement borrow counts to remove all holds
545        let empty_frontier = Antichain::new();
546        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
547        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
548    }
549}