Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{is_duplicated, is_first_distinct, AnyValue, DataFrame};
2use std::collections::{HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash)]
9enum UniqueKey {
10    Bool(bool),
11    I64(i64),
12    U64(u64),
13    F64(u64),
14    String(String),
15    Other(String),
16}
17
18fn unique_key(value: AnyValue) -> Option<UniqueKey> {
19    match value {
20        AnyValue::Null => None,
21        AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
22        AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
23        AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
24        AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
25        AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
26        AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
27        AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
28        AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
29        AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
30        AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
31        AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
32        AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
33        AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
34        AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
35        AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
36        other => Some(UniqueKey::Other(other.to_string())),
37    }
38}
39
40pub fn unique_errors(
41    df: &DataFrame,
42    columns: &[config::ColumnConfig],
43    indices: &ColumnIndex,
44) -> FloeResult<Vec<Vec<RowError>>> {
45    let mut errors_per_row = vec![Vec::new(); df.height()];
46    let unique_columns: Vec<&config::ColumnConfig> = columns
47        .iter()
48        .filter(|col| col.unique == Some(true))
49        .collect();
50    if unique_columns.is_empty() {
51        return Ok(errors_per_row);
52    }
53
54    for column in unique_columns {
55        let index = indices.get(&column.name).ok_or_else(|| {
56            Box::new(RunError(format!("unique column {} not found", column.name)))
57        })?;
58        let series = df.select_at_idx(*index).ok_or_else(|| {
59            Box::new(RunError(format!("unique column {} not found", column.name)))
60        })?;
61        let series = series.as_materialized_series();
62        let non_null = series.len().saturating_sub(series.null_count());
63        if non_null == 0 {
64            continue;
65        }
66        let mut duplicate_mask = is_duplicated(series).map_err(|err| {
67            Box::new(RunError(format!(
68                "unique column {} read failed: {err}",
69                column.name
70            )))
71        })?;
72        let not_null = series.is_not_null();
73        duplicate_mask = &duplicate_mask & &not_null;
74        let mut first_mask = is_first_distinct(series).map_err(|err| {
75            Box::new(RunError(format!(
76                "unique column {} read failed: {err}",
77                column.name
78            )))
79        })?;
80        first_mask = &first_mask & &not_null;
81        let mask = duplicate_mask & !first_mask;
82        for (row_idx, is_dup) in mask.into_iter().enumerate() {
83            if is_dup == Some(true) {
84                errors_per_row[row_idx].push(RowError::new(
85                    "unique",
86                    &column.name,
87                    "duplicate value",
88                ));
89            }
90        }
91    }
92
93    Ok(errors_per_row)
94}
95
96pub fn unique_errors_sparse(
97    df: &DataFrame,
98    columns: &[config::ColumnConfig],
99    indices: &ColumnIndex,
100) -> FloeResult<SparseRowErrors> {
101    let mut errors = SparseRowErrors::new(df.height());
102    if df.height() == 0 {
103        return Ok(errors);
104    }
105    let unique_columns: Vec<&config::ColumnConfig> = columns
106        .iter()
107        .filter(|col| col.unique == Some(true))
108        .collect();
109    if unique_columns.is_empty() {
110        return Ok(errors);
111    }
112
113    for column in unique_columns {
114        let index = indices.get(&column.name).ok_or_else(|| {
115            Box::new(RunError(format!(
116                "unique column {} not found in dataframe",
117                column.name
118            )))
119        })?;
120        let series = df
121            .select_at_idx(*index)
122            .ok_or_else(|| {
123                Box::new(RunError(format!(
124                    "unique column {} not found in dataframe",
125                    column.name
126                )))
127            })?
128            .as_materialized_series()
129            .rechunk();
130        let mut seen: HashSet<UniqueKey> = HashSet::new();
131        for (row_idx, value) in series.iter().enumerate() {
132            let key = match unique_key(value) {
133                Some(key) => key,
134                None => continue,
135            };
136            if seen.contains(&key) {
137                errors.add_error(
138                    row_idx,
139                    RowError::new("unique", &column.name, "duplicate value"),
140                );
141            } else {
142                seen.insert(key);
143            }
144        }
145    }
146
147    Ok(errors)
148}
149
150#[derive(Debug, Default)]
151pub struct UniqueTracker {
152    seen: HashMap<String, HashSet<UniqueKey>>,
153}
154
155impl UniqueTracker {
156    pub fn new(columns: &[config::ColumnConfig]) -> Self {
157        let mut seen = HashMap::new();
158        for column in columns.iter().filter(|col| col.unique == Some(true)) {
159            seen.insert(column.name.clone(), HashSet::new());
160        }
161        Self { seen }
162    }
163
164    pub fn is_empty(&self) -> bool {
165        self.seen.is_empty()
166    }
167
168    pub fn seed_from_df(
169        &mut self,
170        df: &DataFrame,
171        columns: &[config::ColumnConfig],
172    ) -> FloeResult<()> {
173        if df.height() == 0 || self.seen.is_empty() {
174            return Ok(());
175        }
176        let unique_columns: Vec<&config::ColumnConfig> = columns
177            .iter()
178            .filter(|col| col.unique == Some(true))
179            .collect();
180        if unique_columns.is_empty() {
181            return Ok(());
182        }
183
184        for column in unique_columns {
185            let series = df.column(&column.name).map_err(|err| {
186                Box::new(RunError(format!(
187                    "unique column {} not found: {err}",
188                    column.name
189                )))
190            })?;
191            let series = series.as_materialized_series().rechunk();
192            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
193                Box::new(RunError(format!(
194                    "unique column {} not tracked",
195                    column.name
196                )))
197            })?;
198            for value in series.iter() {
199                let key = match unique_key(value) {
200                    Some(key) => key,
201                    None => continue,
202                };
203                seen.insert(key);
204            }
205        }
206
207        Ok(())
208    }
209
210    pub fn apply(
211        &mut self,
212        df: &DataFrame,
213        columns: &[config::ColumnConfig],
214    ) -> FloeResult<Vec<Vec<RowError>>> {
215        let mut errors_per_row = vec![Vec::new(); df.height()];
216        if df.height() == 0 {
217            return Ok(errors_per_row);
218        }
219        let unique_columns: Vec<&config::ColumnConfig> = columns
220            .iter()
221            .filter(|col| col.unique == Some(true))
222            .collect();
223        if unique_columns.is_empty() {
224            return Ok(errors_per_row);
225        }
226
227        for column in unique_columns {
228            let series = df.column(&column.name).map_err(|err| {
229                Box::new(RunError(format!(
230                    "unique column {} not found: {err}",
231                    column.name
232                )))
233            })?;
234            let series = series.as_materialized_series().rechunk();
235            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
236                Box::new(RunError(format!(
237                    "unique column {} not tracked",
238                    column.name
239                )))
240            })?;
241            for (row_idx, value) in series.iter().enumerate() {
242                let key = match unique_key(value) {
243                    Some(key) => key,
244                    None => continue,
245                };
246                if seen.contains(&key) {
247                    errors_per_row[row_idx].push(RowError::new(
248                        "unique",
249                        &column.name,
250                        "duplicate value",
251                    ));
252                } else {
253                    seen.insert(key);
254                }
255            }
256        }
257
258        Ok(errors_per_row)
259    }
260
261    pub fn apply_sparse(
262        &mut self,
263        df: &DataFrame,
264        columns: &[config::ColumnConfig],
265    ) -> FloeResult<SparseRowErrors> {
266        let mut errors = SparseRowErrors::new(df.height());
267        if df.height() == 0 {
268            return Ok(errors);
269        }
270        let unique_columns: Vec<&config::ColumnConfig> = columns
271            .iter()
272            .filter(|col| col.unique == Some(true))
273            .collect();
274        if unique_columns.is_empty() {
275            return Ok(errors);
276        }
277
278        for column in unique_columns {
279            let series = df.column(&column.name).map_err(|err| {
280                Box::new(RunError(format!(
281                    "unique column {} not found: {err}",
282                    column.name
283                )))
284            })?;
285            let series = series.as_materialized_series().rechunk();
286            let seen = self.seen.get_mut(&column.name).ok_or_else(|| {
287                Box::new(RunError(format!(
288                    "unique column {} not tracked",
289                    column.name
290                )))
291            })?;
292            for (row_idx, value) in series.iter().enumerate() {
293                let key = match unique_key(value) {
294                    Some(key) => key,
295                    None => continue,
296                };
297                if seen.contains(&key) {
298                    errors.add_error(
299                        row_idx,
300                        RowError::new("unique", &column.name, "duplicate value"),
301                    );
302                } else {
303                    seen.insert(key);
304                }
305            }
306        }
307
308        Ok(errors)
309    }
310}
311
312pub fn unique_counts(
313    df: &DataFrame,
314    columns: &[config::ColumnConfig],
315) -> FloeResult<Vec<(String, u64)>> {
316    if df.height() == 0 {
317        return Ok(Vec::new());
318    }
319
320    let unique_columns: Vec<&config::ColumnConfig> = columns
321        .iter()
322        .filter(|col| col.unique == Some(true))
323        .collect();
324    if unique_columns.is_empty() {
325        return Ok(Vec::new());
326    }
327
328    let mut counts = Vec::new();
329    for column in unique_columns {
330        let series = df.column(&column.name).map_err(|err| {
331            Box::new(RunError(format!(
332                "unique column {} not found: {err}",
333                column.name
334            )))
335        })?;
336        let non_null = series.len().saturating_sub(series.null_count());
337        if non_null == 0 {
338            continue;
339        }
340        let unique = series.drop_nulls().n_unique().map_err(|err| {
341            Box::new(RunError(format!(
342                "unique column {} read failed: {err}",
343                column.name
344            )))
345        })?;
346        let violations = non_null.saturating_sub(unique) as u64;
347        if violations > 0 {
348            counts.push((column.name.clone(), violations));
349        }
350    }
351
352    Ok(counts)
353}