rs2_stream/pipeline/
builder.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use futures_util::StreamExt;
5use tokio::sync::broadcast;
6use async_stream::stream;
7use crate::RS2Stream;
8
9#[derive(Debug)]
10pub enum PipelineError {
11    NoSource,
12    NoSink,
13    InvalidPipeline(String),
14    RuntimeError(Box<dyn std::error::Error + Send + Sync>),
15}
16
17impl std::fmt::Display for PipelineError {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        match self {
20            PipelineError::NoSource => write!(f, "Pipeline has no source"),
21            PipelineError::NoSink => write!(f, "Pipeline has no sink"),
22            PipelineError::InvalidPipeline(msg) => write!(f, "Invalid pipeline: {}", msg),
23            PipelineError::RuntimeError(e) => write!(f, "Runtime error: {}", e),
24        }
25    }
26}
27
28impl std::error::Error for PipelineError {}
29
30pub type PipelineResult<T> = Result<T, PipelineError>;
31
32pub enum PipelineNode<T> {
33    Source {
34        name: String,
35        func: Box<dyn Fn() -> RS2Stream<T> + Send + Sync>,
36    },
37    Transform {
38        name: String,
39        func: Box<dyn Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync>,
40    },
41    Sink {
42        name: String,
43        func: Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
44    },
45    Branch {
46        name: String,
47        sinks: Vec<Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
48    },
49}
50
51#[derive(Debug)]
52pub struct PipelineConfig {
53    pub name: String,
54    pub buffer_size: usize,
55    pub enable_metrics: bool,
56}
57
58impl Default for PipelineConfig {
59    fn default() -> Self {
60        Self {
61            name: "unnamed-pipeline".to_string(),
62            buffer_size: 1000,
63            enable_metrics: false,
64        }
65    }
66}
67
68pub struct Pipeline<T> {
69    config: PipelineConfig,
70    nodes: Vec<PipelineNode<T>>,
71}
72
73impl<T: Send + Clone + 'static> Pipeline<T> {
74    pub fn new() -> Self {
75        Self {
76            config: PipelineConfig::default(),
77            nodes: vec![],
78        }
79    }
80
81    pub fn with_config(mut self, config: PipelineConfig) -> Self {
82        self.config = config;
83        self
84    }
85
86    pub fn named_source<F>(mut self, name: &str, f: F) -> Self
87    where
88        F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
89    {
90        self.nodes.push(PipelineNode::Source {
91            name: name.to_string(),
92            func: Box::new(f),
93        });
94        self
95    }
96
97    pub fn source<F>(self, f: F) -> Self
98    where
99        F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
100    {
101        self.named_source("source", f)
102    }
103
104    pub fn named_transform<F>(mut self, name: &str, f: F) -> Self
105    where
106        F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
107    {
108        self.nodes.push(PipelineNode::Transform {
109            name: name.to_string(),
110            func: Box::new(f),
111        });
112        self
113    }
114
115    pub fn transform<F>(self, f: F) -> Self
116    where
117        F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
118    {
119        self.named_transform("transform", f)
120    }
121
122    pub fn named_sink<F>(mut self, name: &str, f: F) -> Self
123    where
124        F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
125    {
126        self.nodes.push(PipelineNode::Sink {
127            name: name.to_string(),
128            func: Box::new(f),
129        });
130        self
131    }
132
133    pub fn sink<F>(self, f: F) -> Self
134    where
135        F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
136    {
137        self.named_sink("sink", f)
138    }
139
140    pub fn branch<F1, F2>(mut self, name: &str, f1: F1, f2: F2) -> Self
141    where
142        F1: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
143        F2: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
144    {
145        self.nodes.push(PipelineNode::Branch {
146            name: name.to_string(),
147            sinks: vec![Box::new(f1), Box::new(f2)],
148        });
149        self
150    }
151
152    pub fn validate(&self) -> PipelineResult<()> {
153        if self.nodes.is_empty() {
154            return Err(PipelineError::InvalidPipeline("Empty pipeline".to_string()));
155        }
156
157        let mut has_source = false;
158        let mut has_sink_or_branch = false;
159
160        for node in &self.nodes {
161            match node {
162                PipelineNode::Source { .. } => has_source = true,
163                PipelineNode::Sink { .. } | PipelineNode::Branch { .. } => has_sink_or_branch = true,
164                _ => {}
165            }
166        }
167
168        if !has_source {
169            return Err(PipelineError::NoSource);
170        }
171
172        if !has_sink_or_branch {
173            return Err(PipelineError::NoSink);
174        }
175
176        Ok(())
177    }
178
179    pub async fn run(self) -> PipelineResult<()> {
180        self.validate()?;
181
182        let mut stream = None;
183
184        for node in self.nodes {
185            match node {
186                PipelineNode::Source { name, func } => {
187                    if self.config.enable_metrics {
188                        println!("Pipeline '{}': Starting source '{}'", self.config.name, name);
189                    }
190                    stream = Some(func());
191                }
192                PipelineNode::Transform { name, func } => {
193                    if let Some(s) = stream.take() {
194                        if self.config.enable_metrics {
195                            println!("Pipeline '{}': Applying transform '{}'", self.config.name, name);
196                        }
197                        stream = Some(func(s));
198                    }
199                }
200                PipelineNode::Sink { name, func } => {
201                    if let Some(s) = stream.take() {
202                        if self.config.enable_metrics {
203                            println!("Pipeline '{}': Running sink '{}'", self.config.name, name);
204                        }
205                        func(s).await;
206                    }
207                }
208                PipelineNode::Branch { name, sinks } => {
209                    if let Some(s) = stream.take() {
210                        if self.config.enable_metrics {
211                            println!("Pipeline '{}': Branching to {} sinks at '{}'",
212                                     self.config.name, sinks.len(), name);
213                        }
214
215                        // Use broadcast to fan out to multiple sinks
216                        let (tx, _) = broadcast::channel(self.config.buffer_size);
217
218                        // Spawn task to feed the broadcast channel
219                        let tx_clone = tx.clone();
220                        tokio::spawn(async move {
221                            let mut stream = s;
222                            while let Some(item) = stream.next().await {
223                                if tx_clone.send(item).is_err() {
224                                    break; // All receivers dropped
225                                }
226                            }
227                        });
228
229                        // Run all sinks concurrently
230                        let mut handles = Vec::new();
231                        for sink_func in sinks {
232                            let mut rx = tx.subscribe();
233
234                            // Create a stream from the broadcast receiver
235                            let sink_stream = stream! {
236                                while let Ok(item) = rx.recv().await {
237                                    yield item;
238                                }
239                            }.boxed();
240
241                            handles.push(tokio::spawn(async move {
242                                sink_func(sink_stream).await;
243                            }));
244                        }
245
246                        // Wait for all sinks to complete
247                        for handle in handles {
248                            if let Err(e) = handle.await {
249                                return Err(PipelineError::RuntimeError(Box::new(e)));
250                            }
251                        }
252                    }
253                }
254            }
255        }
256
257        Ok(())
258    }
259}
260
261// Fix the Default implementation to match the trait bounds
262impl<T: Send + Clone + 'static> Default for Pipeline<T> {
263    fn default() -> Self {
264        Self::new()
265    }
266}