differential_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::lattice::Lattice;
14use crate::trace::{Trace, TraceReader, Batch, BatchReader};
15use crate::trace::wrappers::rc::TraceBox;
16
17use timely::scheduling::Activator;
18
19use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
20use super::TraceReplayInstruction;
21
22use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
23
24
25/// A `TraceReader` wrapper which can be imported into other dataflows.
26///
27/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
28/// from the dataflow in which it was defined, and imported into other dataflows.
29pub struct TraceAgent<Tr>
30where
31    Tr: TraceReader,
32    Tr::Time: Lattice+Ord+Clone+'static,
33{
34    trace: Rc<RefCell<TraceBox<Tr>>>,
35    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
36    logical_compaction: Antichain<Tr::Time>,
37    physical_compaction: Antichain<Tr::Time>,
38    temp_antichain: Antichain<Tr::Time>,
39
40    operator: OperatorInfo,
41    logging: Option<crate::logging::Logger>,
42}
43
44impl<Tr> TraceReader for TraceAgent<Tr>
45where
46    Tr: TraceReader,
47    Tr::Time: Lattice+Ord+Clone+'static,
48{
49    type Key<'a> = Tr::Key<'a>;
50    type KeyOwned = Tr::KeyOwned;
51    type Val<'a> = Tr::Val<'a>;
52    type ValOwned = Tr::ValOwned;
53    type Time = Tr::Time;
54    type Diff = Tr::Diff;
55
56    type Batch = Tr::Batch;
57    type Storage = Tr::Storage;
58    type Cursor = Tr::Cursor;
59
60    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
61        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
62        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
63        crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
64        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
65        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
66        self.temp_antichain.clear();
67    }
68    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
69        self.logical_compaction.borrow()
70    }
71    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
72        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
73        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
74        crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
75        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
76        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
77        self.temp_antichain.clear();
78    }
79    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
80        self.physical_compaction.borrow()
81    }
82    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
83        self.trace.borrow_mut().trace.cursor_through(frontier)
84    }
85    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
86}
87
88impl<Tr> TraceAgent<Tr>
89where
90    Tr: TraceReader,
91    Tr::Time: Timestamp+Lattice,
92{
93    /// Creates a new agent from a trace reader.
94    pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
95    where
96        Tr: Trace,
97        Tr::Batch: Batch,
98    {
99        let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
100        let queues = Rc::new(RefCell::new(Vec::new()));
101
102        if let Some(logging) = &logging {
103            logging.log(
104                crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
105            );
106        }
107
108        let reader = TraceAgent {
109            trace: trace.clone(),
110            queues: Rc::downgrade(&queues),
111            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
112            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
113            temp_antichain: Antichain::new(),
114            operator,
115            logging,
116        };
117
118        let writer = TraceWriter::new(
119            vec![<Tr::Time as Timestamp>::minimum()],
120            Rc::downgrade(&trace),
121            queues,
122        );
123
124        (reader, writer)
125    }
126
127    /// Attaches a new shared queue to the trace.
128    ///
129    /// The queue is first populated with existing batches from the trace,
130    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
131    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
132    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
133    {
134        // create a new queue for progress and batch information.
135        let mut new_queue = VecDeque::new();
136
137        // add the existing batches from the trace
138        let mut upper = None;
139        self.trace
140            .borrow_mut()
141            .trace
142            .map_batches(|batch| {
143                new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
144                upper = Some(batch.upper().clone());
145            });
146
147        if let Some(upper) = upper {
148            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
149        }
150
151        let reference = Rc::new((activator, RefCell::new(new_queue)));
152
153        // wraps the queue in a ref-counted ref cell and enqueue/return it.
154        if let Some(queue) = self.queues.upgrade() {
155            queue.borrow_mut().push(Rc::downgrade(&reference));
156        }
157        reference.0.activate();
158        reference
159    }
160
161    /// The [OperatorInfo] of the underlying Timely operator
162    pub fn operator(&self) -> &OperatorInfo {
163        &self.operator
164    }
165
166    /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
167    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
168    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
169    ///
170    /// This method is subject to changes and removal and should not be considered part of a stable
171    /// interface.
172    pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
173        Rc::clone(&self.trace)
174    }
175}
176
177impl<Tr> TraceAgent<Tr>
178where
179    Tr: TraceReader+'static,
180    Tr::Time: Lattice+Ord+Clone+'static,
181{
182    /// Copies an existing collection into the supplied scope.
183    ///
184    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
185    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
186    /// of the collection is its current state, and updates occur from this point forward. The historical changes
187    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
188    /// are no longer evident.
189    ///
190    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
191    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
192    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
193    /// the historical collection may move through configurations that did not actually occur, even if eventually
194    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
195    /// the intermediate computation could do something that the original computation did not, like diverge.
196    ///
197    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
198    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
199    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
200    /// to advance times before using them).
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// use timely::Config;
206    /// use differential_dataflow::input::Input;
207    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
208    /// use differential_dataflow::operators::reduce::Reduce;
209    /// use differential_dataflow::trace::Trace;
210    ///
211    /// ::timely::execute(Config::thread(), |worker| {
212    ///
213    ///     // create a first dataflow
214    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
215    ///         // create input handle and collection.
216    ///         scope.new_collection_from(0 .. 10).1
217    ///              .arrange_by_self()
218    ///              .trace
219    ///     });
220    ///
221    ///     // do some work.
222    ///     worker.step();
223    ///     worker.step();
224    ///
225    ///     // create a second dataflow
226    ///     worker.dataflow(move |scope| {
227    ///         trace.import(scope)
228    ///              .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
229    ///     });
230    ///
231    /// }).unwrap();
232    /// ```
233    pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
234    where
235        G: Scope<Timestamp=Tr::Time>,
236        Tr::Time: Timestamp,
237    {
238        self.import_named(scope, "ArrangedSource")
239    }
240
241    /// Same as `import`, but allows to name the source.
242    pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
243    where
244        G: Scope<Timestamp=Tr::Time>,
245        Tr::Time: Timestamp,
246    {
247        // Drop ShutdownButton and return only the arrangement.
248        self.import_core(scope, name).0
249    }
250
251    /// Imports an arrangement into the supplied scope.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// use timely::Config;
257    /// use timely::dataflow::ProbeHandle;
258    /// use timely::dataflow::operators::Probe;
259    /// use differential_dataflow::input::InputSession;
260    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
261    /// use differential_dataflow::operators::reduce::Reduce;
262    /// use differential_dataflow::trace::Trace;
263    ///
264    /// ::timely::execute(Config::thread(), |worker| {
265    ///
266    ///     let mut input = InputSession::<_,(),isize>::new();
267    ///     let mut probe = ProbeHandle::new();
268    ///
269    ///     // create a first dataflow
270    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
271    ///         // create input handle and collection.
272    ///         input.to_collection(scope)
273    ///              .arrange_by_self()
274    ///              .trace
275    ///     });
276    ///
277    ///     // do some work.
278    ///     worker.step();
279    ///     worker.step();
280    ///
281    ///     // create a second dataflow
282    ///     let mut shutdown = worker.dataflow(|scope| {
283    ///         let (arrange, button) = trace.import_core(scope, "Import");
284    ///         arrange.stream.probe_with(&mut probe);
285    ///         button
286    ///     });
287    ///
288    ///     worker.step();
289    ///     worker.step();
290    ///     assert!(!probe.done());
291    ///
292    ///     shutdown.press();
293    ///
294    ///     worker.step();
295    ///     worker.step();
296    ///     assert!(probe.done());
297    ///
298    /// }).unwrap();
299    /// ```
300    pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
301    where
302        G: Scope<Timestamp=Tr::Time>,
303        Tr::Time: Timestamp,
304    {
305        let trace = self.clone();
306
307        let mut shutdown_button = None;
308
309        let stream = {
310
311            let shutdown_button_ref = &mut shutdown_button;
312            source(scope, name, move |capability, info| {
313
314                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
315
316                let activator = scope.activator_for(&info.address[..]);
317                let queue = self.new_listener(activator);
318
319                let activator = scope.activator_for(&info.address[..]);
320                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
321
322                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
323
324                move |output| {
325
326                    let mut capabilities = capabilities.borrow_mut();
327                    if let Some(ref mut capabilities) = *capabilities {
328
329                        let mut borrow = queue.1.borrow_mut();
330                        for instruction in borrow.drain(..) {
331                            match instruction {
332                                TraceReplayInstruction::Frontier(frontier) => {
333                                    capabilities.downgrade(&frontier.borrow()[..]);
334                                },
335                                TraceReplayInstruction::Batch(batch, hint) => {
336                                    if let Some(time) = hint {
337                                        if !batch.is_empty() {
338                                            let delayed = capabilities.delayed(&time);
339                                            output.session(&delayed).give(batch);
340                                        }
341                                    }
342                                }
343                            }
344                        }
345                    }
346                }
347            })
348        };
349
350        (Arranged { stream, trace }, shutdown_button.unwrap())
351    }
352
353    /// Imports an arrangement into the supplied scope.
354    ///
355    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// use timely::Config;
361    /// use timely::progress::frontier::AntichainRef;
362    /// use timely::dataflow::ProbeHandle;
363    /// use timely::dataflow::operators::Probe;
364    /// use timely::dataflow::operators::Inspect;
365    /// use differential_dataflow::input::InputSession;
366    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
367    /// use differential_dataflow::operators::reduce::Reduce;
368    /// use differential_dataflow::trace::Trace;
369    /// use differential_dataflow::trace::TraceReader;
370    /// use differential_dataflow::input::Input;
371    ///
372    /// ::timely::execute(Config::thread(), |worker| {
373    ///
374    ///     let mut probe = ProbeHandle::new();
375    ///
376    ///     // create a first dataflow
377    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
378    ///         // create input handle and collection.
379    ///         let (handle, stream) = scope.new_collection();
380    ///         let trace = stream.arrange_by_self().trace;
381    ///         (handle, trace)
382    ///     });
383    ///
384    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
385    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
386    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
387    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
388    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
389    ///
390    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
391    ///
392    ///     // create a second dataflow
393    ///     let mut shutdown = worker.dataflow(|scope| {
394    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
395    ///         arrange
396    ///             .as_collection(|k,v| (*k,*v))
397    ///             .inner
398    ///             .inspect(|(d,t,r)| {
399    ///                 assert!(t >= &5);
400    ///             })
401    ///             .probe_with(&mut probe);
402    ///
403    ///         button
404    ///     });
405    ///
406    ///     worker.step();
407    ///     worker.step();
408    ///     assert!(!probe.done());
409    ///
410    ///     shutdown.press();
411    ///
412    ///     worker.step();
413    ///     worker.step();
414    ///     assert!(probe.done());
415    ///
416    /// }).unwrap();
417    /// ```
418    pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
419    where
420        G: Scope<Timestamp=Tr::Time>,
421        Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
422        Tr: TraceReader,
423    {
424        // This frontier describes our only guarantee on the compaction frontier.
425        let since = self.get_logical_compaction().to_owned();
426        self.import_frontier_core(scope, name, since, Antichain::new())
427    }
428
429    /// Import a trace restricted to a specific time interval `[since, until)`.
430    ///
431    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
432    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
433    /// to `until` the operator capability will be dropped.
434    ///
435    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
436    /// frontier indicates the end of times.
437    pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
438    where
439        G: Scope<Timestamp=Tr::Time>,
440        Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
441        Tr: TraceReader,
442    {
443        let trace = self.clone();
444        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
445
446        let mut shutdown_button = None;
447
448        let stream = {
449
450            let shutdown_button_ref = &mut shutdown_button;
451            source(scope, name, move |capability, info| {
452
453                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
454
455                let activator = scope.activator_for(&info.address[..]);
456                let queue = self.new_listener(activator);
457
458                let activator = scope.activator_for(&info.address[..]);
459                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
460
461                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
462
463                move |output| {
464
465                    let mut capabilities = capabilities.borrow_mut();
466                    if let Some(ref mut capabilities) = *capabilities {
467                        let mut borrow = queue.1.borrow_mut();
468                        for instruction in borrow.drain(..) {
469                            // If we have dropped the capabilities due to `until`, attempt no further work.
470                            // Without the capabilities, we should soon be shut down (once this loop ends).
471                            if !capabilities.is_empty() {
472                                match instruction {
473                                    TraceReplayInstruction::Frontier(frontier) => {
474                                        if timely::PartialOrder::less_equal(&until, &frontier) {
475                                            // It might be nice to actively *drop* `capabilities`, but it seems
476                                            // complicated logically (i.e. we'd have to break out of the loop).
477                                            capabilities.downgrade(&[]);
478                                        } else {
479                                            capabilities.downgrade(&frontier.borrow()[..]);
480                                        }
481                                    },
482                                    TraceReplayInstruction::Batch(batch, hint) => {
483                                        if let Some(time) = hint {
484                                            if !batch.is_empty() {
485                                                let delayed = capabilities.delayed(&time);
486                                                output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
487                                            }
488                                        }
489                                    }
490                                }
491                            }
492                        }
493                    }
494                }
495            })
496        };
497
498        (Arranged { stream, trace }, shutdown_button.unwrap())
499    }
500}
501
502
503
504/// Wrapper than can drop shared references.
505pub struct ShutdownButton<T> {
506    reference: Rc<RefCell<Option<T>>>,
507    activator: Activator,
508}
509
510impl<T> ShutdownButton<T> {
511    /// Creates a new ShutdownButton.
512    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
513        Self { reference, activator }
514    }
515    /// Push the shutdown button, dropping the shared objects.
516    pub fn press(&mut self) {
517        *self.reference.borrow_mut() = None;
518        self.activator.activate();
519    }
520    /// Hotwires the button to one that is pressed if dropped.
521    pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
522        ShutdownDeadmans {
523            button: self
524        }
525    }
526}
527
528/// A deadman's switch version of a shutdown button.
529///
530/// This type hosts a shutdown button and will press it when dropped.
531pub struct ShutdownDeadmans<T> {
532    button: ShutdownButton<T>,
533}
534
535impl<T> Drop for ShutdownDeadmans<T> {
536    fn drop(&mut self) {
537        self.button.press();
538    }
539}
540
541impl<Tr> Clone for TraceAgent<Tr>
542where
543    Tr: TraceReader,
544    Tr::Time: Lattice+Ord+Clone+'static,
545{
546    fn clone(&self) -> Self {
547
548        if let Some(logging) = &self.logging {
549            logging.log(
550                crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
551            );
552        }
553
554        // increase counts for wrapped `TraceBox`.
555        let empty_frontier = Antichain::new();
556        self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
557        self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
558
559        TraceAgent {
560            trace: self.trace.clone(),
561            queues: self.queues.clone(),
562            logical_compaction: self.logical_compaction.clone(),
563            physical_compaction: self.physical_compaction.clone(),
564            operator: self.operator.clone(),
565            logging: self.logging.clone(),
566            temp_antichain: Antichain::new(),
567        }
568    }
569}
570
571impl<Tr> Drop for TraceAgent<Tr>
572where
573    Tr: TraceReader,
574    Tr::Time: Lattice+Ord+Clone+'static,
575{
576    fn drop(&mut self) {
577
578        if let Some(logging) = &self.logging {
579            logging.log(
580                crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
581            );
582        }
583
584        // decrement borrow counts to remove all holds
585        let empty_frontier = Antichain::new();
586        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
587        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
588    }
589}