1use std::sync::{Arc, Mutex};
2
3use serde::Deserialize;
4
5use crate::{
6 dataflow::{graph::default_graph, operator::*, AppendableState, Data, State, Stream},
7 node::operator_executors::{
8 OneInExecutor, OneInOneOutMessageProcessor, OneInTwoOutMessageProcessor, OperatorExecutorT,
9 ParallelOneInOneOutMessageProcessor, ParallelOneInTwoOutMessageProcessor,
10 ParallelSinkMessageProcessor, ParallelTwoInOneOutMessageProcessor, SinkMessageProcessor,
11 SourceExecutor, TwoInExecutor, TwoInOneOutMessageProcessor,
12 },
13 scheduler::channel_manager::ChannelManager,
14 OperatorId,
15};
16
17use super::stream::OperatorStream;
18
19pub fn connect_source<O, T>(
22 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
23 mut config: OperatorConfig,
24) -> OperatorStream<T>
25where
26 O: 'static + Source<T>,
27 T: Data + for<'a> Deserialize<'a>,
28{
29 config.id = OperatorId::new_deterministic();
30 let write_stream = OperatorStream::new();
31
32 let config_copy = config.clone();
33 let write_stream_id = write_stream.id();
34 let op_runner =
35 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
36 let mut channel_manager = channel_manager.lock().unwrap();
37
38 let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
39
40 let executor =
41 SourceExecutor::new(config_copy.clone(), operator_fn.clone(), write_stream);
42
43 Box::new(executor)
44 };
45
46 default_graph::add_operator::<_, (), (), T, ()>(
47 config,
48 op_runner,
49 None,
50 None,
51 Some(&write_stream),
52 None,
53 );
54
55 write_stream
56}
57
58pub fn connect_parallel_sink<O, S, T, U>(
61 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
62 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
64 mut config: OperatorConfig,
65 read_stream: &dyn Stream<T>,
66) where
67 O: 'static + ParallelSink<S, T, U>,
68 S: AppendableState<U>,
69 T: Data + for<'a> Deserialize<'a>,
70 U: 'static + Send + Sync,
71{
72 config.id = OperatorId::new_deterministic();
73
74 let config_copy = config.clone();
75 let read_stream_id = read_stream.id();
76 let op_runner =
77 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
78 let mut channel_manager = channel_manager.lock().unwrap();
79
80 let read_stream = channel_manager
81 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
82 .unwrap();
83
84 Box::new(OneInExecutor::new(
85 config_copy.clone(),
86 Box::new(ParallelSinkMessageProcessor::new(
87 config_copy.clone(),
88 operator_fn.clone(),
89 state_fn.clone(),
90 )),
91 read_stream,
92 ))
93 };
94
95 default_graph::add_operator::<_, T, (), (), ()>(
96 config,
97 op_runner,
98 Some(read_stream),
99 None,
100 None,
101 None,
102 );
103}
104
105pub fn connect_sink<O, S, T>(
108 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
109 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
111 mut config: OperatorConfig,
112 read_stream: &dyn Stream<T>,
113) where
114 O: 'static + Sink<S, T>,
115 S: State,
116 T: Data + for<'a> Deserialize<'a>,
117{
118 config.id = OperatorId::new_deterministic();
119
120 let config_copy = config.clone();
121 let read_stream_id = read_stream.id();
122 let op_runner =
123 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
124 let mut channel_manager = channel_manager.lock().unwrap();
125
126 let read_stream = channel_manager
127 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
128 .unwrap();
129
130 Box::new(OneInExecutor::new(
131 config_copy.clone(),
132 Box::new(SinkMessageProcessor::new(
133 config_copy.clone(),
134 operator_fn.clone(),
135 state_fn.clone(),
136 )),
137 read_stream,
138 ))
139 };
140
141 default_graph::add_operator::<_, T, (), (), ()>(
142 config,
143 op_runner,
144 Some(read_stream),
145 None,
146 None,
147 None,
148 );
149}
150
151pub fn connect_parallel_one_in_one_out<O, S, T, U, V>(
154 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
155 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
157 mut config: OperatorConfig,
158 read_stream: &dyn Stream<T>,
159) -> OperatorStream<U>
160where
161 O: 'static + ParallelOneInOneOut<S, T, U, V>,
162 S: AppendableState<V>,
163 T: Data + for<'a> Deserialize<'a>,
164 U: Data + for<'a> Deserialize<'a>,
165 V: 'static + Send + Sync,
166{
167 config.id = OperatorId::new_deterministic();
168 let write_stream = OperatorStream::new();
169
170 let config_copy = config.clone();
171 let read_stream_id = read_stream.id();
172 let write_stream_id = write_stream.id();
173 let op_runner =
174 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
175 let mut channel_manager = channel_manager.lock().unwrap();
176
177 let read_stream = channel_manager
178 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
179 .unwrap();
180 let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
181
182 Box::new(OneInExecutor::new(
183 config_copy.clone(),
184 Box::new(ParallelOneInOneOutMessageProcessor::new(
185 config_copy.clone(),
186 operator_fn.clone(),
187 state_fn.clone(),
188 write_stream,
189 )),
190 read_stream,
191 ))
192 };
193
194 default_graph::add_operator::<_, T, (), U, ()>(
195 config,
196 op_runner,
197 Some(read_stream),
198 None,
199 Some(&write_stream),
200 None,
201 );
202
203 write_stream
204}
205
206pub fn connect_one_in_one_out<O, S, T, U>(
208 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
209 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
211 mut config: OperatorConfig,
212 read_stream: &dyn Stream<T>,
213) -> OperatorStream<U>
214where
215 O: 'static + OneInOneOut<S, T, U>,
216 S: State,
217 T: Data + for<'a> Deserialize<'a>,
218 U: Data + for<'a> Deserialize<'a>,
219{
220 config.id = OperatorId::new_deterministic();
221 let write_stream = OperatorStream::new();
222
223 let config_copy = config.clone();
224 let read_stream_id = read_stream.id();
225 let write_stream_id = write_stream.id();
226 let op_runner =
227 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
228 let mut channel_manager = channel_manager.lock().unwrap();
229
230 let read_stream = channel_manager
231 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
232 .unwrap();
233 let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
234
235 Box::new(OneInExecutor::new(
236 config_copy.clone(),
237 Box::new(OneInOneOutMessageProcessor::new(
238 config_copy.clone(),
239 operator_fn.clone(),
240 state_fn.clone(),
241 write_stream,
242 )),
243 read_stream,
244 ))
245 };
246
247 default_graph::add_operator::<_, T, (), U, ()>(
248 config,
249 op_runner,
250 Some(read_stream),
251 None,
252 Some(&write_stream),
253 None,
254 );
255
256 write_stream
257}
258
259pub fn connect_parallel_two_in_one_out<O, S, T, U, V, W>(
262 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
263 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
265 mut config: OperatorConfig,
266 left_read_stream: &dyn Stream<T>,
267 right_read_stream: &dyn Stream<U>,
268) -> OperatorStream<V>
269where
270 O: 'static + ParallelTwoInOneOut<S, T, U, V, W>,
271 S: AppendableState<W>,
272 T: Data + for<'a> Deserialize<'a>,
273 U: Data + for<'a> Deserialize<'a>,
274 V: Data + for<'a> Deserialize<'a>,
275 W: 'static + Send + Sync,
276{
277 config.id = OperatorId::new_deterministic();
278 let write_stream = OperatorStream::new();
279
280 let config_copy = config.clone();
281 let left_read_stream_id = left_read_stream.id();
282 let right_read_stream_id = right_read_stream.id();
283 let write_stream_id = write_stream.id();
284 let op_runner =
285 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
286 let mut channel_manager = channel_manager.lock().unwrap();
287
288 let left_read_stream = channel_manager
289 .take_read_stream(default_graph::resolve_stream_id(&left_read_stream_id).unwrap())
290 .unwrap();
291 let right_read_stream = channel_manager
292 .take_read_stream(default_graph::resolve_stream_id(&right_read_stream_id).unwrap())
293 .unwrap();
294 let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
295
296 Box::new(TwoInExecutor::new(
297 config_copy.clone(),
298 Box::new(ParallelTwoInOneOutMessageProcessor::new(
299 config_copy.clone(),
300 operator_fn.clone(),
301 state_fn.clone(),
302 write_stream,
303 )),
304 left_read_stream,
305 right_read_stream,
306 ))
307 };
308
309 default_graph::add_operator::<_, T, U, V, ()>(
310 config,
311 op_runner,
312 Some(left_read_stream),
313 Some(right_read_stream),
314 Some(&write_stream),
315 None,
316 );
317
318 write_stream
319}
320
321pub fn connect_two_in_one_out<O, S, T, U, V>(
323 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
324 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
326 mut config: OperatorConfig,
327 left_read_stream: &dyn Stream<T>,
328 right_read_stream: &dyn Stream<U>,
329) -> OperatorStream<V>
330where
331 O: 'static + TwoInOneOut<S, T, U, V>,
332 S: State,
333 T: Data + for<'a> Deserialize<'a>,
334 U: Data + for<'a> Deserialize<'a>,
335 V: Data + for<'a> Deserialize<'a>,
336{
337 config.id = OperatorId::new_deterministic();
338 let write_stream = OperatorStream::new();
339
340 let config_copy = config.clone();
341 let left_read_stream_id = left_read_stream.id();
342 let right_read_stream_id = right_read_stream.id();
343 let write_stream_id = write_stream.id();
344 let op_runner =
345 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
346 let mut channel_manager = channel_manager.lock().unwrap();
347
348 let left_read_stream = channel_manager
349 .take_read_stream(default_graph::resolve_stream_id(&left_read_stream_id).unwrap())
350 .unwrap();
351 let right_read_stream = channel_manager
352 .take_read_stream(default_graph::resolve_stream_id(&right_read_stream_id).unwrap())
353 .unwrap();
354 let write_stream = channel_manager.write_stream(write_stream_id).unwrap();
355
356 Box::new(TwoInExecutor::new(
357 config_copy.clone(),
358 Box::new(TwoInOneOutMessageProcessor::new(
359 config_copy.clone(),
360 operator_fn.clone(),
361 state_fn.clone(),
362 write_stream,
363 )),
364 left_read_stream,
365 right_read_stream,
366 ))
367 };
368
369 default_graph::add_operator::<_, T, U, V, ()>(
370 config,
371 op_runner,
372 Some(left_read_stream),
373 Some(right_read_stream),
374 Some(&write_stream),
375 None,
376 );
377
378 write_stream
379}
380
381pub fn connect_parallel_one_in_two_out<O, S, T, U, V, W>(
384 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
385 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
387 mut config: OperatorConfig,
388 read_stream: &dyn Stream<T>,
389) -> (OperatorStream<U>, OperatorStream<V>)
390where
391 O: 'static + ParallelOneInTwoOut<S, T, U, V, W>,
392 S: AppendableState<W>,
393 T: Data + for<'a> Deserialize<'a>,
394 U: Data + for<'a> Deserialize<'a>,
395 V: Data + for<'a> Deserialize<'a>,
396 W: 'static + Send + Sync,
397{
398 config.id = OperatorId::new_deterministic();
399 let left_write_stream = OperatorStream::new();
400 let right_write_stream = OperatorStream::new();
401
402 let config_copy = config.clone();
403 let read_stream_id = read_stream.id();
404 let left_write_stream_id = left_write_stream.id();
405 let right_write_stream_id = right_write_stream.id();
406 let op_runner =
407 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
408 let mut channel_manager = channel_manager.lock().unwrap();
409
410 let read_stream = channel_manager
411 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
412 .unwrap();
413 let left_write_stream = channel_manager.write_stream(left_write_stream_id).unwrap();
414 let right_write_stream = channel_manager.write_stream(right_write_stream_id).unwrap();
415
416 Box::new(OneInExecutor::new(
417 config_copy.clone(),
418 Box::new(ParallelOneInTwoOutMessageProcessor::new(
419 config_copy.clone(),
420 operator_fn.clone(),
421 state_fn.clone(),
422 left_write_stream,
423 right_write_stream,
424 )),
425 read_stream,
426 ))
427 };
428
429 default_graph::add_operator::<_, T, (), U, V>(
430 config,
431 op_runner,
432 Some(read_stream),
433 None,
434 Some(&left_write_stream),
435 Some(&right_write_stream),
436 );
437
438 (left_write_stream, right_write_stream)
439}
440
441pub fn connect_one_in_two_out<O, S, T, U, V>(
443 operator_fn: impl Fn() -> O + Clone + Send + Sync + 'static,
444 state_fn: impl Fn() -> S + Clone + Send + Sync + 'static,
446 mut config: OperatorConfig,
447 read_stream: &dyn Stream<T>,
448) -> (OperatorStream<U>, OperatorStream<V>)
449where
450 O: 'static + OneInTwoOut<S, T, U, V>,
451 S: State,
452 T: Data + for<'a> Deserialize<'a>,
453 U: Data + for<'a> Deserialize<'a>,
454 V: Data + for<'a> Deserialize<'a>,
455{
456 config.id = OperatorId::new_deterministic();
457 let left_write_stream = OperatorStream::new();
458 let right_write_stream = OperatorStream::new();
459
460 let config_copy = config.clone();
461 let read_stream_id = read_stream.id();
462 let left_write_stream_id = left_write_stream.id();
463 let right_write_stream_id = right_write_stream.id();
464 let op_runner =
465 move |channel_manager: Arc<Mutex<ChannelManager>>| -> Box<dyn OperatorExecutorT> {
466 let mut channel_manager = channel_manager.lock().unwrap();
467
468 let read_stream = channel_manager
469 .take_read_stream(default_graph::resolve_stream_id(&read_stream_id).unwrap())
470 .unwrap();
471 let left_write_stream = channel_manager.write_stream(left_write_stream_id).unwrap();
472 let right_write_stream = channel_manager.write_stream(right_write_stream_id).unwrap();
473
474 Box::new(OneInExecutor::new(
475 config_copy.clone(),
476 Box::new(OneInTwoOutMessageProcessor::new(
477 config_copy.clone(),
478 operator_fn.clone(),
479 state_fn.clone(),
480 left_write_stream,
481 right_write_stream,
482 )),
483 read_stream,
484 ))
485 };
486
487 default_graph::add_operator::<_, T, (), U, V>(
488 config,
489 op_runner,
490 Some(read_stream),
491 None,
492 Some(&left_write_stream),
493 Some(&right_write_stream),
494 );
495
496 (left_write_stream, right_write_stream)
497}