timely/dataflow/operators/generic/
builder_rc.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::default::Default;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::progress::operate::SharedProgress;
9use crate::progress::frontier::{Antichain, MutableAntichain};
10
11use crate::Container;
12use crate::dataflow::{Scope, StreamCore};
13use crate::dataflow::channels::pushers::TeeCore;
14use crate::dataflow::channels::pushers::CounterCore as PushCounter;
15use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer;
16use crate::dataflow::channels::pact::ParallelizationContractCore;
17use crate::dataflow::channels::pullers::Counter as PullCounter;
18use crate::dataflow::operators::capability::Capability;
19use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21use crate::dataflow::operators::generic::builder_raw::OperatorShape;
22
23use crate::logging::TimelyLogger as Logger;
24
25use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
26
27#[derive(Debug)]
29pub struct OperatorBuilder<G: Scope> {
30 builder: OperatorBuilderRaw<G>,
31 frontier: Vec<MutableAntichain<G::Timestamp>>,
32 consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
33 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
34 summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
36 produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
37 logging: Option<Logger>,
38}
39
40impl<G: Scope> OperatorBuilder<G> {
41
42 pub fn new(name: String, scope: G) -> Self {
44 let logging = scope.logging();
45 OperatorBuilder {
46 builder: OperatorBuilderRaw::new(name, scope),
47 frontier: Vec::new(),
48 consumed: Vec::new(),
49 internal: Rc::new(RefCell::new(Vec::new())),
50 summaries: Vec::new(),
51 produced: Vec::new(),
52 logging,
53 }
54 }
55
56 pub fn set_notify(&mut self, notify: bool) {
58 self.builder.set_notify(notify);
59 }
60
61 pub fn new_input<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P) -> InputHandleCore<G::Timestamp, D, P::Puller>
63 where
64 P: ParallelizationContractCore<G::Timestamp, D> {
65
66 let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()];
67 self.new_input_connection(stream, pact, connection)
68 }
69
70 pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, D, P::Puller>
79 where
80 P: ParallelizationContractCore<G::Timestamp, D> {
81
82 let puller = self.builder.new_input_connection(stream, pact, connection.clone());
83
84 let input = PullCounter::new(puller);
85 self.frontier.push(MutableAntichain::new());
86 self.consumed.push(input.consumed().clone());
87
88 let shared_summary = Rc::new(RefCell::new(connection));
89 self.summaries.push(shared_summary.clone());
90
91 new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone())
92 }
93
94 pub fn new_output<D: Container>(&mut self) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {
96 let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()];
97 self.new_output_connection(connection)
98 }
99
100 pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (OutputWrapper<G::Timestamp, D, TeeCore<G::Timestamp, D>>, StreamCore<G, D>) {
109
110 let (tee, stream) = self.builder.new_output_connection(connection.clone());
111
112 let internal = Rc::new(RefCell::new(ChangeBatch::new()));
113 self.internal.borrow_mut().push(internal.clone());
114
115 let mut buffer = PushBuffer::new(PushCounter::new(tee));
116 self.produced.push(buffer.inner().produced().clone());
117
118 for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
119 summary.borrow_mut().push(connection.clone());
120 }
121
122 (OutputWrapper::new(buffer, internal), stream)
123 }
124
125 pub fn build<B, L>(self, constructor: B)
127 where
128 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
129 L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
130 {
131 self.build_reschedule(|caps| {
132 let mut logic = constructor(caps);
133 move |frontier| { logic(frontier); false }
134 })
135 }
136
137 pub fn build_reschedule<B, L>(self, constructor: B)
144 where
145 B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
146 L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
147 {
148 let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
150 for batch in self.internal.borrow().iter() {
151 capabilities.push(Capability::new(G::Timestamp::minimum(), batch.clone()));
152 batch.borrow_mut().clear();
154 }
155
156 let mut logic = constructor(capabilities);
157
158 let mut self_frontier = self.frontier;
159 let self_consumed = self.consumed;
160 let self_internal = self.internal;
161 let self_produced = self.produced;
162
163 let raw_logic =
164 move |progress: &mut SharedProgress<G::Timestamp>| {
165
166 for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
168 frontier.update_iter(progress.drain());
169 }
170
171 let result = logic(&self_frontier[..]);
173
174 for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
176 consumed.borrow_mut().drain_into(progress);
177 }
178
179 let self_internal_borrow = self_internal.borrow_mut();
181 for index in 0 .. self_internal_borrow.len() {
182 let mut borrow = self_internal_borrow[index].borrow_mut();
183 progress.internals[index].extend(borrow.drain());
184 }
185
186 for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
188 produced.borrow_mut().drain_into(progress);
189 }
190
191 result
192 };
193
194 self.builder.build(raw_logic);
195 }
196
197 pub fn index(&self) -> usize {
199 self.builder.index()
200 }
201
202 pub fn global(&self) -> usize {
204 self.builder.global()
205 }
206
207 pub fn shape(&self) -> &OperatorShape {
209 self.builder.shape()
210 }
211
212 pub fn operator_info(&self) -> OperatorInfo {
214 self.builder.operator_info()
215 }
216}
217
218
219#[cfg(test)]
220mod tests {
221
222 #[test]
223 #[should_panic]
224 fn incorrect_capabilities() {
225
226 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
230
231 crate::example(|scope| {
232
233 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
234
235 let (mut output1, _stream1) = builder.new_output::<Vec<()>>();
237 let (mut output2, _stream2) = builder.new_output::<Vec<()>>();
238
239 builder.build(move |capabilities| {
240 move |_frontiers| {
241
242 let mut output_handle1 = output1.activate();
243 let mut output_handle2 = output2.activate();
244
245 output_handle2.session(&capabilities[0]);
247 output_handle1.session(&capabilities[1]);
248 }
249 });
250 })
251 }
252
253 #[test]
254 fn correct_capabilities() {
255
256 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
260
261 crate::example(|scope| {
262
263 let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
264
265 let (mut output1, _stream1) = builder.new_output::<Vec<()>>();
267 let (mut output2, _stream2) = builder.new_output::<Vec<()>>();
268
269 builder.build(move |mut capabilities| {
270 move |_frontiers| {
271
272 let mut output_handle1 = output1.activate();
273 let mut output_handle2 = output2.activate();
274
275 if !capabilities.is_empty() {
277
278 output_handle1.session(&capabilities[0]);
280 output_handle2.session(&capabilities[1]);
281
282 capabilities.clear();
283 }
284 }
285 });
286
287 "Hello".to_owned()
288 });
289 }
290}