Skip to main content

palimpsest_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::rc::{Rc, Weak};
6
7use timely::dataflow::operators::generic::{source, OperatorInfo};
8use timely::dataflow::operators::CapabilitySet;
9use timely::dataflow::Scope;
10use timely::progress::Timestamp;
11use timely::progress::{frontier::AntichainRef, Antichain};
12
13use crate::trace::wrappers::rc::TraceBox;
14use crate::trace::{BatchReader, Trace, TraceReader};
15
16use timely::scheduling::Activator;
17
18use super::TraceReplayInstruction;
19use super::{Arranged, TraceAgentQueueReader, TraceAgentQueueWriter, TraceWriter};
20
21use crate::trace::wrappers::frontier::{BatchFrontier, TraceFrontier};
22
23/// A `TraceReader` wrapper which can be imported into other dataflows.
24///
25/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
26/// from the dataflow in which it was defined, and imported into other dataflows.
27pub struct TraceAgent<Tr: TraceReader> {
28    trace: Rc<RefCell<TraceBox<Tr>>>,
29    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
30    logical_compaction: Antichain<Tr::Time>,
31    physical_compaction: Antichain<Tr::Time>,
32    temp_antichain: Antichain<Tr::Time>,
33
34    operator: OperatorInfo,
35    logging: Option<crate::logging::Logger>,
36}
37
38use crate::trace::implementations::WithLayout;
39impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
40    type Layout = Tr::Layout;
41}
42
43impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
44    type Batch = Tr::Batch;
45    type Storage = Tr::Storage;
46    type Cursor = Tr::Cursor;
47
48    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
49        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
50        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
51        crate::lattice::antichain_join_into(
52            &self.logical_compaction.borrow()[..],
53            &frontier[..],
54            &mut self.temp_antichain,
55        );
56        self.trace.borrow_mut().adjust_logical_compaction(
57            self.logical_compaction.borrow(),
58            self.temp_antichain.borrow(),
59        );
60        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
61        self.temp_antichain.clear();
62    }
63    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
64        self.logical_compaction.borrow()
65    }
66    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
67        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
68        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
69        crate::lattice::antichain_join_into(
70            &self.physical_compaction.borrow()[..],
71            &frontier[..],
72            &mut self.temp_antichain,
73        );
74        self.trace.borrow_mut().adjust_physical_compaction(
75            self.physical_compaction.borrow(),
76            self.temp_antichain.borrow(),
77        );
78        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
79        self.temp_antichain.clear();
80    }
81    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
82        self.physical_compaction.borrow()
83    }
84    fn cursor_through(
85        &mut self,
86        frontier: AntichainRef<'_, Tr::Time>,
87    ) -> Option<(Self::Cursor, Self::Storage)> {
88        self.trace.borrow_mut().trace.cursor_through(frontier)
89    }
90    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
91        self.trace.borrow().trace.map_batches(f)
92    }
93}
94
95impl<Tr: TraceReader> TraceAgent<Tr> {
96    /// Creates a new agent from a trace reader.
97    pub fn new(
98        trace: Tr,
99        operator: OperatorInfo,
100        logging: Option<crate::logging::Logger>,
101    ) -> (Self, TraceWriter<Tr>)
102    where
103        Tr: Trace,
104    {
105        let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
106        let queues = Rc::new(RefCell::new(Vec::new()));
107
108        if let Some(logging) = &logging {
109            logging.log(crate::logging::TraceShare {
110                operator: operator.global_id,
111                diff: 1,
112            });
113        }
114
115        let reader = TraceAgent {
116            trace: trace.clone(),
117            queues: Rc::downgrade(&queues),
118            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
119            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
120            temp_antichain: Antichain::new(),
121            operator,
122            logging,
123        };
124
125        let writer = TraceWriter::new(
126            vec![<Tr::Time as Timestamp>::minimum()],
127            Rc::downgrade(&trace),
128            queues,
129        );
130
131        (reader, writer)
132    }
133
134    /// Attaches a new shared queue to the trace.
135    ///
136    /// The queue is first populated with existing batches from the trace,
137    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
138    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
139    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr> {
140        // create a new queue for progress and batch information.
141        let mut new_queue = VecDeque::new();
142
143        // add the existing batches from the trace
144        let mut upper = None;
145        self.trace.borrow_mut().trace.map_batches(|batch| {
146            new_queue.push_back(TraceReplayInstruction::Batch(
147                batch.clone(),
148                Some(<Tr::Time as Timestamp>::minimum()),
149            ));
150            upper = Some(batch.upper().clone());
151        });
152
153        if let Some(upper) = upper {
154            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
155        }
156
157        let reference = Rc::new((activator, RefCell::new(new_queue)));
158
159        // wraps the queue in a ref-counted ref cell and enqueue/return it.
160        if let Some(queue) = self.queues.upgrade() {
161            queue.borrow_mut().push(Rc::downgrade(&reference));
162        }
163        reference.0.activate();
164        reference
165    }
166
167    /// The [OperatorInfo] of the underlying Timely operator
168    pub fn operator(&self) -> &OperatorInfo {
169        &self.operator
170    }
171
172    /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
173    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
174    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
175    ///
176    /// This method is subject to changes and removal and should not be considered part of a stable
177    /// interface.
178    pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
179        Rc::clone(&self.trace)
180    }
181}
182
183impl<Tr: TraceReader + 'static> TraceAgent<Tr> {
184    /// Copies an existing collection into the supplied scope.
185    ///
186    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
187    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
188    /// of the collection is its current state, and updates occur from this point forward. The historical changes
189    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
190    /// are no longer evident.
191    ///
192    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
193    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
194    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
195    /// the historical collection may move through configurations that did not actually occur, even if eventually
196    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
197    /// the intermediate computation could do something that the original computation did not, like diverge.
198    ///
199    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
200    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
201    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
202    /// to advance times before using them).
203    ///
204    /// # Examples
205    ///
206    /// ```
207    /// use timely::Config;
208    /// use palimpsest_dataflow::input::Input;
209    /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
210    /// use palimpsest_dataflow::operators::reduce::Reduce;
211    /// use palimpsest_dataflow::trace::Trace;
212    ///
213    /// ::timely::execute(Config::thread(), |worker| {
214    ///
215    ///     // create a first dataflow
216    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
217    ///         // create input handle and collection.
218    ///         scope.new_collection_from(0 .. 10).1
219    ///              .arrange_by_self()
220    ///              .trace
221    ///     });
222    ///
223    ///     // do some work.
224    ///     worker.step();
225    ///     worker.step();
226    ///
227    ///     // create a second dataflow
228    ///     worker.dataflow(move |scope| {
229    ///         trace.import(scope)
230    ///              .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
231    ///     });
232    ///
233    /// }).unwrap();
234    /// ```
235    pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
236    where
237        G: Scope<Timestamp = Tr::Time>,
238    {
239        self.import_named(scope, "ArrangedSource")
240    }
241
242    /// Same as `import`, but allows to name the source.
243    pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
244    where
245        G: Scope<Timestamp = Tr::Time>,
246    {
247        // Drop ShutdownButton and return only the arrangement.
248        self.import_core(scope, name).0
249    }
250
251    /// Imports an arrangement into the supplied scope.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// use timely::Config;
257    /// use timely::dataflow::ProbeHandle;
258    /// use timely::dataflow::operators::Probe;
259    /// use palimpsest_dataflow::input::InputSession;
260    /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
261    /// use palimpsest_dataflow::operators::reduce::Reduce;
262    /// use palimpsest_dataflow::trace::Trace;
263    ///
264    /// ::timely::execute(Config::thread(), |worker| {
265    ///
266    ///     let mut input = InputSession::<_,(),isize>::new();
267    ///     let mut probe = ProbeHandle::new();
268    ///
269    ///     // create a first dataflow
270    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
271    ///         // create input handle and collection.
272    ///         input.to_collection(scope)
273    ///              .arrange_by_self()
274    ///              .trace
275    ///     });
276    ///
277    ///     // do some work.
278    ///     worker.step();
279    ///     worker.step();
280    ///
281    ///     // create a second dataflow
282    ///     let mut shutdown = worker.dataflow(|scope| {
283    ///         let (arrange, button) = trace.import_core(scope, "Import");
284    ///         arrange.stream.probe_with(&mut probe);
285    ///         button
286    ///     });
287    ///
288    ///     worker.step();
289    ///     worker.step();
290    ///     assert!(!probe.done());
291    ///
292    ///     shutdown.press();
293    ///
294    ///     worker.step();
295    ///     worker.step();
296    ///     assert!(probe.done());
297    ///
298    /// }).unwrap();
299    /// ```
300    pub fn import_core<G>(
301        &mut self,
302        scope: &G,
303        name: &str,
304    ) -> (
305        Arranged<G, TraceAgent<Tr>>,
306        ShutdownButton<CapabilitySet<Tr::Time>>,
307    )
308    where
309        G: Scope<Timestamp = Tr::Time>,
310    {
311        let trace = self.clone();
312
313        let mut shutdown_button = None;
314
315        let stream = {
316            let shutdown_button_ref = &mut shutdown_button;
317            source(scope, name, move |capability, info| {
318                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
319
320                let activator = scope.activator_for(Rc::clone(&info.address));
321                let queue = self.new_listener(activator);
322
323                let activator = scope.activator_for(info.address);
324                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
325
326                capabilities
327                    .borrow_mut()
328                    .as_mut()
329                    .unwrap()
330                    .insert(capability);
331
332                move |output| {
333                    let mut capabilities = capabilities.borrow_mut();
334                    if let Some(ref mut capabilities) = *capabilities {
335                        let mut borrow = queue.1.borrow_mut();
336                        for instruction in borrow.drain(..) {
337                            match instruction {
338                                TraceReplayInstruction::Frontier(frontier) => {
339                                    capabilities.downgrade(&frontier.borrow()[..]);
340                                }
341                                TraceReplayInstruction::Batch(batch, hint) => {
342                                    if let Some(time) = hint {
343                                        if !batch.is_empty() {
344                                            let delayed = capabilities.delayed(&time);
345                                            output.session(&delayed).give(batch);
346                                        }
347                                    }
348                                }
349                            }
350                        }
351                    }
352                }
353            })
354        };
355
356        (Arranged { stream, trace }, shutdown_button.unwrap())
357    }
358
359    /// Imports an arrangement into the supplied scope.
360    ///
361    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
362    ///
363    /// # Examples
364    ///
365    /// ```
366    /// use timely::Config;
367    /// use timely::progress::frontier::AntichainRef;
368    /// use timely::dataflow::ProbeHandle;
369    /// use timely::dataflow::operators::Probe;
370    /// use timely::dataflow::operators::Inspect;
371    /// use palimpsest_dataflow::input::InputSession;
372    /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
373    /// use palimpsest_dataflow::operators::reduce::Reduce;
374    /// use palimpsest_dataflow::trace::Trace;
375    /// use palimpsest_dataflow::trace::TraceReader;
376    /// use palimpsest_dataflow::input::Input;
377    ///
378    /// ::timely::execute(Config::thread(), |worker| {
379    ///
380    ///     let mut probe = ProbeHandle::new();
381    ///
382    ///     // create a first dataflow
383    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
384    ///         // create input handle and collection.
385    ///         let (handle, stream) = scope.new_collection();
386    ///         let trace = stream.arrange_by_self().trace;
387    ///         (handle, trace)
388    ///     });
389    ///
390    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
391    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
392    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
393    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
394    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
395    ///
396    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
397    ///
398    ///     // create a second dataflow
399    ///     let mut shutdown = worker.dataflow(|scope| {
400    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
401    ///         arrange
402    ///             .as_collection(|k,v| (*k,*v))
403    ///             .inner
404    ///             .inspect(|(d,t,r)| {
405    ///                 assert!(t >= &5);
406    ///             })
407    ///             .probe_with(&mut probe);
408    ///
409    ///         button
410    ///     });
411    ///
412    ///     worker.step();
413    ///     worker.step();
414    ///     assert!(!probe.done());
415    ///
416    ///     shutdown.press();
417    ///
418    ///     worker.step();
419    ///     worker.step();
420    ///     assert!(probe.done());
421    ///
422    /// }).unwrap();
423    /// ```
424    pub fn import_frontier<G>(
425        &mut self,
426        scope: &G,
427        name: &str,
428    ) -> (
429        Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
430        ShutdownButton<CapabilitySet<Tr::Time>>,
431    )
432    where
433        G: Scope<Timestamp = Tr::Time>,
434        Tr: TraceReader,
435    {
436        // This frontier describes our only guarantee on the compaction frontier.
437        let since = self.get_logical_compaction().to_owned();
438        self.import_frontier_core(scope, name, since, Antichain::new())
439    }
440
441    /// Import a trace restricted to a specific time interval `[since, until)`.
442    ///
443    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
444    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
445    /// to `until` the operator capability will be dropped.
446    ///
447    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
448    /// frontier indicates the end of times.
449    pub fn import_frontier_core<G>(
450        &mut self,
451        scope: &G,
452        name: &str,
453        since: Antichain<Tr::Time>,
454        until: Antichain<Tr::Time>,
455    ) -> (
456        Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
457        ShutdownButton<CapabilitySet<Tr::Time>>,
458    )
459    where
460        G: Scope<Timestamp = Tr::Time>,
461        Tr: TraceReader,
462    {
463        let trace = self.clone();
464        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
465
466        let mut shutdown_button = None;
467
468        let stream = {
469            let shutdown_button_ref = &mut shutdown_button;
470            source(scope, name, move |capability, info| {
471                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
472
473                let activator = scope.activator_for(Rc::clone(&info.address));
474                let queue = self.new_listener(activator);
475
476                let activator = scope.activator_for(info.address);
477                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
478
479                capabilities
480                    .borrow_mut()
481                    .as_mut()
482                    .unwrap()
483                    .insert(capability);
484
485                move |output| {
486                    let mut capabilities = capabilities.borrow_mut();
487                    if let Some(ref mut capabilities) = *capabilities {
488                        let mut borrow = queue.1.borrow_mut();
489                        for instruction in borrow.drain(..) {
490                            // If we have dropped the capabilities due to `until`, attempt no further work.
491                            // Without the capabilities, we should soon be shut down (once this loop ends).
492                            if !capabilities.is_empty() {
493                                match instruction {
494                                    TraceReplayInstruction::Frontier(frontier) => {
495                                        if timely::PartialOrder::less_equal(&until, &frontier) {
496                                            // It might be nice to actively *drop* `capabilities`, but it seems
497                                            // complicated logically (i.e. we'd have to break out of the loop).
498                                            capabilities.downgrade(&[]);
499                                        } else {
500                                            capabilities.downgrade(&frontier.borrow()[..]);
501                                        }
502                                    }
503                                    TraceReplayInstruction::Batch(batch, hint) => {
504                                        if let Some(time) = hint {
505                                            if !batch.is_empty() {
506                                                let delayed = capabilities.delayed(&time);
507                                                output.session(&delayed).give(
508                                                    BatchFrontier::make_from(
509                                                        batch,
510                                                        since.borrow(),
511                                                        until.borrow(),
512                                                    ),
513                                                );
514                                            }
515                                        }
516                                    }
517                                }
518                            }
519                        }
520                    }
521                }
522            })
523        };
524
525        (Arranged { stream, trace }, shutdown_button.unwrap())
526    }
527}
528
529/// Wrapper than can drop shared references.
530pub struct ShutdownButton<T> {
531    reference: Rc<RefCell<Option<T>>>,
532    activator: Activator,
533}
534
535impl<T> ShutdownButton<T> {
536    /// Creates a new ShutdownButton.
537    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
538        Self {
539            reference,
540            activator,
541        }
542    }
543    /// Push the shutdown button, dropping the shared objects.
544    pub fn press(&mut self) {
545        *self.reference.borrow_mut() = None;
546        self.activator.activate();
547    }
548    /// Hotwires the button to one that is pressed if dropped.
549    pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
550        ShutdownDeadmans { button: self }
551    }
552}
553
554/// A deadman's switch version of a shutdown button.
555///
556/// This type hosts a shutdown button and will press it when dropped.
557pub struct ShutdownDeadmans<T> {
558    button: ShutdownButton<T>,
559}
560
561impl<T> Drop for ShutdownDeadmans<T> {
562    fn drop(&mut self) {
563        self.button.press();
564    }
565}
566
567impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
568    fn clone(&self) -> Self {
569        if let Some(logging) = &self.logging {
570            logging.log(crate::logging::TraceShare {
571                operator: self.operator.global_id,
572                diff: 1,
573            });
574        }
575
576        // increase counts for wrapped `TraceBox`.
577        let empty_frontier = Antichain::new();
578        self.trace
579            .borrow_mut()
580            .adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
581        self.trace
582            .borrow_mut()
583            .adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
584
585        TraceAgent {
586            trace: self.trace.clone(),
587            queues: self.queues.clone(),
588            logical_compaction: self.logical_compaction.clone(),
589            physical_compaction: self.physical_compaction.clone(),
590            operator: self.operator.clone(),
591            logging: self.logging.clone(),
592            temp_antichain: Antichain::new(),
593        }
594    }
595}
596
597impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
598    fn drop(&mut self) {
599        if let Some(logging) = &self.logging {
600            logging.log(crate::logging::TraceShare {
601                operator: self.operator.global_id,
602                diff: -1,
603            });
604        }
605
606        // decrement borrow counts to remove all holds
607        let empty_frontier = Antichain::new();
608        self.trace
609            .borrow_mut()
610            .adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
611        self.trace
612            .borrow_mut()
613            .adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
614    }
615}