erdos/dataflow/
connect.rs

1use std::sync::{Arc, Mutex};
2
3use serde::Deserialize;
4
5use crate::{
6    dataflow::{graph::default_graph, operator::*, AppendableState, Data, State, Stream},
7    node::operator_executors::{
8        OneInExecutor, OneInOneOutMessageProcessor, OneInTwoOutMessageProcessor, OperatorExecutorT,
9        ParallelOneInOneOutMessageProcessor, ParallelOneInTwoOutMessageProcessor,
10        ParallelSinkMessageProcessor, ParallelTwoInOneOutMessageProcessor, SinkMessageProcessor,
11        SourceExecutor, TwoInExecutor, TwoInOneOutMessageProcessor,
12    },
13    scheduler::channel_manager::ChannelManager,
14    OperatorId,
15};
16
17use super::stream::OperatorStream;
18
19/// Adds a [`Source`] operator, which has no read streams, but introduces data into the dataflow
20/// graph by interacting with external data sources (e.g., other systems, sensor data).
21pub fn connect_source<O, T>(
22    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
23    mut config: OperatorConfig,
24) -> OperatorStream<T>
25where
26    O: 'static + Source<T>,
27    T: Data + for<'a> Deserialize<'a>,
28{
29    config.id = OperatorId::new_deterministic();
30    let write_stream = OperatorStream::new();
31
32    let config_copy = config.clone();
33    let write_stream_id = write_stream.id();
34    let op_runner =
35        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
36            let mut channel_manager = channel_manager.lock().unwrap();
37
38            let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
39
40            let executor =
41                SourceExecutor::new(config_copy.clone(), operator_fn.clone(), write_stream);
42
43            Box::new(executor)
44        };
45
46    default_graph::add_operator::<_, (), (), T, ()>(
47        config,
48        op_runner,
49        None,
50        None,
51        Some(&write_stream),
52        None,
53    );
54
55    write_stream
56}
57
58/// Adds a [`ParallelSink`] operator, which receives data on input read streams and directly
59/// interacts with external systems.
60pub fn connect_parallel_sink<O, S, T, U>(
61    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
62    // Add state as an explicit argument to support future features such as state sharing.
63    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
64    mut config: OperatorConfig,
65    read_stream: &dyn Stream<T>,
66) where
67    O: 'static + ParallelSink<S, T, U>,
68    S: AppendableState<U>,
69    T: Data + for<'a> Deserialize<'a>,
70    U: 'static + Send + Sync,
71{
72    config.id = OperatorId::new_deterministic();
73
74    let config_copy = config.clone();
75    let read_stream_id = read_stream.id();
76    let op_runner =
77        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
78            let mut channel_manager = channel_manager.lock().unwrap();
79
80            let read_stream = channel_manager
81                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
82                .unwrap();
83
84            Box::new(OneInExecutor::new(
85                config_copy.clone(),
86                Box::new(ParallelSinkMessageProcessor::new(
87                    config_copy.clone(),
88                    operator_fn.clone(),
89                    state_fn.clone(),
90                )),
91                read_stream,
92            ))
93        };
94
95    default_graph::add_operator::<_, T, (), (), ()>(
96        config,
97        op_runner,
98        Some(read_stream),
99        None,
100        None,
101        None,
102    );
103}
104
105/// Adds a [`Sink`] operator, which receives data on input read streams and directly interacts
106/// with external systems.
107pub fn connect_sink<O, S, T>(
108    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
109    // Add state as an explicit argument to support future features such as state sharing.
110    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
111    mut config: OperatorConfig,
112    read_stream: &dyn Stream<T>,
113) where
114    O: 'static + Sink<S, T>,
115    S: State,
116    T: Data + for<'a> Deserialize<'a>,
117{
118    config.id = OperatorId::new_deterministic();
119
120    let config_copy = config.clone();
121    let read_stream_id = read_stream.id();
122    let op_runner =
123        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
124            let mut channel_manager = channel_manager.lock().unwrap();
125
126            let read_stream = channel_manager
127                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
128                .unwrap();
129
130            Box::new(OneInExecutor::new(
131                config_copy.clone(),
132                Box::new(SinkMessageProcessor::new(
133                    config_copy.clone(),
134                    operator_fn.clone(),
135                    state_fn.clone(),
136                )),
137                read_stream,
138            ))
139        };
140
141    default_graph::add_operator::<_, T, (), (), ()>(
142        config,
143        op_runner,
144        Some(read_stream),
145        None,
146        None,
147        None,
148    );
149}
150
151/// Adds a [`ParallelOneInOneOut`] operator that has one input read stream and one output
152/// write stream.
153pub fn connect_parallel_one_in_one_out<O, S, T, U, V>(
154    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
155    // Add state as an explicit argument to support future features such as state sharing.
156    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
157    mut config: OperatorConfig,
158    read_stream: &dyn Stream<T>,
159) -> OperatorStream<U>
160where
161    O: 'static + ParallelOneInOneOut<S, T, U, V>,
162    S: AppendableState<V>,
163    T: Data + for<'a> Deserialize<'a>,
164    U: Data + for<'a> Deserialize<'a>,
165    V: 'static + Send + Sync,
166{
167    config.id = OperatorId::new_deterministic();
168    let write_stream = OperatorStream::new();
169
170    let config_copy = config.clone();
171    let read_stream_id = read_stream.id();
172    let write_stream_id = write_stream.id();
173    let op_runner =
174        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
175            let mut channel_manager = channel_manager.lock().unwrap();
176
177            let read_stream = channel_manager
178                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
179                .unwrap();
180            let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
181
182            Box::new(OneInExecutor::new(
183                config_copy.clone(),
184                Box::new(ParallelOneInOneOutMessageProcessor::new(
185                    config_copy.clone(),
186                    operator_fn.clone(),
187                    state_fn.clone(),
188                    write_stream,
189                )),
190                read_stream,
191            ))
192        };
193
194    default_graph::add_operator::<_, T, (), U, ()>(
195        config,
196        op_runner,
197        Some(read_stream),
198        None,
199        Some(&write_stream),
200        None,
201    );
202
203    write_stream
204}
205
206/// Adds a [`OneInOneOut`] operator that has one input read stream and one output write stream.
207pub fn connect_one_in_one_out<O, S, T, U>(
208    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
209    // Add state as an explicit argument to support future features such as state sharing.
210    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
211    mut config: OperatorConfig,
212    read_stream: &dyn Stream<T>,
213) -> OperatorStream<U>
214where
215    O: 'static + OneInOneOut<S, T, U>,
216    S: State,
217    T: Data + for<'a> Deserialize<'a>,
218    U: Data + for<'a> Deserialize<'a>,
219{
220    config.id = OperatorId::new_deterministic();
221    let write_stream = OperatorStream::new();
222
223    let config_copy = config.clone();
224    let read_stream_id = read_stream.id();
225    let write_stream_id = write_stream.id();
226    let op_runner =
227        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
228            let mut channel_manager = channel_manager.lock().unwrap();
229
230            let read_stream = channel_manager
231                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
232                .unwrap();
233            let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
234
235            Box::new(OneInExecutor::new(
236                config_copy.clone(),
237                Box::new(OneInOneOutMessageProcessor::new(
238                    config_copy.clone(),
239                    operator_fn.clone(),
240                    state_fn.clone(),
241                    write_stream,
242                )),
243                read_stream,
244            ))
245        };
246
247    default_graph::add_operator::<_, T, (), U, ()>(
248        config,
249        op_runner,
250        Some(read_stream),
251        None,
252        Some(&write_stream),
253        None,
254    );
255
256    write_stream
257}
258
259/// Adds a [`ParallelTwoInOneOut`] operator that has two input read streams and one output
260/// write stream.
261pub fn connect_parallel_two_in_one_out<O, S, T, U, V, W>(
262    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
263    // Add state as an explicit argument to support future features such as state sharing.
264    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
265    mut config: OperatorConfig,
266    left_read_stream: &dyn Stream<T>,
267    right_read_stream: &dyn Stream<U>,
268) -> OperatorStream<V>
269where
270    O: 'static + ParallelTwoInOneOut<S, T, U, V, W>,
271    S: AppendableState<W>,
272    T: Data + for<'a> Deserialize<'a>,
273    U: Data + for<'a> Deserialize<'a>,
274    V: Data + for<'a> Deserialize<'a>,
275    W: 'static + Send + Sync,
276{
277    config.id = OperatorId::new_deterministic();
278    let write_stream = OperatorStream::new();
279
280    let config_copy = config.clone();
281    let left_read_stream_id = left_read_stream.id();
282    let right_read_stream_id = right_read_stream.id();
283    let write_stream_id = write_stream.id();
284    let op_runner =
285        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
286            let mut channel_manager = channel_manager.lock().unwrap();
287
288            let left_read_stream = channel_manager
289                .take_read_stream(default_graph::resolve_stream_id(&left_read_stream_id).unwrap())
290                .unwrap();
291            let right_read_stream = channel_manager
292                .take_read_stream(default_graph::resolve_stream_id(&right_read_stream_id).unwrap())
293                .unwrap();
294            let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
295
296            Box::new(TwoInExecutor::new(
297                config_copy.clone(),
298                Box::new(ParallelTwoInOneOutMessageProcessor::new(
299                    config_copy.clone(),
300                    operator_fn.clone(),
301                    state_fn.clone(),
302                    write_stream,
303                )),
304                left_read_stream,
305                right_read_stream,
306            ))
307        };
308
309    default_graph::add_operator::<_, T, U, V, ()>(
310        config,
311        op_runner,
312        Some(left_read_stream),
313        Some(right_read_stream),
314        Some(&write_stream),
315        None,
316    );
317
318    write_stream
319}
320
321/// Adds a [`TwoInOneOut`] operator that has two input read streams and one output write stream.
322pub fn connect_two_in_one_out<O, S, T, U, V>(
323    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
324    // Add state as an explicit argument to support future features such as state sharing.
325    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
326    mut config: OperatorConfig,
327    left_read_stream: &dyn Stream<T>,
328    right_read_stream: &dyn Stream<U>,
329) -> OperatorStream<V>
330where
331    O: 'static + TwoInOneOut<S, T, U, V>,
332    S: State,
333    T: Data + for<'a> Deserialize<'a>,
334    U: Data + for<'a> Deserialize<'a>,
335    V: Data + for<'a> Deserialize<'a>,
336{
337    config.id = OperatorId::new_deterministic();
338    let write_stream = OperatorStream::new();
339
340    let config_copy = config.clone();
341    let left_read_stream_id = left_read_stream.id();
342    let right_read_stream_id = right_read_stream.id();
343    let write_stream_id = write_stream.id();
344    let op_runner =
345        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
346            let mut channel_manager = channel_manager.lock().unwrap();
347
348            let left_read_stream = channel_manager
349                .take_read_stream(default_graph::resolve_stream_id(&left_read_stream_id).unwrap())
350                .unwrap();
351            let right_read_stream = channel_manager
352                .take_read_stream(default_graph::resolve_stream_id(&right_read_stream_id).unwrap())
353                .unwrap();
354            let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
355
356            Box::new(TwoInExecutor::new(
357                config_copy.clone(),
358                Box::new(TwoInOneOutMessageProcessor::new(
359                    config_copy.clone(),
360                    operator_fn.clone(),
361                    state_fn.clone(),
362                    write_stream,
363                )),
364                left_read_stream,
365                right_read_stream,
366            ))
367        };
368
369    default_graph::add_operator::<_, T, U, V, ()>(
370        config,
371        op_runner,
372        Some(left_read_stream),
373        Some(right_read_stream),
374        Some(&write_stream),
375        None,
376    );
377
378    write_stream
379}
380
381/// Adds a [`ParallelOneInTwoOut`] operator that has one input read stream and two output
382/// write streams.
383pub fn connect_parallel_one_in_two_out<O, S, T, U, V, W>(
384    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
385    // Add state as an explicit argument to support future features such as state sharing.
386    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
387    mut config: OperatorConfig,
388    read_stream: &dyn Stream<T>,
389) -> (OperatorStream<U>, OperatorStream<V>)
390where
391    O: 'static + ParallelOneInTwoOut<S, T, U, V, W>,
392    S: AppendableState<W>,
393    T: Data + for<'a> Deserialize<'a>,
394    U: Data + for<'a> Deserialize<'a>,
395    V: Data + for<'a> Deserialize<'a>,
396    W: 'static + Send + Sync,
397{
398    config.id = OperatorId::new_deterministic();
399    let left_write_stream = OperatorStream::new();
400    let right_write_stream = OperatorStream::new();
401
402    let config_copy = config.clone();
403    let read_stream_id = read_stream.id();
404    let left_write_stream_id = left_write_stream.id();
405    let right_write_stream_id = right_write_stream.id();
406    let op_runner =
407        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
408            let mut channel_manager = channel_manager.lock().unwrap();
409
410            let read_stream = channel_manager
411                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
412                .unwrap();
413            let left_write_stream = channel_manager.write_stream(left_write_stream_id).unwrap();
414            let right_write_stream = channel_manager.write_stream(right_write_stream_id).unwrap();
415
416            Box::new(OneInExecutor::new(
417                config_copy.clone(),
418                Box::new(ParallelOneInTwoOutMessageProcessor::new(
419                    config_copy.clone(),
420                    operator_fn.clone(),
421                    state_fn.clone(),
422                    left_write_stream,
423                    right_write_stream,
424                )),
425                read_stream,
426            ))
427        };
428
429    default_graph::add_operator::<_, T, (), U, V>(
430        config,
431        op_runner,
432        Some(read_stream),
433        None,
434        Some(&left_write_stream),
435        Some(&right_write_stream),
436    );
437
438    (left_write_stream, right_write_stream)
439}
440
441/// Adds a [`OneInTwoOut`] operator that has one input read stream and two output write streams.
442pub fn connect_one_in_two_out<O, S, T, U, V>(
443    operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
444    // Add state as an explicit argument to support future features such as state sharing.
445    state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
446    mut config: OperatorConfig,
447    read_stream: &dyn Stream<T>,
448) -> (OperatorStream<U>, OperatorStream<V>)
449where
450    O: 'static + OneInTwoOut<S, T, U, V>,
451    S: State,
452    T: Data + for<'a> Deserialize<'a>,
453    U: Data + for<'a> Deserialize<'a>,
454    V: Data + for<'a> Deserialize<'a>,
455{
456    config.id = OperatorId::new_deterministic();
457    let left_write_stream = OperatorStream::new();
458    let right_write_stream = OperatorStream::new();
459
460    let config_copy = config.clone();
461    let read_stream_id = read_stream.id();
462    let left_write_stream_id = left_write_stream.id();
463    let right_write_stream_id = right_write_stream.id();
464    let op_runner =
465        move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
466            let mut channel_manager = channel_manager.lock().unwrap();
467
468            let read_stream = channel_manager
469                .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
470                .unwrap();
471            let left_write_stream = channel_manager.write_stream(left_write_stream_id).unwrap();
472            let right_write_stream = channel_manager.write_stream(right_write_stream_id).unwrap();
473
474            Box::new(OneInExecutor::new(
475                config_copy.clone(),
476                Box::new(OneInTwoOutMessageProcessor::new(
477                    config_copy.clone(),
478                    operator_fn.clone(),
479                    state_fn.clone(),
480                    left_write_stream,
481                    right_write_stream,
482                )),
483                read_stream,
484            ))
485        };
486
487    default_graph::add_operator::<_, T, (), U, V>(
488        config,
489        op_runner,
490        Some(read_stream),
491        None,
492        Some(&left_write_stream),
493        Some(&right_write_stream),
494    );
495
496    (left_write_stream, right_write_stream)
497}