pub mod builder;
pub mod pipeline_core;
pub mod scalers;
pub mod traits;
pub use pipeline_core::{
DataBatch, DataSample, DataType, ErrorStrategy, FeatureConstraint, FeatureSchema, FeatureValue,
MLPipeline, MLPipelineError, ModelType, MonitoringConfig, PipelineConfig, PipelineMetrics,
PipelineNode, TransformType,
};
pub use pipeline_core::FeatureTransformer as FeatureTransformerNode;
pub use pipeline_core::ModelPredictor as ModelPredictorNode;
pub use traits::{FeatureTransformer, ModelPredictor};
pub use scalers::{MinMaxScaler, NormType, NormalizerTransform, StandardScaler};
pub use builder::{Pipeline, PipelineError, PipelineStep};
use ndarray::{Array1, Array2};
use num_traits::{Float, FromPrimitive};
use std::fmt;
pub struct MLPipelineGeneric<T: Clone + fmt::Debug + Float + FromPrimitive + 'static> {
steps: Vec<Box<dyn FeatureTransformer<T>>>,
predictor: Option<Box<dyn ModelPredictor<T>>>,
is_fitted: bool,
}
impl<T: Clone + fmt::Debug + Float + FromPrimitive + Send + Sync + 'static> MLPipelineGeneric<T> {
pub fn new() -> Self {
Self {
steps: Vec::new(),
predictor: None,
is_fitted: false,
}
}
pub fn add_transformer(&mut self, transformer: Box<dyn FeatureTransformer<T>>) {
self.steps.push(transformer);
}
pub fn set_predictor(&mut self, predictor: Box<dyn ModelPredictor<T>>) {
self.predictor = Some(predictor);
}
pub fn fit(&mut self, data: &Array2<T>) -> Result<(), PipelineError> {
if self.steps.is_empty() {
self.is_fitted = true;
return Ok(());
}
let mut current = data.clone();
for step in &mut self.steps {
current = step.fit_transform(¤t)?;
}
self.is_fitted = true;
Ok(())
}
pub fn transform(&self, data: &Array2<T>) -> Result<Array2<T>, PipelineError> {
if self.steps.is_empty() {
return Ok(data.clone());
}
if !self.is_fitted {
return Err(PipelineError::NotFitted("MLPipelineGeneric".to_string()));
}
let mut current = data.clone();
for step in &self.steps {
current = step.transform(¤t)?;
}
Ok(current)
}
pub fn fit_transform(&mut self, data: &Array2<T>) -> Result<Array2<T>, PipelineError> {
self.fit(data)?;
self.transform(data)
}
pub fn fit_predict(&mut self, data: &Array2<T>) -> Result<Array1<T>, PipelineError> {
let transformed = self.fit_transform(data)?;
match &self.predictor {
Some(pred) => pred.predict(&transformed),
None => Err(PipelineError::NoPredictorSet),
}
}
pub fn predict(&self, data: &Array2<T>) -> Result<Array1<T>, PipelineError> {
let transformed = self.transform(data)?;
match &self.predictor {
Some(pred) => pred.predict(&transformed),
None => Err(PipelineError::NoPredictorSet),
}
}
pub fn predict_batch(&self, data: &Array2<T>) -> Result<Array1<T>, PipelineError> {
self.predict(data)
}
pub fn is_fitted(&self) -> bool {
self.is_fitted
}
pub fn n_steps(&self) -> usize {
self.steps.len()
}
pub fn has_predictor(&self) -> bool {
self.predictor.is_some()
}
}
impl<T: Clone + fmt::Debug + Float + FromPrimitive + Send + Sync + 'static> Default
for MLPipelineGeneric<T>
{
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use ndarray::Array2;
fn make_data() -> Array2<f64> {
Array2::from_shape_vec(
(5, 3),
vec![
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0,
],
)
.expect("shape is valid")
}
#[test]
fn test_standard_scaler_fit_transform() {
let data = make_data();
let mut scaler = StandardScaler::new();
scaler.fit(&data).expect("fit should succeed");
let result = scaler.transform(&data).expect("transform should succeed");
assert_eq!(result.shape(), data.shape());
for col in 0..result.ncols() {
let col_mean: f64 = result.column(col).sum() / result.nrows() as f64;
assert!(
col_mean.abs() < 1e-10,
"column {col} mean not ~0: {col_mean}"
);
}
}
#[test]
fn test_min_max_scaler_fit_transform() {
let data = make_data();
let mut scaler = MinMaxScaler::new();
let result = scaler
.fit_transform(&data)
.expect("fit_transform should succeed");
assert_eq!(result.shape(), data.shape());
for &v in result.iter() {
assert!(v >= 0.0 - 1e-10 && v <= 1.0 + 1e-10, "out of [0,1]: {v}");
}
}
#[test]
fn test_normalizer_transform_l2() {
let data = Array2::from_shape_vec((2, 3), vec![3.0f64, 4.0, 0.0, 1.0, 0.0, 0.0])
.expect("shape is valid");
let mut norm = NormalizerTransform::l2();
let result = norm
.fit_transform(&data)
.expect("fit_transform should succeed");
assert!((result[[0, 0]] - 0.6).abs() < 1e-10);
assert!((result[[0, 1]] - 0.8).abs() < 1e-10);
assert!(result[[0, 2]].abs() < 1e-10);
assert!((result[[1, 0]] - 1.0).abs() < 1e-10);
}
#[test]
fn test_normalizer_transform_l1() {
let data = Array2::from_shape_vec((1, 3), vec![1.0f64, 2.0, 3.0]).expect("shape is valid");
let mut norm = NormalizerTransform::l1();
let result = norm
.fit_transform(&data)
.expect("fit_transform should succeed");
assert!((result[[0, 0]] - 1.0 / 6.0).abs() < 1e-10);
assert!((result[[0, 1]] - 2.0 / 6.0).abs() < 1e-10);
assert!((result[[0, 2]] - 3.0 / 6.0).abs() < 1e-10);
}
#[test]
fn test_pipeline_builder_chain() {
let data = make_data();
let mut pipeline = Pipeline::<f64>::new()
.add_transformer(Box::new(StandardScaler::new()))
.add_transformer(Box::new(MinMaxScaler::new()));
let result = pipeline
.fit_transform(&data)
.expect("fit_transform should succeed");
assert_eq!(result.shape(), data.shape());
for &v in result.iter() {
assert!(v >= 0.0 - 1e-10 && v <= 1.0 + 1e-10, "out of [0,1]: {v}");
}
}
#[test]
fn test_pipeline_fit_then_transform_independent() {
let train_data = make_data();
let test_data =
Array2::from_shape_vec((2, 3), vec![100.0, 200.0, 300.0, -10.0, -20.0, -30.0])
.expect("shape is valid");
let mut pipeline = Pipeline::<f64>::new().add_transformer(Box::new(StandardScaler::new()));
pipeline.fit(&train_data).expect("fit should succeed");
let result = pipeline
.transform(&test_data)
.expect("transform should succeed");
assert_eq!(result.shape(), test_data.shape());
}
#[test]
fn test_pipeline_transform_without_fit_fails() {
let data = make_data();
let pipeline = Pipeline::<f64>::new().add_transformer(Box::new(StandardScaler::new()));
let result = pipeline.transform(&data);
assert!(result.is_err(), "transform before fit should fail");
}
#[test]
fn test_empty_pipeline_is_identity() {
let data = make_data();
let mut pipeline = Pipeline::<f64>::new();
let result = pipeline
.fit_transform(&data)
.expect("empty pipeline should succeed");
assert_eq!(result, data);
}
#[test]
fn test_ml_pipeline_generic_fit_transform() {
let data = make_data();
let mut mlp = MLPipelineGeneric::<f64>::new();
mlp.add_transformer(Box::new(StandardScaler::new()));
mlp.add_transformer(Box::new(MinMaxScaler::new()));
let result = mlp
.fit_transform(&data)
.expect("fit_transform should succeed");
assert_eq!(result.shape(), data.shape());
}
#[test]
fn test_ml_pipeline_predict_without_predictor_fails() {
let data = make_data();
let mut mlp = MLPipelineGeneric::<f64>::new();
mlp.add_transformer(Box::new(StandardScaler::new()));
mlp.fit(&data).expect("fit should succeed");
let result = mlp.predict(&data);
assert!(
result.is_err(),
"predict without predictor should return error"
);
assert!(matches!(result, Err(PipelineError::NoPredictorSet)));
}
#[test]
fn test_ml_pipeline_generic_default() {
let mlp = MLPipelineGeneric::<f64>::default();
assert_eq!(mlp.n_steps(), 0);
assert!(!mlp.is_fitted());
assert!(!mlp.has_predictor());
}
#[test]
fn test_standard_scaler_zero_variance_column() {
let data = Array2::from_shape_vec((3, 2), vec![5.0f64, 1.0, 5.0, 2.0, 5.0, 3.0])
.expect("shape is valid");
let mut scaler = StandardScaler::new();
let result = scaler
.fit_transform(&data)
.expect("fit_transform should succeed");
assert_eq!(result[[0, 0]], 0.0);
assert_eq!(result[[1, 0]], 0.0);
assert_eq!(result[[2, 0]], 0.0);
}
#[test]
fn test_min_max_scaler_uniform_column() {
let data = Array2::from_shape_vec((3, 1), vec![7.0f64, 7.0, 7.0]).expect("shape is valid");
let mut scaler = MinMaxScaler::new();
let result = scaler
.fit_transform(&data)
.expect("fit_transform should succeed");
for &v in result.iter() {
assert_eq!(v, 0.0);
}
}
#[test]
fn test_transform_empty_matrix_fails() {
let data: Array2<f64> = Array2::zeros((0, 3));
let mut scaler = StandardScaler::new();
let result = scaler.fit(&data);
assert!(result.is_err(), "fitting empty data should fail");
}
#[test]
fn test_existing_pipeline_api_still_works() {
use pipeline_core::{DataBatch, PipelineConfig};
let pipeline = MLPipeline::new("test".to_string(), PipelineConfig::default());
let batch = DataBatch::new();
let result = pipeline.execute(batch);
assert!(
result.is_ok(),
"empty pipeline on empty batch should succeed"
);
}
}