fluxus_runtime/
runtime.rs1use dashmap::DashMap;
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::Operator;
6use fluxus_utils::models::{Record, StreamResult};
7use std::sync::Arc;
8use tokio::sync::{Mutex, mpsc};
9use tokio::task::JoinHandle;
10use uuid::Uuid;
11
12pub struct RuntimeContext {
14 parallel_config: ParallelConfig,
16 task_handles: Arc<DashMap<String, Vec<JoinHandle<()>>>>,
18}
19
20impl RuntimeContext {
21 pub fn new(parallel_config: ParallelConfig) -> Self {
22 Self {
23 parallel_config,
24 task_handles: Arc::new(DashMap::new()),
25 }
26 }
27
28 pub async fn execute_pipeline<T, S, K>(
30 &self,
31 source: S,
32 operators: Vec<Arc<Mutex<dyn Operator<T, T> + Send + Sync>>>,
33 sink: K,
34 ) -> StreamResult<()>
35 where
36 T: Clone + Send + Sync + 'static,
37 S: Source<T> + Send + Sync + 'static,
38 K: Sink<T> + Send + Sync + 'static,
39 {
40 let (tx, rx) = mpsc::channel(self.parallel_config.buffer_size);
41 let source = Arc::new(Mutex::new(source));
42 let sink = Arc::new(Mutex::new(sink));
43
44 let source_handle = self.spawn_source_task(source.clone(), tx.clone());
46
47 let mut curr_rx = rx;
49 let mut handles = vec![source_handle];
50
51 for operator in operators {
53 let (new_tx, new_rx) = mpsc::channel(self.parallel_config.buffer_size);
54 let operator_handles = self.spawn_operator_tasks(operator, curr_rx, new_tx);
55 handles.extend(operator_handles);
56 curr_rx = new_rx;
57 }
58
59 let sink_handle = self.spawn_sink_task(sink.clone(), curr_rx);
61 handles.push(sink_handle);
62
63 self.task_handles
65 .insert(Uuid::new_v4().to_string(), handles);
66
67 Ok(())
68 }
69
70 fn spawn_source_task<T, S>(
71 &self,
72 source: Arc<Mutex<S>>,
73 tx: mpsc::Sender<Record<T>>,
74 ) -> JoinHandle<()>
75 where
76 T: Clone + Send + 'static,
77 S: Source<T> + Send + 'static,
78 {
79 tokio::spawn(async move {
80 loop {
81 let mut source_guard = source.lock().await;
82 match source_guard.next().await {
83 Ok(Some(record)) => {
84 if tx.send(record).await.is_err() {
85 break;
86 }
87 }
88 _ => break,
89 }
90 }
91 let mut source_guard = source.lock().await;
92 let _ = source_guard.close().await;
93 })
94 }
95
96 fn spawn_operator_tasks<T>(
97 &self,
98 operator: Arc<Mutex<dyn Operator<T, T> + Send + Sync>>,
99 rx: mpsc::Receiver<Record<T>>,
100 tx: mpsc::Sender<Record<T>>,
101 ) -> Vec<JoinHandle<()>>
102 where
103 T: Clone + Send + 'static,
104 {
105 let mut handles = Vec::new();
106 let rx = Arc::new(Mutex::new(rx));
107
108 for _ in 0..self.parallel_config.parallelism {
109 let operator = Arc::clone(&operator);
110 let rx = Arc::clone(&rx);
111 let tx = tx.clone();
112
113 let handle = tokio::spawn(async move {
114 loop {
115 let record = {
116 let mut rx = rx.lock().await;
117 match rx.recv().await {
118 Some(r) => r,
119 None => break,
120 }
121 };
122
123 let mut op = operator.lock().await;
124 if let Ok(results) = op.process(record).await {
125 for result in results {
126 if tx.send(result).await.is_err() {
127 return;
128 }
129 }
130 }
131 }
132 });
133 handles.push(handle);
134 }
135
136 handles
137 }
138
139 fn spawn_sink_task<T, K>(
140 &self,
141 sink: Arc<Mutex<K>>,
142 mut rx: mpsc::Receiver<Record<T>>,
143 ) -> JoinHandle<()>
144 where
145 T: Clone + Send + 'static,
146 K: Sink<T> + Send + 'static,
147 {
148 tokio::spawn(async move {
149 while let Some(record) = rx.recv().await {
150 let mut sink_guard = sink.lock().await;
151 let _ = sink_guard.write(record).await;
152 }
153 let mut sink_guard = sink.lock().await;
154 let _ = sink_guard.flush().await;
155 let _ = sink_guard.close().await;
156 })
157 }
158}