timely/progress/
operate.rs

1//! Methods which describe an operators topology, and the progress it makes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::Schedule;
7use crate::progress::{Timestamp, ChangeBatch, Antichain};
8
9/// Methods for describing an operators topology, and the progress it makes.
10pub trait Operate<T: Timestamp> : Schedule {
11
12    /// Indicates if the operator is strictly local to this worker.
13    ///
14    /// A parent scope must understand whether the progress information returned by the worker
15    /// reflects only this worker's progress, so that it knows whether to send and receive the
16    /// corresponding progress messages to its peers. If the operator is strictly local, it must
17    /// exchange this information, whereas if the operator is itself implemented by the same set
18    /// of workers, the parent scope understands that progress information already reflects the
19    /// aggregate information among the workers.
20    ///
21    /// This is a coarse approximation to refined worker sets. In a future better world, operators
22    /// would explain how their implementations are partitioned, so that a parent scope knows what
23    /// progress information to exchange with which peers. Right now the two choices are either
24    /// "all" or "none", but it could be more detailed. In the more detailed case, this method
25    /// should / could return a pair (index, peers), indicating the group id of the worker out of
26    /// how many groups. This becomes complicated, as a full all-to-all exchange would result in
27    /// multiple copies of the same progress messages (but aggregated variously) arriving at
28    /// arbitrary times.
29    fn local(&self) -> bool { true }
30
31    /// The number of inputs.
32    fn inputs(&self) -> usize;
33    /// The number of outputs.
34    fn outputs(&self) -> usize;
35
36    /// Fetches summary information about internal structure of the operator.
37    ///
38    /// Each operator must summarize its internal structure by a map from pairs `(input, output)`
39    /// to an antichain of timestamp summaries, indicating how a timestamp on any of its inputs may
40    /// be transformed to timestamps on any of its outputs.
41    ///
42    /// Each operator must also indicate whether it initially holds any capabilities on any of its
43    /// outputs, so that the parent operator can properly initialize its progress information.
44    ///
45    /// The default behavior is to indicate that timestamps on any input can emerge unchanged on
46    /// any output, and no initial capabilities are held.
47    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>);
48
49    /// Signals that external frontiers have been set.
50    ///
51    /// By default this method does nothing, and leaves all changes in the `frontiers` element
52    /// of the shared progress state. An operator should be able to consult `frontiers` at any
53    /// point and read out the current frontier information, or the changes from the last time
54    /// that `frontiers` was drained.
55    fn set_external_summary(&mut self) { }
56
57    /// Indicates of whether the operator requires `push_external_progress` information or not.
58    fn notify_me(&self) -> bool { true }
59}
60
61/// Progress information shared between parent and child.
62#[derive(Debug)]
63pub struct SharedProgress<T: Timestamp> {
64    /// Frontier capability changes reported by the parent scope.
65    pub frontiers: Vec<ChangeBatch<T>>,
66    /// Consumed message changes reported by the child operator.
67    pub consumeds: Vec<ChangeBatch<T>>,
68    /// Internal capability changes reported by the child operator.
69    pub internals: Vec<ChangeBatch<T>>,
70    /// Produced message changes reported by the child operator.
71    pub produceds: Vec<ChangeBatch<T>>,
72}
73
74impl<T: Timestamp> SharedProgress<T> {
75    /// Allocates a new shared progress structure.
76    pub fn new(inputs: usize, outputs: usize) -> Self {
77        SharedProgress {
78            frontiers: vec![ChangeBatch::new(); inputs],
79            consumeds: vec![ChangeBatch::new(); inputs],
80            internals: vec![ChangeBatch::new(); outputs],
81            produceds: vec![ChangeBatch::new(); outputs],
82        }
83    }
84}