Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8pub fn unique_errors(
9    df: &DataFrame,
10    columns: &[config::ColumnConfig],
11    indices: &ColumnIndex,
12) -> FloeResult<Vec<Vec<RowError>>> {
13    let mut errors_per_row = vec![Vec::new(); df.height()];
14    let unique_columns: Vec<&config::ColumnConfig> = columns
15        .iter()
16        .filter(|col| col.unique == Some(true))
17        .collect();
18    if unique_columns.is_empty() {
19        return Ok(errors_per_row);
20    }
21
22    for column in unique_columns {
23        let index = indices.get(&column.name).ok_or_else(|| {
24            Box::new(RunError(format!("unique column {} not found", column.name)))
25        })?;
26        let series = df.select_at_idx(*index).ok_or_else(|| {
27            Box::new(RunError(format!("unique column {} not found", column.name)))
28        })?;
29        let series = series.as_materialized_series();
30        let non_null = series.len().saturating_sub(series.null_count());
31        if non_null == 0 {
32            continue;
33        }
34        let mut duplicate_mask = is_duplicated(series).map_err(|err| {
35            Box::new(RunError(format!(
36                "unique column {} read failed: {err}",
37                column.name
38            )))
39        })?;
40        let not_null = series.is_not_null();
41        duplicate_mask = &duplicate_mask & &not_null;
42        let mut first_mask = is_first_distinct(series).map_err(|err| {
43            Box::new(RunError(format!(
44                "unique column {} read failed: {err}",
45                column.name
46            )))
47        })?;
48        first_mask = &first_mask & &not_null;
49        let mask = duplicate_mask & !first_mask;
50        for (row_idx, is_dup) in mask.into_iter().enumerate() {
51            if is_dup == Some(true) {
52                errors_per_row[row_idx].push(RowError::new(
53                    "unique",
54                    &column.name,
55                    "duplicate value",
56                ));
57            }
58        }
59    }
60
61    Ok(errors_per_row)
62}
63
64#[derive(Debug, Default)]
65pub struct UniqueTracker {
66    seen: HashMap<String, HashSet<String>>,
67}
68
69impl UniqueTracker {
70    pub fn new(columns: &[config::ColumnConfig]) -> Self {
71        let mut seen = HashMap::new();
72        for column in columns.iter().filter(|col| col.unique == Some(true)) {
73            seen.insert(column.name.clone(), HashSet::new());
74        }
75        Self { seen }
76    }
77
78    pub fn is_empty(&self) -> bool {
79        self.seen.is_empty()
80    }
81
82    pub fn apply(
83        &mut self,
84        df: &DataFrame,
85        columns: &[config::ColumnConfig],
86    ) -> FloeResult<Vec<Vec<RowError>>> {
87        let mut errors_per_row = vec![Vec::new(); df.height()];
88        if df.height() == 0 {
89            return Ok(errors_per_row);
90        }
91        let unique_columns: Vec<&config::ColumnConfig> = columns
92            .iter()
93            .filter(|col| col.unique == Some(true))
94            .collect();
95        if unique_columns.is_empty() {
96            return Ok(errors_per_row);
97        }
98
99        for column in unique_columns {
100            let series = df.column(&column.name).map_err(|err| {
101                Box::new(RunError(format!(
102                    "unique column {} not found: {err}",
103                    column.name
104                )))
105            })?;
106            let series = series.as_materialized_series().rechunk();
107            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
108                Box::new(RunError(format!(
109                    "unique column {} not tracked",
110                    column.name
111                )))
112            })?;
113            for (row_idx, value) in series.iter().enumerate() {
114                if matches!(value, AnyValue::Null) {
115                    continue;
116                }
117                let key = value.to_string();
118                if seen.contains(&key) {
119                    errors_per_row[row_idx].push(RowError::new(
120                        "unique",
121                        &column.name,
122                        "duplicate value",
123                    ));
124                } else {
125                    seen.insert(key);
126                }
127            }
128        }
129
130        Ok(errors_per_row)
131    }
132}
133
134pub fn unique_counts(
135    df: &DataFrame,
136    columns: &[config::ColumnConfig],
137) -> FloeResult<Vec<(String, u64)>> {
138    if df.height() == 0 {
139        return Ok(Vec::new());
140    }
141
142    let unique_columns: Vec<&config::ColumnConfig> = columns
143        .iter()
144        .filter(|col| col.unique == Some(true))
145        .collect();
146    if unique_columns.is_empty() {
147        return Ok(Vec::new());
148    }
149
150    let mut counts = Vec::new();
151    for column in unique_columns {
152        let series = df.column(&column.name).map_err(|err| {
153            Box::new(RunError(format!(
154                "unique column {} not found: {err}",
155                column.name
156            )))
157        })?;
158        let non_null = series.len().saturating_sub(series.null_count());
159        if non_null == 0 {
160            continue;
161        }
162        let unique = series.drop_nulls().n_unique().map_err(|err| {
163            Box::new(RunError(format!(
164                "unique column {} read failed: {err}",
165                column.name
166            )))
167        })?;
168        let violations = non_null.saturating_sub(unique) as u64;
169        if violations > 0 {
170            counts.push((column.name.clone(), violations));
171        }
172    }
173
174    Ok(counts)
175}