use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
use crate::ml::models::{ModelEvaluator, ModelMetrics, SupervisedModel, UnsupervisedModel};
use crate::ml::preprocessing::*;
use crate::series::Series;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
pub trait SklearnEstimator: fmt::Debug {
fn get_params(&self) -> HashMap<String, String>;
fn set_params(&mut self, params: HashMap<String, String>) -> Result<()>;
fn get_feature_names_out(&self, input_features: Option<&[String]>) -> Option<Vec<String>>;
}
pub trait SklearnTransformer: SklearnEstimator {
fn fit(&mut self, x: &DataFrame, y: Option<&DataFrame>) -> Result<()>;
fn transform(&self, x: &DataFrame) -> Result<DataFrame>;
fn fit_transform(&mut self, x: &DataFrame, y: Option<&DataFrame>) -> Result<DataFrame> {
self.fit(x, y)?;
self.transform(x)
}
fn inverse_transform(&self, x: &DataFrame) -> Result<DataFrame> {
Err(Error::NotImplemented(
"inverse_transform not supported".into(),
))
}
fn clone_transformer(&self) -> Box<dyn SklearnTransformer + Send + Sync>;
}
pub trait SklearnPredictor: SklearnEstimator {
fn fit(&mut self, x: &DataFrame, y: &DataFrame) -> Result<()>;
fn predict(&self, x: &DataFrame) -> Result<Vec<f64>>;
fn predict_proba(&self, x: &DataFrame) -> Result<Vec<Vec<f64>>> {
Err(Error::NotImplemented("predict_proba not supported".into()))
}
fn score(&self, x: &DataFrame, y: &DataFrame) -> Result<f64>;
fn clone_predictor(&self) -> Box<dyn SklearnPredictor + Send + Sync>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandardScalerCompat {
pub with_mean: bool,
pub with_std: bool,
pub copy: bool,
mean_: Option<HashMap<String, f64>>,
scale_: Option<HashMap<String, f64>>,
var_: Option<HashMap<String, f64>>,
n_samples_seen_: Option<usize>,
feature_names_in_: Option<Vec<String>>,
n_features_in_: Option<usize>,
}
impl StandardScalerCompat {
pub fn new() -> Self {
Self {
with_mean: true,
with_std: true,
copy: true,
mean_: None,
scale_: None,
var_: None,
n_samples_seen_: None,
feature_names_in_: None,
n_features_in_: None,
}
}
pub fn with_params(with_mean: bool, with_std: bool) -> Self {
Self {
with_mean,
with_std,
copy: true,
mean_: None,
scale_: None,
var_: None,
n_samples_seen_: None,
feature_names_in_: None,
n_features_in_: None,
}
}
}
impl SklearnEstimator for StandardScalerCompat {
fn get_params(&self) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert("with_mean".to_string(), self.with_mean.to_string());
params.insert("with_std".to_string(), self.with_std.to_string());
params.insert("copy".to_string(), self.copy.to_string());
params
}
fn set_params(&mut self, params: HashMap<String, String>) -> Result<()> {
for (key, value) in params {
match key.as_str() {
"with_mean" => {
self.with_mean = value.parse().map_err(|_| {
Error::InvalidValue(format!(
"Invalid boolean value for with_mean: {}",
value
))
})?
}
"with_std" => {
self.with_std = value.parse().map_err(|_| {
Error::InvalidValue(format!(
"Invalid boolean value for with_std: {}",
value
))
})?
}
"copy" => {
self.copy = value.parse().map_err(|_| {
Error::InvalidValue(format!("Invalid boolean value for copy: {}", value))
})?
}
_ => return Err(Error::InvalidValue(format!("Unknown parameter: {}", key))),
}
}
Ok(())
}
fn get_feature_names_out(&self, input_features: Option<&[String]>) -> Option<Vec<String>> {
if let Some(features) = input_features {
Some(features.to_vec())
} else {
self.feature_names_in_.clone()
}
}
}
impl SklearnTransformer for StandardScalerCompat {
fn fit(&mut self, x: &DataFrame, _y: Option<&DataFrame>) -> Result<()> {
let feature_names: Vec<String> = x.column_names();
let n_features = feature_names.len();
let n_samples = x.nrows();
if n_samples == 0 {
return Err(Error::InvalidValue("Cannot fit on empty dataset".into()));
}
let mut means = HashMap::new();
let mut vars = HashMap::new();
let mut scales = HashMap::new();
for feature_name in &feature_names {
let col = x.get_column::<f64>(feature_name)?;
let values = col.as_f64()?;
if values.is_empty() {
continue;
}
let mean = if self.with_mean {
values.iter().sum::<f64>() / values.len() as f64
} else {
0.0
};
let variance = if self.with_std {
values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64
} else {
1.0
};
let scale = if self.with_std && variance > 1e-10 {
variance.sqrt()
} else {
1.0
};
means.insert(feature_name.clone(), mean);
vars.insert(feature_name.clone(), variance);
scales.insert(feature_name.clone(), scale);
}
self.mean_ = Some(means);
self.var_ = Some(vars);
self.scale_ = Some(scales);
self.n_samples_seen_ = Some(n_samples);
self.feature_names_in_ = Some(feature_names);
self.n_features_in_ = Some(n_features);
Ok(())
}
fn transform(&self, x: &DataFrame) -> Result<DataFrame> {
let means = self.mean_.as_ref().ok_or_else(|| {
Error::InvalidOperation("StandardScaler must be fitted before transform".into())
})?;
let scales = self
.scale_
.as_ref()
.ok_or_else(|| Error::InvalidOperation("Model not fitted. Call fit() first.".into()))?;
let mut result = DataFrame::new();
for feature_name in x.column_names() {
let col = x.get_column::<f64>(&feature_name)?;
let values = col.as_f64()?;
let mean = means.get(&feature_name).copied().unwrap_or(0.0);
let scale = scales.get(&feature_name).copied().unwrap_or(1.0);
let transformed_values: Vec<f64> = values
.iter()
.map(|&val| {
let centered = if self.with_mean { val - mean } else { val };
if self.with_std && scale > 1e-10 {
centered / scale
} else {
centered
}
})
.collect();
result.add_column(
feature_name.clone(),
Series::new(transformed_values, Some(feature_name))?,
)?;
}
Ok(result)
}
fn inverse_transform(&self, x: &DataFrame) -> Result<DataFrame> {
let means = self.mean_.as_ref().ok_or_else(|| {
Error::InvalidOperation("StandardScaler must be fitted before inverse_transform".into())
})?;
let scales = self
.scale_
.as_ref()
.ok_or_else(|| Error::InvalidOperation("Model not fitted. Call fit() first.".into()))?;
let mut result = DataFrame::new();
for feature_name in x.column_names() {
let col = x.get_column::<f64>(&feature_name)?;
let values = col.as_f64()?;
let mean = means.get(&feature_name).copied().unwrap_or(0.0);
let scale = scales.get(&feature_name).copied().unwrap_or(1.0);
let inverse_transformed_values: Vec<f64> = values
.iter()
.map(|&val| {
let scaled = if self.with_std && scale > 1e-10 {
val * scale
} else {
val
};
if self.with_mean {
scaled + mean
} else {
scaled
}
})
.collect();
result.add_column(
feature_name.clone(),
Series::new(inverse_transformed_values, Some(feature_name))?,
)?;
}
Ok(result)
}
fn clone_transformer(&self) -> Box<dyn SklearnTransformer + Send + Sync> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinMaxScalerCompat {
pub feature_range: (f64, f64),
pub copy: bool,
pub clip: bool,
data_min_: Option<HashMap<String, f64>>,
data_max_: Option<HashMap<String, f64>>,
data_range_: Option<HashMap<String, f64>>,
scale_: Option<HashMap<String, f64>>,
min_: Option<HashMap<String, f64>>,
n_samples_seen_: Option<usize>,
feature_names_in_: Option<Vec<String>>,
n_features_in_: Option<usize>,
}
impl MinMaxScalerCompat {
pub fn new() -> Self {
Self {
feature_range: (0.0, 1.0),
copy: true,
clip: false,
data_min_: None,
data_max_: None,
data_range_: None,
scale_: None,
min_: None,
n_samples_seen_: None,
feature_names_in_: None,
n_features_in_: None,
}
}
pub fn with_range(min: f64, max: f64) -> Self {
Self {
feature_range: (min, max),
copy: true,
clip: false,
data_min_: None,
data_max_: None,
data_range_: None,
scale_: None,
min_: None,
n_samples_seen_: None,
feature_names_in_: None,
n_features_in_: None,
}
}
}
impl SklearnEstimator for MinMaxScalerCompat {
fn get_params(&self) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert(
"feature_range_min".to_string(),
self.feature_range.0.to_string(),
);
params.insert(
"feature_range_max".to_string(),
self.feature_range.1.to_string(),
);
params.insert("copy".to_string(), self.copy.to_string());
params.insert("clip".to_string(), self.clip.to_string());
params
}
fn set_params(&mut self, params: HashMap<String, String>) -> Result<()> {
for (key, value) in params {
match key.as_str() {
"feature_range_min" => {
let min_val: f64 = value.parse().map_err(|_| {
Error::InvalidValue(format!(
"Invalid float value for feature_range_min: {}",
value
))
})?;
self.feature_range.0 = min_val;
}
"feature_range_max" => {
let max_val: f64 = value.parse().map_err(|_| {
Error::InvalidValue(format!(
"Invalid float value for feature_range_max: {}",
value
))
})?;
self.feature_range.1 = max_val;
}
"copy" => {
self.copy = value.parse().map_err(|_| {
Error::InvalidValue(format!("Invalid boolean value for copy: {}", value))
})?
}
"clip" => {
self.clip = value.parse().map_err(|_| {
Error::InvalidValue(format!("Invalid boolean value for clip: {}", value))
})?
}
_ => return Err(Error::InvalidValue(format!("Unknown parameter: {}", key))),
}
}
Ok(())
}
fn get_feature_names_out(&self, input_features: Option<&[String]>) -> Option<Vec<String>> {
if let Some(features) = input_features {
Some(features.to_vec())
} else {
self.feature_names_in_.clone()
}
}
}
impl SklearnTransformer for MinMaxScalerCompat {
fn fit(&mut self, x: &DataFrame, _y: Option<&DataFrame>) -> Result<()> {
let feature_names: Vec<String> = x.column_names();
let n_features = feature_names.len();
let n_samples = x.nrows();
if n_samples == 0 {
return Err(Error::InvalidValue("Cannot fit on empty dataset".into()));
}
let mut data_mins = HashMap::new();
let mut data_maxs = HashMap::new();
let mut data_ranges = HashMap::new();
let mut scales = HashMap::new();
let mut mins = HashMap::new();
let (feature_min, feature_max) = self.feature_range;
let feature_range = feature_max - feature_min;
for feature_name in &feature_names {
let col = x.get_column::<f64>(feature_name)?;
let values = col.as_f64()?;
if values.is_empty() {
continue;
}
let data_min = values.iter().copied().fold(f64::INFINITY, f64::min);
let data_max = values.iter().copied().fold(f64::NEG_INFINITY, f64::max);
let data_range = data_max - data_min;
let scale = if data_range > 1e-10 {
feature_range / data_range
} else {
1.0
};
let min = feature_min - data_min * scale;
data_mins.insert(feature_name.clone(), data_min);
data_maxs.insert(feature_name.clone(), data_max);
data_ranges.insert(feature_name.clone(), data_range);
scales.insert(feature_name.clone(), scale);
mins.insert(feature_name.clone(), min);
}
self.data_min_ = Some(data_mins);
self.data_max_ = Some(data_maxs);
self.data_range_ = Some(data_ranges);
self.scale_ = Some(scales);
self.min_ = Some(mins);
self.n_samples_seen_ = Some(n_samples);
self.feature_names_in_ = Some(feature_names);
self.n_features_in_ = Some(n_features);
Ok(())
}
fn transform(&self, x: &DataFrame) -> Result<DataFrame> {
let scales = self.scale_.as_ref().ok_or_else(|| {
Error::InvalidOperation("MinMaxScaler must be fitted before transform".into())
})?;
let mins = self
.min_
.as_ref()
.ok_or_else(|| Error::InvalidOperation("Model not fitted. Call fit() first.".into()))?;
let mut result = DataFrame::new();
for feature_name in x.column_names() {
let col = x.get_column::<f64>(&feature_name)?;
let values = col.as_f64()?;
let scale = scales.get(&feature_name).copied().unwrap_or(1.0);
let min = mins.get(&feature_name).copied().unwrap_or(0.0);
let transformed_values: Vec<f64> = values
.iter()
.map(|&val| {
let transformed = val * scale + min;
if self.clip {
transformed
.max(self.feature_range.0)
.min(self.feature_range.1)
} else {
transformed
}
})
.collect();
result.add_column(
feature_name.clone(),
Series::new(transformed_values, Some(feature_name))?,
)?;
}
Ok(result)
}
fn inverse_transform(&self, x: &DataFrame) -> Result<DataFrame> {
let scales = self.scale_.as_ref().ok_or_else(|| {
Error::InvalidOperation("MinMaxScaler must be fitted before inverse_transform".into())
})?;
let mins = self
.min_
.as_ref()
.ok_or_else(|| Error::InvalidOperation("Model not fitted. Call fit() first.".into()))?;
let mut result = DataFrame::new();
for feature_name in x.column_names() {
let col = x.get_column::<f64>(&feature_name)?;
let values = col.as_f64()?;
let scale = scales.get(&feature_name).copied().unwrap_or(1.0);
let min = mins.get(&feature_name).copied().unwrap_or(0.0);
let inverse_transformed_values: Vec<f64> = values
.iter()
.map(|&val| {
if scale > 1e-10 {
(val - min) / scale
} else {
val
}
})
.collect();
result.add_column(
feature_name.clone(),
Series::new(inverse_transformed_values, Some(feature_name))?,
)?;
}
Ok(result)
}
fn clone_transformer(&self) -> Box<dyn SklearnTransformer + Send + Sync> {
Box::new(self.clone())
}
}
#[derive(Debug)]
pub struct Pipeline {
pub steps: Vec<(String, PipelineStep)>,
pub memory: Option<String>,
pub verbose: bool,
}
#[derive(Debug)]
pub enum PipelineStep {
Transformer(Box<dyn SklearnTransformer + Send + Sync>),
Predictor(Box<dyn SklearnPredictor + Send + Sync>),
}
impl Clone for PipelineStep {
fn clone(&self) -> Self {
match self {
PipelineStep::Transformer(transformer) => {
PipelineStep::Transformer(transformer.clone_transformer())
}
PipelineStep::Predictor(predictor) => {
PipelineStep::Predictor(predictor.clone_predictor())
}
}
}
}
impl Pipeline {
pub fn new(steps: Vec<(String, PipelineStep)>) -> Self {
Self {
steps,
memory: None,
verbose: false,
}
}
pub fn add_step(&mut self, name: String, step: PipelineStep) {
self.steps.push((name, step));
}
pub fn get_step(&self, name: &str) -> Option<&PipelineStep> {
self.steps
.iter()
.find(|(step_name, _)| step_name == name)
.map(|(_, step)| step)
}
pub fn get_step_names(&self) -> Vec<&String> {
self.steps.iter().map(|(name, _)| name).collect()
}
pub fn set_params(&mut self, params: HashMap<String, String>) -> Result<()> {
for (key, value) in params {
if key == "verbose" {
self.verbose = value.parse().map_err(|_| {
Error::InvalidValue(format!("Invalid boolean value for verbose: {}", value))
})?;
} else if key.starts_with("memory") {
self.memory = Some(value);
} else if let Some(param_sep) = key.find("__") {
let step_name = &key[..param_sep];
let param_name = &key[param_sep + 2..];
for (name, step) in &mut self.steps {
if name == step_name {
let mut step_params = HashMap::new();
step_params.insert(param_name.to_string(), value);
match step {
PipelineStep::Transformer(transformer) => {
transformer.set_params(step_params)?;
}
PipelineStep::Predictor(predictor) => {
predictor.set_params(step_params)?;
}
}
break;
}
}
}
}
Ok(())
}
}
impl SklearnEstimator for Pipeline {
fn get_params(&self) -> HashMap<String, String> {
let mut params = HashMap::new();
params.insert("verbose".to_string(), self.verbose.to_string());
if let Some(memory) = &self.memory {
params.insert("memory".to_string(), memory.clone());
}
for (step_name, step) in &self.steps {
let step_params = match step {
PipelineStep::Transformer(transformer) => transformer.get_params(),
PipelineStep::Predictor(predictor) => predictor.get_params(),
};
for (param_name, param_value) in step_params {
params.insert(format!("{}__{}", step_name, param_name), param_value);
}
}
params
}
fn set_params(&mut self, params: HashMap<String, String>) -> Result<()> {
self.set_params(params)
}
fn get_feature_names_out(&self, input_features: Option<&[String]>) -> Option<Vec<String>> {
let mut current_features = input_features.map(|f| f.to_vec());
for (_, step) in &self.steps {
match step {
PipelineStep::Transformer(transformer) => {
current_features = transformer
.get_feature_names_out(current_features.as_ref().map(|f| f.as_slice()));
}
PipelineStep::Predictor(predictor) => {
current_features = predictor
.get_feature_names_out(current_features.as_ref().map(|f| f.as_slice()));
}
}
}
current_features
}
}
impl SklearnPredictor for Pipeline {
fn fit(&mut self, x: &DataFrame, y: &DataFrame) -> Result<()> {
let mut current_x = x.clone();
let steps_len = self.steps.len();
for (i, (step_name, step)) in self.steps.iter_mut().enumerate() {
if self.verbose {
println!("Fitting step {}: {}", i, step_name);
}
match step {
PipelineStep::Transformer(transformer) => {
transformer.fit(¤t_x, Some(y))?;
current_x = transformer.transform(¤t_x)?;
}
PipelineStep::Predictor(predictor) => {
if i != steps_len - 1 {
return Err(Error::InvalidOperation(
"Predictor must be the last step in pipeline".into(),
));
}
predictor.fit(¤t_x, y)?;
}
}
}
Ok(())
}
fn predict(&self, x: &DataFrame) -> Result<Vec<f64>> {
let mut current_x = x.clone();
for (i, (step_name, step)) in self.steps.iter().enumerate() {
if self.verbose {
println!("Transforming step {}: {}", i, step_name);
}
match step {
PipelineStep::Transformer(transformer) => {
current_x = transformer.transform(¤t_x)?;
}
PipelineStep::Predictor(predictor) => {
return predictor.predict(¤t_x);
}
}
}
Err(Error::InvalidOperation(
"Pipeline has no predictor step".into(),
))
}
fn predict_proba(&self, x: &DataFrame) -> Result<Vec<Vec<f64>>> {
let mut current_x = x.clone();
for (step_name, step) in &self.steps {
match step {
PipelineStep::Transformer(transformer) => {
current_x = transformer.transform(¤t_x)?;
}
PipelineStep::Predictor(predictor) => {
return predictor.predict_proba(¤t_x);
}
}
}
Err(Error::InvalidOperation(
"Pipeline has no predictor step".into(),
))
}
fn score(&self, x: &DataFrame, y: &DataFrame) -> Result<f64> {
let mut current_x = x.clone();
for (step_name, step) in &self.steps {
match step {
PipelineStep::Transformer(transformer) => {
current_x = transformer.transform(¤t_x)?;
}
PipelineStep::Predictor(predictor) => {
return predictor.score(¤t_x, y);
}
}
}
Err(Error::InvalidOperation(
"Pipeline has no predictor step".into(),
))
}
fn clone_predictor(&self) -> Box<dyn SklearnPredictor + Send + Sync> {
let cloned_steps = self
.steps
.iter()
.map(|(name, step)| (name.clone(), step.clone()))
.collect();
Box::new(Pipeline {
steps: cloned_steps,
memory: self.memory.clone(),
verbose: self.verbose,
})
}
}
pub mod pipeline_builders {
use super::*;
pub fn standard_preprocessing_pipeline() -> Pipeline {
let steps = vec![(
"scaler".to_string(),
PipelineStep::Transformer(Box::new(StandardScalerCompat::new())),
)];
Pipeline::new(steps)
}
pub fn minmax_preprocessing_pipeline() -> Pipeline {
let steps = vec![(
"scaler".to_string(),
PipelineStep::Transformer(Box::new(MinMaxScalerCompat::new())),
)];
Pipeline::new(steps)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::series::Series;
#[test]
fn test_standard_scaler_compat() {
let mut scaler = StandardScalerCompat::new();
let mut df = DataFrame::new();
df.add_column(
"feature1".to_string(),
Series::new(vec![1.0, 2.0, 3.0, 4.0, 5.0], Some("feature1".to_string()))
.expect("operation should succeed"),
)
.expect("operation should succeed");
df.add_column(
"feature2".to_string(),
Series::new(
vec![10.0, 20.0, 30.0, 40.0, 50.0],
Some("feature2".to_string()),
)
.expect("operation should succeed"),
)
.expect("operation should succeed");
scaler.fit(&df, None).expect("operation should succeed");
let transformed = scaler.transform(&df).expect("operation should succeed");
let feature1_col = transformed
.get_column::<f64>("feature1")
.expect("operation should succeed");
let feature1_values = feature1_col.as_f64().expect("operation should succeed");
let feature1_mean = feature1_values.iter().sum::<f64>() / feature1_values.len() as f64;
assert!(
(feature1_mean).abs() < 1e-10,
"Mean should be approximately zero"
);
let inverse_transformed = scaler
.inverse_transform(&transformed)
.expect("operation should succeed");
let original_feature1 = df
.get_column::<f64>("feature1")
.expect("operation should succeed")
.as_f64()
.expect("operation should succeed");
let restored_feature1 = inverse_transformed
.get_column::<f64>("feature1")
.expect("operation should succeed")
.as_f64()
.expect("operation should succeed");
for (original, restored) in original_feature1.iter().zip(restored_feature1.iter()) {
assert!(
(original - restored).abs() < 1e-10,
"Inverse transform should restore original values"
);
}
}
#[test]
fn test_minmax_scaler_compat() {
let mut scaler = MinMaxScalerCompat::new();
let mut df = DataFrame::new();
df.add_column(
"feature1".to_string(),
Series::new(vec![1.0, 2.0, 3.0, 4.0, 5.0], Some("feature1".to_string()))
.expect("operation should succeed"),
)
.expect("operation should succeed");
scaler.fit(&df, None).expect("operation should succeed");
let transformed = scaler.transform(&df).expect("operation should succeed");
let feature1_col = transformed
.get_column::<f64>("feature1")
.expect("operation should succeed");
let feature1_values = feature1_col.as_f64().expect("operation should succeed");
for &value in &feature1_values {
assert!(
value >= 0.0 && value <= 1.0,
"Values should be in range [0, 1]"
);
}
let min_val = feature1_values
.iter()
.copied()
.fold(f64::INFINITY, f64::min);
let max_val = feature1_values
.iter()
.copied()
.fold(f64::NEG_INFINITY, f64::max);
assert!((min_val - 0.0).abs() < 1e-10, "Minimum should be 0");
assert!((max_val - 1.0).abs() < 1e-10, "Maximum should be 1");
}
#[test]
fn test_pipeline_parameters() {
let pipeline = pipeline_builders::standard_preprocessing_pipeline();
let params = pipeline.get_params();
assert!(params.contains_key("verbose"));
assert!(params.contains_key("scaler__with_mean"));
assert!(params.contains_key("scaler__with_std"));
}
}