Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash)]
9enum UniqueKey {
10    Bool(bool),
11    I64(i64),
12    U64(u64),
13    F64(u64),
14    String(String),
15    Other(String),
16}
17
18fn unique_key(value: AnyValue) -> Option<UniqueKey> {
19    match value {
20        AnyValue::Null => None,
21        AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
22        AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
23        AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
24        AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
25        AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
26        AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
27        AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
28        AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
29        AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
30        AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
31        AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
32        AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
33        AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
34        AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
35        AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
36        other => Some(UniqueKey::Other(other.to_string())),
37    }
38}
39
40pub fn unique_errors(
41    df: &DataFrame,
42    columns: &[config::ColumnConfig],
43    indices: &ColumnIndex,
44) -> FloeResult<Vec<Vec<RowError>>> {
45    let mut errors_per_row = vec![Vec::new(); df.height()];
46    let unique_columns: Vec<&config::ColumnConfig> = columns
47        .iter()
48        .filter(|col| col.unique == Some(true))
49        .collect();
50    if unique_columns.is_empty() {
51        return Ok(errors_per_row);
52    }
53
54    for column in unique_columns {
55        let index = indices.get(&column.name).ok_or_else(|| {
56            Box::new(RunError(format!("unique column {} not found", column.name)))
57        })?;
58        let series = df.select_at_idx(*index).ok_or_else(|| {
59            Box::new(RunError(format!("unique column {} not found", column.name)))
60        })?;
61        let series = series.as_materialized_series();
62        let non_null = series.len().saturating_sub(series.null_count());
63        if non_null == 0 {
64            continue;
65        }
66        let mut duplicate_mask = is_duplicated(series).map_err(|err| {
67            Box::new(RunError(format!(
68                "unique column {} read failed: {err}",
69                column.name
70            )))
71        })?;
72        let not_null = series.is_not_null();
73        duplicate_mask = &duplicate_mask & &not_null;
74        let mut first_mask = is_first_distinct(series).map_err(|err| {
75            Box::new(RunError(format!(
76                "unique column {} read failed: {err}",
77                column.name
78            )))
79        })?;
80        first_mask = &first_mask & &not_null;
81        let mask = duplicate_mask & !first_mask;
82        for (row_idx, is_dup) in mask.into_iter().enumerate() {
83            if is_dup == Some(true) {
84                errors_per_row[row_idx].push(RowError::new(
85                    "unique",
86                    &column.name,
87                    "duplicate value",
88                ));
89            }
90        }
91    }
92
93    Ok(errors_per_row)
94}
95
96pub fn unique_errors_sparse(
97    df: &DataFrame,
98    columns: &[config::ColumnConfig],
99    indices: &ColumnIndex,
100) -> FloeResult<SparseRowErrors> {
101    let mut errors = SparseRowErrors::new(df.height());
102    if df.height() == 0 {
103        return Ok(errors);
104    }
105    let unique_columns: Vec<&config::ColumnConfig> = columns
106        .iter()
107        .filter(|col| col.unique == Some(true))
108        .collect();
109    if unique_columns.is_empty() {
110        return Ok(errors);
111    }
112
113    for column in unique_columns {
114        let index = indices.get(&column.name).ok_or_else(|| {
115            Box::new(RunError(format!(
116                "unique column {} not found in dataframe",
117                column.name
118            )))
119        })?;
120        let series = df
121            .select_at_idx(*index)
122            .ok_or_else(|| {
123                Box::new(RunError(format!(
124                    "unique column {} not found in dataframe",
125                    column.name
126                )))
127            })?
128            .as_materialized_series()
129            .rechunk();
130        let mut seen: HashSet<UniqueKey> = HashSet::new();
131        for (row_idx, value) in series.iter().enumerate() {
132            let key = match unique_key(value) {
133                Some(key) => key,
134                None => continue,
135            };
136            if seen.contains(&key) {
137                errors.add_error(
138                    row_idx,
139                    RowError::new("unique", &column.name, "duplicate value"),
140                );
141            } else {
142                seen.insert(key);
143            }
144        }
145    }
146
147    Ok(errors)
148}
149
150#[derive(Debug, Default)]
151pub struct UniqueTracker {
152    seen: HashMap<String, HashSet<UniqueKey>>,
153}
154
155impl UniqueTracker {
156    pub fn new(columns: &[config::ColumnConfig]) -> Self {
157        let mut seen = HashMap::new();
158        for column in columns.iter().filter(|col| col.unique == Some(true)) {
159            seen.insert(column.name.clone(), HashSet::new());
160        }
161        Self { seen }
162    }
163
164    pub fn is_empty(&self) -> bool {
165        self.seen.is_empty()
166    }
167
168    pub fn apply(
169        &mut self,
170        df: &DataFrame,
171        columns: &[config::ColumnConfig],
172    ) -> FloeResult<Vec<Vec<RowError>>> {
173        let mut errors_per_row = vec![Vec::new(); df.height()];
174        if df.height() == 0 {
175            return Ok(errors_per_row);
176        }
177        let unique_columns: Vec<&config::ColumnConfig> = columns
178            .iter()
179            .filter(|col| col.unique == Some(true))
180            .collect();
181        if unique_columns.is_empty() {
182            return Ok(errors_per_row);
183        }
184
185        for column in unique_columns {
186            let series = df.column(&column.name).map_err(|err| {
187                Box::new(RunError(format!(
188                    "unique column {} not found: {err}",
189                    column.name
190                )))
191            })?;
192            let series = series.as_materialized_series().rechunk();
193            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
194                Box::new(RunError(format!(
195                    "unique column {} not tracked",
196                    column.name
197                )))
198            })?;
199            for (row_idx, value) in series.iter().enumerate() {
200                let key = match unique_key(value) {
201                    Some(key) => key,
202                    None => continue,
203                };
204                if seen.contains(&key) {
205                    errors_per_row[row_idx].push(RowError::new(
206                        "unique",
207                        &column.name,
208                        "duplicate value",
209                    ));
210                } else {
211                    seen.insert(key);
212                }
213            }
214        }
215
216        Ok(errors_per_row)
217    }
218
219    pub fn apply_sparse(
220        &mut self,
221        df: &DataFrame,
222        columns: &[config::ColumnConfig],
223    ) -> FloeResult<SparseRowErrors> {
224        let mut errors = SparseRowErrors::new(df.height());
225        if df.height() == 0 {
226            return Ok(errors);
227        }
228        let unique_columns: Vec<&config::ColumnConfig> = columns
229            .iter()
230            .filter(|col| col.unique == Some(true))
231            .collect();
232        if unique_columns.is_empty() {
233            return Ok(errors);
234        }
235
236        for column in unique_columns {
237            let series = df.column(&column.name).map_err(|err| {
238                Box::new(RunError(format!(
239                    "unique column {} not found: {err}",
240                    column.name
241                )))
242            })?;
243            let series = series.as_materialized_series().rechunk();
244            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
245                Box::new(RunError(format!(
246                    "unique column {} not tracked",
247                    column.name
248                )))
249            })?;
250            for (row_idx, value) in series.iter().enumerate() {
251                let key = match unique_key(value) {
252                    Some(key) => key,
253                    None => continue,
254                };
255                if seen.contains(&key) {
256                    errors.add_error(
257                        row_idx,
258                        RowError::new("unique", &column.name, "duplicate value"),
259                    );
260                } else {
261                    seen.insert(key);
262                }
263            }
264        }
265
266        Ok(errors)
267    }
268}
269
270pub fn unique_counts(
271    df: &DataFrame,
272    columns: &[config::ColumnConfig],
273) -> FloeResult<Vec<(String, u64)>> {
274    if df.height() == 0 {
275        return Ok(Vec::new());
276    }
277
278    let unique_columns: Vec<&config::ColumnConfig> = columns
279        .iter()
280        .filter(|col| col.unique == Some(true))
281        .collect();
282    if unique_columns.is_empty() {
283        return Ok(Vec::new());
284    }
285
286    let mut counts = Vec::new();
287    for column in unique_columns {
288        let series = df.column(&column.name).map_err(|err| {
289            Box::new(RunError(format!(
290                "unique column {} not found: {err}",
291                column.name
292            )))
293        })?;
294        let non_null = series.len().saturating_sub(series.null_count());
295        if non_null == 0 {
296            continue;
297        }
298        let unique = series.drop_nulls().n_unique().map_err(|err| {
299            Box::new(RunError(format!(
300                "unique column {} read failed: {err}",
301                column.name
302            )))
303        })?;
304        let violations = non_null.saturating_sub(unique) as u64;
305        if violations > 0 {
306            counts.push((column.name.clone(), violations));
307        }
308    }
309
310    Ok(counts)
311}