lk_inside/data_core/
analyzer.rs1use polars::prelude::{DataFrame, Series, SortMultipleOptions, NamedFrom};
2use anyhow::{Result, anyhow};
3use crate::analysis::statistics;
4use rand::seq::SliceRandom; #[derive(Debug)] pub struct DataFrameAnalyzer {
8 df: DataFrame,
9}
10
11impl DataFrameAnalyzer {
12 pub fn new(df: DataFrame) -> Self {
13 DataFrameAnalyzer { df }
14 }
15
16 pub fn get_dataframe(&self) -> &DataFrame {
17 &self.df
18 }
19
20 pub fn get_descriptive_statistics(&self) -> Result<DataFrame> {
21 statistics::get_descriptive_statistics(&self.df)
22 }
23
24 pub fn get_value_counts(&self, column_name: &str) -> Result<DataFrame> {
25 statistics::get_value_counts(&self.df, column_name)
26 }
27
28 pub fn get_null_counts(&self) -> Result<DataFrame> {
29 statistics::get_null_counts(&self.df)
30 }
31
32 pub fn get_histogram(&self, column_name: &str, bins: usize) -> Result<DataFrame> {
33 let series = self.df.column(column_name)
34 .map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
35 let series_ref = series.as_series()
36 .ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
37 statistics::calculate_histogram(series_ref, bins)
38 }
39
40 pub fn rank_by_column(&self, column_name: &str) -> Result<DataFrame> {
41 let df = &self.df;
42 let owned_column_names = df.get_column_names(); let df_column_names: Vec<&str> = owned_column_names.iter().map(|s| s.as_ref()).collect();
44 if !df_column_names.contains(&column_name) { return Err(anyhow!("Column '{}' not found for ranking.", column_name));
46 }
47 df.sort(
48 [column_name],
49 SortMultipleOptions { descending: vec![true], nulls_last: vec![false], ..Default::default() },
50 )
51 .map_err(|e| anyhow!("Failed to rank by column '{}': {}", column_name, e))
52 }
53
54 pub fn detect_anomalies(&self, column_name: &str, threshold: f64) -> Result<(DataFrame, usize)> {
55 let series_col = self.df.column(column_name)
56 .map_err(|e| anyhow!("Column '{}' not found: {}", column_name, e))?;
57 let series = series_col.as_series()
58 .ok_or_else(|| anyhow!("Failed to get series from column '{}'", column_name))?;
59
60 let mean = statistics::calculate_mean(series)?;
61 let std_dev = statistics::calculate_std_dev(series)?;
62
63 if std_dev == 0.0 {
64 return Err(anyhow!("Cannot detect anomalies: standard deviation of column '{}' is zero.", column_name));
65 }
66
67 let z_scores: Vec<f64> = series.to_float()?.f64()?.into_iter().map(|opt_val| {
68 if let Some(val) = opt_val {
69 (val - mean) / std_dev
70 } else {
71 f64::NAN }
73 }).collect();
74
75 let is_anomaly: Vec<bool> = z_scores.iter().map(|&z| z.abs() > threshold).collect();
76
77 let z_score_series = Series::new(format!("{}_z_score", column_name).into(), z_scores);
78 let is_anomaly_series = Series::new("is_anomaly".into(), is_anomaly);
79
80 let mut anomaly_df = self.df.clone();
82 anomaly_df.with_column(z_score_series)?;
83 anomaly_df.with_column(is_anomaly_series.clone())?; let anomaly_count = is_anomaly_series.bool()?.into_iter().filter(|&x| x == Some(true)).count();
86
87 Ok((anomaly_df, anomaly_count))
88 }
89
90 pub fn perform_kmeans_clustering(&self, column_names: &[&str], k: usize, max_iterations: usize) -> Result<DataFrame> {
91 if column_names.is_empty() {
92 return Err(anyhow!("No columns provided for clustering."));
93 }
94 if k == 0 {
95 return Err(anyhow!("Number of clusters (k) must be greater than 0."));
96 }
97
98 let mut data: Vec<Vec<f64>> = Vec::new();
100 for i in 0..self.df.height() {
101 let mut row_vec = Vec::with_capacity(column_names.len());
102 for col_name in column_names {
103 let series_float = self.df.column(col_name)
104 .map_err(|e| anyhow!("Column '{}' not found: {}", col_name, e))?
105 .as_series() .ok_or_else(|| anyhow!("Failed to get series from column '{}'", col_name))?
107 .to_float()?; let any_value = series_float.get(i)?;
110 let val = any_value.try_extract::<f64>().map_err(|e| anyhow!("Failed to extract f64 from value at index {} in column '{}': {}", i, col_name, e))?;
111 row_vec.push(val);
112 }
113 data.push(row_vec);
114 }
115
116 if data.is_empty() {
117 return Err(anyhow!("No data available for clustering."));
118 }
119 if data.len() < k {
120 return Err(anyhow!("Number of data points ({}) is less than the number of clusters ({}).", data.len(), k));
121 }
122
123 let mut rng = rand::thread_rng();
125 let mut centroids: Vec<Vec<f64>> = data.choose_multiple(&mut rng, k).cloned().collect();
126
127 let mut assignments: Vec<usize> = vec![0; data.len()];
128 let mut prev_assignments: Vec<usize>;
129
130 for _iter in 0..max_iterations {
131 prev_assignments = assignments.clone();
132
133 assignments = assign_to_clusters(&data, ¢roids);
135
136 centroids = update_centroids(&data, &assignments, k);
138
139 if assignments == prev_assignments {
141 break;
142 }
143 }
144
145 let cluster_series = Series::new("cluster_id".into(), assignments.iter().map(|&id| id as u32).collect::<Vec<u32>>());
147
148 let mut clustered_df = self.df.clone();
149 clustered_df.with_column(cluster_series)?;
150
151 Ok(clustered_df)
152 }
153
154 pub fn group_and_aggregate(&self, group_by_column: &str, aggregations: &[(&str, &str)]) -> Result<DataFrame> {
168 statistics::group_and_aggregate(&self.df, group_by_column, aggregations)
169 }
170
171 pub fn get_correlation_matrix(&self, column_names: &[&str]) -> Result<DataFrame> {
172 statistics::calculate_correlation_matrix(&self.df, column_names)
173 }
174}
175
176fn euclidean_distance(point1: &[f64], point2: &[f64]) -> f64 {
178 point1.iter()
179 .zip(point2.iter())
180 .map(|(a, b)| (a - b).powi(2))
181 .sum::<f64>()
182 .sqrt()
183}
184
185fn assign_to_clusters(data: &[Vec<f64>], centroids: &[Vec<f64>]) -> Vec<usize> {
186 data.iter()
187 .map(|point| {
188 centroids.iter()
189 .enumerate()
190 .min_by(|(_, c1), (_, c2)| {
191 euclidean_distance(point, c1).partial_cmp(&euclidean_distance(point, c2))
192 .unwrap_or(std::cmp::Ordering::Equal)
193 })
194 .map(|(idx, _)| idx)
195 .unwrap_or(0) })
197 .collect()
198}
199
200fn update_centroids(data: &[Vec<f64>], assignments: &[usize], k: usize) -> Vec<Vec<f64>> {
201 let dimensions = data.first().map_or(0, |row| row.len());
202 let mut new_centroids = vec![vec![0.0; dimensions]; k];
203 let mut counts = vec![0usize; k];
204
205 for (i, point) in data.iter().enumerate() {
206 let cluster_id = assignments[i];
207 if cluster_id < k {
208 for d in 0..dimensions {
209 new_centroids[cluster_id][d] += point[d];
210 }
211 counts[cluster_id] += 1;
212 }
213 }
214
215 for cluster_id in 0..k {
216 if counts[cluster_id] > 0 {
217 for d in 0..dimensions {
218 new_centroids[cluster_id][d] /= counts[cluster_id] as f64;
219 }
220 } else {
221 let mut rng = rand::thread_rng();
224 new_centroids[cluster_id] = data.choose(&mut rng).cloned().unwrap_or_else(|| vec![0.0; dimensions]);
225 }
226 }
227 new_centroids
228}