1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash)]
9enum UniqueKey {
10 Bool(bool),
11 I64(i64),
12 U64(u64),
13 F64(u64),
14 String(String),
15 Other(String),
16}
17
18fn unique_key(value: AnyValue) -> Option<UniqueKey> {
19 match value {
20 AnyValue::Null => None,
21 AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
22 AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
23 AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
24 AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
25 AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
26 AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
27 AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
28 AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
29 AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
30 AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
31 AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
32 AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
33 AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
34 AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
35 AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
36 other => Some(UniqueKey::Other(other.to_string())),
37 }
38}
39
40pub fn unique_errors(
41 df: &DataFrame,
42 columns: &[config::ColumnConfig],
43 indices: &ColumnIndex,
44) -> FloeResult<Vec<Vec<RowError>>> {
45 let mut errors_per_row = vec![Vec::new(); df.height()];
46 let unique_columns: Vec<&config::ColumnConfig> = columns
47 .iter()
48 .filter(|col| col.unique == Some(true))
49 .collect();
50 if unique_columns.is_empty() {
51 return Ok(errors_per_row);
52 }
53
54 for column in unique_columns {
55 let index = indices.get(&column.name).ok_or_else(|| {
56 Box::new(RunError(format!("unique column {} not found", column.name)))
57 })?;
58 let series = df.select_at_idx(*index).ok_or_else(|| {
59 Box::new(RunError(format!("unique column {} not found", column.name)))
60 })?;
61 let series = series.as_materialized_series();
62 let non_null = series.len().saturating_sub(series.null_count());
63 if non_null == 0 {
64 continue;
65 }
66 let mut duplicate_mask = is_duplicated(series).map_err(|err| {
67 Box::new(RunError(format!(
68 "unique column {} read failed: {err}",
69 column.name
70 )))
71 })?;
72 let not_null = series.is_not_null();
73 duplicate_mask = &duplicate_mask & ¬_null;
74 let mut first_mask = is_first_distinct(series).map_err(|err| {
75 Box::new(RunError(format!(
76 "unique column {} read failed: {err}",
77 column.name
78 )))
79 })?;
80 first_mask = &first_mask & ¬_null;
81 let mask = duplicate_mask & !first_mask;
82 for (row_idx, is_dup) in mask.into_iter().enumerate() {
83 if is_dup == Some(true) {
84 errors_per_row[row_idx].push(RowError::new(
85 "unique",
86 &column.name,
87 "duplicate value",
88 ));
89 }
90 }
91 }
92
93 Ok(errors_per_row)
94}
95
96pub fn unique_errors_sparse(
97 df: &DataFrame,
98 columns: &[config::ColumnConfig],
99 indices: &ColumnIndex,
100) -> FloeResult<SparseRowErrors> {
101 let mut errors = SparseRowErrors::new(df.height());
102 if df.height() == 0 {
103 return Ok(errors);
104 }
105 let unique_columns: Vec<&config::ColumnConfig> = columns
106 .iter()
107 .filter(|col| col.unique == Some(true))
108 .collect();
109 if unique_columns.is_empty() {
110 return Ok(errors);
111 }
112
113 for column in unique_columns {
114 let index = indices.get(&column.name).ok_or_else(|| {
115 Box::new(RunError(format!(
116 "unique column {} not found in dataframe",
117 column.name
118 )))
119 })?;
120 let series = df
121 .select_at_idx(*index)
122 .ok_or_else(|| {
123 Box::new(RunError(format!(
124 "unique column {} not found in dataframe",
125 column.name
126 )))
127 })?
128 .as_materialized_series()
129 .rechunk();
130 let mut seen: HashSet<UniqueKey> = HashSet::new();
131 for (row_idx, value) in series.iter().enumerate() {
132 let key = match unique_key(value) {
133 Some(key) => key,
134 None => continue,
135 };
136 if seen.contains(&key) {
137 errors.add_error(
138 row_idx,
139 RowError::new("unique", &column.name, "duplicate value"),
140 );
141 } else {
142 seen.insert(key);
143 }
144 }
145 }
146
147 Ok(errors)
148}
149
150#[derive(Debug, Default)]
151pub struct UniqueTracker {
152 seen: HashMap<String, HashSet<UniqueKey>>,
153}
154
155impl UniqueTracker {
156 pub fn new(columns: &[config::ColumnConfig]) -> Self {
157 let mut seen = HashMap::new();
158 for column in columns.iter().filter(|col| col.unique == Some(true)) {
159 seen.insert(column.name.clone(), HashSet::new());
160 }
161 Self { seen }
162 }
163
164 pub fn is_empty(&self) -> bool {
165 self.seen.is_empty()
166 }
167
168 pub fn seed_from_df(
169 &mut self,
170 df: &DataFrame,
171 columns: &[config::ColumnConfig],
172 ) -> FloeResult<()> {
173 if df.height() == 0 || self.seen.is_empty() {
174 return Ok(());
175 }
176 let unique_columns: Vec<&config::ColumnConfig> = columns
177 .iter()
178 .filter(|col| col.unique == Some(true))
179 .collect();
180 if unique_columns.is_empty() {
181 return Ok(());
182 }
183
184 for column in unique_columns {
185 let series = df.column(&column.name).map_err(|err| {
186 Box::new(RunError(format!(
187 "unique column {} not found: {err}",
188 column.name
189 )))
190 })?;
191 let series = series.as_materialized_series().rechunk();
192 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
193 Box::new(RunError(format!(
194 "unique column {} not tracked",
195 column.name
196 )))
197 })?;
198 for value in series.iter() {
199 let key = match unique_key(value) {
200 Some(key) => key,
201 None => continue,
202 };
203 seen.insert(key);
204 }
205 }
206
207 Ok(())
208 }
209
210 pub fn apply(
211 &mut self,
212 df: &DataFrame,
213 columns: &[config::ColumnConfig],
214 ) -> FloeResult<Vec<Vec<RowError>>> {
215 let mut errors_per_row = vec![Vec::new(); df.height()];
216 if df.height() == 0 {
217 return Ok(errors_per_row);
218 }
219 let unique_columns: Vec<&config::ColumnConfig> = columns
220 .iter()
221 .filter(|col| col.unique == Some(true))
222 .collect();
223 if unique_columns.is_empty() {
224 return Ok(errors_per_row);
225 }
226
227 for column in unique_columns {
228 let series = df.column(&column.name).map_err(|err| {
229 Box::new(RunError(format!(
230 "unique column {} not found: {err}",
231 column.name
232 )))
233 })?;
234 let series = series.as_materialized_series().rechunk();
235 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
236 Box::new(RunError(format!(
237 "unique column {} not tracked",
238 column.name
239 )))
240 })?;
241 for (row_idx, value) in series.iter().enumerate() {
242 let key = match unique_key(value) {
243 Some(key) => key,
244 None => continue,
245 };
246 if seen.contains(&key) {
247 errors_per_row[row_idx].push(RowError::new(
248 "unique",
249 &column.name,
250 "duplicate value",
251 ));
252 } else {
253 seen.insert(key);
254 }
255 }
256 }
257
258 Ok(errors_per_row)
259 }
260
261 pub fn apply_sparse(
262 &mut self,
263 df: &DataFrame,
264 columns: &[config::ColumnConfig],
265 ) -> FloeResult<SparseRowErrors> {
266 let mut errors = SparseRowErrors::new(df.height());
267 if df.height() == 0 {
268 return Ok(errors);
269 }
270 let unique_columns: Vec<&config::ColumnConfig> = columns
271 .iter()
272 .filter(|col| col.unique == Some(true))
273 .collect();
274 if unique_columns.is_empty() {
275 return Ok(errors);
276 }
277
278 for column in unique_columns {
279 let series = df.column(&column.name).map_err(|err| {
280 Box::new(RunError(format!(
281 "unique column {} not found: {err}",
282 column.name
283 )))
284 })?;
285 let series = series.as_materialized_series().rechunk();
286 let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
287 Box::new(RunError(format!(
288 "unique column {} not tracked",
289 column.name
290 )))
291 })?;
292 for (row_idx, value) in series.iter().enumerate() {
293 let key = match unique_key(value) {
294 Some(key) => key,
295 None => continue,
296 };
297 if seen.contains(&key) {
298 errors.add_error(
299 row_idx,
300 RowError::new("unique", &column.name, "duplicate value"),
301 );
302 } else {
303 seen.insert(key);
304 }
305 }
306 }
307
308 Ok(errors)
309 }
310}
311
312pub fn unique_counts(
313 df: &DataFrame,
314 columns: &[config::ColumnConfig],
315) -> FloeResult<Vec<(String, u64)>> {
316 if df.height() == 0 {
317 return Ok(Vec::new());
318 }
319
320 let unique_columns: Vec<&config::ColumnConfig> = columns
321 .iter()
322 .filter(|col| col.unique == Some(true))
323 .collect();
324 if unique_columns.is_empty() {
325 return Ok(Vec::new());
326 }
327
328 let mut counts = Vec::new();
329 for column in unique_columns {
330 let series = df.column(&column.name).map_err(|err| {
331 Box::new(RunError(format!(
332 "unique column {} not found: {err}",
333 column.name
334 )))
335 })?;
336 let non_null = series.len().saturating_sub(series.null_count());
337 if non_null == 0 {
338 continue;
339 }
340 let unique = series.drop_nulls().n_unique().map_err(|err| {
341 Box::new(RunError(format!(
342 "unique column {} read failed: {err}",
343 column.name
344 )))
345 })?;
346 let violations = non_null.saturating_sub(unique) as u64;
347 if violations > 0 {
348 counts.push((column.name.clone(), violations));
349 }
350 }
351
352 Ok(counts)
353}