Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{AnyValue, DataFrame, Series};
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8const UNIQUE_SAMPLE_LIMIT: usize = 5;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11enum UniqueKey {
12    Bool(bool),
13    I64(i64),
14    U64(u64),
15    F64(u64),
16    String(String),
17    Other(String),
18}
19
20impl UniqueKey {
21    fn as_string(&self) -> String {
22        match self {
23            UniqueKey::Bool(value) => value.to_string(),
24            UniqueKey::I64(value) => value.to_string(),
25            UniqueKey::U64(value) => value.to_string(),
26            UniqueKey::F64(value) => f64::from_bits(*value).to_string(),
27            UniqueKey::String(value) | UniqueKey::Other(value) => value.clone(),
28        }
29    }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33struct CompositeKey(Vec<UniqueKey>);
34
35#[derive(Debug, Clone)]
36pub struct UniqueConstraint {
37    pub runtime_columns: Vec<String>,
38    pub report_columns: Vec<String>,
39    pub enforce_reject: bool,
40}
41
42#[derive(Debug, Clone)]
43pub struct UniqueConstraintSample {
44    pub values: BTreeMap<String, String>,
45    pub count: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct UniqueConstraintResult {
50    pub columns: Vec<String>,
51    pub duplicates_count: u64,
52    pub affected_rows_count: u64,
53    pub samples: Vec<UniqueConstraintSample>,
54}
55
56#[derive(Debug, Clone)]
57struct ConstraintState {
58    constraint: UniqueConstraint,
59    seen: HashSet<CompositeKey>,
60    duplicates_count: u64,
61    sample_counts: HashMap<CompositeKey, u64>,
62}
63
64#[derive(Debug, Default)]
65pub struct UniqueTracker {
66    states: Vec<ConstraintState>,
67}
68
69impl UniqueTracker {
70    pub fn new(columns: &[config::ColumnConfig]) -> Self {
71        let constraints = legacy_unique_constraints(columns)
72            .into_iter()
73            .map(|column| UniqueConstraint {
74                runtime_columns: vec![column.clone()],
75                report_columns: vec![column],
76                enforce_reject: false,
77            })
78            .collect::<Vec<_>>();
79        Self::with_constraints(constraints)
80    }
81
82    pub fn with_constraints(constraints: Vec<UniqueConstraint>) -> Self {
83        let states = constraints
84            .into_iter()
85            .map(|constraint| ConstraintState {
86                constraint,
87                seen: HashSet::new(),
88                duplicates_count: 0,
89                sample_counts: HashMap::new(),
90            })
91            .collect();
92        Self { states }
93    }
94
95    pub fn is_empty(&self) -> bool {
96        self.states.is_empty()
97    }
98
99    pub fn runtime_columns(&self) -> Vec<String> {
100        let mut seen = HashSet::new();
101        let mut columns = Vec::new();
102        for state in &self.states {
103            for column in &state.constraint.runtime_columns {
104                if seen.insert(column.clone()) {
105                    columns.push(column.clone());
106                }
107            }
108        }
109        columns
110    }
111
112    pub fn seed_from_df(&mut self, df: &DataFrame) -> FloeResult<()> {
113        if df.height() == 0 || self.states.is_empty() {
114            return Ok(());
115        }
116        for state in &mut self.states {
117            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
118            for row_idx in 0..df.height() {
119                let key = match composite_key_from_row(&columns, row_idx)? {
120                    Some(key) => key,
121                    None => continue,
122                };
123                state.seen.insert(key);
124            }
125        }
126        Ok(())
127    }
128
129    pub fn apply(
130        &mut self,
131        df: &DataFrame,
132        columns: &[config::ColumnConfig],
133    ) -> FloeResult<Vec<Vec<RowError>>> {
134        let mut errors_per_row = vec![Vec::new(); df.height()];
135        let sparse = self.apply_sparse(df, columns)?;
136        for (row_idx, row_errors) in sparse.iter() {
137            if let Some(slot) = errors_per_row.get_mut(*row_idx) {
138                slot.extend(row_errors.clone());
139            }
140        }
141        Ok(errors_per_row)
142    }
143
144    pub fn apply_sparse(
145        &mut self,
146        df: &DataFrame,
147        _columns: &[config::ColumnConfig],
148    ) -> FloeResult<SparseRowErrors> {
149        let mut forced_reject_rows = HashSet::new();
150        self.apply_sparse_with_forced_rejects(df, _columns, &mut forced_reject_rows)
151    }
152
153    pub fn apply_sparse_with_forced_rejects(
154        &mut self,
155        df: &DataFrame,
156        _columns: &[config::ColumnConfig],
157        forced_reject_rows: &mut HashSet<usize>,
158    ) -> FloeResult<SparseRowErrors> {
159        let mut errors = SparseRowErrors::new(df.height());
160        if df.height() == 0 || self.states.is_empty() {
161            return Ok(errors);
162        }
163
164        for state in &mut self.states {
165            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
166            let report_columns = state.constraint.report_columns.clone();
167            let (constraint_repr, message) = if report_columns.len() == 1 {
168                (report_columns[0].clone(), "duplicate value")
169            } else {
170                (format!("[{}]", report_columns.join(",")), "duplicate key")
171            };
172            for row_idx in 0..df.height() {
173                let key = match composite_key_from_row(&columns, row_idx)? {
174                    Some(key) => key,
175                    None => continue,
176                };
177                if state.seen.contains(&key) {
178                    errors.add_error(row_idx, RowError::new("unique", &constraint_repr, message));
179                    if state.constraint.enforce_reject {
180                        forced_reject_rows.insert(row_idx);
181                    }
182                    state.duplicates_count += 1;
183                    let counter = state.sample_counts.entry(key).or_insert(0);
184                    *counter += 1;
185                } else {
186                    state.seen.insert(key);
187                }
188            }
189        }
190
191        Ok(errors)
192    }
193
194    pub fn results(&self) -> Vec<UniqueConstraintResult> {
195        self.states
196            .iter()
197            .map(|state| {
198                let mut sample_counts = state
199                    .sample_counts
200                    .iter()
201                    .map(|(key, count)| (key, *count))
202                    .collect::<Vec<_>>();
203                sample_counts.sort_by(|left, right| {
204                    right
205                        .1
206                        .cmp(&left.1)
207                        .then_with(|| format!("{:?}", left.0).cmp(&format!("{:?}", right.0)))
208                });
209                let samples = sample_counts
210                    .into_iter()
211                    .take(UNIQUE_SAMPLE_LIMIT)
212                    .map(|(key, count)| {
213                        let mut values = BTreeMap::new();
214                        for (idx, value) in key.0.iter().enumerate() {
215                            if let Some(column_name) = state.constraint.report_columns.get(idx) {
216                                values.insert(column_name.clone(), value.as_string());
217                            }
218                        }
219                        UniqueConstraintSample { values, count }
220                    })
221                    .collect::<Vec<_>>();
222                UniqueConstraintResult {
223                    columns: state.constraint.report_columns.clone(),
224                    duplicates_count: state.duplicates_count,
225                    affected_rows_count: state.duplicates_count,
226                    samples,
227                }
228            })
229            .collect()
230    }
231}
232
233pub fn unique_errors(
234    df: &DataFrame,
235    columns: &[config::ColumnConfig],
236    _indices: &ColumnIndex,
237) -> FloeResult<Vec<Vec<RowError>>> {
238    let mut tracker = UniqueTracker::new(columns);
239    tracker.apply(df, columns)
240}
241
242pub fn unique_errors_sparse(
243    df: &DataFrame,
244    columns: &[config::ColumnConfig],
245    _indices: &ColumnIndex,
246) -> FloeResult<SparseRowErrors> {
247    let mut tracker = UniqueTracker::new(columns);
248    tracker.apply_sparse(df, columns)
249}
250
251pub fn unique_counts(
252    df: &DataFrame,
253    columns: &[config::ColumnConfig],
254) -> FloeResult<Vec<(String, u64)>> {
255    if df.height() == 0 {
256        return Ok(Vec::new());
257    }
258
259    let unique_columns: Vec<&config::ColumnConfig> = columns
260        .iter()
261        .filter(|col| col.unique == Some(true))
262        .collect();
263    if unique_columns.is_empty() {
264        return Ok(Vec::new());
265    }
266
267    let mut counts = Vec::new();
268    for column in unique_columns {
269        let series = df.column(&column.name).map_err(|err| {
270            Box::new(RunError(format!(
271                "unique column {} not found: {err}",
272                column.name
273            )))
274        })?;
275        let non_null = series.len().saturating_sub(series.null_count());
276        if non_null == 0 {
277            continue;
278        }
279        let unique = series.drop_nulls().n_unique().map_err(|err| {
280            Box::new(RunError(format!(
281                "unique column {} read failed: {err}",
282                column.name
283            )))
284        })?;
285        let violations = non_null.saturating_sub(unique) as u64;
286        if violations > 0 {
287            counts.push((column.name.clone(), violations));
288        }
289    }
290
291    Ok(counts)
292}
293
294pub fn resolve_schema_unique_keys(schema: &config::SchemaConfig) -> Vec<Vec<String>> {
295    let mut seen = HashSet::new();
296    let mut constraints = Vec::new();
297
298    if let Some(unique_keys) = schema.unique_keys.as_ref() {
299        for key in unique_keys {
300            let normalized = key
301                .iter()
302                .map(|column| column.trim().to_string())
303                .collect::<Vec<_>>();
304            if normalized.is_empty() {
305                continue;
306            }
307            let signature = normalized.join("\u{1f}");
308            if seen.insert(signature) {
309                constraints.push(normalized);
310            }
311        }
312    } else {
313        for column in legacy_unique_constraints(&schema.columns) {
314            let constraint = vec![column];
315            let signature = constraint.join("\u{1f}");
316            if seen.insert(signature) {
317                constraints.push(constraint);
318            }
319        }
320    }
321
322    if let Some(primary_key) = schema.primary_key.as_ref() {
323        let normalized = primary_key
324            .iter()
325            .map(|column| column.trim().to_string())
326            .collect::<Vec<_>>();
327        if !normalized.is_empty() {
328            let signature = normalized.join("\u{1f}");
329            if seen.insert(signature) {
330                constraints.push(normalized);
331            }
332        }
333    }
334
335    constraints
336}
337
338fn legacy_unique_constraints(columns: &[config::ColumnConfig]) -> Vec<String> {
339    columns
340        .iter()
341        .filter(|col| col.unique == Some(true))
342        .map(|col| col.name.trim().to_string())
343        .filter(|name| !name.is_empty())
344        .collect()
345}
346
347fn load_constraint_columns(df: &DataFrame, columns: &[String]) -> FloeResult<Vec<Series>> {
348    let mut output = Vec::with_capacity(columns.len());
349    for column in columns {
350        let series = df.column(column).map_err(|err| {
351            Box::new(RunError(format!(
352                "unique constraint column {} not found: {err}",
353                column
354            )))
355        })?;
356        output.push(series.as_materialized_series().rechunk());
357    }
358    Ok(output)
359}
360
361fn composite_key_from_row(columns: &[Series], row_idx: usize) -> FloeResult<Option<CompositeKey>> {
362    let mut key = Vec::with_capacity(columns.len());
363    for series in columns {
364        let value = series.get(row_idx).map_err(|err| {
365            Box::new(RunError(format!(
366                "unique constraint read failed at row {}: {err}",
367                row_idx
368            )))
369        })?;
370        let Some(value) = unique_key(value) else {
371            return Ok(None);
372        };
373        key.push(value);
374    }
375    Ok(Some(CompositeKey(key)))
376}
377
378fn unique_key(value: AnyValue) -> Option<UniqueKey> {
379    match value {
380        AnyValue::Null => None,
381        AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
382        AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
383        AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
384        AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
385        AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
386        AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
387        AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
388        AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
389        AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
390        AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
391        AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
392        AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
393        AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
394        AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
395        AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
396        other => Some(UniqueKey::Other(other.to_string())),
397    }
398}