timely/dataflow/operators/generic/
builder_raw.rs

1//! Types to build operators with general shapes.
2//!
3//! These types expose some raw timely interfaces, and while public so that others can build on them,
4//! they require some sophistication to use correctly. I recommend checking out `builder_rc.rs` for
5//! an interface that is intentionally harder to mis-use.
6
7use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15
16use crate::Container;
17use crate::dataflow::{StreamCore, Scope};
18use crate::dataflow::channels::pushers::TeeCore;
19use crate::dataflow::channels::pact::ParallelizationContractCore;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22/// Contains type-free information about the operator properties.
23#[derive(Debug)]
24pub struct OperatorShape {
25    name: String,   // A meaningful name for the operator.
26    notify: bool,   // Does the operator require progress notifications.
27    peers: usize,   // The total number of workers in the computation.
28    inputs: usize,  // The number of input ports.
29    outputs: usize, // The number of output ports.
30}
31
32/// Core data for the structure of an operator, minus scope and logic.
33impl OperatorShape {
34    fn new(name: String, peers: usize) -> Self {
35        OperatorShape {
36            name,
37            notify: true,
38            peers,
39            inputs: 0,
40            outputs: 0,
41        }
42    }
43
44    /// The number of inputs of this operator
45    pub fn inputs(&self) -> usize {
46        self.inputs
47    }
48
49    /// The number of outputs of this operator
50    pub fn outputs(&self) -> usize {
51        self.outputs
52    }
53}
54
55/// Builds operators with generic shape.
56#[derive(Debug)]
57pub struct OperatorBuilder<G: Scope> {
58    scope: G,
59    index: usize,
60    global: usize,
61    address: Vec<usize>,    // path to the operator (ending with index).
62    shape: OperatorShape,
63    summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
64}
65
66impl<G: Scope> OperatorBuilder<G> {
67
68    /// Allocates a new generic operator builder from its containing scope.
69    pub fn new(name: String, mut scope: G) -> Self {
70
71        let global = scope.new_identifier();
72        let index = scope.allocate_operator_index();
73        let mut address = scope.addr();
74        address.push(index);
75        let peers = scope.peers();
76
77        OperatorBuilder {
78            scope,
79            index,
80            global,
81            address,
82            shape: OperatorShape::new(name, peers),
83            summary: vec![],
84        }
85    }
86
87    /// The operator's scope-local index.
88    pub fn index(&self) -> usize {
89        self.index
90    }
91
92    /// The operator's worker-unique identifier.
93    pub fn global(&self) -> usize {
94        self.global
95    }
96
97    /// Return a reference to the operator's shape
98    pub fn shape(&self) -> &OperatorShape {
99        &self.shape
100    }
101
102    /// Indicates whether the operator requires frontier information.
103    pub fn set_notify(&mut self, notify: bool) {
104        self.shape.notify = notify;
105    }
106
107    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
108    pub fn new_input<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P) -> P::Puller
109        where
110            P: ParallelizationContractCore<G::Timestamp, D> {
111        let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
112        self.new_input_connection(stream, pact, connection)
113    }
114
115    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
116    pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
117    where
118        P: ParallelizationContractCore<G::Timestamp, D> {
119
120        let channel_id = self.scope.new_identifier();
121        let logging = self.scope.logging();
122        let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
123        let target = Target::new(self.index, self.shape.inputs);
124        stream.connect_to(target, sender, channel_id);
125
126        self.shape.inputs += 1;
127        assert_eq!(self.shape.outputs, connection.len());
128        self.summary.push(connection);
129
130        receiver
131    }
132
133    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
134    pub fn new_output<D: Container>(&mut self) -> (TeeCore<G::Timestamp, D>, StreamCore<G, D>) {
135
136        let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
137        self.new_output_connection(connection)
138    }
139
140    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
141    pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (TeeCore<G::Timestamp, D>, StreamCore<G, D>) {
142
143        let (targets, registrar) = TeeCore::<G::Timestamp,D>::new();
144        let source = Source::new(self.index, self.shape.outputs);
145        let stream = StreamCore::new(source, registrar, self.scope.clone());
146
147        self.shape.outputs += 1;
148        assert_eq!(self.shape.inputs, connection.len());
149        for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
150            summary.push(entry);
151        }
152
153        (targets, stream)
154    }
155
156    /// Creates an operator implementation from supplied logic constructor.
157    pub fn build<L>(mut self, logic: L)
158    where
159        L: FnMut(&mut SharedProgress<G::Timestamp>)->bool+'static
160    {
161        let inputs = self.shape.inputs;
162        let outputs = self.shape.outputs;
163
164        let operator = OperatorCore {
165            shape: self.shape,
166            address: self.address,
167            activations: self.scope.activations(),
168            logic,
169            shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
170            summary: self.summary,
171        };
172
173        self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
174    }
175
176    /// Information describing the operator.
177    pub fn operator_info(&self) -> OperatorInfo {
178        OperatorInfo::new(self.index, self.global, &self.address[..])
179    }
180}
181
182struct OperatorCore<T, L>
183where
184    T: Timestamp,
185    L: FnMut(&mut SharedProgress<T>)->bool+'static,
186{
187    shape: OperatorShape,
188    address: Vec<usize>,
189    logic: L,
190    shared_progress: Rc<RefCell<SharedProgress<T>>>,
191    activations: Rc<RefCell<Activations>>,
192    summary: Vec<Vec<Antichain<T::Summary>>>,
193}
194
195impl<T, L> Schedule for OperatorCore<T, L>
196where
197    T: Timestamp,
198    L: FnMut(&mut SharedProgress<T>)->bool+'static,
199{
200    fn name(&self) -> &str { &self.shape.name }
201    fn path(&self) -> &[usize] { &self.address[..] }
202    fn schedule(&mut self) -> bool {
203        let shared_progress = &mut *self.shared_progress.borrow_mut();
204        (self.logic)(shared_progress)
205    }
206}
207
208impl<T, L> Operate<T> for OperatorCore<T, L>
209where
210    T: Timestamp,
211    L: FnMut(&mut SharedProgress<T>)->bool+'static,
212{
213    fn inputs(&self) -> usize { self.shape.inputs }
214    fn outputs(&self) -> usize { self.shape.outputs }
215
216    // announce internal topology as fully connected, and hold all default capabilities.
217    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
218
219        // Request the operator to be scheduled at least once.
220        self.activations.borrow_mut().activate(&self.address[..]);
221
222        // by default, we reserve a capability for each output port at `Default::default()`.
223        self.shared_progress
224            .borrow_mut()
225            .internals
226            .iter_mut()
227            .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
228
229        (self.summary.clone(), self.shared_progress.clone())
230    }
231
232    // initialize self.frontier antichains as indicated by hosting scope.
233    fn set_external_summary(&mut self) {
234        // should we schedule the operator here, or just await the first invocation?
235        self.schedule();
236    }
237
238    fn notify_me(&self) -> bool { self.shape.notify }
239}