use polars::prelude::*;
use polars::frame::DataFrame; use polars::series::Series; use polars::prelude::IntoLazy;
use anyhow::{Result, anyhow};
pub fn get_descriptive_statistics(df: &DataFrame) -> Result<DataFrame> {
let mut stats_map: std::collections::HashMap<String, Vec<f64>> = std::collections::HashMap::new();
let measures = vec![
"count".to_string(),
"mean".to_string(),
"std".to_string(),
"min".to_string(),
"max".to_string(),
"median".to_string(),
"mode".to_string(), ];
for column_name in df.get_column_names() {
let series_option: Option<&Series> = df.column(column_name.as_ref()).map(|col| col.as_series()).ok().flatten();
let mut col_stats: Vec<f64> = Vec::new();
if let Some(series) = series_option {
if series.dtype().is_numeric() {
col_stats.push(series.len() as f64); col_stats.push(calculate_mean(series).unwrap_or(f64::NAN)); col_stats.push(calculate_std_dev(series).unwrap_or(f64::NAN)); col_stats.push(calculate_min(series)?.unwrap_or(f64::NAN)); col_stats.push(calculate_max(series)?.unwrap_or(f64::NAN)); col_stats.push(series.median().unwrap_or(f64::NAN));
col_stats.push(f64::NAN);
} else {
for _ in 0..measures.len() {
col_stats.push(f64::NAN);
}
}
} else {
for _ in 0..measures.len() {
col_stats.push(f64::NAN);
}
}
stats_map.insert(column_name.to_string(), col_stats);
}
let mut output_series: Vec<Series> = Vec::new();
output_series.push(Series::new("Measure".into(), measures.clone()));
for column_name in df.get_column_names() {
if let Some(stats_vec) = stats_map.get(&column_name.to_string()) {
output_series.push(Series::new(PlSmallStr::from(column_name.to_string()), stats_vec.clone()));
} else {
output_series.push(Series::new(PlSmallStr::from(column_name.to_string()), vec![f64::NAN; measures.len()]));
}
}
DataFrame::new(output_series.into_iter().map(|s| s.into_column()).collect()).map_err(|e| anyhow!("Failed to create descriptive statistics DataFrame: {}", e))
}
pub fn get_value_counts(df: &DataFrame, column_name: &str) -> Result<DataFrame> {
let series = df.column(column_name)
.map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
let s = series.as_series()
.ok_or_else(|| anyhow!("Failed to get series for value counts"))?
.clone(); s.value_counts(true, false, column_name.into(), false) .map_err(|e| anyhow!("Failed to get value counts for column '{}': {}", column_name, e))
}
pub fn get_null_counts(df: &DataFrame) -> Result<DataFrame> {
let mut null_counts_data: Vec<Column> = Vec::new(); let mut column_names: Vec<String> = Vec::new();
let mut null_counts: Vec<u32> = Vec::new();
for col in df.get_columns() {
column_names.push(col.name().to_string());
null_counts.push(col.null_count() as u32);
}
null_counts_data.push(Series::new("Column".into(), column_names).into()); null_counts_data.push(Series::new("Null Count".into(), null_counts).into());
DataFrame::new(null_counts_data)
.map_err(|e| anyhow!("Failed to create null counts DataFrame: {}", e))
}
pub fn calculate_histogram(series: &Series, bins: usize) -> Result<DataFrame> {
let float_series = series.to_float()?; let series_f64_ca = float_series.f64()?;
let data_f64: Vec<f64> = series_f64_ca.into_iter().flatten().collect();
let min_val = data_f64.iter().min_by(|a, b| a.partial_cmp(b).unwrap()).copied().unwrap_or(0.0);
let max_val = data_f64.iter().max_by(|a, b| a.partial_cmp(b).unwrap()).copied().unwrap_or(0.0);
if (max_val - min_val).abs() < f64::EPSILON {
return Err(anyhow!("Cannot create histogram for a series with a single value"));
}
let bin_width = (max_val - min_val) / bins as f64;
let mut counts = vec![0u32; bins];
for val in data_f64.into_iter() { let mut bin = ((val - min_val) / bin_width) as usize;
if bin >= bins {
bin = bins - 1;
}
counts[bin] += 1;
}
let bin_starts: Vec<f64> = (0..bins).map(|i| min_val + i as f64 * bin_width).collect(); let bin_ends: Vec<f64> = (0..bins).map(|i| min_val + (i + 1) as f64 * bin_width).collect();
let bin_labels: Vec<String> = bin_starts.iter().zip(bin_ends.iter()).map(|(start, end)| format!("{:.2}-{:.2}", start, end)).collect();
let bins_series = Series::new("bin".into(), bin_labels); let counts_series = Series::new("count".into(), counts);
DataFrame::new(vec![bins_series.into(), counts_series.into()]) .map_err(|e| anyhow!("Failed to create histogram DataFrame: {}", e))
}
pub fn calculate_mean(series: &Series) -> Result<f64> {
series.to_float()?.mean().ok_or_else(|| anyhow!("Failed to calculate mean for series '{}'", series.name()))
}
pub fn calculate_min(series: &Series) -> Result<Option<f64>> {
series.min().map_err(|e| anyhow::anyhow!(e))
}
pub fn calculate_max(series: &Series) -> Result<Option<f64>> {
series.max().map_err(|e| anyhow::anyhow!(e))
}
pub fn calculate_std_dev(series: &Series) -> Result<f64> {
series.to_float()?.f64()
.map_err(|e| anyhow!("Failed to get f64 series for std dev calculation: {}", e))?
.std(1).ok_or_else(|| anyhow!("Failed to calculate standard deviation for series '{}'", series.name()))
}
pub fn group_and_aggregate(
df: &DataFrame,
group_by_column: &str,
aggregations: &[(&str, &str)],
) -> Result<DataFrame> {
let mut aggs: Vec<Expr> = Vec::new();
for (col_name, agg_type) in aggregations {
let expr = match *agg_type {
"sum" => col(*col_name).sum(), "mean" => col(*col_name).mean(), "min" => col(*col_name).min(), "max" => col(*col_name).max(), "count" => col(*col_name).count(), _ => return Err(anyhow!("Unsupported aggregation type: {}", agg_type)),
};
aggs.push(expr);
}
df.clone().lazy() .group_by([col(group_by_column)]) .agg(&aggs)
.collect() .map_err(|e| anyhow!("Failed to perform group and aggregate: {}", e))
}
pub fn calculate_correlation_matrix(df: &DataFrame, column_names: &[&str]) -> Result<DataFrame> {
if column_names.len() < 2 {
return Err(anyhow!("At least two numerical columns are required to calculate a correlation matrix."));
}
let numeric_cols_names: Vec<String> = df
.get_column_names()
.into_iter()
.filter(|name| {
df.column(name.as_str()).map_or(false, |s| s.dtype().is_numeric())
})
.filter(|name| column_names.contains(&name.as_str())) .map(|name| name.to_string())
.collect();
if numeric_cols_names.len() < 2 {
return Err(anyhow!("Less than two numerical columns found among the selected columns."));
}
let mut correlation_data: Vec<Series> = Vec::new();
for (i, col_name1) in numeric_cols_names.iter().enumerate() {
let mut row_values = Vec::new();
for (j, col_name2) in numeric_cols_names.iter().enumerate() {
if i == j {
row_values.push(Some(1.0f64));
} else {
let s1 = df.column(col_name1)?.as_series().ok_or_else(|| anyhow!("Failed to get series for {}", col_name1))?.to_float()?;
let s2 = df.column(col_name2)?.as_series().ok_or_else(|| anyhow!("Failed to get series for {}", col_name2))?.to_float()?;
let correlation = calculate_pearson_correlation(&s1, &s2)?;
row_values.push(Some(correlation));
}
}
let series = Series::new(col_name1.as_str().into(), row_values);
correlation_data.push(series);
}
DataFrame::new(correlation_data.into_iter().map(|s| s.into()).collect::<Vec<Column>>())
.map_err(|e| anyhow!("Failed to create correlation matrix DataFrame: {}", e))
}
pub fn calculate_pearson_correlation(s1: &Series, s2: &Series) -> Result<f64> {
if s1.len() != s2.len() {
return Err(anyhow!("Series must have the same length to calculate correlation."));
}
if s1.len() == 0 {
return Err(anyhow!("Cannot calculate correlation for empty series."));
}
let n = s1.len() as f64;
let s1_f64 = s1.to_float()?.f64()?.into_no_null_iter().collect::<Vec<f64>>();
let s2_f64 = s2.to_float()?.f64()?.into_no_null_iter().collect::<Vec<f64>>();
let sum_xy: f64 = s1_f64.iter().zip(s2_f64.iter()).map(|(&x, &y)| x * y).sum();
let sum_x: f64 = s1_f64.iter().sum();
let sum_y: f64 = s2_f64.iter().sum();
let sum_x2: f64 = s1_f64.iter().map(|&x| x * x).sum();
let sum_y2: f64 = s2_f64.iter().map(|&y| y * y).sum();
let numerator = n * sum_xy - sum_x * sum_y;
let denominator_x = n * sum_x2 - sum_x * sum_x;
let denominator_y = n * sum_y2 - sum_y * sum_y;
let denominator = (denominator_x * denominator_y).sqrt();
if denominator == 0.0 {
return Ok(f64::NAN); }
Ok(numerator / denominator)
}