differential_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14
15use timely::scheduling::Activator;
16
17use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
18use super::TraceReplayInstruction;
19
20use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
21
22
23/// A `TraceReader` wrapper which can be imported into other dataflows.
24///
25/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
26/// from the dataflow in which it was defined, and imported into other dataflows.
27pub struct TraceAgent<Tr: TraceReader> {
28 trace: Rc<RefCell<trace_box::TraceBox<Tr>>>,
29 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
30 logical_compaction: Antichain<Tr::Time>,
31 physical_compaction: Antichain<Tr::Time>,
32 temp_antichain: Antichain<Tr::Time>,
33
34 operator: OperatorInfo,
35 logging: Option<crate::logging::Logger>,
36}
37
38use crate::trace::implementations::WithLayout;
39impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
40 type Layout = Tr::Layout;
41}
42
43impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
44
45 type Batch = Tr::Batch;
46 type Storage = Tr::Storage;
47 type Cursor = Tr::Cursor;
48
49 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
50 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
51 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
52 crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
53 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
54 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
55 self.temp_antichain.clear();
56 }
57 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
58 self.logical_compaction.borrow()
59 }
60 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
61 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
62 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
63 crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
64 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
65 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
66 self.temp_antichain.clear();
67 }
68 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
69 self.physical_compaction.borrow()
70 }
71 fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
72 self.trace.borrow_mut().trace.cursor_through(frontier)
73 }
74 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
75}
76
77impl<Tr: TraceReader> TraceAgent<Tr> {
78 /// Creates a new agent from a trace reader.
79 pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
80 where
81 Tr: Trace,
82 {
83 let trace = Rc::new(RefCell::new(trace_box::TraceBox::new(trace)));
84 let queues = Rc::new(RefCell::new(Vec::new()));
85
86 if let Some(logging) = &logging {
87 logging.log(
88 crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
89 );
90 }
91
92 let reader = TraceAgent {
93 trace: Rc::clone(&trace),
94 queues: Rc::downgrade(&queues),
95 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
96 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
97 temp_antichain: Antichain::new(),
98 operator,
99 logging,
100 };
101
102 let writer = TraceWriter::new(
103 vec![Tr::Time::minimum()],
104 Rc::downgrade(&trace),
105 queues,
106 );
107
108 (reader, writer)
109 }
110
111 /// Attaches a new shared queue to the trace.
112 ///
113 /// The queue is first populated with existing batches from the trace,
114 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
115 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
116 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
117 {
118 // create a new queue for progress and batch information.
119 let mut new_queue = VecDeque::new();
120
121 // add the existing batches from the trace
122 let mut upper = None;
123 self.trace
124 .borrow_mut()
125 .trace
126 .map_batches(|batch| {
127 new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(Tr::Time::minimum())));
128 upper = Some(batch.upper().clone());
129 });
130
131 if let Some(upper) = upper {
132 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
133 }
134
135 let reference = Rc::new((activator, RefCell::new(new_queue)));
136
137 // wraps the queue in a ref-counted ref cell and enqueue/return it.
138 if let Some(queue) = self.queues.upgrade() {
139 queue.borrow_mut().push(Rc::downgrade(&reference));
140 }
141 reference.0.activate();
142 reference
143 }
144
145 /// The [OperatorInfo] of the underlying Timely operator
146 pub fn operator(&self) -> &OperatorInfo {
147 &self.operator
148 }
149
150 /// Obtain a reference to the inner [`trace_box::TraceBox`]. It is the caller's obligation to maintain
151 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
152 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
153 ///
154 /// This method is subject to changes and removal and should not be considered part of a stable
155 /// interface.
156 pub fn trace_box_unstable(&self) -> Rc<RefCell<trace_box::TraceBox<Tr>>> {
157 Rc::clone(&self.trace)
158 }
159}
160
161impl<Tr: TraceReader+'static> TraceAgent<Tr> {
162 /// Copies an existing collection into the supplied scope.
163 ///
164 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
165 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
166 /// of the collection is its current state, and updates occur from this point forward. The historical changes
167 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
168 /// are no longer evident.
169 ///
170 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
171 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
172 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
173 /// the historical collection may move through configurations that did not actually occur, even if eventually
174 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
175 /// the intermediate computation could do something that the original computation did not, like diverge.
176 ///
177 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
178 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
179 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
180 /// to advance times before using them).
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use timely::Config;
186 /// use differential_dataflow::input::Input;
187 /// use differential_dataflow::trace::Trace;
188 /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
189 ///
190 /// ::timely::execute(Config::thread(), |worker| {
191 ///
192 /// // create a first dataflow
193 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
194 /// // create input handle and collection.
195 /// scope.new_collection_from(0 .. 10).1
196 /// .arrange_by_self()
197 /// .trace
198 /// });
199 ///
200 /// // do some work.
201 /// worker.step();
202 /// worker.step();
203 ///
204 /// // create a second dataflow
205 /// worker.dataflow(move |scope| {
206 /// trace.import(scope)
207 /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>,_>(
208 /// "Reduce",
209 /// |_key, src, dst| dst.push((*src[0].0, 1)),
210 /// |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
211 /// )
212 /// .as_collection(|k,v| (k.clone(), v.clone()));
213 /// });
214 ///
215 /// }).unwrap();
216 /// ```
217 pub fn import<'scope>(&mut self, scope: Scope<'scope, Tr::Time>) -> Arranged<'scope, TraceAgent<Tr>>
218 {
219 self.import_named(scope, "ArrangedSource")
220 }
221
222 /// Same as `import`, but allows to name the source.
223 pub fn import_named<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
224 {
225 // Drop ShutdownButton and return only the arrangement.
226 self.import_core(scope, name).0
227 }
228
229 /// Imports an arrangement into the supplied scope.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use timely::Config;
235 /// use timely::dataflow::ProbeHandle;
236 /// use timely::dataflow::operators::Probe;
237 /// use differential_dataflow::input::InputSession;
238 /// use differential_dataflow::trace::Trace;
239 ///
240 /// ::timely::execute(Config::thread(), |worker| {
241 ///
242 /// let mut input = InputSession::<_,(),isize>::new();
243 /// let mut probe = ProbeHandle::new();
244 ///
245 /// // create a first dataflow
246 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
247 /// // create input handle and collection.
248 /// input.to_collection(scope)
249 /// .arrange_by_self()
250 /// .trace
251 /// });
252 ///
253 /// // do some work.
254 /// worker.step();
255 /// worker.step();
256 ///
257 /// // create a second dataflow
258 /// let mut shutdown = worker.dataflow(|scope| {
259 /// let (arrange, button) = trace.import_core(scope, "Import");
260 /// arrange.stream.probe_with(&mut probe);
261 /// button
262 /// });
263 ///
264 /// worker.step();
265 /// worker.step();
266 /// assert!(!probe.done());
267 ///
268 /// shutdown.press();
269 ///
270 /// worker.step();
271 /// worker.step();
272 /// assert!(probe.done());
273 ///
274 /// }).unwrap();
275 /// ```
276 pub fn import_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
277 {
278 let trace = self.clone();
279
280 let mut shutdown_button = None;
281
282 let stream = {
283
284 let shutdown_button_ref = &mut shutdown_button;
285 source(scope, name, move |capability, info| {
286
287 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
288
289 let activator = scope.activator_for(Rc::clone(&info.address));
290 let queue = self.new_listener(activator);
291
292 let activator = scope.activator_for(info.address);
293 *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));
294
295 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
296
297 move |output| {
298
299 let mut capabilities = capabilities.borrow_mut();
300 if let Some(ref mut capabilities) = *capabilities {
301
302 let mut borrow = queue.1.borrow_mut();
303 for instruction in borrow.drain(..) {
304 match instruction {
305 TraceReplayInstruction::Frontier(frontier) => {
306 capabilities.downgrade(&frontier.borrow()[..]);
307 },
308 TraceReplayInstruction::Batch(batch, hint) => {
309 if let Some(time) = hint {
310 if !batch.is_empty() {
311 let delayed = capabilities.delayed(&time);
312 output.session(&delayed).give(batch);
313 }
314 }
315 }
316 }
317 }
318 }
319 }
320 })
321 };
322
323 (Arranged { stream, trace }, shutdown_button.unwrap())
324 }
325
326 /// Imports an arrangement into the supplied scope.
327 ///
328 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
329 ///
330 /// # Examples
331 ///
332 /// ```
333 /// use timely::Config;
334 /// use timely::progress::frontier::AntichainRef;
335 /// use timely::dataflow::ProbeHandle;
336 /// use timely::dataflow::operators::Probe;
337 /// use timely::dataflow::operators::Inspect;
338 /// use differential_dataflow::input::InputSession;
339 /// use differential_dataflow::trace::Trace;
340 /// use differential_dataflow::trace::TraceReader;
341 /// use differential_dataflow::input::Input;
342 ///
343 /// ::timely::execute(Config::thread(), |worker| {
344 ///
345 /// let mut probe = ProbeHandle::new();
346 ///
347 /// // create a first dataflow
348 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
349 /// // create input handle and collection.
350 /// let (handle, stream) = scope.new_collection();
351 /// let trace = stream.arrange_by_self().trace;
352 /// (handle, trace)
353 /// });
354 ///
355 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
356 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
357 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
358 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
359 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
360 ///
361 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
362 ///
363 /// // create a second dataflow
364 /// let mut shutdown = worker.dataflow(|scope| {
365 /// let (arrange, button) = trace.import_frontier(scope, "Import");
366 /// arrange
367 /// .as_collection(|k,v| (*k,*v))
368 /// .inner
369 /// .inspect(|(d,t,r)| {
370 /// assert!(t >= &5);
371 /// })
372 /// .probe_with(&mut probe);
373 ///
374 /// button
375 /// });
376 ///
377 /// worker.step();
378 /// worker.step();
379 /// assert!(!probe.done());
380 ///
381 /// shutdown.press();
382 ///
383 /// worker.step();
384 /// worker.step();
385 /// assert!(probe.done());
386 ///
387 /// }).unwrap();
388 /// ```
389 pub fn import_frontier<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
390 where
391 Tr: TraceReader,
392 {
393 // This frontier describes our only guarantee on the compaction frontier.
394 let since = self.get_logical_compaction().to_owned();
395 self.import_frontier_core(scope, name, since, Antichain::new())
396 }
397
398 /// Import a trace restricted to a specific time interval `[since, until)`.
399 ///
400 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
401 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
402 /// to `until` the operator capability will be dropped.
403 ///
404 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
405 /// frontier indicates the end of times.
406 pub fn import_frontier_core<'scope>(&mut self, scope: Scope<'scope, Tr::Time>, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<'scope, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
407 where
408 Tr: TraceReader,
409 {
410 let trace = self.clone();
411 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
412
413 let mut shutdown_button = None;
414
415 let stream = {
416
417 let shutdown_button_ref = &mut shutdown_button;
418 source(scope, name, move |capability, info| {
419
420 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
421
422 let activator = scope.activator_for(Rc::clone(&info.address));
423 let queue = self.new_listener(activator);
424
425 let activator = scope.activator_for(info.address);
426 *shutdown_button_ref = Some(ShutdownButton::new(Rc::clone(&capabilities), activator));
427
428 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
429
430 move |output| {
431
432 let mut capabilities = capabilities.borrow_mut();
433 if let Some(ref mut capabilities) = *capabilities {
434 let mut borrow = queue.1.borrow_mut();
435 for instruction in borrow.drain(..) {
436 // If we have dropped the capabilities due to `until`, attempt no further work.
437 // Without the capabilities, we should soon be shut down (once this loop ends).
438 if !capabilities.is_empty() {
439 match instruction {
440 TraceReplayInstruction::Frontier(frontier) => {
441 if timely::PartialOrder::less_equal(&until, &frontier) {
442 // It might be nice to actively *drop* `capabilities`, but it seems
443 // complicated logically (i.e. we'd have to break out of the loop).
444 capabilities.downgrade(&[]);
445 } else {
446 capabilities.downgrade(&frontier.borrow()[..]);
447 }
448 },
449 TraceReplayInstruction::Batch(batch, hint) => {
450 if let Some(time) = hint {
451 if !batch.is_empty() {
452 let delayed = capabilities.delayed(&time);
453 output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
454 }
455 }
456 }
457 }
458 }
459 }
460 }
461 }
462 })
463 };
464
465 (Arranged { stream, trace }, shutdown_button.unwrap())
466 }
467}
468
469
470
471/// Wrapper than can drop shared references.
472pub struct ShutdownButton<T> {
473 reference: Rc<RefCell<Option<T>>>,
474 activator: Activator,
475}
476
477impl<T> ShutdownButton<T> {
478 /// Creates a new ShutdownButton.
479 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
480 Self { reference, activator }
481 }
482 /// Push the shutdown button, dropping the shared objects.
483 pub fn press(&mut self) {
484 *self.reference.borrow_mut() = None;
485 self.activator.activate();
486 }
487}
488
489impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
490 fn clone(&self) -> Self {
491
492 if let Some(logging) = &self.logging {
493 logging.log(
494 crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
495 );
496 }
497
498 // increase counts for wrapped `TraceBox`.
499 let empty_frontier = Antichain::new();
500 self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
501 self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
502
503 TraceAgent {
504 trace: Rc::clone(&self.trace),
505 queues: Weak::clone(&self.queues),
506 logical_compaction: self.logical_compaction.clone(),
507 physical_compaction: self.physical_compaction.clone(),
508 operator: self.operator.clone(),
509 logging: self.logging.clone(),
510 temp_antichain: Antichain::new(),
511 }
512 }
513}
514
515impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
516 fn drop(&mut self) {
517
518 if let Some(logging) = &self.logging {
519 logging.log(
520 crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
521 );
522 }
523
524 // decrement borrow counts to remove all holds
525 let empty_frontier = Antichain::new();
526 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
527 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
528 }
529}
530
531/// A trace wrapper suitable for use through shared reference counted ownership.
532///
533/// The wrapper mainly accumulates the expressed compaction constraints from many,
534/// and presents their implications to the wrapped trace.
535pub mod trace_box {
536
537 use timely::progress::{frontier::{AntichainRef, MutableAntichain}};
538
539 use crate::trace::TraceReader;
540
541 /// A wrapper around a trace which tracks the frontiers of all referees.
542 ///
543 /// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
544 /// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
545 /// may influence.
546 pub struct TraceBox<Tr: TraceReader> {
547 /// accumulated holds on times for advancement.
548 pub (crate) logical_compaction: MutableAntichain<Tr::Time>,
549 /// accumulated holds on times for distinction.
550 pub (crate) physical_compaction: MutableAntichain<Tr::Time>,
551 /// The wrapped trace.
552 pub (crate) trace: Tr,
553 }
554
555 impl<Tr: TraceReader> TraceBox<Tr> {
556 /// Moves an existing trace into a shareable trace wrapper.
557 ///
558 /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
559 /// process will fish these out and make sure that they are used for the initial read capabilities.
560 pub fn new(mut trace: Tr) -> Self {
561
562 let mut logical_compaction = MutableAntichain::new();
563 logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
564 let mut physical_compaction = MutableAntichain::new();
565 physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
566
567 TraceBox {
568 logical_compaction,
569 physical_compaction,
570 trace,
571 }
572 }
573 /// Borrowed access to the underlying trace.
574 ///
575 /// This is used to inspect batches for purposes of resource accounting in external systems.
576 pub fn trace(&self) -> &Tr { &self.trace }
577 /// Replaces elements of `lower` with those of `upper`.
578 #[inline]
579 pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
580 self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
581 self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
582 self.trace.set_logical_compaction(self.logical_compaction.frontier());
583 }
584 /// Replaces elements of `lower` with those of `upper`.
585 #[inline]
586 pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
587 self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
588 self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
589 self.trace.set_physical_compaction(self.physical_compaction.frontier());
590 }
591 }
592
593}