Skip to main content

floe_core/checks/
unique.rs

1use polars::prelude::{is_duplicated, is_first_distinct, DataFrame};
2
3use super::{ColumnIndex, RowError};
4use crate::errors::RunError;
5use crate::{config, FloeResult};
6
7pub fn unique_errors(
8    df: &DataFrame,
9    columns: &[config::ColumnConfig],
10    indices: &ColumnIndex,
11) -> FloeResult<Vec<Vec<RowError>>> {
12    let mut errors_per_row = vec![Vec::new(); df.height()];
13    let unique_columns: Vec<&config::ColumnConfig> = columns
14        .iter()
15        .filter(|col| col.unique == Some(true))
16        .collect();
17    if unique_columns.is_empty() {
18        return Ok(errors_per_row);
19    }
20
21    for column in unique_columns {
22        let index = indices.get(&column.name).ok_or_else(|| {
23            Box::new(RunError(format!("unique column {} not found", column.name)))
24        })?;
25        let series = df.select_at_idx(*index).ok_or_else(|| {
26            Box::new(RunError(format!("unique column {} not found", column.name)))
27        })?;
28        let series = series.as_materialized_series();
29        let non_null = series.len().saturating_sub(series.null_count());
30        if non_null == 0 {
31            continue;
32        }
33        let mut duplicate_mask = is_duplicated(series).map_err(|err| {
34            Box::new(RunError(format!(
35                "unique column {} read failed: {err}",
36                column.name
37            )))
38        })?;
39        let not_null = series.is_not_null();
40        duplicate_mask = &duplicate_mask & &not_null;
41        let mut first_mask = is_first_distinct(series).map_err(|err| {
42            Box::new(RunError(format!(
43                "unique column {} read failed: {err}",
44                column.name
45            )))
46        })?;
47        first_mask = &first_mask & &not_null;
48        let mask = duplicate_mask & !first_mask;
49        for (row_idx, is_dup) in mask.into_iter().enumerate() {
50            if is_dup == Some(true) {
51                errors_per_row[row_idx].push(RowError::new(
52                    "unique",
53                    &column.name,
54                    "duplicate value",
55                ));
56            }
57        }
58    }
59
60    Ok(errors_per_row)
61}
62
63pub fn unique_counts(
64    df: &DataFrame,
65    columns: &[config::ColumnConfig],
66) -> FloeResult<Vec<(String, u64)>> {
67    if df.height() == 0 {
68        return Ok(Vec::new());
69    }
70
71    let unique_columns: Vec<&config::ColumnConfig> = columns
72        .iter()
73        .filter(|col| col.unique == Some(true))
74        .collect();
75    if unique_columns.is_empty() {
76        return Ok(Vec::new());
77    }
78
79    let mut counts = Vec::new();
80    for column in unique_columns {
81        let series = df.column(&column.name).map_err(|err| {
82            Box::new(RunError(format!(
83                "unique column {} not found: {err}",
84                column.name
85            )))
86        })?;
87        let non_null = series.len().saturating_sub(series.null_count());
88        if non_null == 0 {
89            continue;
90        }
91        let unique = series.drop_nulls().n_unique().map_err(|err| {
92            Box::new(RunError(format!(
93                "unique column {} read failed: {err}",
94                column.name
95            )))
96        })?;
97        let violations = non_null.saturating_sub(unique) as u64;
98        if violations > 0 {
99            counts.push((column.name.clone(), violations));
100        }
101    }
102
103    Ok(counts)
104}