kizzasi_io/
pipeline.rs

1//! Stream processing pipeline for composable transformations
2//!
3//! This module provides a framework for building complex stream processing
4//! pipelines with reusable, composable components.
5
6use crate::error::{IoError, IoResult};
7use async_trait::async_trait;
8use scirs2_core::ndarray::Array1;
9
10/// Trait for stream transformations that can be chained
11#[async_trait]
12pub trait StreamTransform: Send + Sync {
13    /// Transform input data to output data
14    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>>;
15
16    /// Optional processing after all data
17    async fn finalize(&mut self) -> IoResult<()> {
18        Ok(())
19    }
20
21    /// Get transform name for debugging
22    fn name(&self) -> &str {
23        "StreamTransform"
24    }
25}
26
27/// A pipeline of stream transformations
28pub struct Pipeline {
29    transforms: Vec<Box<dyn StreamTransform>>,
30}
31
32impl Pipeline {
33    /// Create a new empty pipeline
34    pub fn new() -> Self {
35        Self {
36            transforms: Vec::new(),
37        }
38    }
39
40    /// Add a transform to the pipeline
41    pub fn add_transform<T: StreamTransform + 'static>(mut self, transform: T) -> Self {
42        self.transforms.push(Box::new(transform));
43        self
44    }
45
46    /// Process data through all transforms in sequence
47    pub async fn process(&mut self, mut data: Array1<f32>) -> IoResult<Array1<f32>> {
48        for transform in &mut self.transforms {
49            data = transform.transform(data).await?;
50        }
51        Ok(data)
52    }
53
54    /// Process batch of data
55    pub async fn process_batch(&mut self, batch: Vec<Array1<f32>>) -> IoResult<Vec<Array1<f32>>> {
56        let mut results = Vec::with_capacity(batch.len());
57        for data in batch {
58            results.push(self.process(data).await?);
59        }
60        Ok(results)
61    }
62
63    /// Finalize all transforms
64    pub async fn finalize(&mut self) -> IoResult<()> {
65        for transform in &mut self.transforms {
66            transform.finalize().await?;
67        }
68        Ok(())
69    }
70
71    /// Get number of transforms
72    pub fn len(&self) -> usize {
73        self.transforms.len()
74    }
75
76    /// Check if pipeline is empty
77    pub fn is_empty(&self) -> bool {
78        self.transforms.is_empty()
79    }
80}
81
82impl Default for Pipeline {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88/// Scaling transform (multiply by constant)
89pub struct ScaleTransform {
90    scale: f32,
91}
92
93impl ScaleTransform {
94    pub fn new(scale: f32) -> Self {
95        Self { scale }
96    }
97}
98
99#[async_trait]
100impl StreamTransform for ScaleTransform {
101    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
102        Ok(input * self.scale)
103    }
104
105    fn name(&self) -> &str {
106        "ScaleTransform"
107    }
108}
109
110/// Offset transform (add constant)
111pub struct OffsetTransform {
112    offset: f32,
113}
114
115impl OffsetTransform {
116    pub fn new(offset: f32) -> Self {
117        Self { offset }
118    }
119}
120
121#[async_trait]
122impl StreamTransform for OffsetTransform {
123    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
124        Ok(input + self.offset)
125    }
126
127    fn name(&self) -> &str {
128        "OffsetTransform"
129    }
130}
131
132/// Clipping transform (limit values to range)
133pub struct ClipTransform {
134    min: f32,
135    max: f32,
136}
137
138impl ClipTransform {
139    pub fn new(min: f32, max: f32) -> Self {
140        Self { min, max }
141    }
142}
143
144#[async_trait]
145impl StreamTransform for ClipTransform {
146    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
147        Ok(input.mapv(|x| x.clamp(self.min, self.max)))
148    }
149
150    fn name(&self) -> &str {
151        "ClipTransform"
152    }
153}
154
155/// Normalization transform (scale to [0, 1] or [-1, 1])
156pub struct NormalizeTransform {
157    min_val: Option<f32>,
158    max_val: Option<f32>,
159    centered: bool,
160}
161
162impl NormalizeTransform {
163    /// Create normalizer with auto-detection of min/max
164    pub fn new(centered: bool) -> Self {
165        Self {
166            min_val: None,
167            max_val: None,
168            centered,
169        }
170    }
171
172    /// Create normalizer with fixed min/max
173    pub fn with_range(min: f32, max: f32, centered: bool) -> Self {
174        Self {
175            min_val: Some(min),
176            max_val: Some(max),
177            centered,
178        }
179    }
180}
181
182#[async_trait]
183impl StreamTransform for NormalizeTransform {
184    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
185        let min = self
186            .min_val
187            .unwrap_or_else(|| input.iter().copied().fold(f32::INFINITY, f32::min));
188        let max = self
189            .max_val
190            .unwrap_or_else(|| input.iter().copied().fold(f32::NEG_INFINITY, f32::max));
191
192        let range = max - min;
193        if range.abs() < 1e-10 {
194            return Ok(input);
195        }
196
197        let normalized = if self.centered {
198            // Scale to [-1, 1]
199            (input - min) / range * 2.0 - 1.0
200        } else {
201            // Scale to [0, 1]
202            (input - min) / range
203        };
204
205        Ok(normalized)
206    }
207
208    fn name(&self) -> &str {
209        "NormalizeTransform"
210    }
211}
212
213/// Decimation transform (downsample by factor)
214pub struct DecimateTransform {
215    factor: usize,
216}
217
218impl DecimateTransform {
219    pub fn new(factor: usize) -> Self {
220        assert!(factor > 0, "Decimation factor must be > 0");
221        Self { factor }
222    }
223}
224
225#[async_trait]
226impl StreamTransform for DecimateTransform {
227    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
228        let output: Vec<f32> = input.iter().step_by(self.factor).copied().collect();
229        Ok(Array1::from_vec(output))
230    }
231
232    fn name(&self) -> &str {
233        "DecimateTransform"
234    }
235}
236
237/// Moving average transform (smooth signal)
238pub struct MovingAverageTransform {
239    window_size: usize,
240    buffer: Vec<f32>,
241}
242
243impl MovingAverageTransform {
244    pub fn new(window_size: usize) -> Self {
245        assert!(window_size > 0, "Window size must be > 0");
246        Self {
247            window_size,
248            buffer: Vec::with_capacity(window_size),
249        }
250    }
251}
252
253#[async_trait]
254impl StreamTransform for MovingAverageTransform {
255    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
256        let mut output = Vec::with_capacity(input.len());
257
258        for &val in input.iter() {
259            self.buffer.push(val);
260            if self.buffer.len() > self.window_size {
261                self.buffer.remove(0);
262            }
263
264            let avg: f32 = self.buffer.iter().sum::<f32>() / self.buffer.len() as f32;
265            output.push(avg);
266        }
267
268        Ok(Array1::from_vec(output))
269    }
270
271    fn name(&self) -> &str {
272        "MovingAverageTransform"
273    }
274}
275
276/// Derivative transform (compute differences)
277pub struct DerivativeTransform {
278    last_value: Option<f32>,
279}
280
281impl DerivativeTransform {
282    pub fn new() -> Self {
283        Self { last_value: None }
284    }
285}
286
287impl Default for DerivativeTransform {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293#[async_trait]
294impl StreamTransform for DerivativeTransform {
295    async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
296        let mut output = Vec::with_capacity(input.len());
297
298        for &val in input.iter() {
299            if let Some(last) = self.last_value {
300                output.push(val - last);
301            } else {
302                output.push(0.0);
303            }
304            self.last_value = Some(val);
305        }
306
307        Ok(Array1::from_vec(output))
308    }
309
310    fn name(&self) -> &str {
311        "DerivativeTransform"
312    }
313}
314
315/// Parallel pipeline - process multiple pipelines in parallel and combine results
316pub struct ParallelPipeline {
317    pipelines: Vec<Pipeline>,
318    combiner: CombineStrategy,
319}
320
321#[derive(Debug, Clone, Copy)]
322pub enum CombineStrategy {
323    /// Average all outputs
324    Average,
325    /// Take maximum value across outputs
326    Maximum,
327    /// Take minimum value across outputs
328    Minimum,
329    /// Sum all outputs
330    Sum,
331}
332
333impl ParallelPipeline {
334    /// Create parallel pipeline with combiner strategy
335    pub fn new(combiner: CombineStrategy) -> Self {
336        Self {
337            pipelines: Vec::new(),
338            combiner,
339        }
340    }
341
342    /// Add a pipeline to run in parallel
343    pub fn add_pipeline(mut self, pipeline: Pipeline) -> Self {
344        self.pipelines.push(pipeline);
345        self
346    }
347
348    /// Process data through all pipelines in parallel
349    pub async fn process(&mut self, data: Array1<f32>) -> IoResult<Array1<f32>> {
350        if self.pipelines.is_empty() {
351            return Ok(data);
352        }
353
354        let mut results = Vec::with_capacity(self.pipelines.len());
355
356        // Process through each pipeline
357        for pipeline in &mut self.pipelines {
358            let result = pipeline.process(data.clone()).await?;
359            results.push(result);
360        }
361
362        // Combine results based on strategy
363        self.combine_results(results)
364    }
365
366    fn combine_results(&self, results: Vec<Array1<f32>>) -> IoResult<Array1<f32>> {
367        if results.is_empty() {
368            return Err(IoError::InvalidConfig("No results to combine".to_string()));
369        }
370
371        let len = results[0].len();
372        let mut combined = Array1::zeros(len);
373
374        match self.combiner {
375            CombineStrategy::Average => {
376                for result in &results {
377                    combined += result;
378                }
379                combined /= results.len() as f32;
380            }
381            CombineStrategy::Maximum => {
382                for i in 0..len {
383                    let max = results
384                        .iter()
385                        .map(|r| r[i])
386                        .fold(f32::NEG_INFINITY, f32::max);
387                    combined[i] = max;
388                }
389            }
390            CombineStrategy::Minimum => {
391                for i in 0..len {
392                    let min = results.iter().map(|r| r[i]).fold(f32::INFINITY, f32::min);
393                    combined[i] = min;
394                }
395            }
396            CombineStrategy::Sum => {
397                for result in &results {
398                    combined += result;
399                }
400            }
401        }
402
403        Ok(combined)
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    #[tokio::test]
412    async fn test_scale_transform() {
413        let mut transform = ScaleTransform::new(2.0);
414        let input = Array1::from_vec(vec![1.0, 2.0, 3.0]);
415        let output = transform.transform(input).await.unwrap();
416        assert_eq!(output.as_slice().unwrap(), &[2.0, 4.0, 6.0]);
417    }
418
419    #[tokio::test]
420    async fn test_pipeline() {
421        let mut pipeline = Pipeline::new()
422            .add_transform(ScaleTransform::new(2.0))
423            .add_transform(OffsetTransform::new(1.0));
424
425        let input = Array1::from_vec(vec![1.0, 2.0, 3.0]);
426        let output = pipeline.process(input).await.unwrap();
427        assert_eq!(output.as_slice().unwrap(), &[3.0, 5.0, 7.0]);
428    }
429
430    #[tokio::test]
431    async fn test_clip_transform() {
432        let mut transform = ClipTransform::new(-1.0, 1.0);
433        let input = Array1::from_vec(vec![-2.0, 0.5, 2.0]);
434        let output = transform.transform(input).await.unwrap();
435        assert_eq!(output.as_slice().unwrap(), &[-1.0, 0.5, 1.0]);
436    }
437
438    #[tokio::test]
439    async fn test_normalize_transform() {
440        let mut transform = NormalizeTransform::with_range(0.0, 10.0, false);
441        let input = Array1::from_vec(vec![0.0, 5.0, 10.0]);
442        let output = transform.transform(input).await.unwrap();
443        assert_eq!(output.as_slice().unwrap(), &[0.0, 0.5, 1.0]);
444    }
445}