timely/progress/operate.rs
1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// Methods for describing an operators topology, and the progress it makes.
10pub trait Operate<T: Timestamp> : Schedule {
11
12 /// Indicates if the operator is strictly local to this worker.
13 ///
14 /// A parent scope must understand whether the progress information returned by the worker
15 /// reflects only this worker's progress, so that it knows whether to send and receive the
16 /// corresponding progress messages to its peers. If the operator is strictly local, it must
17 /// exchange this information, whereas if the operator is itself implemented by the same set
18 /// of workers, the parent scope understands that progress information already reflects the
19 /// aggregate information among the workers.
20 ///
21 /// This is a coarse approximation to refined worker sets. In a future better world, operators
22 /// would explain how their implementations are partitioned, so that a parent scope knows what
23 /// progress information to exchange with which peers. Right now the two choices are either
24 /// "all" or "none", but it could be more detailed. In the more detailed case, this method
25 /// should / could return a pair (index, peers), indicating the group id of the worker out of
26 /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
27 /// multiple copies of the same progress messages (but aggregated variously) arriving at
28 /// arbitrary times.
29 fn local(&self) -> bool { true }
30
31 /// The number of inputs.
32 fn inputs(&self) -> usize;
33 /// The number of outputs.
34 fn outputs(&self) -> usize;
35
36 /// Fetches summary information about internal structure of the operator.
37 ///
38 /// Each operator must summarize its internal structure by a map from pairs `(input, output)`
39 /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
40 /// be transformed to timestamps on any of its outputs.
41 ///
42 /// Each operator must also indicate whether it initially holds any capabilities on any of its
43 /// outputs, so that the parent operator can properly initialize its progress information.
44 ///
45 /// The default behavior is to indicate that timestamps on any input can emerge unchanged on
46 /// any output, and no initial capabilities are held.
47 fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
48
49 /// Signals that external frontiers have been set.
50 ///
51 /// By default this method does nothing, and leaves all changes in the `frontiers` element
52 /// of the shared progress state. An operator should be able to consult `frontiers` at any
53 /// point and read out the current frontier information, or the changes from the last time
54 /// that `frontiers` was drained.
55 fn set_external_summary(&mut self) { }
56
57 /// Indicates of whether the operator requires `push_external_progress` information or not.
58 fn notify_me(&self) -> bool { true }
59}
60
61/// Progress information shared between parent and child.
62#[derive(Debug)]
63pub struct SharedProgress<T: Timestamp> {
64 /// Frontier capability changes reported by the parent scope.
65 pub frontiers: Vec<ChangeBatch<T>>,
66 /// Consumed message changes reported by the child operator.
67 pub consumeds: Vec<ChangeBatch<T>>,
68 /// Internal capability changes reported by the child operator.
69 pub internals: Vec<ChangeBatch<T>>,
70 /// Produced message changes reported by the child operator.
71 pub produceds: Vec<ChangeBatch<T>>,
72}
73
74impl<T: Timestamp> SharedProgress<T> {
75 /// Allocates a new shared progress structure.
76 pub fn new(inputs: usize, outputs: usize) -> Self {
77 SharedProgress {
78 frontiers: vec![ChangeBatch::new(); inputs],
79 consumeds: vec![ChangeBatch::new(); inputs],
80 internals: vec![ChangeBatch::new(); outputs],
81 produceds: vec![ChangeBatch::new(); outputs],
82 }
83 }
84}