1use serde::Deserialize;
2use std::{
3 collections::HashSet,
4 marker::PhantomData,
5 sync::{Arc, Mutex},
6};
7
8use crate::{
9 dataflow::{
10 context::{OneInOneOutContext, ParallelOneInOneOutContext, SetupContext},
11 deadlines::{ConditionContext, DeadlineEvent, DeadlineId},
12 operator::{OneInOneOut, OperatorConfig, ParallelOneInOneOut},
13 stream::{StreamId, WriteStreamT},
14 AppendableState, Data, Message, ReadStream, State, Timestamp, WriteStream,
15 },
16 node::{
17 operator_event::{OperatorEvent, OperatorType},
18 operator_executors::OneInMessageProcessorT,
19 },
20 Uuid,
21};
22
23pub struct ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
31where
32 O: 'static + ParallelOneInOneOut<S, T, U, V>,
33 S: AppendableState<V>,
34 T: Data + for<'a> Deserialize<'a>,
35 U: Data + for<'a> Deserialize<'a>,
36 V: 'static + Send + Sync,
37{
38 config: OperatorConfig,
39 operator: Arc<O>,
40 state: Arc<S>,
41 state_ids: HashSet<Uuid>,
42 write_stream: WriteStream<U>,
43 phantom_t: PhantomData<T>,
44 phantom_v: PhantomData<V>,
45}
46
47impl<O, S, T, U, V> ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
48where
49 O: 'static + ParallelOneInOneOut<S, T, U, V>,
50 S: AppendableState<V>,
51 T: Data + for<'a> Deserialize<'a>,
52 U: Data + for<'a> Deserialize<'a>,
53 V: 'static + Send + Sync,
54{
55 pub fn new(
56 config: OperatorConfig,
57 operator_fn: impl Fn() -> O + Send,
58 state_fn: impl Fn() -> S + Send,
59 write_stream: WriteStream<U>,
60 ) -> Self {
61 Self {
62 config,
63 operator: Arc::new(operator_fn()),
64 state: Arc::new(state_fn()),
65 state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
66 write_stream,
67 phantom_t: PhantomData,
68 phantom_v: PhantomData,
69 }
70 }
71}
72
73impl<O, S, T, U, V> OneInMessageProcessorT<S, T>
74 for ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
75where
76 O: 'static + ParallelOneInOneOut<S, T, U, V>,
77 S: AppendableState<V>,
78 T: Data + for<'a> Deserialize<'a>,
79 U: Data + for<'a> Deserialize<'a>,
80 V: 'static + Send + Sync,
81{
82 fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
83 let mut setup_context =
84 SetupContext::new(vec![read_stream.id()], vec![self.write_stream.id()]);
85 Arc::get_mut(&mut self.operator)
86 .unwrap()
87 .setup(&mut setup_context);
88 setup_context
89 }
90
91 fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
92 Arc::get_mut(&mut self.operator).unwrap().run(
93 &self.config,
94 read_stream,
95 &mut self.write_stream,
96 );
97 }
98
99 fn execute_destroy(&mut self) {
100 Arc::get_mut(&mut self.operator).unwrap().destroy();
101 }
102
103 fn cleanup(&mut self) {
104 if !self.write_stream.is_closed() {
105 self.write_stream
106 .send(Message::new_watermark(Timestamp::Top))
107 .unwrap_or_else(|_| {
108 panic!(
109 "[ParallelOneInOneOut] Error sending Top watermark for operator {}",
110 self.config.get_name()
111 )
112 });
113 }
114 }
115
116 fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
117 let operator = Arc::clone(&self.operator);
119 let state = Arc::clone(&self.state);
120 let time = msg.timestamp().clone();
121 let config = self.config.clone();
122 let write_stream = self.write_stream.clone();
123
124 OperatorEvent::new(
125 time.clone(),
126 false,
127 0,
128 HashSet::new(),
129 HashSet::new(),
130 move || {
131 operator.on_data(
132 &ParallelOneInOneOutContext::new(time, config, &state, write_stream),
133 msg.data().unwrap(),
134 )
135 },
136 OperatorType::Parallel,
137 )
138 }
139
140 fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
141 let operator = Arc::clone(&self.operator);
143 let state = Arc::clone(&self.state);
144 let time = timestamp.clone();
145 let config = self.config.clone();
146 let write_stream = self.write_stream.clone();
147
148 if self.config.flow_watermarks {
149 let mut write_stream_copy = self.write_stream.clone();
150 let time_copy = time.clone();
151 OperatorEvent::new(
152 time.clone(),
153 true,
154 127,
155 HashSet::new(),
156 self.state_ids.clone(),
157 move || {
158 operator.on_watermark(&mut ParallelOneInOneOutContext::new(
160 time,
161 config,
162 &state,
163 write_stream,
164 ));
165
166 write_stream_copy
168 .send(Message::new_watermark(time_copy.clone()))
169 .ok();
170
171 state.commit(&time_copy);
173 },
174 OperatorType::Parallel,
175 )
176 } else {
177 OperatorEvent::new(
178 time.clone(),
179 true,
180 0,
181 HashSet::new(),
182 self.state_ids.clone(),
183 move || {
184 operator.on_watermark(&mut ParallelOneInOneOutContext::new(
186 time.clone(),
187 config,
188 &state,
189 write_stream,
190 ));
191
192 state.commit(&time);
194 },
195 OperatorType::Parallel,
196 )
197 }
198 }
199
200 fn arm_deadlines(
201 &self,
202 setup_context: &mut SetupContext<S>,
203 read_stream_ids: Vec<StreamId>,
204 condition_context: &ConditionContext,
205 timestamp: Timestamp,
206 ) -> Vec<DeadlineEvent> {
207 let mut deadline_events = Vec::new();
208 let state = Arc::clone(&self.state);
209 for deadline in setup_context.deadlines() {
210 if deadline
211 .get_constrained_read_stream_ids()
212 .is_superset(&read_stream_ids.iter().cloned().collect())
213 && deadline.invoke_start_condition(&read_stream_ids, condition_context, ×tamp)
214 {
215 let deadline_duration = deadline.calculate_deadline(&state, ×tamp);
217 deadline_events.push(DeadlineEvent::new(
218 deadline.get_constrained_read_stream_ids().clone(),
219 deadline.get_constrained_write_stream_ids().clone(),
220 timestamp.clone(),
221 deadline_duration,
222 deadline.get_end_condition_fn(),
223 deadline.id(),
224 ));
225 }
226 }
227 deadline_events
228 }
229
230 fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
231 let write_stream_id = self.write_stream.id();
232 if deadline_event.write_stream_ids.contains(&write_stream_id) {
233 return (deadline_event.end_condition)(
235 &[write_stream_id],
236 &self.write_stream.get_condition_context(),
237 &deadline_event.timestamp,
238 );
239 }
240 false
241 }
242
243 fn invoke_handler(
244 &self,
245 setup_context: &mut SetupContext<S>,
246 deadline_id: DeadlineId,
247 timestamp: Timestamp,
248 ) {
249 setup_context.invoke_handler(deadline_id, &(*self.state), ×tamp);
250 }
251}
252
253pub struct OneInOneOutMessageProcessor<O, S, T, U>
260where
261 O: 'static + OneInOneOut<S, T, U>,
262 S: State,
263 T: Data + for<'a> Deserialize<'a>,
264 U: Data + for<'a> Deserialize<'a>,
265{
266 config: OperatorConfig,
267 operator: Arc<Mutex<O>>,
268 state: Arc<Mutex<S>>,
269 state_ids: HashSet<Uuid>,
270 write_stream: WriteStream<U>,
271 phantom_t: PhantomData<T>,
272}
273
274impl<O, S, T, U> OneInOneOutMessageProcessor<O, S, T, U>
275where
276 O: 'static + OneInOneOut<S, T, U>,
277 S: State,
278 T: Data + for<'a> Deserialize<'a>,
279 U: Data + for<'a> Deserialize<'a>,
280{
281 pub fn new(
282 config: OperatorConfig,
283 operator_fn: impl Fn() -> O + Send,
284 state_fn: impl Fn() -> S + Send,
285 write_stream: WriteStream<U>,
286 ) -> Self {
287 Self {
288 config,
289 operator: Arc::new(Mutex::new(operator_fn())),
290 state: Arc::new(Mutex::new(state_fn())),
291 state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
292 write_stream,
293 phantom_t: PhantomData,
294 }
295 }
296}
297
298impl<O, S, T, U> OneInMessageProcessorT<S, T> for OneInOneOutMessageProcessor<O, S, T, U>
299where
300 O: 'static + OneInOneOut<S, T, U>,
301 S: State,
302 T: Data + for<'a> Deserialize<'a>,
303 U: Data + for<'a> Deserialize<'a>,
304{
305 fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
306 let mut setup_context =
307 SetupContext::new(vec![read_stream.id()], vec![self.write_stream.id()]);
308 self.operator.lock().unwrap().setup(&mut setup_context);
309 setup_context
310 }
311
312 fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
313 self.operator
314 .lock()
315 .unwrap()
316 .run(&self.config, read_stream, &mut self.write_stream);
317 }
318
319 fn execute_destroy(&mut self) {
320 self.operator.lock().unwrap().destroy();
321 }
322
323 fn cleanup(&mut self) {
324 if !self.write_stream.is_closed() {
325 self.write_stream
326 .send(Message::new_watermark(Timestamp::Top))
327 .unwrap_or_else(|_| {
328 panic!(
329 "[OneInOneOut] Error sending Top watermark for operator {}",
330 self.config.get_name()
331 )
332 });
333 }
334 }
335
336 fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
337 let operator = Arc::clone(&self.operator);
339 let state = Arc::clone(&self.state);
340 let time = msg.timestamp().clone();
341 let config = self.config.clone();
342 let write_stream = self.write_stream.clone();
343
344 OperatorEvent::new(
345 time.clone(),
346 false,
347 0,
348 HashSet::new(),
349 HashSet::new(),
350 move || {
351 let mut mutable_operator = operator.lock().unwrap();
353 let mut mutable_state = state.lock().unwrap();
354
355 mutable_operator.on_data(
356 &mut OneInOneOutContext::new(time, config, &mut mutable_state, write_stream),
357 msg.data().unwrap(),
358 )
359 },
360 OperatorType::Sequential,
361 )
362 }
363
364 fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
365 let operator = Arc::clone(&self.operator);
367 let state = Arc::clone(&self.state);
368 let config = self.config.clone();
369 let time = timestamp.clone();
370 let write_stream = self.write_stream.clone();
371
372 if self.config.flow_watermarks {
373 let mut write_stream_copy = self.write_stream.clone();
374 let time_copy = time.clone();
375 OperatorEvent::new(
376 time.clone(),
377 true,
378 127,
379 HashSet::new(),
380 self.state_ids.clone(),
381 move || {
382 let mut mutable_operator = operator.lock().unwrap();
384 let mut mutable_state = state.lock().unwrap();
385
386 mutable_operator.on_watermark(&mut OneInOneOutContext::new(
387 time,
388 config,
389 &mut mutable_state,
390 write_stream,
391 ));
392
393 write_stream_copy
395 .send(Message::new_watermark(time_copy.clone()))
396 .ok();
397
398 mutable_state.commit(&time_copy);
400 },
401 OperatorType::Sequential,
402 )
403 } else {
404 OperatorEvent::new(
405 time.clone(),
406 true,
407 0,
408 HashSet::new(),
409 self.state_ids.clone(),
410 move || {
411 let mut mutable_operator = operator.lock().unwrap();
413 let mut mutable_state = state.lock().unwrap();
414
415 mutable_operator.on_watermark(&mut OneInOneOutContext::new(
416 time.clone(),
417 config,
418 &mut mutable_state,
419 write_stream,
420 ));
421
422 mutable_state.commit(&time);
424 },
425 OperatorType::Sequential,
426 )
427 }
428 }
429
430 fn arm_deadlines(
431 &self,
432 setup_context: &mut SetupContext<S>,
433 read_stream_ids: Vec<StreamId>,
434 condition_context: &ConditionContext,
435 timestamp: Timestamp,
436 ) -> Vec<DeadlineEvent> {
437 let mut deadline_events = Vec::new();
438 let state = Arc::clone(&self.state);
439 for deadline in setup_context.deadlines() {
440 if deadline
441 .get_constrained_read_stream_ids()
442 .is_superset(&read_stream_ids.iter().cloned().collect())
443 && deadline.invoke_start_condition(&read_stream_ids, condition_context, ×tamp)
444 {
445 let deadline_duration =
447 deadline.calculate_deadline(&(*state.lock().unwrap()), ×tamp);
448 deadline_events.push(DeadlineEvent::new(
449 deadline.get_constrained_read_stream_ids().clone(),
450 deadline.get_constrained_write_stream_ids().clone(),
451 timestamp.clone(),
452 deadline_duration,
453 deadline.get_end_condition_fn(),
454 deadline.id(),
455 ));
456 }
457 }
458 deadline_events
459 }
460
461 fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
462 let write_stream_id = self.write_stream.id();
463 if deadline_event.write_stream_ids.contains(&write_stream_id) {
464 return (deadline_event.end_condition)(
466 &[write_stream_id],
467 &self.write_stream.get_condition_context(),
468 &deadline_event.timestamp,
469 );
470 }
471 false
472 }
473
474 fn invoke_handler(
475 &self,
476 setup_context: &mut SetupContext<S>,
477 deadline_id: DeadlineId,
478 timestamp: Timestamp,
479 ) {
480 setup_context.invoke_handler(deadline_id, &(*self.state.lock().unwrap()), ×tamp);
481 }
482}