fluxus_runtime/
runtime.rs

1use dashmap::DashMap;
2use fluxus_core::{Operator, ParallelConfig, Record, Sink, Source, StreamResult};
3use std::sync::Arc;
4use tokio::sync::{Mutex, mpsc};
5use tokio::task::JoinHandle;
6use uuid::Uuid;
7
8/// Runtime context for managing stream processing execution
9pub struct RuntimeContext {
10    /// Task parallelism configuration
11    parallel_config: ParallelConfig,
12    /// Active task handles
13    task_handles: Arc<DashMap<String, Vec<JoinHandle<()>>>>,
14}
15
16impl RuntimeContext {
17    pub fn new(parallel_config: ParallelConfig) -> Self {
18        Self {
19            parallel_config,
20            task_handles: Arc::new(DashMap::new()),
21        }
22    }
23
24    /// Execute a source-to-sink pipeline with operators
25    pub async fn execute_pipeline<T, S, K>(
26        &self,
27        source: S,
28        operators: Vec<Arc<Mutex<dyn Operator<T, T> + Send + Sync>>>,
29        sink: K,
30    ) -> StreamResult<()>
31    where
32        T: Clone + Send + Sync + 'static,
33        S: Source<T> + Send + Sync + 'static,
34        K: Sink<T> + Send + Sync + 'static,
35    {
36        let (tx, rx) = mpsc::channel(self.parallel_config.buffer_size);
37        let source = Arc::new(Mutex::new(source));
38        let sink = Arc::new(Mutex::new(sink));
39
40        // Spawn source task
41        let source_handle = self.spawn_source_task(source.clone(), tx.clone());
42
43        // Create channels for operator pipeline
44        let mut curr_rx = rx;
45        let mut handles = vec![source_handle];
46
47        // Spawn operator tasks
48        for operator in operators {
49            let (new_tx, new_rx) = mpsc::channel(self.parallel_config.buffer_size);
50            let operator_handles = self.spawn_operator_tasks(operator, curr_rx, new_tx);
51            handles.extend(operator_handles);
52            curr_rx = new_rx;
53        }
54
55        // Spawn sink task
56        let sink_handle = self.spawn_sink_task(sink.clone(), curr_rx);
57        handles.push(sink_handle);
58
59        // Store handles
60        self.task_handles
61            .insert(Uuid::new_v4().to_string(), handles);
62
63        Ok(())
64    }
65
66    fn spawn_source_task<T, S>(
67        &self,
68        source: Arc<Mutex<S>>,
69        tx: mpsc::Sender<Record<T>>,
70    ) -> JoinHandle<()>
71    where
72        T: Clone + Send + 'static,
73        S: Source<T> + Send + 'static,
74    {
75        tokio::spawn(async move {
76            loop {
77                let mut source_guard = source.lock().await;
78                match source_guard.next().await {
79                    Ok(Some(record)) => {
80                        if tx.send(record).await.is_err() {
81                            break;
82                        }
83                    }
84                    _ => break,
85                }
86            }
87            let mut source_guard = source.lock().await;
88            let _ = source_guard.close().await;
89        })
90    }
91
92    fn spawn_operator_tasks<T>(
93        &self,
94        operator: Arc<Mutex<dyn Operator<T, T> + Send + Sync>>,
95        rx: mpsc::Receiver<Record<T>>,
96        tx: mpsc::Sender<Record<T>>,
97    ) -> Vec<JoinHandle<()>>
98    where
99        T: Clone + Send + 'static,
100    {
101        let mut handles = Vec::new();
102        let rx = Arc::new(Mutex::new(rx));
103
104        for _ in 0..self.parallel_config.parallelism {
105            let operator = Arc::clone(&operator);
106            let rx = Arc::clone(&rx);
107            let tx = tx.clone();
108
109            let handle = tokio::spawn(async move {
110                loop {
111                    let record = {
112                        let mut rx = rx.lock().await;
113                        match rx.recv().await {
114                            Some(r) => r,
115                            None => break,
116                        }
117                    };
118
119                    let mut op = operator.lock().await;
120                    if let Ok(results) = op.process(record).await {
121                        for result in results {
122                            if tx.send(result).await.is_err() {
123                                return;
124                            }
125                        }
126                    }
127                }
128            });
129            handles.push(handle);
130        }
131
132        handles
133    }
134
135    fn spawn_sink_task<T, K>(
136        &self,
137        sink: Arc<Mutex<K>>,
138        mut rx: mpsc::Receiver<Record<T>>,
139    ) -> JoinHandle<()>
140    where
141        T: Clone + Send + 'static,
142        K: Sink<T> + Send + 'static,
143    {
144        tokio::spawn(async move {
145            while let Some(record) = rx.recv().await {
146                let mut sink_guard = sink.lock().await;
147                let _ = sink_guard.write(record).await;
148            }
149            let mut sink_guard = sink.lock().await;
150            let _ = sink_guard.flush().await;
151            let _ = sink_guard.close().await;
152        })
153    }
154}