Skip to main content

sciforge_hub/engine/pipeline/
flow.rs

1//! Pipeline builder and built-in stages.
2//!
3//! [`Pipeline`] chains [`Stage`]s that each transform a `Vec<f64>`.
4//! Built-in stages include filtering, normalisation, and scaling.
5
6use crate::domain::common::errors::{HubError, HubResult};
7
8/// Named transformation stage.
9pub struct Stage {
10    /// Stage identifier.
11    pub name: String,
12    /// Transformation closure.
13    pub transform: Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>>,
14}
15
16/// Sequential data transformation pipeline.
17pub struct Pipeline {
18    stages: Vec<Stage>,
19}
20
21impl Pipeline {
22    /// Creates an empty pipeline.
23    pub fn new() -> Self {
24        Self { stages: Vec::new() }
25    }
26
27    /// Appends a named stage.
28    pub fn add_stage(
29        mut self,
30        name: &str,
31        transform: Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>>,
32    ) -> Self {
33        self.stages.push(Stage {
34            name: name.to_string(),
35            transform,
36        });
37        self
38    }
39
40    /// Runs all stages in order on the input vector.
41    pub fn execute(&self, input: Vec<f64>) -> HubResult<Vec<f64>> {
42        let mut data = input;
43        for stage in &self.stages {
44            data = (stage.transform)(data)
45                .map_err(|e| HubError::ComputationFailed(format!("stage '{}': {e}", stage.name)))?;
46        }
47        Ok(data)
48    }
49
50    /// Number of stages.
51    pub fn stage_count(&self) -> usize {
52        self.stages.len()
53    }
54
55    /// Returns the names of all stages in order.
56    pub fn stage_names(&self) -> Vec<&str> {
57        self.stages.iter().map(|s| s.name.as_str()).collect()
58    }
59}
60
61impl Default for Pipeline {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67/// Stage that normalizes values to [0, 1].
68pub fn normalize_stage() -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
69    Box::new(|data| {
70        if data.is_empty() {
71            return Err(HubError::EmptyData);
72        }
73        let min = data.iter().copied().fold(f64::INFINITY, f64::min);
74        let max = data.iter().copied().fold(f64::NEG_INFINITY, f64::max);
75        let range = max - min;
76        if range == 0.0 {
77            return Ok(vec![0.0; data.len()]);
78        }
79        Ok(data.iter().map(|&v| (v - min) / range).collect())
80    })
81}
82
83/// Stage that scales all values by a constant factor.
84pub fn scale_stage(factor: f64) -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
85    Box::new(move |data| Ok(data.iter().map(|&v| v * factor).collect()))
86}
87
88/// Stage that keeps only positive values.
89pub fn filter_positive_stage() -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
90    Box::new(|data| {
91        let filtered: Vec<f64> = data.into_iter().filter(|&v| v > 0.0).collect();
92        if filtered.is_empty() {
93            return Err(HubError::EmptyData);
94        }
95        Ok(filtered)
96    })
97}