rs2_stream/pipeline/
builder.rs

1use crate::RS2Stream;
2use async_stream::stream;
3use futures_util::StreamExt;
4use std::future::Future;
5use std::pin::Pin;
6use tokio::sync::broadcast;
7
8#[derive(Debug)]
9pub enum PipelineError {
10    NoSource,
11    NoSink,
12    InvalidPipeline(String),
13    RuntimeError(Box<dyn std::error::Error + Send + Sync>),
14}
15
16impl std::fmt::Display for PipelineError {
17    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18        match self {
19            PipelineError::NoSource => write!(f, "Pipeline has no source"),
20            PipelineError::NoSink => write!(f, "Pipeline has no sink"),
21            PipelineError::InvalidPipeline(msg) => write!(f, "Invalid pipeline: {}", msg),
22            PipelineError::RuntimeError(e) => write!(f, "Runtime error: {}", e),
23        }
24    }
25}
26
27impl std::error::Error for PipelineError {}
28
29pub type PipelineResult<T> = Result<T, PipelineError>;
30
31pub enum PipelineNode<T> {
32    Source {
33        name: String,
34        func: Box<dyn Fn() -> RS2Stream<T> + Send + Sync>,
35    },
36    Transform {
37        name: String,
38        func: Box<dyn Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync>,
39    },
40    Sink {
41        name: String,
42        func: Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
43    },
44    Branch {
45        name: String,
46        sinks: Vec<
47            Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
48        >,
49    },
50}
51
52#[derive(Debug)]
53pub struct PipelineConfig {
54    pub name: String,
55    pub buffer_size: usize,
56    pub enable_metrics: bool,
57}
58
59impl Default for PipelineConfig {
60    fn default() -> Self {
61        Self {
62            name: "unnamed-pipeline".to_string(),
63            buffer_size: 1000,
64            enable_metrics: false,
65        }
66    }
67}
68
69pub struct Pipeline<T> {
70    config: PipelineConfig,
71    nodes: Vec<PipelineNode<T>>,
72}
73
74impl<T: Send + Clone + 'static> Pipeline<T> {
75    pub fn new() -> Self {
76        Self {
77            config: PipelineConfig::default(),
78            nodes: vec![],
79        }
80    }
81
82    pub fn with_config(mut self, config: PipelineConfig) -> Self {
83        self.config = config;
84        self
85    }
86
87    pub fn named_source<F>(mut self, name: &str, f: F) -> Self
88    where
89        F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
90    {
91        self.nodes.push(PipelineNode::Source {
92            name: name.to_string(),
93            func: Box::new(f),
94        });
95        self
96    }
97
98    pub fn source<F>(self, f: F) -> Self
99    where
100        F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
101    {
102        self.named_source("source", f)
103    }
104
105    pub fn named_transform<F>(mut self, name: &str, f: F) -> Self
106    where
107        F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
108    {
109        self.nodes.push(PipelineNode::Transform {
110            name: name.to_string(),
111            func: Box::new(f),
112        });
113        self
114    }
115
116    pub fn transform<F>(self, f: F) -> Self
117    where
118        F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
119    {
120        self.named_transform("transform", f)
121    }
122
123    pub fn named_sink<F>(mut self, name: &str, f: F) -> Self
124    where
125        F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
126    {
127        self.nodes.push(PipelineNode::Sink {
128            name: name.to_string(),
129            func: Box::new(f),
130        });
131        self
132    }
133
134    pub fn sink<F>(self, f: F) -> Self
135    where
136        F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
137    {
138        self.named_sink("sink", f)
139    }
140
141    pub fn branch<F1, F2>(mut self, name: &str, f1: F1, f2: F2) -> Self
142    where
143        F1: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
144        F2: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
145    {
146        self.nodes.push(PipelineNode::Branch {
147            name: name.to_string(),
148            sinks: vec![Box::new(f1), Box::new(f2)],
149        });
150        self
151    }
152
153    pub fn validate(&self) -> PipelineResult<()> {
154        if self.nodes.is_empty() {
155            return Err(PipelineError::InvalidPipeline("Empty pipeline".to_string()));
156        }
157
158        let mut has_source = false;
159        let mut has_sink_or_branch = false;
160
161        for node in &self.nodes {
162            match node {
163                PipelineNode::Source { .. } => has_source = true,
164                PipelineNode::Sink { .. } | PipelineNode::Branch { .. } => {
165                    has_sink_or_branch = true
166                }
167                _ => {}
168            }
169        }
170
171        if !has_source {
172            return Err(PipelineError::NoSource);
173        }
174
175        if !has_sink_or_branch {
176            return Err(PipelineError::NoSink);
177        }
178
179        Ok(())
180    }
181
182    pub async fn run(self) -> PipelineResult<()> {
183        self.validate()?;
184
185        let mut stream = None;
186
187        for node in self.nodes {
188            match node {
189                PipelineNode::Source { name: _name, func } => {
190                    stream = Some(func());
191                }
192                PipelineNode::Transform { name: _name, func } => {
193                    if let Some(s) = stream.take() {
194                        stream = Some(func(s));
195                    }
196                }
197                PipelineNode::Sink { name: _name, func } => {
198                    if let Some(s) = stream.take() {
199                        func(s).await;
200                    }
201                }
202                PipelineNode::Branch { name: _name, sinks } => {
203                    if let Some(s) = stream.take() {
204                        // Use broadcast to fan out to multiple sinks
205                        let (tx, _) = broadcast::channel(self.config.buffer_size);
206
207                        // Spawn task to feed the broadcast channel
208                        let tx_clone = tx.clone();
209                        tokio::spawn(async move {
210                            let mut stream = s;
211                            while let Some(item) = stream.next().await {
212                                if tx_clone.send(item).is_err() {
213                                    break; // All receivers dropped
214                                }
215                            }
216                        });
217
218                        // Run all sinks concurrently
219                        let mut handles = Vec::new();
220                        for sink_func in sinks {
221                            let mut rx = tx.subscribe();
222
223                            // Create a stream from the broadcast receiver
224                            let sink_stream = stream! {
225                                while let Ok(item) = rx.recv().await {
226                                    yield item;
227                                }
228                            }
229                            .boxed();
230
231                            handles.push(tokio::spawn(async move {
232                                sink_func(sink_stream).await;
233                            }));
234                        }
235
236                        // Wait for all sinks to complete
237                        for handle in handles {
238                            if let Err(e) = handle.await {
239                                return Err(PipelineError::RuntimeError(Box::new(e)));
240                            }
241                        }
242                    }
243                }
244            }
245        }
246
247        Ok(())
248    }
249}
250
251// Fix the Default implementation to match the trait bounds
252impl<T: Send + Clone + 'static> Default for Pipeline<T> {
253    fn default() -> Self {
254        Self::new()
255    }
256}