floe_core/checks/
unique.rs1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8pub fn unique_errors(
9 df: &DataFrame,
10 columns: &[config::ColumnConfig],
11 indices: &ColumnIndex,
12) -> FloeResult<Vec<Vec<RowError>>> {
13 let mut errors_per_row = vec![Vec::new(); df.height()];
14 let unique_columns: Vec<&config::ColumnConfig> = columns
15 .iter()
16 .filter(|col| col.unique == Some(true))
17 .collect();
18 if unique_columns.is_empty() {
19 return Ok(errors_per_row);
20 }
21
22 for column in unique_columns {
23 let index = indices.get(&column.name).ok_or_else(|| {
24 Box::new(RunError(format!("unique column {} not found", column.name)))
25 })?;
26 let series = df.select_at_idx(*index).ok_or_else(|| {
27 Box::new(RunError(format!("unique column {} not found", column.name)))
28 })?;
29 let series = series.as_materialized_series();
30 let non_null = series.len().saturating_sub(series.null_count());
31 if non_null == 0 {
32 continue;
33 }
34 let mut duplicate_mask = is_duplicated(series).map_err(|err| {
35 Box::new(RunError(format!(
36 "unique column {} read failed: {err}",
37 column.name
38 )))
39 })?;
40 let not_null = series.is_not_null();
41 duplicate_mask = &duplicate_mask & ¬_null;
42 let mut first_mask = is_first_distinct(series).map_err(|err| {
43 Box::new(RunError(format!(
44 "unique column {} read failed: {err}",
45 column.name
46 )))
47 })?;
48 first_mask = &first_mask & ¬_null;
49 let mask = duplicate_mask & !first_mask;
50 for (row_idx, is_dup) in mask.into_iter().enumerate() {
51 if is_dup == Some(true) {
52 errors_per_row[row_idx].push(RowError::new(
53 "unique",
54 &column.name,
55 "duplicate value",
56 ));
57 }
58 }
59 }
60
61 Ok(errors_per_row)
62}
63
64#[derive(Debug, Default)]
65pub struct UniqueTracker {
66 seen: HashMap<String, HashSet<String>>,
67}
68
69impl UniqueTracker {
70 pub fn new(columns: &[config::ColumnConfig]) -> Self {
71 let mut seen = HashMap::new();
72 for column in columns.iter().filter(|col| col.unique == Some(true)) {
73 seen.insert(column.name.clone(), HashSet::new());
74 }
75 Self { seen }
76 }
77
78 pub fn is_empty(&self) -> bool {
79 self.seen.is_empty()
80 }
81
82 pub fn apply(
83 &mut self,
84 df: &DataFrame,
85 columns: &[config::ColumnConfig],
86 ) -> FloeResult<Vec<Vec<RowError>>> {
87 let mut errors_per_row = vec![Vec::new(); df.height()];
88 if df.height() == 0 {
89 return Ok(errors_per_row);
90 }
91 let unique_columns: Vec<&config::ColumnConfig> = columns
92 .iter()
93 .filter(|col| col.unique == Some(true))
94 .collect();
95 if unique_columns.is_empty() {
96 return Ok(errors_per_row);
97 }
98
99 for column in unique_columns {
100 let series = df.column(&column.name).map_err(|err| {
101 Box::new(RunError(format!(
102 "unique column {} not found: {err}",
103 column.name
104 )))
105 })?;
106 let series = series.as_materialized_series().rechunk();
107 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
108 Box::new(RunError(format!(
109 "unique column {} not tracked",
110 column.name
111 )))
112 })?;
113 for (row_idx, value) in series.iter().enumerate() {
114 if matches!(value, AnyValue::Null) {
115 continue;
116 }
117 let key = value.to_string();
118 if seen.contains(&key) {
119 errors_per_row[row_idx].push(RowError::new(
120 "unique",
121 &column.name,
122 "duplicate value",
123 ));
124 } else {
125 seen.insert(key);
126 }
127 }
128 }
129
130 Ok(errors_per_row)
131 }
132}
133
134pub fn unique_counts(
135 df: &DataFrame,
136 columns: &[config::ColumnConfig],
137) -> FloeResult<Vec<(String, u64)>> {
138 if df.height() == 0 {
139 return Ok(Vec::new());
140 }
141
142 let unique_columns: Vec<&config::ColumnConfig> = columns
143 .iter()
144 .filter(|col| col.unique == Some(true))
145 .collect();
146 if unique_columns.is_empty() {
147 return Ok(Vec::new());
148 }
149
150 let mut counts = Vec::new();
151 for column in unique_columns {
152 let series = df.column(&column.name).map_err(|err| {
153 Box::new(RunError(format!(
154 "unique column {} not found: {err}",
155 column.name
156 )))
157 })?;
158 let non_null = series.len().saturating_sub(series.null_count());
159 if non_null == 0 {
160 continue;
161 }
162 let unique = series.drop_nulls().n_unique().map_err(|err| {
163 Box::new(RunError(format!(
164 "unique column {} read failed: {err}",
165 column.name
166 )))
167 })?;
168 let violations = non_null.saturating_sub(unique) as u64;
169 if violations > 0 {
170 counts.push((column.name.clone(), violations));
171 }
172 }
173
174 Ok(counts)
175}