use pandrs::error::Result;
use pandrs::ml::{AdvancedPipeline, BinningStrategy, FeatureEngineeringStage, WindowOperation};
use pandrs::optimized::OptimizedDataFrame;
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn main() -> Result<()> {
println!("=== Extended ML Pipeline Example ===\n");
let mut df = OptimizedDataFrame::new();
df.add_float_column(
"price",
vec![
100.0, 102.0, 98.0, 105.0, 103.0, 107.0, 101.0, 110.0, 108.0, 112.0,
],
)?;
df.add_float_column(
"volume",
vec![
1000.0, 1200.0, 800.0, 1500.0, 1100.0, 1300.0, 900.0, 1600.0, 1400.0, 1700.0,
],
)?;
df.add_float_column(
"market_cap",
vec![
50000.0, 52000.0, 49000.0, 55000.0, 53000.0, 57000.0, 51000.0, 60000.0, 58000.0,
62000.0,
],
)?;
println!("Original Dataset:");
println!("{df:?}");
println!();
println!("=== Example 1: Basic Feature Engineering ===");
basic_feature_engineering_example(&df)?;
println!("\n=== Example 2: Advanced Pipeline with Multiple Operations ===");
advanced_pipeline_example(&df)?;
println!("\n=== Example 3: Custom Transformation Pipeline ===");
custom_transformation_example(&df)?;
println!("\n=== Example 4: Financial Analysis Pipeline ===");
financial_analysis_pipeline(&df)?;
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn basic_feature_engineering_example(df: &OptimizedDataFrame) -> Result<()> {
let feature_stage = FeatureEngineeringStage::new()
.with_polynomial_features(vec!["price".to_string()], 2)
.with_interaction_features(vec![("price".to_string(), "volume".to_string())]);
let mut pipeline = AdvancedPipeline::new()
.add_stage(Box::new(feature_stage))
.with_monitoring(true);
let result = pipeline.execute(df.clone())?;
println!("Result with polynomial and interaction features:");
println!("{result:?}");
let summary = pipeline.execution_summary();
println!("Execution Summary:");
println!("- Total stages: {}", summary.total_stages);
println!("- Total duration: {:?}", summary.total_duration);
println!("- Peak memory usage: {} bytes", summary.peak_memory_usage);
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn advanced_pipeline_example(df: &OptimizedDataFrame) -> Result<()> {
let feature_stage = FeatureEngineeringStage::new()
.with_polynomial_features(vec!["price".to_string()], 3)
.with_interaction_features(vec![
("price".to_string(), "volume".to_string()),
("price".to_string(), "market_cap".to_string()),
])
.with_binning("price".to_string(), 5, BinningStrategy::EqualWidth)
.with_rolling_window("price".to_string(), 3, WindowOperation::Mean)
.with_rolling_window("volume".to_string(), 3, WindowOperation::Std);
let mut pipeline = AdvancedPipeline::new()
.add_stage(Box::new(feature_stage))
.with_monitoring(true);
let result = pipeline.execute(df.clone())?;
println!("Result with comprehensive feature engineering:");
println!("Columns: {:?}", result.column_names());
println!(
"Shape: {} rows × {} columns",
result.row_count(),
result.column_count()
);
let summary = pipeline.execution_summary();
println!("\nDetailed Execution Metrics:");
for stage_exec in &summary.stage_details {
println!("Stage: {}", stage_exec.stage_name);
println!(" Duration: {:?}", stage_exec.duration);
println!(
" Input rows: {}, Output rows: {}",
stage_exec.input_rows, stage_exec.output_rows
);
println!(" Memory usage: {} bytes", stage_exec.memory_usage);
}
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn custom_transformation_example(df: &OptimizedDataFrame) -> Result<()> {
let feature_stage = FeatureEngineeringStage::new()
.with_polynomial_features(vec!["price".to_string()], 2)
.with_custom_transform(
"price_volatility_indicator".to_string(),
|df: &OptimizedDataFrame| -> Result<OptimizedDataFrame> {
let mut result = df.clone();
if let Ok(price_values) = df.get_float_column("price") {
let volatility: Vec<f64> = price_values
.windows(2)
.map(|window| ((window[1] - window[0]) / window[0] * 100.0).abs())
.collect();
let mut full_volatility = vec![0.0];
full_volatility.extend(volatility);
result.add_float_column("price_volatility", full_volatility)?;
}
Ok(result)
},
);
let mut pipeline = AdvancedPipeline::new().add_stage(Box::new(feature_stage));
let result = pipeline.execute(df.clone())?;
println!("Result with custom volatility transformation:");
println!("{result:?}");
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn financial_analysis_pipeline(df: &OptimizedDataFrame) -> Result<()> {
let technical_stage = FeatureEngineeringStage::new()
.with_rolling_window("price".to_string(), 3, WindowOperation::Mean) .with_rolling_window("price".to_string(), 3, WindowOperation::Std) .with_rolling_window("volume".to_string(), 3, WindowOperation::Sum) .with_custom_transform(
"momentum_indicator".to_string(),
|df: &OptimizedDataFrame| -> Result<OptimizedDataFrame> {
let mut result = df.clone();
if let Ok(price_values) = df.get_float_column("price") {
let momentum: Vec<f64> = price_values.windows(3)
.map(|window| {
if window.len() >= 3 {
window[2] - window[0] } else {
0.0
}
})
.collect();
let mut full_momentum = vec![0.0, 0.0];
full_momentum.extend(momentum);
result.add_float_column("momentum", full_momentum)?;
}
Ok(result)
}
);
let risk_stage = FeatureEngineeringStage::new()
.with_binning("price".to_string(), 4, BinningStrategy::EqualFrequency)
.with_binning("volume".to_string(), 3, BinningStrategy::EqualWidth)
.with_custom_transform(
"risk_score".to_string(),
|df: &OptimizedDataFrame| -> Result<OptimizedDataFrame> {
let mut result = df.clone();
if let Ok(prices) = df.get_float_column("price") {
let risk_scores: Vec<f64> = prices
.windows(3)
.map(|window| {
if window.len() >= 3 {
let mean = window.iter().sum::<f64>() / window.len() as f64;
let variance =
window.iter().map(|&x| (x - mean).powi(2)).sum::<f64>()
/ window.len() as f64;
variance.sqrt()
} else {
0.0
}
})
.collect();
let mut full_risk_scores = vec![0.0, 0.0];
full_risk_scores.extend(risk_scores);
result.add_float_column("risk_score", full_risk_scores)?;
}
Ok(result)
},
);
let mut pipeline = AdvancedPipeline::new()
.add_stage(Box::new(technical_stage))
.add_stage(Box::new(risk_stage))
.with_monitoring(true);
let result = pipeline.execute(df.clone())?;
println!("Financial Analysis Pipeline Results:");
println!(
"Generated {} features from {} original features",
result.column_count(),
df.column_count()
);
println!("\nFinal columns: {:?}", result.column_names());
if let Ok(risk_scores) = result.get_float_column("risk_score") {
let avg_risk: f64 = risk_scores.iter().sum::<f64>() / risk_scores.len() as f64;
let max_risk = risk_scores.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
let min_risk = risk_scores.iter().fold(f64::INFINITY, |a, &b| a.min(b));
println!("\nRisk Analysis:");
println!("- Average risk score: {avg_risk:.3}");
println!("- Maximum risk score: {max_risk:.3}");
println!("- Minimum risk score: {min_risk:.3}");
}
let summary = pipeline.execution_summary();
println!("\nPipeline Performance:");
println!("- Total execution time: {:?}", summary.total_duration);
println!("- Stages executed: {}", summary.total_stages);
println!("- Peak memory usage: {} bytes", summary.peak_memory_usage);
println!("\nPer-stage breakdown:");
for (i, stage_exec) in summary.stage_details.iter().enumerate() {
println!(
"Stage {}: {} ({:?})",
i + 1,
stage_exec.stage_name,
stage_exec.duration
);
}
Ok(())
}