lk_inside/data_core/
analyzer.rs

1use polars::prelude::{DataFrame, Series, SortMultipleOptions, NamedFrom};
2use anyhow::{Result, anyhow};
3use crate::analysis::statistics;
4use rand::seq::SliceRandom; // for choose_multiple and choose
5
6#[derive(Debug)] // Add Debug derive
7pub struct DataFrameAnalyzer {
8    df: DataFrame,
9}
10
11impl DataFrameAnalyzer {
12    pub fn new(df: DataFrame) -> Self {
13        DataFrameAnalyzer { df }
14    }
15
16    pub fn get_dataframe(&self) -> &DataFrame {
17        &self.df
18    }
19
20    pub fn get_descriptive_statistics(&self) -> Result<DataFrame> {
21        statistics::get_descriptive_statistics(&self.df)
22    }
23
24    pub fn get_value_counts(&self, column_name: &str) -> Result<DataFrame> {
25        statistics::get_value_counts(&self.df, column_name)
26    }
27
28    pub fn get_null_counts(&self) -> Result<DataFrame> {
29        statistics::get_null_counts(&self.df)
30    }
31
32    pub fn get_histogram(&self, column_name: &str, bins: usize) -> Result<DataFrame> {
33        let series = self.df.column(column_name)
34            .map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
35        let series_ref = series.as_series()
36            .ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
37        statistics::calculate_histogram(series_ref, bins)
38    }
39
40    pub fn rank_by_column(&self, column_name: &str) -> Result<DataFrame> {
41        let df = &self.df;
42        let owned_column_names = df.get_column_names(); // Store the Vec<String>
43        let df_column_names: Vec<&str> = owned_column_names.iter().map(|s| s.as_ref()).collect();
44        if !df_column_names.contains(&column_name) { // Fix: pass &column_name
45            return Err(anyhow!("Column '{}' not found for ranking.", column_name));
46        }
47        df.sort(
48            [column_name],
49            SortMultipleOptions { descending: vec![true], nulls_last: vec![false], ..Default::default() },
50        )
51            .map_err(|e| anyhow!("Failed to rank by column '{}': {}", column_name, e))
52    }
53
54    pub fn detect_anomalies(&self, column_name: &str, threshold: f64) -> Result<(DataFrame, usize)> {
55        let series_col = self.df.column(column_name)
56            .map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
57        let series = series_col.as_series()
58            .ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
59
60        let mean = statistics::calculate_mean(series)?;
61        let std_dev = statistics::calculate_std_dev(series)?;
62
63        if std_dev == 0.0 {
64            return Err(anyhow!("Cannot detect anomalies: standard deviation of column '{}' is zero.", column_name));
65        }
66
67        let z_scores: Vec<f64> = series.to_float()?.f64()?.into_iter().map(|opt_val| {
68            if let Some(val) = opt_val {
69                (val - mean) / std_dev
70            } else {
71                f64::NAN // Handle nulls gracefully
72            }
73        }).collect();
74
75        let is_anomaly: Vec<bool> = z_scores.iter().map(|&z| z.abs() > threshold).collect();
76
77        let z_score_series = Series::new(format!("{}_z_score", column_name).into(), z_scores);
78        let is_anomaly_series = Series::new("is_anomaly".into(), is_anomaly);
79
80        // Add z_scores and anomaly flags to the original DataFrame
81        let mut anomaly_df = self.df.clone();
82        anomaly_df.with_column(z_score_series)?;
83        anomaly_df.with_column(is_anomaly_series.clone())?; // Clone is_anomaly_series again for use below
84
85        let anomaly_count = is_anomaly_series.bool()?.into_iter().filter(|&x| x == Some(true)).count();
86
87        Ok((anomaly_df, anomaly_count))
88    }
89
90    pub fn perform_kmeans_clustering(&self, column_names: &[&str], k: usize, max_iterations: usize) -> Result<DataFrame> {
91        if column_names.is_empty() {
92            return Err(anyhow!("No columns provided for clustering."));
93        }
94        if k == 0 {
95            return Err(anyhow!("Number of clusters (k) must be greater than 0."));
96        }
97
98        // 1. Extract and prepare data
99        let mut data: Vec<Vec<f64>> = Vec::new();
100        for i in 0..self.df.height() {
101            let mut row_vec = Vec::with_capacity(column_names.len());
102            for col_name in column_names {
103                let series_float = self.df.column(col_name)
104                    .map_err(|e| anyhow!("Column '{}' not found: {}", col_name, e))?
105                    .as_series() // Convert &Column to &Series
106                    .ok_or_else(|| anyhow!("Failed to get series from column '{}'", col_name))?
107                    .to_float()?; // Ensure numerical type
108
109                let any_value = series_float.get(i)?;
110                let val = any_value.try_extract::<f64>().map_err(|e| anyhow!("Failed to extract f64 from value at index {} in column '{}': {}", i, col_name, e))?;
111                row_vec.push(val);
112            }
113            data.push(row_vec);
114        }
115
116        if data.is_empty() {
117            return Err(anyhow!("No data available for clustering."));
118        }
119        if data.len() < k {
120            return Err(anyhow!("Number of data points ({}) is less than the number of clusters ({}).", data.len(), k));
121        }
122
123        // 2. Initialize centroids (randomly select k data points)
124        let mut rng = rand::thread_rng();
125        let mut centroids: Vec<Vec<f64>> = data.choose_multiple(&mut rng, k).cloned().collect();
126
127        let mut assignments: Vec<usize> = vec![0; data.len()];
128        let mut prev_assignments: Vec<usize>;
129
130        for _iter in 0..max_iterations {
131            prev_assignments = assignments.clone();
132
133            // 3. Assignment step
134            assignments = assign_to_clusters(&data, &centroids);
135
136            // 4. Update step
137            centroids = update_centroids(&data, &assignments, k);
138
139            // 5. Check for convergence
140            if assignments == prev_assignments {
141                break;
142            }
143        }
144
145        // Add cluster assignments to a new DataFrame
146        let cluster_series = Series::new("cluster_id".into(), assignments.iter().map(|&id| id as u32).collect::<Vec<u32>>());
147        
148        let mut clustered_df = self.df.clone();
149        clustered_df.with_column(cluster_series)?;
150
151        Ok(clustered_df)
152    }
153
154    /// Performs a group-by operation followed by aggregation on specified columns.
155    ///
156    /// # Arguments
157    /// * `group_by_column` - The name of the column to group the DataFrame by.
158    /// * `aggregations` - A slice of tuples, where each tuple contains:
159    ///     * The name of the column to aggregate.
160    ///     * The type of aggregation to perform ("sum", "mean", "min", "max", "count").
161    ///
162    /// # Returns
163    /// A `Result` containing the aggregated `DataFrame` or an `anyhow::Error` if the operation fails.
164    ///
165    /// # Errors
166    /// Returns an error if the specified columns are not found or if an unsupported aggregation type is provided.
167    pub fn group_and_aggregate(&self, group_by_column: &str, aggregations: &[(&str, &str)]) -> Result<DataFrame> {
168        statistics::group_and_aggregate(&self.df, group_by_column, aggregations)
169    }
170
171    pub fn get_correlation_matrix(&self, column_names: &[&str]) -> Result<DataFrame> {
172        statistics::calculate_correlation_matrix(&self.df, column_names)
173    }
174}
175
176// Helper functions for K-Means
177fn euclidean_distance(point1: &[f64], point2: &[f64]) -> f64 {
178    point1.iter()
179        .zip(point2.iter())
180        .map(|(a, b)| (a - b).powi(2))
181        .sum::<f64>()
182        .sqrt()
183}
184
185fn assign_to_clusters(data: &[Vec<f64>], centroids: &[Vec<f64>]) -> Vec<usize> {
186    data.iter()
187        .map(|point| {
188            centroids.iter()
189                .enumerate()
190                .min_by(|(_, c1), (_, c2)| {
191                    euclidean_distance(point, c1).partial_cmp(&euclidean_distance(point, c2))
192                        .unwrap_or(std::cmp::Ordering::Equal)
193                })
194                .map(|(idx, _)| idx)
195                .unwrap_or(0) // Should not happen with non-empty centroids
196        })
197        .collect()
198}
199
200fn update_centroids(data: &[Vec<f64>], assignments: &[usize], k: usize) -> Vec<Vec<f64>> {
201    let dimensions = data.first().map_or(0, |row| row.len());
202    let mut new_centroids = vec![vec![0.0; dimensions]; k];
203    let mut counts = vec![0usize; k];
204
205    for (i, point) in data.iter().enumerate() {
206        let cluster_id = assignments[i];
207        if cluster_id < k {
208            for d in 0..dimensions {
209                new_centroids[cluster_id][d] += point[d];
210            }
211            counts[cluster_id] += 1;
212        }
213    }
214
215    for cluster_id in 0..k {
216        if counts[cluster_id] > 0 {
217            for d in 0..dimensions {
218                new_centroids[cluster_id][d] /= counts[cluster_id] as f64;
219            }
220        } else {
221            // Handle empty cluster: re-initialize with a random point or keep old centroid
222            // For simplicity, here we'll re-initialize with a random data point
223            let mut rng = rand::thread_rng();
224            new_centroids[cluster_id] = data.choose(&mut rng).cloned().unwrap_or_else(|| vec![0.0; dimensions]);
225        }
226    }
227    new_centroids
228}