differential_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, Batch, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr>
29where
30    Tr: TraceReader,
31{
32    trace: Rc<RefCell<TraceBox<Tr>>>,
33    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
34    logical_compaction: Antichain<Tr::Time>,
35    physical_compaction: Antichain<Tr::Time>,
36    temp_antichain: Antichain<Tr::Time>,
37
38    operator: OperatorInfo,
39    logging: Option<crate::logging::Logger>,
40}
41
42impl<Tr> TraceReader for TraceAgent<Tr>
43where
44    Tr: TraceReader,
45{
46    type Key<'a> = Tr::Key<'a>;
47    type Val<'a> = Tr::Val<'a>;
48    type Time = Tr::Time;
49    type TimeGat<'a> = Tr::TimeGat<'a>;
50    type Diff = Tr::Diff;
51    type DiffGat<'a> = Tr::DiffGat<'a>;
52
53    type Batch = Tr::Batch;
54    type Storage = Tr::Storage;
55    type Cursor = Tr::Cursor;
56
57    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
58        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
59        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
60        crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
61        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
62        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
63        self.temp_antichain.clear();
64    }
65    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
66        self.logical_compaction.borrow()
67    }
68    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
69        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
70        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
71        crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
72        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
73        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
74        self.temp_antichain.clear();
75    }
76    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
77        self.physical_compaction.borrow()
78    }
79    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
80        self.trace.borrow_mut().trace.cursor_through(frontier)
81    }
82    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
83}
84
85impl<Tr: TraceReader> TraceAgent<Tr> {
86    /// Creates a new agent from a trace reader.
87    pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
88    where
89        Tr: Trace,
90        Tr::Batch: Batch,
91    {
92        let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
93        let queues = Rc::new(RefCell::new(Vec::new()));
94
95        if let Some(logging) = &logging {
96            logging.log(
97                crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
98            );
99        }
100
101        let reader = TraceAgent {
102            trace: trace.clone(),
103            queues: Rc::downgrade(&queues),
104            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
105            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
106            temp_antichain: Antichain::new(),
107            operator,
108            logging,
109        };
110
111        let writer = TraceWriter::new(
112            vec![<Tr::Time as Timestamp>::minimum()],
113            Rc::downgrade(&trace),
114            queues,
115        );
116
117        (reader, writer)
118    }
119
120    /// Attaches a new shared queue to the trace.
121    ///
122    /// The queue is first populated with existing batches from the trace,
123    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
124    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
125    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
126    {
127        // create a new queue for progress and batch information.
128        let mut new_queue = VecDeque::new();
129
130        // add the existing batches from the trace
131        let mut upper = None;
132        self.trace
133            .borrow_mut()
134            .trace
135            .map_batches(|batch| {
136                new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
137                upper = Some(batch.upper().clone());
138            });
139
140        if let Some(upper) = upper {
141            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
142        }
143
144        let reference = Rc::new((activator, RefCell::new(new_queue)));
145
146        // wraps the queue in a ref-counted ref cell and enqueue/return it.
147        if let Some(queue) = self.queues.upgrade() {
148            queue.borrow_mut().push(Rc::downgrade(&reference));
149        }
150        reference.0.activate();
151        reference
152    }
153
154    /// The [OperatorInfo] of the underlying Timely operator
155    pub fn operator(&self) -> &OperatorInfo {
156        &self.operator
157    }
158
159    /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
160    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
161    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
162    ///
163    /// This method is subject to changes and removal and should not be considered part of a stable
164    /// interface.
165    pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
166        Rc::clone(&self.trace)
167    }
168}
169
170impl<Tr> TraceAgent<Tr>
171where
172    Tr: TraceReader+'static,
173{
174    /// Copies an existing collection into the supplied scope.
175    ///
176    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
177    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
178    /// of the collection is its current state, and updates occur from this point forward. The historical changes
179    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
180    /// are no longer evident.
181    ///
182    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
183    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
184    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
185    /// the historical collection may move through configurations that did not actually occur, even if eventually
186    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
187    /// the intermediate computation could do something that the original computation did not, like diverge.
188    ///
189    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
190    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
191    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
192    /// to advance times before using them).
193    ///
194    /// # Examples
195    ///
196    /// ```
197    /// use timely::Config;
198    /// use differential_dataflow::input::Input;
199    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
200    /// use differential_dataflow::operators::reduce::Reduce;
201    /// use differential_dataflow::trace::Trace;
202    ///
203    /// ::timely::execute(Config::thread(), |worker| {
204    ///
205    ///     // create a first dataflow
206    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
207    ///         // create input handle and collection.
208    ///         scope.new_collection_from(0 .. 10).1
209    ///              .arrange_by_self()
210    ///              .trace
211    ///     });
212    ///
213    ///     // do some work.
214    ///     worker.step();
215    ///     worker.step();
216    ///
217    ///     // create a second dataflow
218    ///     worker.dataflow(move |scope| {
219    ///         trace.import(scope)
220    ///              .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
221    ///     });
222    ///
223    /// }).unwrap();
224    /// ```
225    pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
226    where
227        G: Scope<Timestamp=Tr::Time>,
228    {
229        self.import_named(scope, "ArrangedSource")
230    }
231
232    /// Same as `import`, but allows to name the source.
233    pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
234    where
235        G: Scope<Timestamp=Tr::Time>,
236    {
237        // Drop ShutdownButton and return only the arrangement.
238        self.import_core(scope, name).0
239    }
240
241    /// Imports an arrangement into the supplied scope.
242    ///
243    /// # Examples
244    ///
245    /// ```
246    /// use timely::Config;
247    /// use timely::dataflow::ProbeHandle;
248    /// use timely::dataflow::operators::Probe;
249    /// use differential_dataflow::input::InputSession;
250    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
251    /// use differential_dataflow::operators::reduce::Reduce;
252    /// use differential_dataflow::trace::Trace;
253    ///
254    /// ::timely::execute(Config::thread(), |worker| {
255    ///
256    ///     let mut input = InputSession::<_,(),isize>::new();
257    ///     let mut probe = ProbeHandle::new();
258    ///
259    ///     // create a first dataflow
260    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
261    ///         // create input handle and collection.
262    ///         input.to_collection(scope)
263    ///              .arrange_by_self()
264    ///              .trace
265    ///     });
266    ///
267    ///     // do some work.
268    ///     worker.step();
269    ///     worker.step();
270    ///
271    ///     // create a second dataflow
272    ///     let mut shutdown = worker.dataflow(|scope| {
273    ///         let (arrange, button) = trace.import_core(scope, "Import");
274    ///         arrange.stream.probe_with(&mut probe);
275    ///         button
276    ///     });
277    ///
278    ///     worker.step();
279    ///     worker.step();
280    ///     assert!(!probe.done());
281    ///
282    ///     shutdown.press();
283    ///
284    ///     worker.step();
285    ///     worker.step();
286    ///     assert!(probe.done());
287    ///
288    /// }).unwrap();
289    /// ```
290    pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
291    where
292        G: Scope<Timestamp=Tr::Time>,
293    {
294        let trace = self.clone();
295
296        let mut shutdown_button = None;
297
298        let stream = {
299
300            let shutdown_button_ref = &mut shutdown_button;
301            source(scope, name, move |capability, info| {
302
303                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
304
305                let activator = scope.activator_for(Rc::clone(&info.address));
306                let queue = self.new_listener(activator);
307
308                let activator = scope.activator_for(info.address);
309                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
310
311                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
312
313                move |output| {
314
315                    let mut capabilities = capabilities.borrow_mut();
316                    if let Some(ref mut capabilities) = *capabilities {
317
318                        let mut borrow = queue.1.borrow_mut();
319                        for instruction in borrow.drain(..) {
320                            match instruction {
321                                TraceReplayInstruction::Frontier(frontier) => {
322                                    capabilities.downgrade(&frontier.borrow()[..]);
323                                },
324                                TraceReplayInstruction::Batch(batch, hint) => {
325                                    if let Some(time) = hint {
326                                        if !batch.is_empty() {
327                                            let delayed = capabilities.delayed(&time);
328                                            output.session(&delayed).give(batch);
329                                        }
330                                    }
331                                }
332                            }
333                        }
334                    }
335                }
336            })
337        };
338
339        (Arranged { stream, trace }, shutdown_button.unwrap())
340    }
341
342    /// Imports an arrangement into the supplied scope.
343    ///
344    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
345    ///
346    /// # Examples
347    ///
348    /// ```
349    /// use timely::Config;
350    /// use timely::progress::frontier::AntichainRef;
351    /// use timely::dataflow::ProbeHandle;
352    /// use timely::dataflow::operators::Probe;
353    /// use timely::dataflow::operators::Inspect;
354    /// use differential_dataflow::input::InputSession;
355    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
356    /// use differential_dataflow::operators::reduce::Reduce;
357    /// use differential_dataflow::trace::Trace;
358    /// use differential_dataflow::trace::TraceReader;
359    /// use differential_dataflow::input::Input;
360    ///
361    /// ::timely::execute(Config::thread(), |worker| {
362    ///
363    ///     let mut probe = ProbeHandle::new();
364    ///
365    ///     // create a first dataflow
366    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
367    ///         // create input handle and collection.
368    ///         let (handle, stream) = scope.new_collection();
369    ///         let trace = stream.arrange_by_self().trace;
370    ///         (handle, trace)
371    ///     });
372    ///
373    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
374    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
375    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
376    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
377    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
378    ///
379    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
380    ///
381    ///     // create a second dataflow
382    ///     let mut shutdown = worker.dataflow(|scope| {
383    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
384    ///         arrange
385    ///             .as_collection(|k,v| (*k,*v))
386    ///             .inner
387    ///             .inspect(|(d,t,r)| {
388    ///                 assert!(t >= &5);
389    ///             })
390    ///             .probe_with(&mut probe);
391    ///
392    ///         button
393    ///     });
394    ///
395    ///     worker.step();
396    ///     worker.step();
397    ///     assert!(!probe.done());
398    ///
399    ///     shutdown.press();
400    ///
401    ///     worker.step();
402    ///     worker.step();
403    ///     assert!(probe.done());
404    ///
405    /// }).unwrap();
406    /// ```
407    pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
408    where
409        G: Scope<Timestamp=Tr::Time>,
410        Tr: TraceReader,
411    {
412        // This frontier describes our only guarantee on the compaction frontier.
413        let since = self.get_logical_compaction().to_owned();
414        self.import_frontier_core(scope, name, since, Antichain::new())
415    }
416
417    /// Import a trace restricted to a specific time interval `[since, until)`.
418    ///
419    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
420    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
421    /// to `until` the operator capability will be dropped.
422    ///
423    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
424    /// frontier indicates the end of times.
425    pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
426    where
427        G: Scope<Timestamp=Tr::Time>,
428        Tr: TraceReader,
429    {
430        let trace = self.clone();
431        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
432
433        let mut shutdown_button = None;
434
435        let stream = {
436
437            let shutdown_button_ref = &mut shutdown_button;
438            source(scope, name, move |capability, info| {
439
440                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
441
442                let activator = scope.activator_for(Rc::clone(&info.address));
443                let queue = self.new_listener(activator);
444
445                let activator = scope.activator_for(info.address);
446                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
447
448                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
449
450                move |output| {
451
452                    let mut capabilities = capabilities.borrow_mut();
453                    if let Some(ref mut capabilities) = *capabilities {
454                        let mut borrow = queue.1.borrow_mut();
455                        for instruction in borrow.drain(..) {
456                            // If we have dropped the capabilities due to `until`, attempt no further work.
457                            // Without the capabilities, we should soon be shut down (once this loop ends).
458                            if !capabilities.is_empty() {
459                                match instruction {
460                                    TraceReplayInstruction::Frontier(frontier) => {
461                                        if timely::PartialOrder::less_equal(&until, &frontier) {
462                                            // It might be nice to actively *drop* `capabilities`, but it seems
463                                            // complicated logically (i.e. we'd have to break out of the loop).
464                                            capabilities.downgrade(&[]);
465                                        } else {
466                                            capabilities.downgrade(&frontier.borrow()[..]);
467                                        }
468                                    },
469                                    TraceReplayInstruction::Batch(batch, hint) => {
470                                        if let Some(time) = hint {
471                                            if !batch.is_empty() {
472                                                let delayed = capabilities.delayed(&time);
473                                                output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
474                                            }
475                                        }
476                                    }
477                                }
478                            }
479                        }
480                    }
481                }
482            })
483        };
484
485        (Arranged { stream, trace }, shutdown_button.unwrap())
486    }
487}
488
489
490
491/// Wrapper than can drop shared references.
492pub struct ShutdownButton<T> {
493    reference: Rc<RefCell<Option<T>>>,
494    activator: Activator,
495}
496
497impl<T> ShutdownButton<T> {
498    /// Creates a new ShutdownButton.
499    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
500        Self { reference, activator }
501    }
502    /// Push the shutdown button, dropping the shared objects.
503    pub fn press(&mut self) {
504        *self.reference.borrow_mut() = None;
505        self.activator.activate();
506    }
507    /// Hotwires the button to one that is pressed if dropped.
508    pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
509        ShutdownDeadmans {
510            button: self
511        }
512    }
513}
514
515/// A deadman's switch version of a shutdown button.
516///
517/// This type hosts a shutdown button and will press it when dropped.
518pub struct ShutdownDeadmans<T> {
519    button: ShutdownButton<T>,
520}
521
522impl<T> Drop for ShutdownDeadmans<T> {
523    fn drop(&mut self) {
524        self.button.press();
525    }
526}
527
528impl<Tr> Clone for TraceAgent<Tr>
529where
530    Tr: TraceReader,
531{
532    fn clone(&self) -> Self {
533
534        if let Some(logging) = &self.logging {
535            logging.log(
536                crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
537            );
538        }
539
540        // increase counts for wrapped `TraceBox`.
541        let empty_frontier = Antichain::new();
542        self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
543        self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
544
545        TraceAgent {
546            trace: self.trace.clone(),
547            queues: self.queues.clone(),
548            logical_compaction: self.logical_compaction.clone(),
549            physical_compaction: self.physical_compaction.clone(),
550            operator: self.operator.clone(),
551            logging: self.logging.clone(),
552            temp_antichain: Antichain::new(),
553        }
554    }
555}
556
557impl<Tr> Drop for TraceAgent<Tr>
558where
559    Tr: TraceReader,
560{
561    fn drop(&mut self) {
562
563        if let Some(logging) = &self.logging {
564            logging.log(
565                crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
566            );
567        }
568
569        // decrement borrow counts to remove all holds
570        let empty_frontier = Antichain::new();
571        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
572        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
573    }
574}