use polars::prelude::{DataFrame, Series, SortMultipleOptions, NamedFrom};
use anyhow::{Result, anyhow};
use crate::analysis::statistics;
use rand::seq::SliceRandom;
#[derive(Debug)] pub struct DataFrameAnalyzer {
df: DataFrame,
}
impl DataFrameAnalyzer {
pub fn new(df: DataFrame) -> Self {
DataFrameAnalyzer { df }
}
pub fn get_dataframe(&self) -> &DataFrame {
&self.df
}
pub fn get_descriptive_statistics(&self) -> Result<DataFrame> {
statistics::get_descriptive_statistics(&self.df)
}
pub fn get_value_counts(&self, column_name: &str) -> Result<DataFrame> {
statistics::get_value_counts(&self.df, column_name)
}
pub fn get_null_counts(&self) -> Result<DataFrame> {
statistics::get_null_counts(&self.df)
}
pub fn get_histogram(&self, column_name: &str, bins: usize) -> Result<DataFrame> {
let series = self.df.column(column_name)
.map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
let series_ref = series.as_series()
.ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
statistics::calculate_histogram(series_ref, bins)
}
pub fn rank_by_column(&self, column_name: &str) -> Result<DataFrame> {
let df = &self.df;
let owned_column_names = df.get_column_names(); let df_column_names: Vec<&str> = owned_column_names.iter().map(|s| s.as_ref()).collect();
if !df_column_names.contains(&column_name) { return Err(anyhow!("Column '{}' not found for ranking.", column_name));
}
df.sort(
[column_name],
SortMultipleOptions { descending: vec![true], nulls_last: vec![false], ..Default::default() },
)
.map_err(|e| anyhow!("Failed to rank by column '{}': {}", column_name, e))
}
pub fn detect_anomalies(&self, column_name: &str, threshold: f64) -> Result<(DataFrame, usize)> {
let series_col = self.df.column(column_name)
.map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
let series = series_col.as_series()
.ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
let mean = statistics::calculate_mean(series)?;
let std_dev = statistics::calculate_std_dev(series)?;
if std_dev == 0.0 {
return Err(anyhow!("Cannot detect anomalies: standard deviation of column '{}' is zero.", column_name));
}
let z_scores: Vec<f64> = series.to_float()?.f64()?.into_iter().map(|opt_val| {
if let Some(val) = opt_val {
(val - mean) / std_dev
} else {
f64::NAN }
}).collect();
let is_anomaly: Vec<bool> = z_scores.iter().map(|&z| z.abs() > threshold).collect();
let z_score_series = Series::new(format!("{}_z_score", column_name).into(), z_scores);
let is_anomaly_series = Series::new("is_anomaly".into(), is_anomaly);
let mut anomaly_df = self.df.clone();
anomaly_df.with_column(z_score_series)?;
anomaly_df.with_column(is_anomaly_series.clone())?;
let anomaly_count = is_anomaly_series.bool()?.into_iter().filter(|&x| x == Some(true)).count();
Ok((anomaly_df, anomaly_count))
}
pub fn perform_kmeans_clustering(&self, column_names: &[&str], k: usize, max_iterations: usize) -> Result<DataFrame> {
if column_names.is_empty() {
return Err(anyhow!("No columns provided for clustering."));
}
if k == 0 {
return Err(anyhow!("Number of clusters (k) must be greater than 0."));
}
let mut data: Vec<Vec<f64>> = Vec::new();
for i in 0..self.df.height() {
let mut row_vec = Vec::with_capacity(column_names.len());
for col_name in column_names {
let series_float = self.df.column(col_name)
.map_err(|e| anyhow!("Column '{}' not found: {}", col_name, e))?
.as_series() .ok_or_else(|| anyhow!("Failed to get series from column '{}'", col_name))?
.to_float()?;
let any_value = series_float.get(i)?;
let val = any_value.try_extract::<f64>().map_err(|e| anyhow!("Failed to extract f64 from value at index {} in column '{}': {}", i, col_name, e))?;
row_vec.push(val);
}
data.push(row_vec);
}
if data.is_empty() {
return Err(anyhow!("No data available for clustering."));
}
if data.len() < k {
return Err(anyhow!("Number of data points ({}) is less than the number of clusters ({}).", data.len(), k));
}
let mut rng = rand::thread_rng();
let mut centroids: Vec<Vec<f64>> = data.choose_multiple(&mut rng, k).cloned().collect();
let mut assignments: Vec<usize> = vec![0; data.len()];
let mut prev_assignments: Vec<usize>;
for _iter in 0..max_iterations {
prev_assignments = assignments.clone();
assignments = assign_to_clusters(&data, ¢roids);
centroids = update_centroids(&data, &assignments, k);
if assignments == prev_assignments {
break;
}
}
let cluster_series = Series::new("cluster_id".into(), assignments.iter().map(|&id| id as u32).collect::<Vec<u32>>());
let mut clustered_df = self.df.clone();
clustered_df.with_column(cluster_series)?;
Ok(clustered_df)
}
pub fn group_and_aggregate(&self, group_by_column: &str, aggregations: &[(&str, &str)]) -> Result<DataFrame> {
statistics::group_and_aggregate(&self.df, group_by_column, aggregations)
}
pub fn get_correlation_matrix(&self, column_names: &[&str]) -> Result<DataFrame> {
statistics::calculate_correlation_matrix(&self.df, column_names)
}
}
fn euclidean_distance(point1: &[f64], point2: &[f64]) -> f64 {
point1.iter()
.zip(point2.iter())
.map(|(a, b)| (a - b).powi(2))
.sum::<f64>()
.sqrt()
}
fn assign_to_clusters(data: &[Vec<f64>], centroids: &[Vec<f64>]) -> Vec<usize> {
data.iter()
.map(|point| {
centroids.iter()
.enumerate()
.min_by(|(_, c1), (_, c2)| {
euclidean_distance(point, c1).partial_cmp(&euclidean_distance(point, c2))
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(idx, _)| idx)
.unwrap_or(0) })
.collect()
}
fn update_centroids(data: &[Vec<f64>], assignments: &[usize], k: usize) -> Vec<Vec<f64>> {
let dimensions = data.first().map_or(0, |row| row.len());
let mut new_centroids = vec![vec![0.0; dimensions]; k];
let mut counts = vec![0usize; k];
for (i, point) in data.iter().enumerate() {
let cluster_id = assignments[i];
if cluster_id < k {
for d in 0..dimensions {
new_centroids[cluster_id][d] += point[d];
}
counts[cluster_id] += 1;
}
}
for cluster_id in 0..k {
if counts[cluster_id] > 0 {
for d in 0..dimensions {
new_centroids[cluster_id][d] /= counts[cluster_id] as f64;
}
} else {
let mut rng = rand::thread_rng();
new_centroids[cluster_id] = data.choose(&mut rng).cloned().unwrap_or_else(|| vec![0.0; dimensions]);
}
}
new_centroids
}