palimpsest_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::rc::{Rc, Weak};
6
7use timely::dataflow::operators::generic::{source, OperatorInfo};
8use timely::dataflow::operators::CapabilitySet;
9use timely::dataflow::Scope;
10use timely::progress::Timestamp;
11use timely::progress::{frontier::AntichainRef, Antichain};
12
13use crate::trace::wrappers::rc::TraceBox;
14use crate::trace::{BatchReader, Trace, TraceReader};
15
16use timely::scheduling::Activator;
17
18use super::TraceReplayInstruction;
19use super::{Arranged, TraceAgentQueueReader, TraceAgentQueueWriter, TraceWriter};
20
21use crate::trace::wrappers::frontier::{BatchFrontier, TraceFrontier};
22
23/// A `TraceReader` wrapper which can be imported into other dataflows.
24///
25/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
26/// from the dataflow in which it was defined, and imported into other dataflows.
27pub struct TraceAgent<Tr: TraceReader> {
28 trace: Rc<RefCell<TraceBox<Tr>>>,
29 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
30 logical_compaction: Antichain<Tr::Time>,
31 physical_compaction: Antichain<Tr::Time>,
32 temp_antichain: Antichain<Tr::Time>,
33
34 operator: OperatorInfo,
35 logging: Option<crate::logging::Logger>,
36}
37
38use crate::trace::implementations::WithLayout;
39impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
40 type Layout = Tr::Layout;
41}
42
43impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
44 type Batch = Tr::Batch;
45 type Storage = Tr::Storage;
46 type Cursor = Tr::Cursor;
47
48 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
49 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
50 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
51 crate::lattice::antichain_join_into(
52 &self.logical_compaction.borrow()[..],
53 &frontier[..],
54 &mut self.temp_antichain,
55 );
56 self.trace.borrow_mut().adjust_logical_compaction(
57 self.logical_compaction.borrow(),
58 self.temp_antichain.borrow(),
59 );
60 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
61 self.temp_antichain.clear();
62 }
63 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
64 self.logical_compaction.borrow()
65 }
66 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
67 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
68 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
69 crate::lattice::antichain_join_into(
70 &self.physical_compaction.borrow()[..],
71 &frontier[..],
72 &mut self.temp_antichain,
73 );
74 self.trace.borrow_mut().adjust_physical_compaction(
75 self.physical_compaction.borrow(),
76 self.temp_antichain.borrow(),
77 );
78 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
79 self.temp_antichain.clear();
80 }
81 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
82 self.physical_compaction.borrow()
83 }
84 fn cursor_through(
85 &mut self,
86 frontier: AntichainRef<'_, Tr::Time>,
87 ) -> Option<(Self::Cursor, Self::Storage)> {
88 self.trace.borrow_mut().trace.cursor_through(frontier)
89 }
90 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
91 self.trace.borrow().trace.map_batches(f)
92 }
93}
94
95impl<Tr: TraceReader> TraceAgent<Tr> {
96 /// Creates a new agent from a trace reader.
97 pub fn new(
98 trace: Tr,
99 operator: OperatorInfo,
100 logging: Option<crate::logging::Logger>,
101 ) -> (Self, TraceWriter<Tr>)
102 where
103 Tr: Trace,
104 {
105 let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
106 let queues = Rc::new(RefCell::new(Vec::new()));
107
108 if let Some(logging) = &logging {
109 logging.log(crate::logging::TraceShare {
110 operator: operator.global_id,
111 diff: 1,
112 });
113 }
114
115 let reader = TraceAgent {
116 trace: trace.clone(),
117 queues: Rc::downgrade(&queues),
118 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
119 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
120 temp_antichain: Antichain::new(),
121 operator,
122 logging,
123 };
124
125 let writer = TraceWriter::new(
126 vec![<Tr::Time as Timestamp>::minimum()],
127 Rc::downgrade(&trace),
128 queues,
129 );
130
131 (reader, writer)
132 }
133
134 /// Attaches a new shared queue to the trace.
135 ///
136 /// The queue is first populated with existing batches from the trace,
137 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
138 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
139 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr> {
140 // create a new queue for progress and batch information.
141 let mut new_queue = VecDeque::new();
142
143 // add the existing batches from the trace
144 let mut upper = None;
145 self.trace.borrow_mut().trace.map_batches(|batch| {
146 new_queue.push_back(TraceReplayInstruction::Batch(
147 batch.clone(),
148 Some(<Tr::Time as Timestamp>::minimum()),
149 ));
150 upper = Some(batch.upper().clone());
151 });
152
153 if let Some(upper) = upper {
154 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
155 }
156
157 let reference = Rc::new((activator, RefCell::new(new_queue)));
158
159 // wraps the queue in a ref-counted ref cell and enqueue/return it.
160 if let Some(queue) = self.queues.upgrade() {
161 queue.borrow_mut().push(Rc::downgrade(&reference));
162 }
163 reference.0.activate();
164 reference
165 }
166
167 /// The [OperatorInfo] of the underlying Timely operator
168 pub fn operator(&self) -> &OperatorInfo {
169 &self.operator
170 }
171
172 /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
173 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
174 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
175 ///
176 /// This method is subject to changes and removal and should not be considered part of a stable
177 /// interface.
178 pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
179 Rc::clone(&self.trace)
180 }
181}
182
183impl<Tr: TraceReader + 'static> TraceAgent<Tr> {
184 /// Copies an existing collection into the supplied scope.
185 ///
186 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
187 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
188 /// of the collection is its current state, and updates occur from this point forward. The historical changes
189 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
190 /// are no longer evident.
191 ///
192 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
193 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
194 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
195 /// the historical collection may move through configurations that did not actually occur, even if eventually
196 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
197 /// the intermediate computation could do something that the original computation did not, like diverge.
198 ///
199 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
200 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
201 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
202 /// to advance times before using them).
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// use timely::Config;
208 /// use palimpsest_dataflow::input::Input;
209 /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
210 /// use palimpsest_dataflow::operators::reduce::Reduce;
211 /// use palimpsest_dataflow::trace::Trace;
212 ///
213 /// ::timely::execute(Config::thread(), |worker| {
214 ///
215 /// // create a first dataflow
216 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
217 /// // create input handle and collection.
218 /// scope.new_collection_from(0 .. 10).1
219 /// .arrange_by_self()
220 /// .trace
221 /// });
222 ///
223 /// // do some work.
224 /// worker.step();
225 /// worker.step();
226 ///
227 /// // create a second dataflow
228 /// worker.dataflow(move |scope| {
229 /// trace.import(scope)
230 /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
231 /// });
232 ///
233 /// }).unwrap();
234 /// ```
235 pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
236 where
237 G: Scope<Timestamp = Tr::Time>,
238 {
239 self.import_named(scope, "ArrangedSource")
240 }
241
242 /// Same as `import`, but allows to name the source.
243 pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
244 where
245 G: Scope<Timestamp = Tr::Time>,
246 {
247 // Drop ShutdownButton and return only the arrangement.
248 self.import_core(scope, name).0
249 }
250
251 /// Imports an arrangement into the supplied scope.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// use timely::Config;
257 /// use timely::dataflow::ProbeHandle;
258 /// use timely::dataflow::operators::Probe;
259 /// use palimpsest_dataflow::input::InputSession;
260 /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
261 /// use palimpsest_dataflow::operators::reduce::Reduce;
262 /// use palimpsest_dataflow::trace::Trace;
263 ///
264 /// ::timely::execute(Config::thread(), |worker| {
265 ///
266 /// let mut input = InputSession::<_,(),isize>::new();
267 /// let mut probe = ProbeHandle::new();
268 ///
269 /// // create a first dataflow
270 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
271 /// // create input handle and collection.
272 /// input.to_collection(scope)
273 /// .arrange_by_self()
274 /// .trace
275 /// });
276 ///
277 /// // do some work.
278 /// worker.step();
279 /// worker.step();
280 ///
281 /// // create a second dataflow
282 /// let mut shutdown = worker.dataflow(|scope| {
283 /// let (arrange, button) = trace.import_core(scope, "Import");
284 /// arrange.stream.probe_with(&mut probe);
285 /// button
286 /// });
287 ///
288 /// worker.step();
289 /// worker.step();
290 /// assert!(!probe.done());
291 ///
292 /// shutdown.press();
293 ///
294 /// worker.step();
295 /// worker.step();
296 /// assert!(probe.done());
297 ///
298 /// }).unwrap();
299 /// ```
300 pub fn import_core<G>(
301 &mut self,
302 scope: &G,
303 name: &str,
304 ) -> (
305 Arranged<G, TraceAgent<Tr>>,
306 ShutdownButton<CapabilitySet<Tr::Time>>,
307 )
308 where
309 G: Scope<Timestamp = Tr::Time>,
310 {
311 let trace = self.clone();
312
313 let mut shutdown_button = None;
314
315 let stream = {
316 let shutdown_button_ref = &mut shutdown_button;
317 source(scope, name, move |capability, info| {
318 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
319
320 let activator = scope.activator_for(Rc::clone(&info.address));
321 let queue = self.new_listener(activator);
322
323 let activator = scope.activator_for(info.address);
324 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
325
326 capabilities
327 .borrow_mut()
328 .as_mut()
329 .unwrap()
330 .insert(capability);
331
332 move |output| {
333 let mut capabilities = capabilities.borrow_mut();
334 if let Some(ref mut capabilities) = *capabilities {
335 let mut borrow = queue.1.borrow_mut();
336 for instruction in borrow.drain(..) {
337 match instruction {
338 TraceReplayInstruction::Frontier(frontier) => {
339 capabilities.downgrade(&frontier.borrow()[..]);
340 }
341 TraceReplayInstruction::Batch(batch, hint) => {
342 if let Some(time) = hint {
343 if !batch.is_empty() {
344 let delayed = capabilities.delayed(&time);
345 output.session(&delayed).give(batch);
346 }
347 }
348 }
349 }
350 }
351 }
352 }
353 })
354 };
355
356 (Arranged { stream, trace }, shutdown_button.unwrap())
357 }
358
359 /// Imports an arrangement into the supplied scope.
360 ///
361 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
362 ///
363 /// # Examples
364 ///
365 /// ```
366 /// use timely::Config;
367 /// use timely::progress::frontier::AntichainRef;
368 /// use timely::dataflow::ProbeHandle;
369 /// use timely::dataflow::operators::Probe;
370 /// use timely::dataflow::operators::Inspect;
371 /// use palimpsest_dataflow::input::InputSession;
372 /// use palimpsest_dataflow::operators::arrange::ArrangeBySelf;
373 /// use palimpsest_dataflow::operators::reduce::Reduce;
374 /// use palimpsest_dataflow::trace::Trace;
375 /// use palimpsest_dataflow::trace::TraceReader;
376 /// use palimpsest_dataflow::input::Input;
377 ///
378 /// ::timely::execute(Config::thread(), |worker| {
379 ///
380 /// let mut probe = ProbeHandle::new();
381 ///
382 /// // create a first dataflow
383 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
384 /// // create input handle and collection.
385 /// let (handle, stream) = scope.new_collection();
386 /// let trace = stream.arrange_by_self().trace;
387 /// (handle, trace)
388 /// });
389 ///
390 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
391 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
392 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
393 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
394 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
395 ///
396 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
397 ///
398 /// // create a second dataflow
399 /// let mut shutdown = worker.dataflow(|scope| {
400 /// let (arrange, button) = trace.import_frontier(scope, "Import");
401 /// arrange
402 /// .as_collection(|k,v| (*k,*v))
403 /// .inner
404 /// .inspect(|(d,t,r)| {
405 /// assert!(t >= &5);
406 /// })
407 /// .probe_with(&mut probe);
408 ///
409 /// button
410 /// });
411 ///
412 /// worker.step();
413 /// worker.step();
414 /// assert!(!probe.done());
415 ///
416 /// shutdown.press();
417 ///
418 /// worker.step();
419 /// worker.step();
420 /// assert!(probe.done());
421 ///
422 /// }).unwrap();
423 /// ```
424 pub fn import_frontier<G>(
425 &mut self,
426 scope: &G,
427 name: &str,
428 ) -> (
429 Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
430 ShutdownButton<CapabilitySet<Tr::Time>>,
431 )
432 where
433 G: Scope<Timestamp = Tr::Time>,
434 Tr: TraceReader,
435 {
436 // This frontier describes our only guarantee on the compaction frontier.
437 let since = self.get_logical_compaction().to_owned();
438 self.import_frontier_core(scope, name, since, Antichain::new())
439 }
440
441 /// Import a trace restricted to a specific time interval `[since, until)`.
442 ///
443 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
444 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
445 /// to `until` the operator capability will be dropped.
446 ///
447 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
448 /// frontier indicates the end of times.
449 pub fn import_frontier_core<G>(
450 &mut self,
451 scope: &G,
452 name: &str,
453 since: Antichain<Tr::Time>,
454 until: Antichain<Tr::Time>,
455 ) -> (
456 Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
457 ShutdownButton<CapabilitySet<Tr::Time>>,
458 )
459 where
460 G: Scope<Timestamp = Tr::Time>,
461 Tr: TraceReader,
462 {
463 let trace = self.clone();
464 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
465
466 let mut shutdown_button = None;
467
468 let stream = {
469 let shutdown_button_ref = &mut shutdown_button;
470 source(scope, name, move |capability, info| {
471 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
472
473 let activator = scope.activator_for(Rc::clone(&info.address));
474 let queue = self.new_listener(activator);
475
476 let activator = scope.activator_for(info.address);
477 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
478
479 capabilities
480 .borrow_mut()
481 .as_mut()
482 .unwrap()
483 .insert(capability);
484
485 move |output| {
486 let mut capabilities = capabilities.borrow_mut();
487 if let Some(ref mut capabilities) = *capabilities {
488 let mut borrow = queue.1.borrow_mut();
489 for instruction in borrow.drain(..) {
490 // If we have dropped the capabilities due to `until`, attempt no further work.
491 // Without the capabilities, we should soon be shut down (once this loop ends).
492 if !capabilities.is_empty() {
493 match instruction {
494 TraceReplayInstruction::Frontier(frontier) => {
495 if timely::PartialOrder::less_equal(&until, &frontier) {
496 // It might be nice to actively *drop* `capabilities`, but it seems
497 // complicated logically (i.e. we'd have to break out of the loop).
498 capabilities.downgrade(&[]);
499 } else {
500 capabilities.downgrade(&frontier.borrow()[..]);
501 }
502 }
503 TraceReplayInstruction::Batch(batch, hint) => {
504 if let Some(time) = hint {
505 if !batch.is_empty() {
506 let delayed = capabilities.delayed(&time);
507 output.session(&delayed).give(
508 BatchFrontier::make_from(
509 batch,
510 since.borrow(),
511 until.borrow(),
512 ),
513 );
514 }
515 }
516 }
517 }
518 }
519 }
520 }
521 }
522 })
523 };
524
525 (Arranged { stream, trace }, shutdown_button.unwrap())
526 }
527}
528
529/// Wrapper than can drop shared references.
530pub struct ShutdownButton<T> {
531 reference: Rc<RefCell<Option<T>>>,
532 activator: Activator,
533}
534
535impl<T> ShutdownButton<T> {
536 /// Creates a new ShutdownButton.
537 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
538 Self {
539 reference,
540 activator,
541 }
542 }
543 /// Push the shutdown button, dropping the shared objects.
544 pub fn press(&mut self) {
545 *self.reference.borrow_mut() = None;
546 self.activator.activate();
547 }
548 /// Hotwires the button to one that is pressed if dropped.
549 pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
550 ShutdownDeadmans { button: self }
551 }
552}
553
554/// A deadman's switch version of a shutdown button.
555///
556/// This type hosts a shutdown button and will press it when dropped.
557pub struct ShutdownDeadmans<T> {
558 button: ShutdownButton<T>,
559}
560
561impl<T> Drop for ShutdownDeadmans<T> {
562 fn drop(&mut self) {
563 self.button.press();
564 }
565}
566
567impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
568 fn clone(&self) -> Self {
569 if let Some(logging) = &self.logging {
570 logging.log(crate::logging::TraceShare {
571 operator: self.operator.global_id,
572 diff: 1,
573 });
574 }
575
576 // increase counts for wrapped `TraceBox`.
577 let empty_frontier = Antichain::new();
578 self.trace
579 .borrow_mut()
580 .adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
581 self.trace
582 .borrow_mut()
583 .adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
584
585 TraceAgent {
586 trace: self.trace.clone(),
587 queues: self.queues.clone(),
588 logical_compaction: self.logical_compaction.clone(),
589 physical_compaction: self.physical_compaction.clone(),
590 operator: self.operator.clone(),
591 logging: self.logging.clone(),
592 temp_antichain: Antichain::new(),
593 }
594 }
595}
596
597impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
598 fn drop(&mut self) {
599 if let Some(logging) = &self.logging {
600 logging.log(crate::logging::TraceShare {
601 operator: self.operator.global_id,
602 diff: -1,
603 });
604 }
605
606 // decrement borrow counts to remove all holds
607 let empty_frontier = Antichain::new();
608 self.trace
609 .borrow_mut()
610 .adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
611 self.trace
612 .borrow_mut()
613 .adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
614 }
615}