fluxus_runtime/
runtime.rs

1use dashmap::DashMap;
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::Operator;
6use fluxus_utils::models::{Record, StreamResult};
7use std::sync::Arc;
8use tokio::sync::{Mutex, mpsc};
9use tokio::task::JoinHandle;
10use uuid::Uuid;
11
12/// Runtime context for managing stream processing execution
13pub struct RuntimeContext {
14    /// Task parallelism configuration
15    parallel_config: ParallelConfig,
16    /// Active task handles
17    task_handles: Arc<DashMap<String, Vec<JoinHandle<()>>>>,
18}
19
20impl RuntimeContext {
21    pub fn new(parallel_config: ParallelConfig) -> Self {
22        Self {
23            parallel_config,
24            task_handles: Arc::new(DashMap::new()),
25        }
26    }
27
28    /// Execute a source-to-sink pipeline with operators
29    pub async fn execute_pipeline<T, S, K>(
30        &self,
31        source: S,
32        operators: Vec<Arc<Mutex<dyn Operator<T, T> + Send + Sync>>>,
33        sink: K,
34    ) -> StreamResult<()>
35    where
36        T: Clone + Send + Sync + 'static,
37        S: Source<T> + Send + Sync + 'static,
38        K: Sink<T> + Send + Sync + 'static,
39    {
40        let (tx, rx) = mpsc::channel(self.parallel_config.buffer_size);
41        let source = Arc::new(Mutex::new(source));
42        let sink = Arc::new(Mutex::new(sink));
43
44        // Spawn source task
45        let source_handle = self.spawn_source_task(source.clone(), tx.clone());
46
47        // Create channels for operator pipeline
48        let mut curr_rx = rx;
49        let mut handles = vec![source_handle];
50
51        // Spawn operator tasks
52        for operator in operators {
53            let (new_tx, new_rx) = mpsc::channel(self.parallel_config.buffer_size);
54            let operator_handles = self.spawn_operator_tasks(operator, curr_rx, new_tx);
55            handles.extend(operator_handles);
56            curr_rx = new_rx;
57        }
58
59        // Spawn sink task
60        let sink_handle = self.spawn_sink_task(sink.clone(), curr_rx);
61        handles.push(sink_handle);
62
63        // Store handles
64        self.task_handles
65            .insert(Uuid::new_v4().to_string(), handles);
66
67        Ok(())
68    }
69
70    fn spawn_source_task<T, S>(
71        &self,
72        source: Arc<Mutex<S>>,
73        tx: mpsc::Sender<Record<T>>,
74    ) -> JoinHandle<()>
75    where
76        T: Clone + Send + 'static,
77        S: Source<T> + Send + 'static,
78    {
79        tokio::spawn(async move {
80            loop {
81                let mut source_guard = source.lock().await;
82                match source_guard.next().await {
83                    Ok(Some(record)) => {
84                        if tx.send(record).await.is_err() {
85                            break;
86                        }
87                    }
88                    _ => break,
89                }
90            }
91            let mut source_guard = source.lock().await;
92            let _ = source_guard.close().await;
93        })
94    }
95
96    fn spawn_operator_tasks<T>(
97        &self,
98        operator: Arc<Mutex<dyn Operator<T, T> + Send + Sync>>,
99        rx: mpsc::Receiver<Record<T>>,
100        tx: mpsc::Sender<Record<T>>,
101    ) -> Vec<JoinHandle<()>>
102    where
103        T: Clone + Send + 'static,
104    {
105        let mut handles = Vec::new();
106        let rx = Arc::new(Mutex::new(rx));
107
108        for _ in 0..self.parallel_config.parallelism {
109            let operator = Arc::clone(&operator);
110            let rx = Arc::clone(&rx);
111            let tx = tx.clone();
112
113            let handle = tokio::spawn(async move {
114                loop {
115                    let record = {
116                        let mut rx = rx.lock().await;
117                        match rx.recv().await {
118                            Some(r) => r,
119                            None => break,
120                        }
121                    };
122
123                    let mut op = operator.lock().await;
124                    if let Ok(results) = op.process(record).await {
125                        for result in results {
126                            if tx.send(result).await.is_err() {
127                                return;
128                            }
129                        }
130                    }
131                }
132            });
133            handles.push(handle);
134        }
135
136        handles
137    }
138
139    fn spawn_sink_task<T, K>(
140        &self,
141        sink: Arc<Mutex<K>>,
142        mut rx: mpsc::Receiver<Record<T>>,
143    ) -> JoinHandle<()>
144    where
145        T: Clone + Send + 'static,
146        K: Sink<T> + Send + 'static,
147    {
148        tokio::spawn(async move {
149            while let Some(record) = rx.recv().await {
150                let mut sink_guard = sink.lock().await;
151                let _ = sink_guard.write(record).await;
152            }
153            let mut sink_guard = sink.lock().await;
154            let _ = sink_guard.flush().await;
155            let _ = sink_guard.close().await;
156        })
157    }
158}