1use std::collections::{HashMap, HashSet};
7
8use crate::{
9 dataset::{ArrowDataset, Dataset},
10 error::Result,
11};
12
13#[derive(Debug, Clone, PartialEq)]
19pub enum QualityIssue {
20 HighNullRatio {
22 column: String,
24 null_ratio: f64,
26 threshold: f64,
28 },
29 HighDuplicateRatio {
31 column: String,
33 duplicate_ratio: f64,
35 threshold: f64,
37 },
38 LowCardinality {
40 column: String,
42 unique_count: usize,
44 total_count: usize,
46 },
47 OutliersDetected {
49 column: String,
51 outlier_count: usize,
53 outlier_ratio: f64,
55 },
56 DuplicateRows {
58 duplicate_count: usize,
60 duplicate_ratio: f64,
62 },
63 ConstantColumn {
65 column: String,
67 value: String,
69 },
70 EmptySchema,
72 EmptyDataset,
74}
75
76impl QualityIssue {
77 pub fn severity(&self) -> u8 {
79 match self {
80 Self::EmptySchema | Self::EmptyDataset => 5,
81 Self::ConstantColumn { .. } => 4,
82 Self::HighNullRatio { null_ratio, .. } if *null_ratio > 0.5 => 4,
83 Self::HighNullRatio { .. } => 3,
84 Self::OutliersDetected { outlier_ratio, .. } if *outlier_ratio > 0.1 => 3,
85 Self::OutliersDetected { .. }
86 | Self::HighDuplicateRatio { .. }
87 | Self::DuplicateRows { .. } => 2,
88 Self::LowCardinality { .. } => 1,
89 }
90 }
91
92 pub fn column(&self) -> Option<&str> {
94 match self {
95 Self::HighNullRatio { column, .. }
96 | Self::HighDuplicateRatio { column, .. }
97 | Self::LowCardinality { column, .. }
98 | Self::OutliersDetected { column, .. }
99 | Self::ConstantColumn { column, .. } => Some(column),
100 _ => None,
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct ColumnQuality {
108 pub name: String,
110 pub total_count: usize,
112 pub null_count: usize,
114 pub null_ratio: f64,
116 pub unique_count: usize,
118 pub unique_ratio: f64,
120 pub duplicate_count: usize,
122 pub duplicate_ratio: f64,
124 pub outlier_count: Option<usize>,
126 pub numeric_stats: Option<NumericStats>,
128}
129
130impl ColumnQuality {
131 pub fn is_constant(&self) -> bool {
133 self.unique_count <= 1 && self.total_count > 0
134 }
135
136 pub fn is_mostly_null(&self, threshold: f64) -> bool {
138 self.null_ratio >= threshold
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct NumericStats {
145 pub min: f64,
147 pub max: f64,
149 pub mean: f64,
151 pub std_dev: f64,
153 pub q1: f64,
155 pub median: f64,
157 pub q3: f64,
159}
160
161impl NumericStats {
162 pub fn iqr(&self) -> f64 {
164 self.q3 - self.q1
165 }
166
167 pub fn outlier_lower_bound(&self) -> f64 {
169 self.q1 - 1.5 * self.iqr()
170 }
171
172 pub fn outlier_upper_bound(&self) -> f64 {
174 self.q3 + 1.5 * self.iqr()
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct QualityReport {
181 pub row_count: usize,
183 pub column_count: usize,
185 pub columns: HashMap<String, ColumnQuality>,
187 pub issues: Vec<QualityIssue>,
189 pub score: f64,
191 pub duplicate_row_count: usize,
193}
194
195impl QualityReport {
196 pub fn has_issues(&self) -> bool {
198 !self.issues.is_empty()
199 }
200
201 pub fn column_issues(&self, column: &str) -> Vec<&QualityIssue> {
203 self.issues
204 .iter()
205 .filter(|i| i.column() == Some(column))
206 .collect()
207 }
208
209 pub fn max_severity(&self) -> u8 {
211 self.issues.iter().map(|i| i.severity()).max().unwrap_or(0)
212 }
213
214 pub fn problematic_columns(&self) -> Vec<&str> {
216 self.issues
217 .iter()
218 .filter_map(|i| i.column())
219 .collect::<HashSet<_>>()
220 .into_iter()
221 .collect()
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct QualityThresholds {
228 pub max_null_ratio: f64,
230 pub max_duplicate_ratio: f64,
232 pub min_cardinality: usize,
234 pub max_outlier_ratio: f64,
236 pub max_duplicate_row_ratio: f64,
238}
239
240impl Default for QualityThresholds {
241 fn default() -> Self {
242 Self {
243 max_null_ratio: 0.1,
244 max_duplicate_ratio: 0.5,
245 min_cardinality: 2,
246 max_outlier_ratio: 0.05,
247 max_duplicate_row_ratio: 0.01,
248 }
249 }
250}
251
252pub struct QualityChecker {
254 pub(crate) thresholds: QualityThresholds,
255 pub(crate) check_outliers: bool,
256 pub(crate) check_duplicates: bool,
257}
258
259impl Default for QualityChecker {
260 fn default() -> Self {
261 Self::new()
262 }
263}
264
265impl QualityChecker {
266 pub fn new() -> Self {
268 Self {
269 thresholds: QualityThresholds::default(),
270 check_outliers: true,
271 check_duplicates: true,
272 }
273 }
274
275 #[must_use]
277 pub fn max_null_ratio(mut self, ratio: f64) -> Self {
278 self.thresholds.max_null_ratio = ratio;
279 self
280 }
281
282 #[must_use]
284 pub fn max_duplicate_ratio(mut self, ratio: f64) -> Self {
285 self.thresholds.max_duplicate_ratio = ratio;
286 self
287 }
288
289 #[must_use]
291 pub fn min_cardinality(mut self, min: usize) -> Self {
292 self.thresholds.min_cardinality = min;
293 self
294 }
295
296 #[must_use]
298 pub fn max_outlier_ratio(mut self, ratio: f64) -> Self {
299 self.thresholds.max_outlier_ratio = ratio;
300 self
301 }
302
303 #[must_use]
305 pub fn with_outlier_check(mut self, enabled: bool) -> Self {
306 self.check_outliers = enabled;
307 self
308 }
309
310 #[must_use]
312 pub fn with_duplicate_check(mut self, enabled: bool) -> Self {
313 self.check_duplicates = enabled;
314 self
315 }
316
317 pub fn check(&self, dataset: &ArrowDataset) -> Result<QualityReport> {
319 let schema = dataset.schema();
320 let mut issues = Vec::new();
321
322 if schema.fields().is_empty() {
324 issues.push(QualityIssue::EmptySchema);
325 return Ok(QualityReport {
326 row_count: 0,
327 column_count: 0,
328 columns: HashMap::new(),
329 issues,
330 score: 0.0,
331 duplicate_row_count: 0,
332 });
333 }
334
335 let (column_data, row_count) = self.collect_data(dataset);
337
338 if row_count == 0 {
340 issues.push(QualityIssue::EmptyDataset);
341 return Ok(QualityReport {
342 row_count: 0,
343 column_count: schema.fields().len(),
344 columns: HashMap::new(),
345 issues,
346 score: 0.0,
347 duplicate_row_count: 0,
348 });
349 }
350
351 let mut columns = HashMap::new();
353 for (col_name, values) in &column_data {
354 let quality = self.analyze_column(col_name, values, row_count);
355
356 if quality.null_ratio > self.thresholds.max_null_ratio {
358 issues.push(QualityIssue::HighNullRatio {
359 column: col_name.clone(),
360 null_ratio: quality.null_ratio,
361 threshold: self.thresholds.max_null_ratio,
362 });
363 }
364
365 if quality.duplicate_ratio > self.thresholds.max_duplicate_ratio {
366 issues.push(QualityIssue::HighDuplicateRatio {
367 column: col_name.clone(),
368 duplicate_ratio: quality.duplicate_ratio,
369 threshold: self.thresholds.max_duplicate_ratio,
370 });
371 }
372
373 if quality.unique_count < self.thresholds.min_cardinality && row_count > 1 {
374 issues.push(QualityIssue::LowCardinality {
375 column: col_name.clone(),
376 unique_count: quality.unique_count,
377 total_count: row_count,
378 });
379 }
380
381 if quality.is_constant() {
382 let value = values
383 .iter()
384 .find(|v| v.is_some())
385 .map(|v| v.clone().unwrap_or_default())
386 .unwrap_or_default();
387 issues.push(QualityIssue::ConstantColumn {
388 column: col_name.clone(),
389 value,
390 });
391 }
392
393 if let Some(outlier_count) = quality.outlier_count {
394 let outlier_ratio = outlier_count as f64 / row_count as f64;
395 if outlier_ratio > self.thresholds.max_outlier_ratio {
396 issues.push(QualityIssue::OutliersDetected {
397 column: col_name.clone(),
398 outlier_count,
399 outlier_ratio,
400 });
401 }
402 }
403
404 columns.insert(col_name.clone(), quality);
405 }
406
407 let duplicate_row_count = if self.check_duplicates {
409 self.count_duplicate_rows(&column_data, row_count)
410 } else {
411 0
412 };
413
414 let duplicate_row_ratio = duplicate_row_count as f64 / row_count as f64;
415 if duplicate_row_ratio > self.thresholds.max_duplicate_row_ratio {
416 issues.push(QualityIssue::DuplicateRows {
417 duplicate_count: duplicate_row_count,
418 duplicate_ratio: duplicate_row_ratio,
419 });
420 }
421
422 let score = self.calculate_score(&columns, &issues, row_count);
424
425 Ok(QualityReport {
426 row_count,
427 column_count: schema.fields().len(),
428 columns,
429 issues,
430 score,
431 duplicate_row_count,
432 })
433 }
434
435 pub(crate) fn collect_data(
437 &self,
438 dataset: &ArrowDataset,
439 ) -> (HashMap<String, Vec<Option<String>>>, usize) {
440 use arrow::array::{
441 Array, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, ListArray,
442 StringArray, StructArray,
443 };
444
445 let schema = dataset.schema();
446 let mut data: HashMap<String, Vec<Option<String>>> = HashMap::new();
447 let mut row_count = 0;
448
449 for field in schema.fields() {
450 data.insert(field.name().clone(), Vec::new());
451 }
452
453 for batch in dataset.iter() {
454 row_count += batch.num_rows();
455
456 for (col_idx, field) in schema.fields().iter().enumerate() {
457 if let Some(col_data) = data.get_mut(field.name()) {
458 let array = batch.column(col_idx);
459
460 for i in 0..array.len() {
461 if array.is_null(i) {
462 col_data.push(None);
463 } else if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
464 col_data.push(Some(arr.value(i).to_string()));
465 } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
466 col_data.push(Some(arr.value(i).to_string()));
467 } else if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
468 col_data.push(Some(arr.value(i).to_string()));
469 } else if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
470 col_data.push(Some(arr.value(i).to_string()));
471 } else if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
472 col_data.push(Some(arr.value(i).to_string()));
474 } else if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() {
475 col_data.push(Some(arr.value(i).to_string()));
477 } else if let Some(arr) = array.as_any().downcast_ref::<ListArray>() {
478 col_data.push(Some(Self::serialize_list_value(arr, i)));
480 } else if let Some(arr) = array.as_any().downcast_ref::<StructArray>() {
481 col_data.push(Some(Self::serialize_struct_value(arr, i)));
483 } else {
484 col_data.push(Some("?".to_string()));
485 }
486 }
487 }
488 }
489 }
490
491 (data, row_count)
492 }
493
494 pub(crate) fn serialize_list_value(arr: &arrow::array::ListArray, idx: usize) -> String {
496 use arrow::array::Array;
497
498 let values = arr.value(idx);
499 let mut parts = Vec::new();
500
501 for i in 0..values.len() {
502 if values.is_null(i) {
503 parts.push("null".to_string());
504 } else {
505 parts.push(Self::serialize_array_value(&values, i));
507 }
508 }
509
510 format!("[{}]", parts.join(","))
511 }
512
513 pub(crate) fn serialize_struct_value(arr: &arrow::array::StructArray, idx: usize) -> String {
515 use arrow::array::Array;
516
517 let mut parts = Vec::new();
518
519 for (field_idx, field) in arr.fields().iter().enumerate() {
520 let col = arr.column(field_idx);
521 let value = if col.is_null(idx) {
522 "null".to_string()
523 } else {
524 Self::serialize_array_value(col, idx)
525 };
526 parts.push(format!("{}:{}", field.name(), value));
527 }
528
529 format!("{{{}}}", parts.join(","))
530 }
531
532 pub(crate) fn serialize_array_value(arr: &dyn arrow::array::Array, idx: usize) -> String {
534 use arrow::array::{
535 BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, ListArray,
536 StringArray, StructArray,
537 };
538
539 if arr.is_null(idx) {
540 return "null".to_string();
541 }
542
543 if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
544 format!("\"{}\"", a.value(idx))
545 } else if let Some(a) = arr.as_any().downcast_ref::<Int32Array>() {
546 a.value(idx).to_string()
547 } else if let Some(a) = arr.as_any().downcast_ref::<Int64Array>() {
548 a.value(idx).to_string()
549 } else if let Some(a) = arr.as_any().downcast_ref::<Float64Array>() {
550 a.value(idx).to_string()
551 } else if let Some(a) = arr.as_any().downcast_ref::<Float32Array>() {
552 a.value(idx).to_string()
553 } else if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
554 a.value(idx).to_string()
555 } else if let Some(a) = arr.as_any().downcast_ref::<ListArray>() {
556 Self::serialize_list_value(a, idx)
557 } else if let Some(a) = arr.as_any().downcast_ref::<StructArray>() {
558 Self::serialize_struct_value(a, idx)
559 } else {
560 "?".to_string()
561 }
562 }
563
564 pub(crate) fn analyze_column(
566 &self,
567 name: &str,
568 values: &[Option<String>],
569 total_count: usize,
570 ) -> ColumnQuality {
571 let null_count = values.iter().filter(|v| v.is_none()).count();
572 let null_ratio = if total_count > 0 {
573 null_count as f64 / total_count as f64
574 } else {
575 0.0
576 };
577
578 let non_null_values: Vec<&str> = values.iter().filter_map(|v| v.as_deref()).collect();
580 let unique_set: HashSet<&str> = non_null_values.iter().copied().collect();
581 let unique_count = unique_set.len();
582 let unique_ratio = if !non_null_values.is_empty() {
583 unique_count as f64 / non_null_values.len() as f64
584 } else {
585 0.0
586 };
587
588 let duplicate_count = non_null_values.len().saturating_sub(unique_count);
590 let duplicate_ratio = if !non_null_values.is_empty() {
591 duplicate_count as f64 / non_null_values.len() as f64
592 } else {
593 0.0
594 };
595
596 let (outlier_count, numeric_stats) = if self.check_outliers {
598 self.analyze_numeric(&non_null_values)
599 } else {
600 (None, None)
601 };
602
603 ColumnQuality {
604 name: name.to_string(),
605 total_count,
606 null_count,
607 null_ratio,
608 unique_count,
609 unique_ratio,
610 duplicate_count,
611 duplicate_ratio,
612 outlier_count,
613 numeric_stats,
614 }
615 }
616
617 pub(crate) fn analyze_numeric(&self, values: &[&str]) -> (Option<usize>, Option<NumericStats>) {
619 let numeric_values: Vec<f64> = values
620 .iter()
621 .filter_map(|v| v.parse::<f64>().ok())
622 .filter(|v| v.is_finite())
623 .collect();
624
625 if numeric_values.len() < 4 {
626 return (None, None);
627 }
628
629 let mut sorted = numeric_values.clone();
630 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
631
632 let n = sorted.len();
633 let min = sorted[0];
634 let max = sorted[n - 1];
635 let mean = numeric_values.iter().sum::<f64>() / n as f64;
636
637 let variance = numeric_values
638 .iter()
639 .map(|v| (v - mean).powi(2))
640 .sum::<f64>()
641 / n as f64;
642 let std_dev = variance.sqrt();
643
644 let q1 = sorted[n / 4];
645 let median = sorted[n / 2];
646 let q3 = sorted[3 * n / 4];
647
648 let stats = NumericStats {
649 min,
650 max,
651 mean,
652 std_dev,
653 q1,
654 median,
655 q3,
656 };
657
658 let lower = stats.outlier_lower_bound();
660 let upper = stats.outlier_upper_bound();
661 let outlier_count = numeric_values
662 .iter()
663 .filter(|&&v| v < lower || v > upper)
664 .count();
665
666 (Some(outlier_count), Some(stats))
667 }
668
669 pub(crate) fn count_duplicate_rows(
671 &self,
672 data: &HashMap<String, Vec<Option<String>>>,
673 row_count: usize,
674 ) -> usize {
675 if data.is_empty() || row_count == 0 {
676 return 0;
677 }
678
679 let mut row_set: HashSet<String> = HashSet::new();
681 let mut duplicates = 0;
682
683 let columns: Vec<&String> = data.keys().collect();
684
685 for i in 0..row_count {
686 let row_key: String = columns
687 .iter()
688 .map(|col| {
689 data.get(*col)
690 .and_then(|v| v.get(i))
691 .map(|v| v.clone().unwrap_or_else(|| "NULL".to_string()))
692 .unwrap_or_else(|| "NULL".to_string())
693 })
694 .collect::<Vec<_>>()
695 .join("|");
696
697 if !row_set.insert(row_key) {
698 duplicates += 1;
699 }
700 }
701
702 duplicates
703 }
704
705 pub(crate) fn calculate_score(
707 &self,
708 columns: &HashMap<String, ColumnQuality>,
709 issues: &[QualityIssue],
710 row_count: usize,
711 ) -> f64 {
712 if row_count == 0 || columns.is_empty() {
713 return 0.0;
714 }
715
716 let mut score = 100.0;
717
718 let avg_null_ratio: f64 =
720 columns.values().map(|c| c.null_ratio).sum::<f64>() / columns.len() as f64;
721 score -= avg_null_ratio * 30.0;
722
723 for issue in issues {
725 score -= match issue.severity() {
726 5 => 25.0,
727 4 => 15.0,
728 3 => 10.0,
729 2 => 5.0,
730 1 => 2.0,
731 _ => 0.0,
732 };
733 }
734
735 score.clamp(0.0, 100.0)
736 }
737}
738
739#[derive(Debug, Clone)]
741pub struct TextColumnStats {
742 pub min_len: usize,
744 pub max_len: usize,
746 pub mean_len: f64,
748 pub p50_len: usize,
750 pub p95_len: usize,
752 pub p99_len: usize,
754 pub empty_count: usize,
756 pub preamble_count: usize,
758 pub total: usize,
760}
761
762impl TextColumnStats {
763 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
774 pub fn from_dataset(
775 dataset: &crate::ArrowDataset,
776 column: &str,
777 preamble_prefix: Option<&str>,
778 ) -> crate::Result<Self> {
779 use arrow::array::{Array, StringArray};
780
781 use crate::Dataset;
782
783 let schema = dataset.schema();
784 let col_idx = schema
785 .fields()
786 .iter()
787 .position(|f| f.name() == column)
788 .ok_or_else(|| crate::Error::invalid_config(format!("Column '{column}' not found")))?;
789
790 let mut lengths: Vec<usize> = Vec::with_capacity(dataset.len());
791 let mut empty_count = 0usize;
792 let mut preamble_count = 0usize;
793
794 for batch in dataset.iter() {
795 let array = batch.column(col_idx);
796 let str_arr = array
797 .as_any()
798 .downcast_ref::<StringArray>()
799 .ok_or_else(|| {
800 crate::Error::invalid_config(format!("Column '{column}' is not a string type"))
801 })?;
802
803 for i in 0..str_arr.len() {
804 if str_arr.is_null(i) {
805 continue;
806 }
807 let val = str_arr.value(i);
808 let len = val.len();
809 lengths.push(len);
810
811 if val.trim().is_empty() {
812 empty_count += 1;
813 }
814 if let Some(prefix) = preamble_prefix {
815 if val.starts_with(prefix) {
816 preamble_count += 1;
817 }
818 }
819 }
820 }
821
822 if lengths.is_empty() {
823 return Ok(Self {
824 min_len: 0,
825 max_len: 0,
826 mean_len: 0.0,
827 p50_len: 0,
828 p95_len: 0,
829 p99_len: 0,
830 empty_count: 0,
831 preamble_count: 0,
832 total: 0,
833 });
834 }
835
836 lengths.sort_unstable();
837 let total = lengths.len();
838 let min_len = lengths[0];
839 let max_len = lengths[total - 1];
840 let mean_len = lengths.iter().sum::<usize>() as f64 / total as f64;
841 let p50_len = lengths[total / 2];
842 let p95_len = lengths[(total as f64 * 0.95) as usize];
843 let p99_len = lengths[(total as f64 * 0.99).min((total - 1) as f64) as usize];
844
845 Ok(Self {
846 min_len,
847 max_len,
848 mean_len,
849 p50_len,
850 p95_len,
851 p99_len,
852 empty_count,
853 preamble_count,
854 total,
855 })
856 }
857}