1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use futures_util::StreamExt;
5use tokio::sync::broadcast;
6use async_stream::stream;
7use crate::RS2Stream;
8
9#[derive(Debug)]
10pub enum PipelineError {
11 NoSource,
12 NoSink,
13 InvalidPipeline(String),
14 RuntimeError(Box<dyn std::error::Error + Send + Sync>),
15}
16
17impl std::fmt::Display for PipelineError {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 match self {
20 PipelineError::NoSource => write!(f, "Pipeline has no source"),
21 PipelineError::NoSink => write!(f, "Pipeline has no sink"),
22 PipelineError::InvalidPipeline(msg) => write!(f, "Invalid pipeline: {}", msg),
23 PipelineError::RuntimeError(e) => write!(f, "Runtime error: {}", e),
24 }
25 }
26}
27
28impl std::error::Error for PipelineError {}
29
30pub type PipelineResult<T> = Result<T, PipelineError>;
31
32pub enum PipelineNode<T> {
33 Source {
34 name: String,
35 func: Box<dyn Fn() -> RS2Stream<T> + Send + Sync>,
36 },
37 Transform {
38 name: String,
39 func: Box<dyn Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync>,
40 },
41 Sink {
42 name: String,
43 func: Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
44 },
45 Branch {
46 name: String,
47 sinks: Vec<Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>>,
48 },
49}
50
51#[derive(Debug)]
52pub struct PipelineConfig {
53 pub name: String,
54 pub buffer_size: usize,
55 pub enable_metrics: bool,
56}
57
58impl Default for PipelineConfig {
59 fn default() -> Self {
60 Self {
61 name: "unnamed-pipeline".to_string(),
62 buffer_size: 1000,
63 enable_metrics: false,
64 }
65 }
66}
67
68pub struct Pipeline<T> {
69 config: PipelineConfig,
70 nodes: Vec<PipelineNode<T>>,
71}
72
73impl<T: Send + Clone + 'static> Pipeline<T> {
74 pub fn new() -> Self {
75 Self {
76 config: PipelineConfig::default(),
77 nodes: vec![],
78 }
79 }
80
81 pub fn with_config(mut self, config: PipelineConfig) -> Self {
82 self.config = config;
83 self
84 }
85
86 pub fn named_source<F>(mut self, name: &str, f: F) -> Self
87 where
88 F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
89 {
90 self.nodes.push(PipelineNode::Source {
91 name: name.to_string(),
92 func: Box::new(f),
93 });
94 self
95 }
96
97 pub fn source<F>(self, f: F) -> Self
98 where
99 F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
100 {
101 self.named_source("source", f)
102 }
103
104 pub fn named_transform<F>(mut self, name: &str, f: F) -> Self
105 where
106 F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
107 {
108 self.nodes.push(PipelineNode::Transform {
109 name: name.to_string(),
110 func: Box::new(f),
111 });
112 self
113 }
114
115 pub fn transform<F>(self, f: F) -> Self
116 where
117 F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
118 {
119 self.named_transform("transform", f)
120 }
121
122 pub fn named_sink<F>(mut self, name: &str, f: F) -> Self
123 where
124 F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
125 {
126 self.nodes.push(PipelineNode::Sink {
127 name: name.to_string(),
128 func: Box::new(f),
129 });
130 self
131 }
132
133 pub fn sink<F>(self, f: F) -> Self
134 where
135 F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
136 {
137 self.named_sink("sink", f)
138 }
139
140 pub fn branch<F1, F2>(mut self, name: &str, f1: F1, f2: F2) -> Self
141 where
142 F1: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
143 F2: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
144 {
145 self.nodes.push(PipelineNode::Branch {
146 name: name.to_string(),
147 sinks: vec![Box::new(f1), Box::new(f2)],
148 });
149 self
150 }
151
152 pub fn validate(&self) -> PipelineResult<()> {
153 if self.nodes.is_empty() {
154 return Err(PipelineError::InvalidPipeline("Empty pipeline".to_string()));
155 }
156
157 let mut has_source = false;
158 let mut has_sink_or_branch = false;
159
160 for node in &self.nodes {
161 match node {
162 PipelineNode::Source { .. } => has_source = true,
163 PipelineNode::Sink { .. } | PipelineNode::Branch { .. } => has_sink_or_branch = true,
164 _ => {}
165 }
166 }
167
168 if !has_source {
169 return Err(PipelineError::NoSource);
170 }
171
172 if !has_sink_or_branch {
173 return Err(PipelineError::NoSink);
174 }
175
176 Ok(())
177 }
178
179 pub async fn run(self) -> PipelineResult<()> {
180 self.validate()?;
181
182 let mut stream = None;
183
184 for node in self.nodes {
185 match node {
186 PipelineNode::Source { name, func } => {
187 if self.config.enable_metrics {
188 println!("Pipeline '{}': Starting source '{}'", self.config.name, name);
189 }
190 stream = Some(func());
191 }
192 PipelineNode::Transform { name, func } => {
193 if let Some(s) = stream.take() {
194 if self.config.enable_metrics {
195 println!("Pipeline '{}': Applying transform '{}'", self.config.name, name);
196 }
197 stream = Some(func(s));
198 }
199 }
200 PipelineNode::Sink { name, func } => {
201 if let Some(s) = stream.take() {
202 if self.config.enable_metrics {
203 println!("Pipeline '{}': Running sink '{}'", self.config.name, name);
204 }
205 func(s).await;
206 }
207 }
208 PipelineNode::Branch { name, sinks } => {
209 if let Some(s) = stream.take() {
210 if self.config.enable_metrics {
211 println!("Pipeline '{}': Branching to {} sinks at '{}'",
212 self.config.name, sinks.len(), name);
213 }
214
215 let (tx, _) = broadcast::channel(self.config.buffer_size);
217
218 let tx_clone = tx.clone();
220 tokio::spawn(async move {
221 let mut stream = s;
222 while let Some(item) = stream.next().await {
223 if tx_clone.send(item).is_err() {
224 break; }
226 }
227 });
228
229 let mut handles = Vec::new();
231 for sink_func in sinks {
232 let mut rx = tx.subscribe();
233
234 let sink_stream = stream! {
236 while let Ok(item) = rx.recv().await {
237 yield item;
238 }
239 }.boxed();
240
241 handles.push(tokio::spawn(async move {
242 sink_func(sink_stream).await;
243 }));
244 }
245
246 for handle in handles {
248 if let Err(e) = handle.await {
249 return Err(PipelineError::RuntimeError(Box::new(e)));
250 }
251 }
252 }
253 }
254 }
255 }
256
257 Ok(())
258 }
259}
260
261impl<T: Send + Clone + 'static> Default for Pipeline<T> {
263 fn default() -> Self {
264 Self::new()
265 }
266}