1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! Methods which describe an operators topology, and the progress it makes.
use std::default::Default;
use progress::{Timestamp, CountMap, Antichain};
/// Methods for describing an operators topology, and the progress it makes.
pub trait Operate<T: Timestamp> {
/// Indicates if the operator is strictly local to this worker.
///
/// A parent scope must understand whether the progress information returned by the worker
/// reflects only this worker's progress, so that it knows whether to send and receive the
/// corresponding progress messages to its peers. If the operator is strictly local, it must
/// exchange this information, whereas if the operator is itself implemented by the same set
/// of workers, the parent scope understands that progress information already reflects the
/// aggregate information among the workers.
///
/// This is a coarse approximation to refined worker sets. In a future better world, operators
/// would explain how their implementations are partitioned, so that a parent scope knows what
/// progress information to exchange with which peers. Right now the two choices are either
/// "all" or "none", but it could be more detailed. In the more detailed case, this method
/// should / could return a pair (index, peers), indicating the group id of the worker out of
/// how many groups. This becomes complicated, as a full all-to-all exchange would result in
/// multiple copies of the same progress messages (but aggregated variously) arriving at
/// arbitrary times.
fn local(&self) -> bool { true }
/// The number of inputs.
fn inputs(&self) -> usize;
/// The number of outputs.
fn outputs(&self) -> usize;
/// Fetches summary information about internal structure of the operator.
///
/// Each operator must summarize its internal structure by a map from pairs `(input, output)`
/// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
/// be transformed to timestamps on any of its outputs.
///
/// Each operator must also indicate whether it initially holds any capabilities on any of its
/// outputs, so that the parent operator can properly initialize its progress information.
///
/// The default behavior is to indicate that timestamps on any input can emerge unchanged on
/// any output, and no initial capabilities are held.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<CountMap<T>>) {
(vec![vec![Antichain::from_elem(Default::default()); self.outputs()]; self.inputs()],
vec![CountMap::new(); self.outputs()])
}
/// Presents summary information about the external structure around the operator.
///
/// Each operator exists in the context of a parent scope, and the edges and other operators it
/// hosts represent paths messages may take from this operators outputs back to its inputs. For
/// an operator to correctly understand the implications of local progress statements, it must
/// understand how messages it produces may eventually return to its inputs.
///
/// The parent scope must also provide initial capabilities for each of the inputs, reflecting
/// work elsewhere in the timely computation. Note: it is not clear whether the parent must not
/// include capabilities expressed by the operator itself. It seems possible to exclude such
/// capabilities, if it would help the operator, but the operator should not yet rely on any
/// specific behavior.
fn set_external_summary(&mut self, _summaries: Vec<Vec<Antichain<T::Summary>>>, _frontier: &mut [CountMap<T>]) { }
/// Reports a summary of progress statements external to the operator and its peer group.
///
/// This report summarizes *all* of the external world, including updates issued by the operator
/// itself. This is important, and means that there is an ordering constraint that needs to be
/// enforced by the operator: before reporting any summarized progress to its parent, a child
/// must install the non-summarized parts (interal messages, capabilities) locally. Otherwise,
/// the child may learn about external "progress" corresponding to its own actions, and without
/// noting the consequences of those actions, will be in a bit of a pickle.
///
/// Note: Callee is expected to consume the contents of _external to indicate acknowledgement.
fn push_external_progress(&mut self, external: &mut [CountMap<T>]) {
// default implementation just drains the external updates
for updates in external.iter_mut() {
updates.clear();
}
}
/// Retrieves a summary of progress statements internal to the operator.
///
/// Returns a bool indicating if there is any unreported work remaining (e.g. work that doesn't
/// project on an output).
///
/// Note: not "internal to the operator and its peer group". The operator instance should only
/// report progress performed by its own instance. The parent scope will figure out what to do
/// with this information (mostly likely exchange it with its peers). There does seem to be the
/// opportunity to optimize this, but it may complicate the life of the parent to know which of
/// its children are reporting partial information and which are complete.
fn pull_internal_progress(&mut self, consumed: &mut [CountMap<T>], // to populate
internal: &mut [CountMap<T>], // to populate
produced: &mut [CountMap<T>]) -> bool; // to populate
/// A descripitive name for the operator
fn name(&self) -> String;
/// Indicates of whether the operator requires `push_external_progress` information or not.
fn notify_me(&self) -> bool { true }
}