sciforge_hub/engine/pipeline/
flow.rs1use crate::domain::common::errors::{HubError, HubResult};
7
8pub struct Stage {
10 pub name: String,
12 pub transform: Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>>,
14}
15
16pub struct Pipeline {
18 stages: Vec<Stage>,
19}
20
21impl Pipeline {
22 pub fn new() -> Self {
24 Self { stages: Vec::new() }
25 }
26
27 pub fn add_stage(
29 mut self,
30 name: &str,
31 transform: Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>>,
32 ) -> Self {
33 self.stages.push(Stage {
34 name: name.to_string(),
35 transform,
36 });
37 self
38 }
39
40 pub fn execute(&self, input: Vec<f64>) -> HubResult<Vec<f64>> {
42 let mut data = input;
43 for stage in &self.stages {
44 data = (stage.transform)(data)
45 .map_err(|e| HubError::ComputationFailed(format!("stage '{}': {e}", stage.name)))?;
46 }
47 Ok(data)
48 }
49
50 pub fn stage_count(&self) -> usize {
52 self.stages.len()
53 }
54
55 pub fn stage_names(&self) -> Vec<&str> {
57 self.stages.iter().map(|s| s.name.as_str()).collect()
58 }
59}
60
61impl Default for Pipeline {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67pub fn normalize_stage() -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
69 Box::new(|data| {
70 if data.is_empty() {
71 return Err(HubError::EmptyData);
72 }
73 let min = data.iter().copied().fold(f64::INFINITY, f64::min);
74 let max = data.iter().copied().fold(f64::NEG_INFINITY, f64::max);
75 let range = max - min;
76 if range == 0.0 {
77 return Ok(vec![0.0; data.len()]);
78 }
79 Ok(data.iter().map(|&v| (v - min) / range).collect())
80 })
81}
82
83pub fn scale_stage(factor: f64) -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
85 Box::new(move |data| Ok(data.iter().map(|&v| v * factor).collect()))
86}
87
88pub fn filter_positive_stage() -> Box<dyn Fn(Vec<f64>) -> HubResult<Vec<f64>>> {
90 Box::new(|data| {
91 let filtered: Vec<f64> = data.into_iter().filter(|&v| v > 0.0).collect();
92 if filtered.is_empty() {
93 return Err(HubError::EmptyData);
94 }
95 Ok(filtered)
96 })
97}