erdos/node/operator_executors/
one_in_one_out_executor.rs

1use serde::Deserialize;
2use std::{
3    collections::HashSet,
4    marker::PhantomData,
5    sync::{Arc, Mutex},
6};
7
8use crate::{
9    dataflow::{
10        context::{OneInOneOutContext, ParallelOneInOneOutContext, SetupContext},
11        deadlines::{ConditionContext, DeadlineEvent, DeadlineId},
12        operator::{OneInOneOut, OperatorConfig, ParallelOneInOneOut},
13        stream::{StreamId, WriteStreamT},
14        AppendableState, Data, Message, ReadStream, State, Timestamp, WriteStream,
15    },
16    node::{
17        operator_event::{OperatorEvent, OperatorType},
18        operator_executors::OneInMessageProcessorT,
19    },
20    Uuid,
21};
22
23/// Message Processor that defines the generation and execution of events for a ParallelOneInOneOut
24/// operator, where
25/// O: An operator that implements the ParallelOneInOneOut trait,
26/// S: A state structure that implements the AppendableState trait,
27/// T: Type of messages received on the read stream,
28/// U: Type of messages sent on the write stream,
29/// V: Type of intermediate data appended to the state structure S.
30pub struct ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
31where
32    O: 'static + ParallelOneInOneOut<S, T, U, V>,
33    S: AppendableState<V>,
34    T: Data + for<'a> Deserialize<'a>,
35    U: Data + for<'a> Deserialize<'a>,
36    V: 'static + Send + Sync,
37{
38    config: OperatorConfig,
39    operator: Arc<O>,
40    state: Arc<S>,
41    state_ids: HashSet<Uuid>,
42    write_stream: WriteStream<U>,
43    phantom_t: PhantomData<T>,
44    phantom_v: PhantomData<V>,
45}
46
47impl<O, S, T, U, V> ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
48where
49    O: 'static + ParallelOneInOneOut<S, T, U, V>,
50    S: AppendableState<V>,
51    T: Data + for<'a> Deserialize<'a>,
52    U: Data + for<'a> Deserialize<'a>,
53    V: 'static + Send + Sync,
54{
55    pub fn new(
56        config: OperatorConfig,
57        operator_fn: impl Fn() -> O + Send,
58        state_fn: impl Fn() -> S + Send,
59        write_stream: WriteStream<U>,
60    ) -> Self {
61        Self {
62            config,
63            operator: Arc::new(operator_fn()),
64            state: Arc::new(state_fn()),
65            state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
66            write_stream,
67            phantom_t: PhantomData,
68            phantom_v: PhantomData,
69        }
70    }
71}
72
73impl<O, S, T, U, V> OneInMessageProcessorT<S, T>
74    for ParallelOneInOneOutMessageProcessor<O, S, T, U, V>
75where
76    O: 'static + ParallelOneInOneOut<S, T, U, V>,
77    S: AppendableState<V>,
78    T: Data + for<'a> Deserialize<'a>,
79    U: Data + for<'a> Deserialize<'a>,
80    V: 'static + Send + Sync,
81{
82    fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
83        let mut setup_context =
84            SetupContext::new(vec![read_stream.id()], vec![self.write_stream.id()]);
85        Arc::get_mut(&mut self.operator)
86            .unwrap()
87            .setup(&mut setup_context);
88        setup_context
89    }
90
91    fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
92        Arc::get_mut(&mut self.operator).unwrap().run(
93            &self.config,
94            read_stream,
95            &mut self.write_stream,
96        );
97    }
98
99    fn execute_destroy(&mut self) {
100        Arc::get_mut(&mut self.operator).unwrap().destroy();
101    }
102
103    fn cleanup(&mut self) {
104        if !self.write_stream.is_closed() {
105            self.write_stream
106                .send(Message::new_watermark(Timestamp::Top))
107                .unwrap_or_else(|_| {
108                    panic!(
109                        "[ParallelOneInOneOut] Error sending Top watermark for operator {}",
110                        self.config.get_name()
111                    )
112                });
113        }
114    }
115
116    fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
117        // Clone the reference to the operator and the state.
118        let operator = Arc::clone(&self.operator);
119        let state = Arc::clone(&self.state);
120        let time = msg.timestamp().clone();
121        let config = self.config.clone();
122        let write_stream = self.write_stream.clone();
123
124        OperatorEvent::new(
125            time.clone(),
126            false,
127            0,
128            HashSet::new(),
129            HashSet::new(),
130            move || {
131                operator.on_data(
132                    &ParallelOneInOneOutContext::new(time, config, &state, write_stream),
133                    msg.data().unwrap(),
134                )
135            },
136            OperatorType::Parallel,
137        )
138    }
139
140    fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
141        // Clone the reference to the operator and the state.
142        let operator = Arc::clone(&self.operator);
143        let state = Arc::clone(&self.state);
144        let time = timestamp.clone();
145        let config = self.config.clone();
146        let write_stream = self.write_stream.clone();
147
148        if self.config.flow_watermarks {
149            let mut write_stream_copy = self.write_stream.clone();
150            let time_copy = time.clone();
151            OperatorEvent::new(
152                time.clone(),
153                true,
154                127,
155                HashSet::new(),
156                self.state_ids.clone(),
157                move || {
158                    // Invoke the watermark method.
159                    operator.on_watermark(&mut ParallelOneInOneOutContext::new(
160                        time,
161                        config,
162                        &state,
163                        write_stream,
164                    ));
165
166                    // Send a watermark.
167                    write_stream_copy
168                        .send(Message::new_watermark(time_copy.clone()))
169                        .ok();
170
171                    // Commit the state.
172                    state.commit(&time_copy);
173                },
174                OperatorType::Parallel,
175            )
176        } else {
177            OperatorEvent::new(
178                time.clone(),
179                true,
180                0,
181                HashSet::new(),
182                self.state_ids.clone(),
183                move || {
184                    // Invoke the watermark method.
185                    operator.on_watermark(&mut ParallelOneInOneOutContext::new(
186                        time.clone(),
187                        config,
188                        &state,
189                        write_stream,
190                    ));
191
192                    // Commit the state.
193                    state.commit(&time);
194                },
195                OperatorType::Parallel,
196            )
197        }
198    }
199
200    fn arm_deadlines(
201        &self,
202        setup_context: &mut SetupContext<S>,
203        read_stream_ids: Vec<StreamId>,
204        condition_context: &ConditionContext,
205        timestamp: Timestamp,
206    ) -> Vec<DeadlineEvent> {
207        let mut deadline_events = Vec::new();
208        let state = Arc::clone(&self.state);
209        for deadline in setup_context.deadlines() {
210            if deadline
211                .get_constrained_read_stream_ids()
212                .is_superset(&read_stream_ids.iter().cloned().collect())
213                && deadline.invoke_start_condition(&read_stream_ids, condition_context, &timestamp)
214            {
215                // Compute the deadline for the timestamp.
216                let deadline_duration = deadline.calculate_deadline(&state, &timestamp);
217                deadline_events.push(DeadlineEvent::new(
218                    deadline.get_constrained_read_stream_ids().clone(),
219                    deadline.get_constrained_write_stream_ids().clone(),
220                    timestamp.clone(),
221                    deadline_duration,
222                    deadline.get_end_condition_fn(),
223                    deadline.id(),
224                ));
225            }
226        }
227        deadline_events
228    }
229
230    fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
231        let write_stream_id = self.write_stream.id();
232        if deadline_event.write_stream_ids.contains(&write_stream_id) {
233            // Invoke the end condition function on the statistics from the WriteStream.
234            return (deadline_event.end_condition)(
235                &[write_stream_id],
236                &self.write_stream.get_condition_context(),
237                &deadline_event.timestamp,
238            );
239        }
240        false
241    }
242
243    fn invoke_handler(
244        &self,
245        setup_context: &mut SetupContext<S>,
246        deadline_id: DeadlineId,
247        timestamp: Timestamp,
248    ) {
249        setup_context.invoke_handler(deadline_id, &(*self.state), &timestamp);
250    }
251}
252
253/// Message Processor that defines the generation and execution of events for a OneInOneOut
254/// operator, where
255/// O: An operator that implements the OneInOneOut trait,
256/// S: A state structure that implements the State trait,
257/// T: Type of messages received on the read stream,
258/// U: Type of messages sent on the write stream,
259pub struct OneInOneOutMessageProcessor<O, S, T, U>
260where
261    O: 'static + OneInOneOut<S, T, U>,
262    S: State,
263    T: Data + for<'a> Deserialize<'a>,
264    U: Data + for<'a> Deserialize<'a>,
265{
266    config: OperatorConfig,
267    operator: Arc<Mutex<O>>,
268    state: Arc<Mutex<S>>,
269    state_ids: HashSet<Uuid>,
270    write_stream: WriteStream<U>,
271    phantom_t: PhantomData<T>,
272}
273
274impl<O, S, T, U> OneInOneOutMessageProcessor<O, S, T, U>
275where
276    O: 'static + OneInOneOut<S, T, U>,
277    S: State,
278    T: Data + for<'a> Deserialize<'a>,
279    U: Data + for<'a> Deserialize<'a>,
280{
281    pub fn new(
282        config: OperatorConfig,
283        operator_fn: impl Fn() -> O + Send,
284        state_fn: impl Fn() -> S + Send,
285        write_stream: WriteStream<U>,
286    ) -> Self {
287        Self {
288            config,
289            operator: Arc::new(Mutex::new(operator_fn())),
290            state: Arc::new(Mutex::new(state_fn())),
291            state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
292            write_stream,
293            phantom_t: PhantomData,
294        }
295    }
296}
297
298impl<O, S, T, U> OneInMessageProcessorT<S, T> for OneInOneOutMessageProcessor<O, S, T, U>
299where
300    O: 'static + OneInOneOut<S, T, U>,
301    S: State,
302    T: Data + for<'a> Deserialize<'a>,
303    U: Data + for<'a> Deserialize<'a>,
304{
305    fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
306        let mut setup_context =
307            SetupContext::new(vec![read_stream.id()], vec![self.write_stream.id()]);
308        self.operator.lock().unwrap().setup(&mut setup_context);
309        setup_context
310    }
311
312    fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
313        self.operator
314            .lock()
315            .unwrap()
316            .run(&self.config, read_stream, &mut self.write_stream);
317    }
318
319    fn execute_destroy(&mut self) {
320        self.operator.lock().unwrap().destroy();
321    }
322
323    fn cleanup(&mut self) {
324        if !self.write_stream.is_closed() {
325            self.write_stream
326                .send(Message::new_watermark(Timestamp::Top))
327                .unwrap_or_else(|_| {
328                    panic!(
329                        "[OneInOneOut] Error sending Top watermark for operator {}",
330                        self.config.get_name()
331                    )
332                });
333        }
334    }
335
336    fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
337        // Clone the reference to the operator and the state.
338        let operator = Arc::clone(&self.operator);
339        let state = Arc::clone(&self.state);
340        let time = msg.timestamp().clone();
341        let config = self.config.clone();
342        let write_stream = self.write_stream.clone();
343
344        OperatorEvent::new(
345            time.clone(),
346            false,
347            0,
348            HashSet::new(),
349            HashSet::new(),
350            move || {
351                // Note: to avoid deadlock, always lock the operator before the state.
352                let mut mutable_operator = operator.lock().unwrap();
353                let mut mutable_state = state.lock().unwrap();
354
355                mutable_operator.on_data(
356                    &mut OneInOneOutContext::new(time, config, &mut mutable_state, write_stream),
357                    msg.data().unwrap(),
358                )
359            },
360            OperatorType::Sequential,
361        )
362    }
363
364    fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
365        // Clone the reference to the operator and the state.
366        let operator = Arc::clone(&self.operator);
367        let state = Arc::clone(&self.state);
368        let config = self.config.clone();
369        let time = timestamp.clone();
370        let write_stream = self.write_stream.clone();
371
372        if self.config.flow_watermarks {
373            let mut write_stream_copy = self.write_stream.clone();
374            let time_copy = time.clone();
375            OperatorEvent::new(
376                time.clone(),
377                true,
378                127,
379                HashSet::new(),
380                self.state_ids.clone(),
381                move || {
382                    // Note: to avoid deadlock, always lock the operator before the state.
383                    let mut mutable_operator = operator.lock().unwrap();
384                    let mut mutable_state = state.lock().unwrap();
385
386                    mutable_operator.on_watermark(&mut OneInOneOutContext::new(
387                        time,
388                        config,
389                        &mut mutable_state,
390                        write_stream,
391                    ));
392
393                    // Send a watermark.
394                    write_stream_copy
395                        .send(Message::new_watermark(time_copy.clone()))
396                        .ok();
397
398                    // Commit the state.
399                    mutable_state.commit(&time_copy);
400                },
401                OperatorType::Sequential,
402            )
403        } else {
404            OperatorEvent::new(
405                time.clone(),
406                true,
407                0,
408                HashSet::new(),
409                self.state_ids.clone(),
410                move || {
411                    // Note: to avoid deadlock, always lock the operator before the state.
412                    let mut mutable_operator = operator.lock().unwrap();
413                    let mut mutable_state = state.lock().unwrap();
414
415                    mutable_operator.on_watermark(&mut OneInOneOutContext::new(
416                        time.clone(),
417                        config,
418                        &mut mutable_state,
419                        write_stream,
420                    ));
421
422                    // Commit the state.
423                    mutable_state.commit(&time);
424                },
425                OperatorType::Sequential,
426            )
427        }
428    }
429
430    fn arm_deadlines(
431        &self,
432        setup_context: &mut SetupContext<S>,
433        read_stream_ids: Vec<StreamId>,
434        condition_context: &ConditionContext,
435        timestamp: Timestamp,
436    ) -> Vec<DeadlineEvent> {
437        let mut deadline_events = Vec::new();
438        let state = Arc::clone(&self.state);
439        for deadline in setup_context.deadlines() {
440            if deadline
441                .get_constrained_read_stream_ids()
442                .is_superset(&read_stream_ids.iter().cloned().collect())
443                && deadline.invoke_start_condition(&read_stream_ids, condition_context, &timestamp)
444            {
445                // Compute the deadline for the timestamp.
446                let deadline_duration =
447                    deadline.calculate_deadline(&(*state.lock().unwrap()), &timestamp);
448                deadline_events.push(DeadlineEvent::new(
449                    deadline.get_constrained_read_stream_ids().clone(),
450                    deadline.get_constrained_write_stream_ids().clone(),
451                    timestamp.clone(),
452                    deadline_duration,
453                    deadline.get_end_condition_fn(),
454                    deadline.id(),
455                ));
456            }
457        }
458        deadline_events
459    }
460
461    fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
462        let write_stream_id = self.write_stream.id();
463        if deadline_event.write_stream_ids.contains(&write_stream_id) {
464            // Invoke the end condition function on the statistics from the WriteStream.
465            return (deadline_event.end_condition)(
466                &[write_stream_id],
467                &self.write_stream.get_condition_context(),
468                &deadline_event.timestamp,
469            );
470        }
471        false
472    }
473
474    fn invoke_handler(
475        &self,
476        setup_context: &mut SetupContext<S>,
477        deadline_id: DeadlineId,
478        timestamp: Timestamp,
479    ) {
480        setup_context.invoke_handler(deadline_id, &(*self.state.lock().unwrap()), &timestamp);
481    }
482}