timely/dataflow/operators/generic/
builder_rc.rs

1//! Types to build operators with general shapes.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, StreamCore};
13use crate::dataflow::channels::pushers::TeeCore;
14use crate::dataflow::channels::pushers::CounterCore as PushCounter;
15use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer;
16use crate::dataflow::channels::pact::ParallelizationContractCore;
17use crate::dataflow::channels::pullers::Counter as PullCounter;
18use crate::dataflow::operators::capability::Capability;
19use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21use crate::dataflow::operators::generic::builder_raw::OperatorShape;
22
23use crate::logging::TimelyLogger as Logger;
24
25use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
26
27/// Builds operators with generic shape.
28#[derive(Debug)]
29pub struct OperatorBuilder<G: Scope> {
30    builder: OperatorBuilderRaw<G>,
31    frontier: Vec<MutableAntichain<G::Timestamp>>,
32    consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
33    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
34    /// For each input, a shared list of summaries to each output.
35    summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36    produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
37    logging: Option<Logger>,
38}
39
40impl<G: Scope> OperatorBuilder<G> {
41
42    /// Allocates a new generic operator builder from its containing scope.
43    pub fn new(name: String, scope: G) -> Self {
44        let logging = scope.logging();
45        OperatorBuilder {
46            builder: OperatorBuilderRaw::new(name, scope),
47            frontier: Vec::new(),
48            consumed: Vec::new(),
49            internal: Rc::new(RefCell::new(Vec::new())),
50            summaries: Vec::new(),
51            produced: Vec::new(),
52            logging,
53        }
54    }
55
56    /// Indicates whether the operator requires frontier information.
57    pub fn set_notify(&mut self, notify: bool) {
58        self.builder.set_notify(notify);
59    }
60
61    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
62    pub fn new_input<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P) -> InputHandleCore<G::Timestamp, D, P::Puller>
63    where
64        P: ParallelizationContractCore<G::Timestamp, D> {
65
66        let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()];
67        self.new_input_connection(stream, pact, connection)
68    }
69
70    /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
71    ///
72    /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
73    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
74    /// greater or equal to some element of the corresponding antichain in `connection`.
75    ///
76    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
77    /// antichain indicating that there is no connection from the input to the output.
78    pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, D, P::Puller>
79        where
80            P: ParallelizationContractCore<G::Timestamp, D> {
81
82        let puller = self.builder.new_input_connection(stream, pact, connection.clone());
83
84        let input = PullCounter::new(puller);
85        self.frontier.push(MutableAntichain::new());
86        self.consumed.push(input.consumed().clone());
87
88        let shared_summary = Rc::new(RefCell::new(connection));
89        self.summaries.push(shared_summary.clone());
90
91        new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone())
92    }
93
94    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
95    pub fn new_output<D: Container>(&mut self) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {
96        let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
97        self.new_output_connection(connection)
98    }
99
100    /// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
101    ///
102    /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
103    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
104    /// greater or equal to some element of the corresponding antichain in `connection`.
105    ///
106    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
107    /// antichain indicating that there is no connection from the input to the output.
108    pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {
109
110        let (tee, stream) = self.builder.new_output_connection(connection.clone());
111
112        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
113        self.internal.borrow_mut().push(internal.clone());
114
115        let mut buffer = PushBuffer::new(PushCounter::new(tee));
116        self.produced.push(buffer.inner().produced().clone());
117
118        for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
119            summary.borrow_mut().push(connection.clone());
120        }
121
122        (OutputWrapper::new(buffer, internal), stream)
123    }
124
125    /// Creates an operator implementation from supplied logic constructor.
126    pub fn build<B, L>(self, constructor: B)
127    where
128        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
129        L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
130    {
131        self.build_reschedule(|caps| {
132            let mut logic = constructor(caps);
133            move |frontier| { logic(frontier); false }
134        })
135    }
136
137    /// Creates an operator implementation from supplied logic constructor.
138    ///
139    /// Unlike `build`, the supplied closure can indicate if the operator
140    /// should be considered incomplete. The `build` method indicates that
141    /// the operator is never incomplete and can be shut down at the system's
142    /// discretion.
143    pub fn build_reschedule<B, L>(self, constructor: B)
144    where
145        B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
146        L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
147    {
148        // create capabilities, discard references to their creation.
149        let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
150        for batch in self.internal.borrow().iter() {
151            capabilities.push(Capability::new(G::Timestamp::minimum(), batch.clone()));
152            // Discard evidence of creation, as we are assumed to start with one.
153            batch.borrow_mut().clear();
154        }
155
156        let mut logic = constructor(capabilities);
157
158        let mut self_frontier = self.frontier;
159        let self_consumed = self.consumed;
160        let self_internal = self.internal;
161        let self_produced = self.produced;
162
163        let raw_logic =
164        move |progress: &mut SharedProgress<G::Timestamp>| {
165
166            // drain frontier changes
167            for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
168                frontier.update_iter(progress.drain());
169            }
170
171            // invoke supplied logic
172            let result = logic(&self_frontier[..]);
173
174            // move batches of consumed changes.
175            for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
176                consumed.borrow_mut().drain_into(progress);
177            }
178
179            // move batches of internal changes.
180            let self_internal_borrow = self_internal.borrow_mut();
181            for index in 0 .. self_internal_borrow.len() {
182                let mut borrow = self_internal_borrow[index].borrow_mut();
183                progress.internals[index].extend(borrow.drain());
184            }
185
186            // move batches of produced changes.
187            for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
188                produced.borrow_mut().drain_into(progress);
189            }
190
191            result
192        };
193
194        self.builder.build(raw_logic);
195    }
196
197    /// Get the identifier assigned to the operator being constructed
198    pub fn index(&self) -> usize {
199        self.builder.index()
200    }
201
202    /// The operator's worker-unique identifier.
203    pub fn global(&self) -> usize {
204        self.builder.global()
205    }
206
207    /// Return a reference to the operator's shape
208    pub fn shape(&self) -> &OperatorShape {
209        self.builder.shape()
210    }
211
212    /// Creates operator info for the operator.
213    pub fn operator_info(&self) -> OperatorInfo {
214        self.builder.operator_info()
215    }
216}
217
218
219#[cfg(test)]
220mod tests {
221
222    #[test]
223    #[should_panic]
224    fn incorrect_capabilities() {
225
226        // This tests that if we attempt to use a capability associated with the
227        // wrong output, there is a run-time assertion.
228
229        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
230
231        crate::example(|scope| {
232
233            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
234
235            // let mut input = builder.new_input(stream, Pipeline);
236            let (mut output1, _stream1) = builder.new_output::<Vec<()>>();
237            let (mut output2, _stream2) = builder.new_output::<Vec<()>>();
238
239            builder.build(move |capabilities| {
240                move |_frontiers| {
241
242                    let mut output_handle1 = output1.activate();
243                    let mut output_handle2 = output2.activate();
244
245                    // NOTE: Using incorrect capabilities here.
246                    output_handle2.session(&capabilities[0]);
247                    output_handle1.session(&capabilities[1]);
248                }
249            });
250        })
251    }
252
253    #[test]
254    fn correct_capabilities() {
255
256        // This tests that if we attempt to use capabilities with the correct outputs
257        // there is no runtime assertion
258
259        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
260
261        crate::example(|scope| {
262
263            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
264
265            // let mut input = builder.new_input(stream, Pipeline);
266            let (mut output1, _stream1) = builder.new_output::<Vec<()>>();
267            let (mut output2, _stream2) = builder.new_output::<Vec<()>>();
268
269            builder.build(move |mut capabilities| {
270                move |_frontiers| {
271
272                    let mut output_handle1 = output1.activate();
273                    let mut output_handle2 = output2.activate();
274
275                    // Avoid second call.
276                    if !capabilities.is_empty() {
277
278                        // NOTE: Using correct capabilities here.
279                        output_handle1.session(&capabilities[0]);
280                        output_handle2.session(&capabilities[1]);
281
282                        capabilities.clear();
283                    }
284                }
285            });
286
287            "Hello".to_owned()
288        });
289    }
290}