1mod cleaning;
2mod stats;
3mod tui;
4mod types;
5
6use cleaning::{check_consistency, detect_duplicates, detect_outliers, detect_redundancy};
7use rayon::prelude::*;
8use stats::{
9 ColumnStats, compute_cardinality, compute_dependency_scores, compute_distribution,
10 detect_drift, detect_temporal_patterns, estimate_noise, suggest_transformations,
11};
12use thiserror::Error;
13pub use tui::render_tui;
14use types::TypeInference;
15
16#[derive(Debug, Error)]
17pub enum PrestoError {
18 #[error("Empty dataset provided")]
19 EmptyDataset,
20 #[error("Invalid numeric data: {0}")]
21 InvalidNumeric(String),
22}
23
24#[derive(Debug, Clone)]
25pub struct Dataset {
26 headers: Vec<String>,
27 rows: Vec<Vec<String>>,
28}
29
30impl Dataset {
31 pub fn new(headers: Vec<String>, rows: Vec<Vec<String>>) -> Self {
32 Dataset { headers, rows }
33 }
34
35 pub fn from_csv(path: &str) -> Result<Self, PrestoError> {
36 let mut rdr =
37 csv::Reader::from_path(path).map_err(|e| PrestoError::InvalidNumeric(e.to_string()))?;
38 let headers = rdr
39 .headers()
40 .map_err(|e| PrestoError::InvalidNumeric(e.to_string()))?
41 .iter()
42 .map(String::from)
43 .collect();
44 let rows: Vec<Vec<String>> = rdr
45 .records()
46 .map(|r| {
47 let record = r.map_err(|e| PrestoError::InvalidNumeric(e.to_string()))?;
48 Ok(record.iter().map(String::from).collect::<Vec<String>>())
49 })
50 .collect::<Result<Vec<_>, _>>()?;
51 Ok(Dataset { headers, rows })
52 }
53}
54
55#[derive(Debug, serde::Serialize)]
56pub struct Description {
57 stats: Vec<ColumnStats>,
58 missing: Vec<usize>,
59 duplicates: usize,
60 outliers: Vec<Vec<usize>>,
61 types: Vec<TypeInference>,
62 dependency_scores: Vec<f64>,
63 drift_scores: Vec<f64>,
64 cardinality: Vec<usize>,
65 distributions: Vec<Vec<(f64, usize)>>,
66 consistency_issues: Vec<usize>,
67 temporal_patterns: Vec<String>,
68 transform_suggestions: Vec<String>,
69 noise_scores: Vec<f64>,
70 redundancy_pairs: Vec<(usize, usize, f64)>,
71 total_rows: usize,
72 missing_pct: f64,
73 unique_pct: f64,
74 top_values: Vec<(String, Vec<(String, usize)>)>,
75 correlations: Vec<Vec<f64>>,
76 feature_importance: Vec<(usize, f64)>,
77 anomalies: Vec<(usize, f64, usize)>,
78}
79
80impl Description {
81 pub fn new(
82 stats: Vec<ColumnStats>,
83 missing: Vec<usize>,
84 duplicates: usize,
85 outliers: Vec<Vec<usize>>,
86 types: Vec<TypeInference>,
87 dependency_scores: Vec<f64>,
88 drift_scores: Vec<f64>,
89 cardinality: Vec<usize>,
90 distributions: Vec<Vec<(f64, usize)>>,
91 consistency_issues: Vec<usize>,
92 temporal_patterns: Vec<String>,
93 transform_suggestions: Vec<String>,
94 noise_scores: Vec<f64>,
95 redundancy_pairs: Vec<(usize, usize, f64)>,
96 total_rows: usize,
97 missing_pct: f64,
98 unique_pct: f64,
99 top_values: Vec<(String, Vec<(String, usize)>)>,
100 correlations: Vec<Vec<f64>>,
101 feature_importance: Vec<(usize, f64)>,
102 anomalies: Vec<(usize, f64, usize)>,
103 ) -> Self {
104 Description {
105 stats,
106 missing,
107 duplicates,
108 outliers,
109 types,
110 dependency_scores,
111 drift_scores,
112 cardinality,
113 distributions,
114 consistency_issues,
115 temporal_patterns,
116 transform_suggestions,
117 noise_scores,
118 redundancy_pairs,
119 total_rows,
120 missing_pct,
121 unique_pct,
122 top_values,
123 correlations,
124 feature_importance,
125 anomalies,
126 }
127 }
128}
129
130pub fn describe(dataset: &Dataset) -> Result<Description, PrestoError> {
131 if dataset.rows.is_empty() {
132 return Err(PrestoError::EmptyDataset);
133 }
134
135 let num_cols = dataset.headers.len();
136
137 let stats: Vec<ColumnStats> = (0..num_cols)
138 .into_par_iter()
139 .map(|col_idx| stats::compute_stats(&dataset.rows, col_idx))
140 .collect::<Result<_, _>>()?;
141
142 let missing: Vec<usize> = (0..num_cols)
143 .into_par_iter()
144 .map(|col_idx| {
145 dataset
146 .rows
147 .iter()
148 .filter(|row| row[col_idx].is_empty() || row[col_idx] == "NA")
149 .count()
150 })
151 .collect();
152
153 let duplicates = detect_duplicates(&dataset.rows);
154
155 let outliers: Vec<Vec<usize>> = (0..num_cols)
156 .into_par_iter()
157 .map(|col_idx| detect_outliers(&dataset.rows, col_idx, &stats[col_idx]))
158 .collect();
159
160 let types: Vec<TypeInference> = (0..num_cols)
161 .into_par_iter()
162 .map(|col_idx| types::infer_type(&dataset.rows, col_idx))
163 .collect();
164
165 let dependency_scores = compute_dependency_scores(dataset, &stats)?;
166 let drift_scores = detect_drift(dataset, &stats)?;
167 let cardinality = compute_cardinality(dataset)?;
168 let distributions = compute_distribution(dataset, &stats)?;
169 let consistency_issues = check_consistency(dataset)?;
170 let temporal_patterns = detect_temporal_patterns(dataset)?;
171 let transform_suggestions = suggest_transformations(&stats)?;
172 let noise_scores = estimate_noise(dataset, &stats)?;
173 let redundancy_pairs = detect_redundancy(dataset)?;
174
175 let total_rows = dataset.rows.len();
176 let total_cells = total_rows * num_cols;
177 let missing_pct = missing.iter().sum::<usize>() as f64 / total_cells as f64 * 100.0;
178 let unique_rows: std::collections::HashSet<&Vec<String>> = dataset.rows.iter().collect();
179 let unique_pct = unique_rows.len() as f64 / total_rows as f64 * 100.0;
180
181 let top_values: Vec<(String, Vec<(String, usize)>)> = (0..num_cols)
182 .into_par_iter()
183 .map(|col_idx| {
184 let mut counts: std::collections::HashMap<String, usize> =
185 std::collections::HashMap::new();
186 for row in &dataset.rows {
187 let val = &row[col_idx];
188 if !val.is_empty() && val != "NA" {
189 *counts.entry(val.clone()).or_insert(0) += 1;
190 }
191 }
192 let mut sorted: Vec<(String, usize)> = counts.into_iter().collect();
193 sorted.sort_by(|a, b| b.1.cmp(&a.1));
194 (
195 dataset.headers[col_idx].clone(),
196 sorted.into_iter().take(5).collect(),
197 )
198 })
199 .collect();
200
201 let correlations: Vec<Vec<f64>> = (0..num_cols)
202 .into_par_iter()
203 .map(|i| {
204 (0..num_cols)
205 .map(|j| {
206 if i == j {
207 return 1.0;
208 }
209 let col_i: Vec<f64> = dataset
210 .rows
211 .iter()
212 .filter_map(|row| row[i].parse::<f64>().ok())
213 .collect();
214 let col_j: Vec<f64> = dataset
215 .rows
216 .iter()
217 .filter_map(|row| row[j].parse::<f64>().ok())
218 .collect();
219 if col_i.len() != col_j.len() || col_i.is_empty() {
220 return 0.0;
221 }
222 if let (Some(mean_i), Some(std_i)) = (stats[i].mean, stats[i].std_dev) {
223 if let (Some(mean_j), Some(std_j)) = (stats[j].mean, stats[j].std_dev) {
224 let cov = col_i
225 .iter()
226 .zip(col_j.iter())
227 .map(|(&x, &y)| (x - mean_i) * (y - mean_j))
228 .sum::<f64>()
229 / col_i.len() as f64;
230 cov / (std_i * std_j)
231 } else {
232 0.0
233 }
234 } else {
235 0.0
236 }
237 })
238 .collect()
239 })
240 .collect();
241
242 let target_idx = dataset
243 .headers
244 .iter()
245 .position(|h| h.to_lowercase().contains("target"))
246 .unwrap_or(0);
247 let target_values: Vec<f64> = dataset
248 .rows
249 .iter()
250 .filter_map(|row| row[target_idx].parse::<f64>().ok())
251 .collect();
252 let feature_importance: Vec<(usize, f64)> = (0..num_cols)
253 .into_par_iter()
254 .filter_map(|col_idx| {
255 let col_values: Vec<f64> = dataset
256 .rows
257 .iter()
258 .filter_map(|row| row[col_idx].parse::<f64>().ok())
259 .collect();
260 if col_idx != target_idx
261 && !col_values.is_empty()
262 && col_values.len() == target_values.len()
263 {
264 let corr = if let (Some(mean_i), Some(std_i)) =
265 (stats[col_idx].mean, stats[col_idx].std_dev)
266 {
267 if let (Some(mean_t), Some(std_t)) =
268 (stats[target_idx].mean, stats[target_idx].std_dev)
269 {
270 let cov = col_values
271 .iter()
272 .zip(target_values.iter())
273 .map(|(&x, &y)| (x - mean_i) * (y - mean_t))
274 .sum::<f64>()
275 / col_values.len() as f64;
276 cov / (std_i * std_t)
277 } else {
278 0.0
279 }
280 } else {
281 0.0
282 };
283 Some((col_idx, corr.abs()))
284 } else {
285 None
286 }
287 })
288 .collect();
289 let mut feature_importance = feature_importance;
290 feature_importance.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
291
292 let anomalies: Vec<(usize, f64, usize)> = (0..num_cols)
293 .into_par_iter()
294 .flat_map(|col_idx| {
295 let col_values: Vec<(f64, usize)> = dataset
296 .rows
297 .iter()
298 .enumerate()
299 .filter_map(|(idx, row)| row[col_idx].parse::<f64>().ok().map(|v| (v, idx)))
300 .collect();
301 if let (Some(mean), Some(std_dev)) = (stats[col_idx].mean, stats[col_idx].std_dev) {
302 col_values
303 .into_iter()
304 .filter(|&(val, _)| (val - mean).abs() / std_dev > 3.0)
305 .map(move |(val, idx)| (col_idx, val, idx))
306 .collect::<Vec<_>>()
307 } else {
308 Vec::new()
309 }
310 })
311 .collect();
312
313 let description = Description::new(
314 stats,
315 missing,
316 duplicates,
317 outliers,
318 types,
319 dependency_scores,
320 drift_scores,
321 cardinality,
322 distributions,
323 consistency_issues,
324 temporal_patterns,
325 transform_suggestions,
326 noise_scores,
327 redundancy_pairs,
328 total_rows,
329 missing_pct,
330 unique_pct,
331 top_values,
332 correlations,
333 feature_importance,
334 anomalies,
335 );
336
337 render_tui(dataset, &description)?;
338
339 Ok(description)
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_describe_empty() {
348 let dataset = Dataset::new(vec![], vec![]);
349 assert!(matches!(describe(&dataset), Err(PrestoError::EmptyDataset)));
350 }
351}