timely/dataflow/operators/generic/
builder_raw.rs1use std::default::Default;
8use std::rc::Rc;
9use std::cell::RefCell;
10
11use crate::scheduling::{Schedule, Activations};
12
13use crate::progress::{Source, Target};
14use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
15
16use crate::Container;
17use crate::dataflow::{StreamCore, Scope};
18use crate::dataflow::channels::pushers::TeeCore;
19use crate::dataflow::channels::pact::ParallelizationContractCore;
20use crate::dataflow::operators::generic::operator_info::OperatorInfo;
21
22#[derive(Debug)]
24pub struct OperatorShape {
25 name: String, notify: bool, peers: usize, inputs: usize, outputs: usize, }
31
32impl OperatorShape {
34 fn new(name: String, peers: usize) -> Self {
35 OperatorShape {
36 name,
37 notify: true,
38 peers,
39 inputs: 0,
40 outputs: 0,
41 }
42 }
43
44 pub fn inputs(&self) -> usize {
46 self.inputs
47 }
48
49 pub fn outputs(&self) -> usize {
51 self.outputs
52 }
53}
54
55#[derive(Debug)]
57pub struct OperatorBuilder<G: Scope> {
58 scope: G,
59 index: usize,
60 global: usize,
61 address: Vec<usize>, shape: OperatorShape,
63 summary: Vec<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>,
64}
65
66impl<G: Scope> OperatorBuilder<G> {
67
68 pub fn new(name: String, mut scope: G) -> Self {
70
71 let global = scope.new_identifier();
72 let index = scope.allocate_operator_index();
73 let mut address = scope.addr();
74 address.push(index);
75 let peers = scope.peers();
76
77 OperatorBuilder {
78 scope,
79 index,
80 global,
81 address,
82 shape: OperatorShape::new(name, peers),
83 summary: vec![],
84 }
85 }
86
87 pub fn index(&self) -> usize {
89 self.index
90 }
91
92 pub fn global(&self) -> usize {
94 self.global
95 }
96
97 pub fn shape(&self) -> &OperatorShape {
99 &self.shape
100 }
101
102 pub fn set_notify(&mut self, notify: bool) {
104 self.shape.notify = notify;
105 }
106
107 pub fn new_input<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P) -> P::Puller
109 where
110 P: ParallelizationContractCore<G::Timestamp, D> {
111 let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs];
112 self.new_input_connection(stream, pact, connection)
113 }
114
115 pub fn new_input_connection<D: Container, P>(&mut self, stream: &StreamCore<G, D>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> P::Puller
117 where
118 P: ParallelizationContractCore<G::Timestamp, D> {
119
120 let channel_id = self.scope.new_identifier();
121 let logging = self.scope.logging();
122 let (sender, receiver) = pact.connect(&mut self.scope, channel_id, &self.address[..], logging);
123 let target = Target::new(self.index, self.shape.inputs);
124 stream.connect_to(target, sender, channel_id);
125
126 self.shape.inputs += 1;
127 assert_eq!(self.shape.outputs, connection.len());
128 self.summary.push(connection);
129
130 receiver
131 }
132
133 pub fn new_output<D: Container>(&mut self) -> (TeeCore<G::Timestamp, D>, StreamCore<G, D>) {
135
136 let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs];
137 self.new_output_connection(connection)
138 }
139
140 pub fn new_output_connection<D: Container>(&mut self, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> (TeeCore<G::Timestamp, D>, StreamCore<G, D>) {
142
143 let (targets, registrar) = TeeCore::<G::Timestamp,D>::new();
144 let source = Source::new(self.index, self.shape.outputs);
145 let stream = StreamCore::new(source, registrar, self.scope.clone());
146
147 self.shape.outputs += 1;
148 assert_eq!(self.shape.inputs, connection.len());
149 for (summary, entry) in self.summary.iter_mut().zip(connection.into_iter()) {
150 summary.push(entry);
151 }
152
153 (targets, stream)
154 }
155
156 pub fn build<L>(mut self, logic: L)
158 where
159 L: FnMut(&mut SharedProgress<G::Timestamp>)->bool+'static
160 {
161 let inputs = self.shape.inputs;
162 let outputs = self.shape.outputs;
163
164 let operator = OperatorCore {
165 shape: self.shape,
166 address: self.address,
167 activations: self.scope.activations(),
168 logic,
169 shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
170 summary: self.summary,
171 };
172
173 self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
174 }
175
176 pub fn operator_info(&self) -> OperatorInfo {
178 OperatorInfo::new(self.index, self.global, &self.address[..])
179 }
180}
181
182struct OperatorCore<T, L>
183where
184 T: Timestamp,
185 L: FnMut(&mut SharedProgress<T>)->bool+'static,
186{
187 shape: OperatorShape,
188 address: Vec<usize>,
189 logic: L,
190 shared_progress: Rc<RefCell<SharedProgress<T>>>,
191 activations: Rc<RefCell<Activations>>,
192 summary: Vec<Vec<Antichain<T::Summary>>>,
193}
194
195impl<T, L> Schedule for OperatorCore<T, L>
196where
197 T: Timestamp,
198 L: FnMut(&mut SharedProgress<T>)->bool+'static,
199{
200 fn name(&self) -> &str { &self.shape.name }
201 fn path(&self) -> &[usize] { &self.address[..] }
202 fn schedule(&mut self) -> bool {
203 let shared_progress = &mut *self.shared_progress.borrow_mut();
204 (self.logic)(shared_progress)
205 }
206}
207
208impl<T, L> Operate<T> for OperatorCore<T, L>
209where
210 T: Timestamp,
211 L: FnMut(&mut SharedProgress<T>)->bool+'static,
212{
213 fn inputs(&self) -> usize { self.shape.inputs }
214 fn outputs(&self) -> usize { self.shape.outputs }
215
216 fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
218
219 self.activations.borrow_mut().activate(&self.address[..]);
221
222 self.shared_progress
224 .borrow_mut()
225 .internals
226 .iter_mut()
227 .for_each(|output| output.update(T::minimum(), self.shape.peers as i64));
228
229 (self.summary.clone(), self.shared_progress.clone())
230 }
231
232 fn set_external_summary(&mut self) {
234 self.schedule();
236 }
237
238 fn notify_me(&self) -> bool { self.shape.notify }
239}