rs2_stream/pipeline/
builder.rs1use crate::RS2Stream;
2use async_stream::stream;
3use futures_util::StreamExt;
4use std::future::Future;
5use std::pin::Pin;
6use tokio::sync::broadcast;
7
8#[derive(Debug)]
9pub enum PipelineError {
10 NoSource,
11 NoSink,
12 InvalidPipeline(String),
13 RuntimeError(Box<dyn std::error::Error + Send + Sync>),
14}
15
16impl std::fmt::Display for PipelineError {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 PipelineError::NoSource => write!(f, "Pipeline has no source"),
20 PipelineError::NoSink => write!(f, "Pipeline has no sink"),
21 PipelineError::InvalidPipeline(msg) => write!(f, "Invalid pipeline: {}", msg),
22 PipelineError::RuntimeError(e) => write!(f, "Runtime error: {}", e),
23 }
24 }
25}
26
27impl std::error::Error for PipelineError {}
28
29pub type PipelineResult<T> = Result<T, PipelineError>;
30
31pub enum PipelineNode<T> {
32 Source {
33 name: String,
34 func: Box<dyn Fn() -> RS2Stream<T> + Send + Sync>,
35 },
36 Transform {
37 name: String,
38 func: Box<dyn Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync>,
39 },
40 Sink {
41 name: String,
42 func: Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
43 },
44 Branch {
45 name: String,
46 sinks: Vec<
47 Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
48 >,
49 },
50}
51
52#[derive(Debug)]
53pub struct PipelineConfig {
54 pub name: String,
55 pub buffer_size: usize,
56 pub enable_metrics: bool,
57}
58
59impl Default for PipelineConfig {
60 fn default() -> Self {
61 Self {
62 name: "unnamed-pipeline".to_string(),
63 buffer_size: 1000,
64 enable_metrics: false,
65 }
66 }
67}
68
69pub struct Pipeline<T> {
70 config: PipelineConfig,
71 nodes: Vec<PipelineNode<T>>,
72}
73
74impl<T: Send + Clone + 'static> Pipeline<T> {
75 pub fn new() -> Self {
76 Self {
77 config: PipelineConfig::default(),
78 nodes: vec![],
79 }
80 }
81
82 pub fn with_config(mut self, config: PipelineConfig) -> Self {
83 self.config = config;
84 self
85 }
86
87 pub fn named_source<F>(mut self, name: &str, f: F) -> Self
88 where
89 F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
90 {
91 self.nodes.push(PipelineNode::Source {
92 name: name.to_string(),
93 func: Box::new(f),
94 });
95 self
96 }
97
98 pub fn source<F>(self, f: F) -> Self
99 where
100 F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
101 {
102 self.named_source("source", f)
103 }
104
105 pub fn named_transform<F>(mut self, name: &str, f: F) -> Self
106 where
107 F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
108 {
109 self.nodes.push(PipelineNode::Transform {
110 name: name.to_string(),
111 func: Box::new(f),
112 });
113 self
114 }
115
116 pub fn transform<F>(self, f: F) -> Self
117 where
118 F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
119 {
120 self.named_transform("transform", f)
121 }
122
123 pub fn named_sink<F>(mut self, name: &str, f: F) -> Self
124 where
125 F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
126 {
127 self.nodes.push(PipelineNode::Sink {
128 name: name.to_string(),
129 func: Box::new(f),
130 });
131 self
132 }
133
134 pub fn sink<F>(self, f: F) -> Self
135 where
136 F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
137 {
138 self.named_sink("sink", f)
139 }
140
141 pub fn branch<F1, F2>(mut self, name: &str, f1: F1, f2: F2) -> Self
142 where
143 F1: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
144 F2: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
145 {
146 self.nodes.push(PipelineNode::Branch {
147 name: name.to_string(),
148 sinks: vec![Box::new(f1), Box::new(f2)],
149 });
150 self
151 }
152
153 pub fn validate(&self) -> PipelineResult<()> {
154 if self.nodes.is_empty() {
155 return Err(PipelineError::InvalidPipeline("Empty pipeline".to_string()));
156 }
157
158 let mut has_source = false;
159 let mut has_sink_or_branch = false;
160
161 for node in &self.nodes {
162 match node {
163 PipelineNode::Source { .. } => has_source = true,
164 PipelineNode::Sink { .. } | PipelineNode::Branch { .. } => {
165 has_sink_or_branch = true
166 }
167 _ => {}
168 }
169 }
170
171 if !has_source {
172 return Err(PipelineError::NoSource);
173 }
174
175 if !has_sink_or_branch {
176 return Err(PipelineError::NoSink);
177 }
178
179 Ok(())
180 }
181
182 pub async fn run(self) -> PipelineResult<()> {
183 self.validate()?;
184
185 let mut stream = None;
186
187 for node in self.nodes {
188 match node {
189 PipelineNode::Source { name: _name, func } => {
190 stream = Some(func());
191 }
192 PipelineNode::Transform { name: _name, func } => {
193 if let Some(s) = stream.take() {
194 stream = Some(func(s));
195 }
196 }
197 PipelineNode::Sink { name: _name, func } => {
198 if let Some(s) = stream.take() {
199 func(s).await;
200 }
201 }
202 PipelineNode::Branch { name: _name, sinks } => {
203 if let Some(s) = stream.take() {
204 let (tx, _) = broadcast::channel(self.config.buffer_size);
206
207 let tx_clone = tx.clone();
209 tokio::spawn(async move {
210 let mut stream = s;
211 while let Some(item) = stream.next().await {
212 if tx_clone.send(item).is_err() {
213 break; }
215 }
216 });
217
218 let mut handles = Vec::new();
220 for sink_func in sinks {
221 let mut rx = tx.subscribe();
222
223 let sink_stream = stream! {
225 while let Ok(item) = rx.recv().await {
226 yield item;
227 }
228 }
229 .boxed();
230
231 handles.push(tokio::spawn(async move {
232 sink_func(sink_stream).await;
233 }));
234 }
235
236 for handle in handles {
238 if let Err(e) = handle.await {
239 return Err(PipelineError::RuntimeError(Box::new(e)));
240 }
241 }
242 }
243 }
244 }
245 }
246
247 Ok(())
248 }
249}
250
251impl<T: Send + Clone + 'static> Default for Pipeline<T> {
253 fn default() -> Self {
254 Self::new()
255 }
256}