Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{AnyValue, DataFrame, Series};
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8const UNIQUE_SAMPLE_LIMIT: usize = 5;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11enum UniqueKey {
12    Bool(bool),
13    I64(i64),
14    U64(u64),
15    F64(u64),
16    String(String),
17    Other(String),
18}
19
20impl UniqueKey {
21    fn as_string(&self) -> String {
22        match self {
23            UniqueKey::Bool(value) => value.to_string(),
24            UniqueKey::I64(value) => value.to_string(),
25            UniqueKey::U64(value) => value.to_string(),
26            UniqueKey::F64(value) => f64::from_bits(*value).to_string(),
27            UniqueKey::String(value) | UniqueKey::Other(value) => value.clone(),
28        }
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33struct CompositeKey(Vec<UniqueKey>);
34
35#[derive(Debug, Clone)]
36pub struct UniqueConstraint {
37    pub runtime_columns: Vec<String>,
38    pub report_columns: Vec<String>,
39    pub enforce_reject: bool,
40}
41
42#[derive(Debug, Clone)]
43pub struct UniqueConstraintSample {
44    pub values: BTreeMap<String, String>,
45    pub count: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct UniqueConstraintResult {
50    pub columns: Vec<String>,
51    pub duplicates_count: u64,
52    pub batch_duplicates_count: u64,
53    pub target_duplicates_count: u64,
54    pub affected_rows_count: u64,
55    pub samples: Vec<UniqueConstraintSample>,
56}
57
58#[derive(Debug, Clone)]
59struct ConstraintState {
60    constraint: UniqueConstraint,
61    seen: HashSet<CompositeKey>,
62    seeded_keys: HashSet<CompositeKey>,
63    duplicates_count: u64,
64    batch_duplicates_count: u64,
65    target_duplicates_count: u64,
66    sample_counts: HashMap<CompositeKey, u64>,
67}
68
69#[derive(Debug, Default)]
70pub struct UniqueTracker {
71    states: Vec<ConstraintState>,
72}
73
74impl UniqueTracker {
75    pub fn new(columns: &[config::ColumnConfig]) -> Self {
76        let constraints = legacy_unique_constraints(columns)
77            .into_iter()
78            .map(|column| UniqueConstraint {
79                runtime_columns: vec![column.clone()],
80                report_columns: vec![column],
81                enforce_reject: false,
82            })
83            .collect::<Vec<_>>();
84        Self::with_constraints(constraints)
85    }
86
87    pub fn with_constraints(constraints: Vec<UniqueConstraint>) -> Self {
88        let states = constraints
89            .into_iter()
90            .map(|constraint| ConstraintState {
91                constraint,
92                seen: HashSet::new(),
93                seeded_keys: HashSet::new(),
94                duplicates_count: 0,
95                batch_duplicates_count: 0,
96                target_duplicates_count: 0,
97                sample_counts: HashMap::new(),
98            })
99            .collect();
100        Self { states }
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.states.is_empty()
105    }
106
107    pub fn runtime_columns(&self) -> Vec<String> {
108        let mut seen = HashSet::new();
109        let mut columns = Vec::new();
110        for state in &self.states {
111            for column in &state.constraint.runtime_columns {
112                if seen.insert(column.clone()) {
113                    columns.push(column.clone());
114                }
115            }
116        }
117        columns
118    }
119
120    pub fn seed_from_df(&mut self, df: &DataFrame) -> FloeResult<()> {
121        if df.height() == 0 || self.states.is_empty() {
122            return Ok(());
123        }
124        for state in &mut self.states {
125            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
126            for row_idx in 0..df.height() {
127                let key = match composite_key_from_row(&columns, row_idx)? {
128                    Some(key) => key,
129                    None => continue,
130                };
131                state.seeded_keys.insert(key.clone());
132                state.seen.insert(key);
133            }
134        }
135        Ok(())
136    }
137
138    pub fn apply(
139        &mut self,
140        df: &DataFrame,
141        columns: &[config::ColumnConfig],
142    ) -> FloeResult<Vec<Vec<RowError>>> {
143        let mut errors_per_row = vec![Vec::new(); df.height()];
144        let sparse = self.apply_sparse(df, columns)?;
145        for (row_idx, row_errors) in sparse.iter() {
146            if let Some(slot) = errors_per_row.get_mut(*row_idx) {
147                slot.extend(row_errors.clone());
148            }
149        }
150        Ok(errors_per_row)
151    }
152
153    pub fn apply_sparse(
154        &mut self,
155        df: &DataFrame,
156        _columns: &[config::ColumnConfig],
157    ) -> FloeResult<SparseRowErrors> {
158        let mut forced_reject_rows = HashSet::new();
159        self.apply_sparse_with_forced_rejects(df, _columns, &mut forced_reject_rows)
160    }
161
162    pub fn apply_sparse_with_forced_rejects(
163        &mut self,
164        df: &DataFrame,
165        _columns: &[config::ColumnConfig],
166        forced_reject_rows: &mut HashSet<usize>,
167    ) -> FloeResult<SparseRowErrors> {
168        let mut errors = SparseRowErrors::new(df.height());
169        if df.height() == 0 || self.states.is_empty() {
170            return Ok(errors);
171        }
172
173        for state in &mut self.states {
174            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
175            let report_columns = state.constraint.report_columns.clone();
176            let (constraint_repr, message) = if report_columns.len() == 1 {
177                (report_columns[0].clone(), "duplicate value")
178            } else {
179                (format!("[{}]", report_columns.join(",")), "duplicate key")
180            };
181            for row_idx in 0..df.height() {
182                let key = match composite_key_from_row(&columns, row_idx)? {
183                    Some(key) => key,
184                    None => continue,
185                };
186                if state.seen.contains(&key) {
187                    errors.add_error(row_idx, RowError::new("unique", &constraint_repr, message));
188                    if state.constraint.enforce_reject {
189                        forced_reject_rows.insert(row_idx);
190                    }
191                    state.duplicates_count += 1;
192                    if state.seeded_keys.contains(&key) {
193                        state.target_duplicates_count += 1;
194                    } else {
195                        state.batch_duplicates_count += 1;
196                    }
197                    let counter = state.sample_counts.entry(key).or_insert(0);
198                    *counter += 1;
199                } else {
200                    state.seen.insert(key);
201                }
202            }
203        }
204
205        Ok(errors)
206    }
207
208    pub fn results(&self) -> Vec<UniqueConstraintResult> {
209        self.states
210            .iter()
211            .map(|state| {
212                let mut sample_counts = state
213                    .sample_counts
214                    .iter()
215                    .map(|(key, count)| (key, *count))
216                    .collect::<Vec<_>>();
217                sample_counts.sort_by(|left, right| {
218                    right
219                        .1
220                        .cmp(&left.1)
221                        .then_with(|| format!("{:?}", left.0).cmp(&format!("{:?}", right.0)))
222                });
223                let samples = sample_counts
224                    .into_iter()
225                    .take(UNIQUE_SAMPLE_LIMIT)
226                    .map(|(key, count)| {
227                        let mut values = BTreeMap::new();
228                        for (idx, value) in key.0.iter().enumerate() {
229                            if let Some(column_name) = state.constraint.report_columns.get(idx) {
230                                values.insert(column_name.clone(), value.as_string());
231                            }
232                        }
233                        UniqueConstraintSample { values, count }
234                    })
235                    .collect::<Vec<_>>();
236                UniqueConstraintResult {
237                    columns: state.constraint.report_columns.clone(),
238                    duplicates_count: state.duplicates_count,
239                    batch_duplicates_count: state.batch_duplicates_count,
240                    target_duplicates_count: state.target_duplicates_count,
241                    affected_rows_count: state.duplicates_count,
242                    samples,
243                }
244            })
245            .collect()
246    }
247}
248
249pub fn unique_errors(
250    df: &DataFrame,
251    columns: &[config::ColumnConfig],
252    _indices: &ColumnIndex,
253) -> FloeResult<Vec<Vec<RowError>>> {
254    let mut tracker = UniqueTracker::new(columns);
255    tracker.apply(df, columns)
256}
257
258pub fn unique_errors_sparse(
259    df: &DataFrame,
260    columns: &[config::ColumnConfig],
261    _indices: &ColumnIndex,
262) -> FloeResult<SparseRowErrors> {
263    let mut tracker = UniqueTracker::new(columns);
264    tracker.apply_sparse(df, columns)
265}
266
267pub fn unique_counts(
268    df: &DataFrame,
269    columns: &[config::ColumnConfig],
270) -> FloeResult<Vec<(String, u64)>> {
271    if df.height() == 0 {
272        return Ok(Vec::new());
273    }
274
275    let unique_columns: Vec<&config::ColumnConfig> = columns
276        .iter()
277        .filter(|col| col.unique == Some(true))
278        .collect();
279    if unique_columns.is_empty() {
280        return Ok(Vec::new());
281    }
282
283    let mut counts = Vec::new();
284    for column in unique_columns {
285        let series = df.column(&column.name).map_err(|err| {
286            Box::new(RunError(format!(
287                "unique column {} not found: {err}",
288                column.name
289            )))
290        })?;
291        let non_null = series.len().saturating_sub(series.null_count());
292        if non_null == 0 {
293            continue;
294        }
295        let unique = series.drop_nulls().n_unique().map_err(|err| {
296            Box::new(RunError(format!(
297                "unique column {} read failed: {err}",
298                column.name
299            )))
300        })?;
301        let violations = non_null.saturating_sub(unique) as u64;
302        if violations > 0 {
303            counts.push((column.name.clone(), violations));
304        }
305    }
306
307    Ok(counts)
308}
309
310pub fn resolve_schema_unique_keys(schema: &config::SchemaConfig) -> Vec<Vec<String>> {
311    let mut seen = HashSet::new();
312    let mut constraints = Vec::new();
313
314    if let Some(unique_keys) = schema.unique_keys.as_ref() {
315        for key in unique_keys {
316            let normalized = key
317                .iter()
318                .map(|column| column.trim().to_string())
319                .collect::<Vec<_>>();
320            if normalized.is_empty() {
321                continue;
322            }
323            let signature = normalized.join("\u{1f}");
324            if seen.insert(signature) {
325                constraints.push(normalized);
326            }
327        }
328    } else {
329        for column in legacy_unique_constraints(&schema.columns) {
330            let constraint = vec![column];
331            let signature = constraint.join("\u{1f}");
332            if seen.insert(signature) {
333                constraints.push(constraint);
334            }
335        }
336    }
337
338    if let Some(primary_key) = schema.primary_key.as_ref() {
339        let normalized = primary_key
340            .iter()
341            .map(|column| column.trim().to_string())
342            .collect::<Vec<_>>();
343        if !normalized.is_empty() {
344            let signature = normalized.join("\u{1f}");
345            if seen.insert(signature) {
346                constraints.push(normalized);
347            }
348        }
349    }
350
351    constraints
352}
353
354fn legacy_unique_constraints(columns: &[config::ColumnConfig]) -> Vec<String> {
355    columns
356        .iter()
357        .filter(|col| col.unique == Some(true))
358        .map(|col| col.name.trim().to_string())
359        .filter(|name| !name.is_empty())
360        .collect()
361}
362
363fn load_constraint_columns(df: &DataFrame, columns: &[String]) -> FloeResult<Vec<Series>> {
364    let mut output = Vec::with_capacity(columns.len());
365    for column in columns {
366        let series = df.column(column).map_err(|err| {
367            Box::new(RunError(format!(
368                "unique constraint column {} not found: {err}",
369                column
370            )))
371        })?;
372        output.push(series.as_materialized_series().rechunk());
373    }
374    Ok(output)
375}
376
377fn composite_key_from_row(columns: &[Series], row_idx: usize) -> FloeResult<Option<CompositeKey>> {
378    let mut key = Vec::with_capacity(columns.len());
379    for series in columns {
380        let value = series.get(row_idx).map_err(|err| {
381            Box::new(RunError(format!(
382                "unique constraint read failed at row {}: {err}",
383                row_idx
384            )))
385        })?;
386        let Some(value) = unique_key(value) else {
387            return Ok(None);
388        };
389        key.push(value);
390    }
391    Ok(Some(CompositeKey(key)))
392}
393
394fn unique_key(value: AnyValue) -> Option<UniqueKey> {
395    match value {
396        AnyValue::Null => None,
397        AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
398        AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
399        AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
400        AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
401        AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
402        AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
403        AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
404        AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
405        AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
406        AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
407        AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
408        AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
409        AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
410        AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
411        AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
412        other => Some(UniqueKey::Other(other.to_string())),
413    }
414}