fluxus_runtime/
runtime.rs1use dashmap::DashMap;
2use fluxus_core::{Operator, ParallelConfig, Record, Sink, Source, StreamResult};
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5use tokio::task::JoinHandle;
6use uuid::Uuid;
7
8pub struct RuntimeContext {
10 parallel_config: ParallelConfig,
12 task_handles: Arc<DashMap<String, Vec<JoinHandle<()>>>>,
14}
15
16impl RuntimeContext {
17 pub fn new(parallel_config: ParallelConfig) -> Self {
18 Self {
19 parallel_config,
20 task_handles: Arc::new(DashMap::new()),
21 }
22 }
23
24 pub async fn execute_pipeline<T, S, K>(
26 &self,
27 source: S,
28 operators: Vec<Arc<Mutex<dyn Operator<T, T> + Send + Sync>>>,
29 sink: K,
30 ) -> StreamResult<()>
31 where
32 T: Clone + Send + Sync + 'static,
33 S: Source<T> + Send + Sync + 'static,
34 K: Sink<T> + Send + Sync + 'static,
35 {
36 let (tx, rx) = mpsc::channel(self.parallel_config.buffer_size);
37 let source = Arc::new(Mutex::new(source));
38 let sink = Arc::new(Mutex::new(sink));
39
40 let source_handle = self.spawn_source_task(source.clone(), tx.clone());
42
43 let mut curr_rx = rx;
45 let mut handles = vec![source_handle];
46
47 for operator in operators {
49 let (new_tx, new_rx) = mpsc::channel(self.parallel_config.buffer_size);
50 let operator_handles = self.spawn_operator_tasks(operator, curr_rx, new_tx);
51 handles.extend(operator_handles);
52 curr_rx = new_rx;
53 }
54
55 let sink_handle = self.spawn_sink_task(sink.clone(), curr_rx);
57 handles.push(sink_handle);
58
59 self.task_handles
61 .insert(Uuid::new_v4().to_string(), handles);
62
63 Ok(())
64 }
65
66 fn spawn_source_task<T, S>(
67 &self,
68 source: Arc<Mutex<S>>,
69 tx: mpsc::Sender<Record<T>>,
70 ) -> JoinHandle<()>
71 where
72 T: Clone + Send + 'static,
73 S: Source<T> + Send + 'static,
74 {
75 tokio::spawn(async move {
76 loop {
77 let mut source_guard = source.lock().await;
78 match source_guard.next().await {
79 Ok(Some(record)) => {
80 if tx.send(record).await.is_err() {
81 break;
82 }
83 }
84 _ => break,
85 }
86 }
87 let mut source_guard = source.lock().await;
88 let _ = source_guard.close().await;
89 })
90 }
91
92 fn spawn_operator_tasks<T>(
93 &self,
94 operator: Arc<Mutex<dyn Operator<T, T> + Send + Sync>>,
95 rx: mpsc::Receiver<Record<T>>,
96 tx: mpsc::Sender<Record<T>>,
97 ) -> Vec<JoinHandle<()>>
98 where
99 T: Clone + Send + 'static,
100 {
101 let mut handles = Vec::new();
102 let rx = Arc::new(Mutex::new(rx));
103
104 for _ in 0..self.parallel_config.parallelism {
105 let operator = Arc::clone(&operator);
106 let rx = Arc::clone(&rx);
107 let tx = tx.clone();
108
109 let handle = tokio::spawn(async move {
110 loop {
111 let record = {
112 let mut rx = rx.lock().await;
113 match rx.recv().await {
114 Some(r) => r,
115 None => break,
116 }
117 };
118
119 let mut op = operator.lock().await;
120 if let Ok(results) = op.process(record).await {
121 for result in results {
122 if tx.send(result).await.is_err() {
123 return;
124 }
125 }
126 }
127 }
128 });
129 handles.push(handle);
130 }
131
132 handles
133 }
134
135 fn spawn_sink_task<T, K>(
136 &self,
137 sink: Arc<Mutex<K>>,
138 mut rx: mpsc::Receiver<Record<T>>,
139 ) -> JoinHandle<()>
140 where
141 T: Clone + Send + 'static,
142 K: Sink<T> + Send + 'static,
143 {
144 tokio::spawn(async move {
145 while let Some(record) = rx.recv().await {
146 let mut sink_guard = sink.lock().await;
147 let _ = sink_guard.write(record).await;
148 }
149 let mut sink_guard = sink.lock().await;
150 let _ = sink_guard.flush().await;
151 let _ = sink_guard.close().await;
152 })
153 }
154}