floe_core/checks/
unique.rs1use polars::prelude::{is_duplicated, is_first_distinct, DataFrame};
2
3use super::{ColumnIndex, RowError};
4use crate::errors::RunError;
5use crate::{config, FloeResult};
6
7pub fn unique_errors(
8 df: &DataFrame,
9 columns: &[config::ColumnConfig],
10 indices: &ColumnIndex,
11) -> FloeResult<Vec<Vec<RowError>>> {
12 let mut errors_per_row = vec![Vec::new(); df.height()];
13 let unique_columns: Vec<&config::ColumnConfig> = columns
14 .iter()
15 .filter(|col| col.unique == Some(true))
16 .collect();
17 if unique_columns.is_empty() {
18 return Ok(errors_per_row);
19 }
20
21 for column in unique_columns {
22 let index = indices.get(&column.name).ok_or_else(|| {
23 Box::new(RunError(format!("unique column {} not found", column.name)))
24 })?;
25 let series = df.select_at_idx(*index).ok_or_else(|| {
26 Box::new(RunError(format!("unique column {} not found", column.name)))
27 })?;
28 let series = series.as_materialized_series();
29 let non_null = series.len().saturating_sub(series.null_count());
30 if non_null == 0 {
31 continue;
32 }
33 let mut duplicate_mask = is_duplicated(series).map_err(|err| {
34 Box::new(RunError(format!(
35 "unique column {} read failed: {err}",
36 column.name
37 )))
38 })?;
39 let not_null = series.is_not_null();
40 duplicate_mask = &duplicate_mask & ¬_null;
41 let mut first_mask = is_first_distinct(series).map_err(|err| {
42 Box::new(RunError(format!(
43 "unique column {} read failed: {err}",
44 column.name
45 )))
46 })?;
47 first_mask = &first_mask & ¬_null;
48 let mask = duplicate_mask & !first_mask;
49 for (row_idx, is_dup) in mask.into_iter().enumerate() {
50 if is_dup == Some(true) {
51 errors_per_row[row_idx].push(RowError::new(
52 "unique",
53 &column.name,
54 "duplicate value",
55 ));
56 }
57 }
58 }
59
60 Ok(errors_per_row)
61}
62
63pub fn unique_counts(
64 df: &DataFrame,
65 columns: &[config::ColumnConfig],
66) -> FloeResult<Vec<(String, u64)>> {
67 if df.height() == 0 {
68 return Ok(Vec::new());
69 }
70
71 let unique_columns: Vec<&config::ColumnConfig> = columns
72 .iter()
73 .filter(|col| col.unique == Some(true))
74 .collect();
75 if unique_columns.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let mut counts = Vec::new();
80 for column in unique_columns {
81 let series = df.column(&column.name).map_err(|err| {
82 Box::new(RunError(format!(
83 "unique column {} not found: {err}",
84 column.name
85 )))
86 })?;
87 let non_null = series.len().saturating_sub(series.null_count());
88 if non_null == 0 {
89 continue;
90 }
91 let unique = series.drop_nulls().n_unique().map_err(|err| {
92 Box::new(RunError(format!(
93 "unique column {} read failed: {err}",
94 column.name
95 )))
96 })?;
97 let violations = non_null.saturating_sub(unique) as u64;
98 if violations > 0 {
99 counts.push((column.name.clone(), violations));
100 }
101 }
102
103 Ok(counts)
104}