Skip to main content

differential_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14
15use timely::scheduling::Activator;
16
17use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
18use super::TraceReplayInstruction;
19
20use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
21
22
23/// A `TraceReader` wrapper which can be imported into other dataflows.
24///
25/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
26/// from the dataflow in which it was defined, and imported into other dataflows.
27pub struct TraceAgent<Tr: TraceReader> {
28    trace: Rc<RefCell<trace_box::TraceBox<Tr>>>,
29    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
30    logical_compaction: Antichain<Tr::Time>,
31    physical_compaction: Antichain<Tr::Time>,
32    temp_antichain: Antichain<Tr::Time>,
33
34    operator: OperatorInfo,
35    logging: Option<crate::logging::Logger>,
36}
37
38use crate::trace::implementations::WithLayout;
39impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
40    type Layout = Tr::Layout;
41}
42
43impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
44
45    type Batch = Tr::Batch;
46    type Storage = Tr::Storage;
47    type Cursor = Tr::Cursor;
48
49    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
50        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
51        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
52        crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
53        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
54        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
55        self.temp_antichain.clear();
56    }
57    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
58        self.logical_compaction.borrow()
59    }
60    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
61        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
62        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
63        crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
64        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
65        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
66        self.temp_antichain.clear();
67    }
68    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
69        self.physical_compaction.borrow()
70    }
71    fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
72        self.trace.borrow_mut().trace.cursor_through(frontier)
73    }
74    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
75}
76
77impl<Tr: TraceReader> TraceAgent<Tr> {
78    /// Creates a new agent from a trace reader.
79    pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
80    where
81        Tr: Trace,
82    {
83        let trace = Rc::new(RefCell::new(trace_box::TraceBox::new(trace)));
84        let queues = Rc::new(RefCell::new(Vec::new()));
85
86        if let Some(logging) = &logging {
87            logging.log(
88                crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
89            );
90        }
91
92        let reader = TraceAgent {
93            trace: Rc::clone(&trace),
94            queues: Rc::downgrade(&queues),
95            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
96            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
97            temp_antichain: Antichain::new(),
98            operator,
99            logging,
100        };
101
102        let writer = TraceWriter::new(
103            vec![Tr::Time::minimum()],
104            Rc::downgrade(&trace),
105            queues,
106        );
107
108        (reader, writer)
109    }
110
111    /// Attaches a new shared queue to the trace.
112    ///
113    /// The queue is first populated with existing batches from the trace,
114    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
115    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
116    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
117    {
118        // create a new queue for progress and batch information.
119        let mut new_queue = VecDeque::new();
120
121        // add the existing batches from the trace
122        let mut upper = None;
123        self.trace
124            .borrow_mut()
125            .trace
126            .map_batches(|batch| {
127                new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(Tr::Time::minimum())));
128                upper = Some(batch.upper().clone());
129            });
130
131        if let Some(upper) = upper {
132            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
133        }
134
135        let reference = Rc::new((activator, RefCell::new(new_queue)));
136
137        // wraps the queue in a ref-counted ref cell and enqueue/return it.
138        if let Some(queue) = self.queues.upgrade() {
139            queue.borrow_mut().push(Rc::downgrade(&reference));
140        }
141        reference.0.activate();
142        reference
143    }
144
145    /// The [OperatorInfo] of the underlying Timely operator
146    pub fn operator(&self) -> &OperatorInfo {
147        &self.operator
148    }
149
150    /// Obtain a reference to the inner [`trace_box::TraceBox`]. It is the caller's obligation to maintain
151    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
152    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
153    ///
154    /// This method is subject to changes and removal and should not be considered part of a stable
155    /// interface.
156    pub fn trace_box_unstable(&self) -> Rc<RefCell<trace_box::TraceBox<Tr>>> {
157        Rc::clone(&self.trace)
158    }
159}
160
161impl<Tr: TraceReader+'static> TraceAgent<Tr> {
162    /// Copies an existing collection into the supplied scope.
163    ///
164    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
165    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
166    /// of the collection is its current state, and updates occur from this point forward. The historical changes
167    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
168    /// are no longer evident.
169    ///
170    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
171    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
172    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
173    /// the historical collection may move through configurations that did not actually occur, even if eventually
174    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
175    /// the intermediate computation could do something that the original computation did not, like diverge.
176    ///
177    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
178    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
179    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
180    /// to advance times before using them).
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use timely::Config;
186    /// use differential_dataflow::input::Input;
187    /// use differential_dataflow::trace::Trace;
188    /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
189    ///
190    /// ::timely::execute(Config::thread(), |worker| {
191    ///
192    ///     // create a first dataflow
193    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
194    ///         // create input handle and collection.
195    ///         scope.new_collection_from(0 .. 10).1
196    ///              .arrange_by_self()
197    ///              .trace
198    ///     });
199    ///
200    ///     // do some work.
201    ///     worker.step();
202    ///     worker.step();
203    ///
204    ///     // create a second dataflow
205    ///     worker.dataflow(move |scope| {
206    ///         trace.import(scope)
207    ///              .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>(
208    ///                  "Reduce",
209    ///                  |_key, src, dst| dst.push((*src[0].0, 1)),
210    ///                  |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
211    ///              )
212    ///              .as_collection(|k,v| (k.clone(), v.clone()));
213    ///     });
214    ///
215    /// }).unwrap();
216    /// ```
217    pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent<Tr>>
218    {
219        self.import_named(scope, "ArrangedSource")
220    }
221
222    /// Same as `import`, but allows to name the source.
223    pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
224    {
225        // Drop ShutdownButton and return only the arrangement.
226        self.import_core(scope, name).0
227    }
228
229    /// Imports an arrangement into the supplied scope.
230    ///
231    /// # Examples
232    ///
233    /// ```
234    /// use timely::Config;
235    /// use timely::dataflow::ProbeHandle;
236    /// use timely::dataflow::operators::Probe;
237    /// use differential_dataflow::input::InputSession;
238    /// use differential_dataflow::trace::Trace;
239    ///
240    /// ::timely::execute(Config::thread(), |worker| {
241    ///
242    ///     let mut input = InputSession::<_,(),isize>::new();
243    ///     let mut probe = ProbeHandle::new();
244    ///
245    ///     // create a first dataflow
246    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
247    ///         // create input handle and collection.
248    ///         input.to_collection(scope)
249    ///              .arrange_by_self()
250    ///              .trace
251    ///     });
252    ///
253    ///     // do some work.
254    ///     worker.step();
255    ///     worker.step();
256    ///
257    ///     // create a second dataflow
258    ///     let mut shutdown = worker.dataflow(|scope| {
259    ///         let (arrange, button) = trace.import_core(scope, "Import");
260    ///         arrange.stream.probe_with(&mut probe);
261    ///         button
262    ///     });
263    ///
264    ///     worker.step();
265    ///     worker.step();
266    ///     assert!(!probe.done());
267    ///
268    ///     shutdown.press();
269    ///
270    ///     worker.step();
271    ///     worker.step();
272    ///     assert!(probe.done());
273    ///
274    /// }).unwrap();
275    /// ```
276    pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
277    {
278        let trace = self.clone();
279
280        let mut shutdown_button = None;
281
282        let stream = {
283
284            let shutdown_button_ref = &mut shutdown_button;
285            source(scope, name, move |capability, info| {
286
287                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
288
289                let activator = scope.activator_for(Rc::clone(&info.address));
290                let queue = self.new_listener(activator);
291
292                let activator = scope.activator_for(info.address);
293                *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));
294
295                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
296
297                move |output| {
298
299                    let mut capabilities = capabilities.borrow_mut();
300                    if let Some(ref mut capabilities) = *capabilities {
301
302                        let mut borrow = queue.1.borrow_mut();
303                        for instruction in borrow.drain(..) {
304                            match instruction {
305                                TraceReplayInstruction::Frontier(frontier) => {
306                                    capabilities.downgrade(&frontier.borrow()[..]);
307                                },
308                                TraceReplayInstruction::Batch(batch, hint) => {
309                                    if let Some(time) = hint {
310                                        if !batch.is_empty() {
311                                            let delayed = capabilities.delayed(&time);
312                                            output.session(&delayed).give(batch);
313                                        }
314                                    }
315                                }
316                            }
317                        }
318                    }
319                }
320            })
321        };
322
323        (Arranged { stream, trace }, shutdown_button.unwrap())
324    }
325
326    /// Imports an arrangement into the supplied scope.
327    ///
328    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use timely::Config;
334    /// use timely::progress::frontier::AntichainRef;
335    /// use timely::dataflow::ProbeHandle;
336    /// use timely::dataflow::operators::Probe;
337    /// use timely::dataflow::operators::Inspect;
338    /// use differential_dataflow::input::InputSession;
339    /// use differential_dataflow::trace::Trace;
340    /// use differential_dataflow::trace::TraceReader;
341    /// use differential_dataflow::input::Input;
342    ///
343    /// ::timely::execute(Config::thread(), |worker| {
344    ///
345    ///     let mut probe = ProbeHandle::new();
346    ///
347    ///     // create a first dataflow
348    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
349    ///         // create input handle and collection.
350    ///         let (handle, stream) = scope.new_collection();
351    ///         let trace = stream.arrange_by_self().trace;
352    ///         (handle, trace)
353    ///     });
354    ///
355    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
356    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
357    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
358    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
359    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
360    ///
361    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
362    ///
363    ///     // create a second dataflow
364    ///     let mut shutdown = worker.dataflow(|scope| {
365    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
366    ///         arrange
367    ///             .as_collection(|k,v| (*k,*v))
368    ///             .inner
369    ///             .inspect(|(d,t,r)| {
370    ///                 assert!(t >= &5);
371    ///             })
372    ///             .probe_with(&mut probe);
373    ///
374    ///         button
375    ///     });
376    ///
377    ///     worker.step();
378    ///     worker.step();
379    ///     assert!(!probe.done());
380    ///
381    ///     shutdown.press();
382    ///
383    ///     worker.step();
384    ///     worker.step();
385    ///     assert!(probe.done());
386    ///
387    /// }).unwrap();
388    /// ```
389    pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
390    where
391        Tr: TraceReader,
392    {
393        // This frontier describes our only guarantee on the compaction frontier.
394        let since = self.get_logical_compaction().to_owned();
395        self.import_frontier_core(scope, name, since, Antichain::new())
396    }
397
398    /// Import a trace restricted to a specific time interval `[since, until)`.
399    ///
400    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
401    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
402    /// to `until` the operator capability will be dropped.
403    ///
404    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
405    /// frontier indicates the end of times.
406    pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
407    where
408        Tr: TraceReader,
409    {
410        let trace = self.clone();
411        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
412
413        let mut shutdown_button = None;
414
415        let stream = {
416
417            let shutdown_button_ref = &mut shutdown_button;
418            source(scope, name, move |capability, info| {
419
420                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
421
422                let activator = scope.activator_for(Rc::clone(&info.address));
423                let queue = self.new_listener(activator);
424
425                let activator = scope.activator_for(info.address);
426                *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));
427
428                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
429
430                move |output| {
431
432                    let mut capabilities = capabilities.borrow_mut();
433                    if let Some(ref mut capabilities) = *capabilities {
434                        let mut borrow = queue.1.borrow_mut();
435                        for instruction in borrow.drain(..) {
436                            // If we have dropped the capabilities due to `until`, attempt no further work.
437                            // Without the capabilities, we should soon be shut down (once this loop ends).
438                            if !capabilities.is_empty() {
439                                match instruction {
440                                    TraceReplayInstruction::Frontier(frontier) => {
441                                        if timely::PartialOrder::less_equal(&until, &frontier) {
442                                            // It might be nice to actively *drop* `capabilities`, but it seems
443                                            // complicated logically (i.e. we'd have to break out of the loop).
444                                            capabilities.downgrade(&[]);
445                                        } else {
446                                            capabilities.downgrade(&frontier.borrow()[..]);
447                                        }
448                                    },
449                                    TraceReplayInstruction::Batch(batch, hint) => {
450                                        if let Some(time) = hint {
451                                            if !batch.is_empty() {
452                                                let delayed = capabilities.delayed(&time);
453                                                output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
454                                            }
455                                        }
456                                    }
457                                }
458                            }
459                        }
460                    }
461                }
462            })
463        };
464
465        (Arranged { stream, trace }, shutdown_button.unwrap())
466    }
467}
468
469
470
471/// Wrapper than can drop shared references.
472pub struct ShutdownButton<T> {
473    reference: Rc<RefCell<Option<T>>>,
474    activator: Activator,
475}
476
477impl<T> ShutdownButton<T> {
478    /// Creates a new ShutdownButton.
479    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
480        Self { reference, activator }
481    }
482    /// Push the shutdown button, dropping the shared objects.
483    pub fn press(&mut self) {
484        *self.reference.borrow_mut() = None;
485        self.activator.activate();
486    }
487}
488
489impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
490    fn clone(&self) -> Self {
491
492        if let Some(logging) = &self.logging {
493            logging.log(
494                crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
495            );
496        }
497
498        // increase counts for wrapped `TraceBox`.
499        let empty_frontier = Antichain::new();
500        self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
501        self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
502
503        TraceAgent {
504            trace: Rc::clone(&self.trace),
505            queues: Weak::clone(&self.queues),
506            logical_compaction: self.logical_compaction.clone(),
507            physical_compaction: self.physical_compaction.clone(),
508            operator: self.operator.clone(),
509            logging: self.logging.clone(),
510            temp_antichain: Antichain::new(),
511        }
512    }
513}
514
515impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
516    fn drop(&mut self) {
517
518        if let Some(logging) = &self.logging {
519            logging.log(
520                crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
521            );
522        }
523
524        // decrement borrow counts to remove all holds
525        let empty_frontier = Antichain::new();
526        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
527        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
528    }
529}
530
531/// A trace wrapper suitable for use through shared reference counted ownership.
532///
533/// The wrapper mainly accumulates the expressed compaction constraints from many,
534/// and presents their implications to the wrapped trace.
535pub mod trace_box {
536
537    use timely::progress::{frontier::{AntichainRef, MutableAntichain}};
538
539    use crate::trace::TraceReader;
540
541    /// A wrapper around a trace which tracks the frontiers of all referees.
542    ///
543    /// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
544    /// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
545    /// may influence.
546    pub struct TraceBox<Tr: TraceReader> {
547        /// accumulated holds on times for advancement.
548        pub (crate) logical_compaction: MutableAntichain<Tr::Time>,
549        /// accumulated holds on times for distinction.
550        pub (crate) physical_compaction: MutableAntichain<Tr::Time>,
551        /// The wrapped trace.
552        pub (crate) trace: Tr,
553    }
554
555    impl<Tr: TraceReader> TraceBox<Tr> {
556        /// Moves an existing trace into a shareable trace wrapper.
557        ///
558        /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
559        /// process will fish these out and make sure that they are used for the initial read capabilities.
560        pub fn new(mut trace: Tr) -> Self {
561
562            let mut logical_compaction = MutableAntichain::new();
563            logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
564            let mut physical_compaction = MutableAntichain::new();
565            physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
566
567            TraceBox {
568                logical_compaction,
569                physical_compaction,
570                trace,
571            }
572        }
573        /// Borrowed access to the underlying trace.
574        ///
575        /// This is used to inspect batches for purposes of resource accounting in external systems.
576        pub fn trace(&self) -> &Tr { &self.trace }
577        /// Replaces elements of `lower` with those of `upper`.
578        #[inline]
579        pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
580            self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
581            self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
582            self.trace.set_logical_compaction(self.logical_compaction.frontier());
583        }
584        /// Replaces elements of `lower` with those of `upper`.
585        #[inline]
586        pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
587            self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
588            self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
589            self.trace.set_physical_compaction(self.physical_compaction.frontier());
590        }
591    }
592
593}