1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash)]
9enum UniqueKey {
10 Bool(bool),
11 I64(i64),
12 U64(u64),
13 F64(u64),
14 String(String),
15 Other(String),
16}
17
18fn unique_key(value: AnyValue) -> Option<UniqueKey> {
19 match value {
20 AnyValue::Null => None,
21 AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
22 AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
23 AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
24 AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
25 AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
26 AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
27 AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
28 AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
29 AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
30 AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
31 AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
32 AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
33 AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
34 AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
35 AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
36 other => Some(UniqueKey::Other(other.to_string())),
37 }
38}
39
40pub fn unique_errors(
41 df: &DataFrame,
42 columns: &[config::ColumnConfig],
43 indices: &ColumnIndex,
44) -> FloeResult<Vec<Vec<RowError>>> {
45 let mut errors_per_row = vec![Vec::new(); df.height()];
46 let unique_columns: Vec<&config::ColumnConfig> = columns
47 .iter()
48 .filter(|col| col.unique == Some(true))
49 .collect();
50 if unique_columns.is_empty() {
51 return Ok(errors_per_row);
52 }
53
54 for column in unique_columns {
55 let index = indices.get(&column.name).ok_or_else(|| {
56 Box::new(RunError(format!("unique column {} not found", column.name)))
57 })?;
58 let series = df.select_at_idx(*index).ok_or_else(|| {
59 Box::new(RunError(format!("unique column {} not found", column.name)))
60 })?;
61 let series = series.as_materialized_series();
62 let non_null = series.len().saturating_sub(series.null_count());
63 if non_null == 0 {
64 continue;
65 }
66 let mut duplicate_mask = is_duplicated(series).map_err(|err| {
67 Box::new(RunError(format!(
68 "unique column {} read failed: {err}",
69 column.name
70 )))
71 })?;
72 let not_null = series.is_not_null();
73 duplicate_mask = &duplicate_mask & ¬_null;
74 let mut first_mask = is_first_distinct(series).map_err(|err| {
75 Box::new(RunError(format!(
76 "unique column {} read failed: {err}",
77 column.name
78 )))
79 })?;
80 first_mask = &first_mask & ¬_null;
81 let mask = duplicate_mask & !first_mask;
82 for (row_idx, is_dup) in mask.into_iter().enumerate() {
83 if is_dup == Some(true) {
84 errors_per_row[row_idx].push(RowError::new(
85 "unique",
86 &column.name,
87 "duplicate value",
88 ));
89 }
90 }
91 }
92
93 Ok(errors_per_row)
94}
95
96pub fn unique_errors_sparse(
97 df: &DataFrame,
98 columns: &[config::ColumnConfig],
99 indices: &ColumnIndex,
100) -> FloeResult<SparseRowErrors> {
101 let mut errors = SparseRowErrors::new(df.height());
102 if df.height() == 0 {
103 return Ok(errors);
104 }
105 let unique_columns: Vec<&config::ColumnConfig> = columns
106 .iter()
107 .filter(|col| col.unique == Some(true))
108 .collect();
109 if unique_columns.is_empty() {
110 return Ok(errors);
111 }
112
113 for column in unique_columns {
114 let index = indices.get(&column.name).ok_or_else(|| {
115 Box::new(RunError(format!(
116 "unique column {} not found in dataframe",
117 column.name
118 )))
119 })?;
120 let series = df
121 .select_at_idx(*index)
122 .ok_or_else(|| {
123 Box::new(RunError(format!(
124 "unique column {} not found in dataframe",
125 column.name
126 )))
127 })?
128 .as_materialized_series()
129 .rechunk();
130 let mut seen: HashSet<UniqueKey> = HashSet::new();
131 for (row_idx, value) in series.iter().enumerate() {
132 let key = match unique_key(value) {
133 Some(key) => key,
134 None => continue,
135 };
136 if seen.contains(&key) {
137 errors.add_error(
138 row_idx,
139 RowError::new("unique", &column.name, "duplicate value"),
140 );
141 } else {
142 seen.insert(key);
143 }
144 }
145 }
146
147 Ok(errors)
148}
149
150#[derive(Debug, Default)]
151pub struct UniqueTracker {
152 seen: HashMap<String, HashSet<UniqueKey>>,
153}
154
155impl UniqueTracker {
156 pub fn new(columns: &[config::ColumnConfig]) -> Self {
157 let mut seen = HashMap::new();
158 for column in columns.iter().filter(|col| col.unique == Some(true)) {
159 seen.insert(column.name.clone(), HashSet::new());
160 }
161 Self { seen }
162 }
163
164 pub fn is_empty(&self) -> bool {
165 self.seen.is_empty()
166 }
167
168 pub fn apply(
169 &mut self,
170 df: &DataFrame,
171 columns: &[config::ColumnConfig],
172 ) -> FloeResult<Vec<Vec<RowError>>> {
173 let mut errors_per_row = vec![Vec::new(); df.height()];
174 if df.height() == 0 {
175 return Ok(errors_per_row);
176 }
177 let unique_columns: Vec<&config::ColumnConfig> = columns
178 .iter()
179 .filter(|col| col.unique == Some(true))
180 .collect();
181 if unique_columns.is_empty() {
182 return Ok(errors_per_row);
183 }
184
185 for column in unique_columns {
186 let series = df.column(&column.name).map_err(|err| {
187 Box::new(RunError(format!(
188 "unique column {} not found: {err}",
189 column.name
190 )))
191 })?;
192 let series = series.as_materialized_series().rechunk();
193 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
194 Box::new(RunError(format!(
195 "unique column {} not tracked",
196 column.name
197 )))
198 })?;
199 for (row_idx, value) in series.iter().enumerate() {
200 let key = match unique_key(value) {
201 Some(key) => key,
202 None => continue,
203 };
204 if seen.contains(&key) {
205 errors_per_row[row_idx].push(RowError::new(
206 "unique",
207 &column.name,
208 "duplicate value",
209 ));
210 } else {
211 seen.insert(key);
212 }
213 }
214 }
215
216 Ok(errors_per_row)
217 }
218
219 pub fn apply_sparse(
220 &mut self,
221 df: &DataFrame,
222 columns: &[config::ColumnConfig],
223 ) -> FloeResult<SparseRowErrors> {
224 let mut errors = SparseRowErrors::new(df.height());
225 if df.height() == 0 {
226 return Ok(errors);
227 }
228 let unique_columns: Vec<&config::ColumnConfig> = columns
229 .iter()
230 .filter(|col| col.unique == Some(true))
231 .collect();
232 if unique_columns.is_empty() {
233 return Ok(errors);
234 }
235
236 for column in unique_columns {
237 let series = df.column(&column.name).map_err(|err| {
238 Box::new(RunError(format!(
239 "unique column {} not found: {err}",
240 column.name
241 )))
242 })?;
243 let series = series.as_materialized_series().rechunk();
244 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
245 Box::new(RunError(format!(
246 "unique column {} not tracked",
247 column.name
248 )))
249 })?;
250 for (row_idx, value) in series.iter().enumerate() {
251 let key = match unique_key(value) {
252 Some(key) => key,
253 None => continue,
254 };
255 if seen.contains(&key) {
256 errors.add_error(
257 row_idx,
258 RowError::new("unique", &column.name, "duplicate value"),
259 );
260 } else {
261 seen.insert(key);
262 }
263 }
264 }
265
266 Ok(errors)
267 }
268}
269
270pub fn unique_counts(
271 df: &DataFrame,
272 columns: &[config::ColumnConfig],
273) -> FloeResult<Vec<(String, u64)>> {
274 if df.height() == 0 {
275 return Ok(Vec::new());
276 }
277
278 let unique_columns: Vec<&config::ColumnConfig> = columns
279 .iter()
280 .filter(|col| col.unique == Some(true))
281 .collect();
282 if unique_columns.is_empty() {
283 return Ok(Vec::new());
284 }
285
286 let mut counts = Vec::new();
287 for column in unique_columns {
288 let series = df.column(&column.name).map_err(|err| {
289 Box::new(RunError(format!(
290 "unique column {} not found: {err}",
291 column.name
292 )))
293 })?;
294 let non_null = series.len().saturating_sub(series.null_count());
295 if non_null == 0 {
296 continue;
297 }
298 let unique = series.drop_nulls().n_unique().map_err(|err| {
299 Box::new(RunError(format!(
300 "unique column {} read failed: {err}",
301 column.name
302 )))
303 })?;
304 let violations = non_null.saturating_sub(unique) as u64;
305 if violations > 0 {
306 counts.push((column.name.clone(), violations));
307 }
308 }
309
310 Ok(counts)
311}