Skip to main content

floe_core/checks/
not_null.rs

1use polars::prelude::{AnyValue, DataFrame};
2
3use super::{ColumnIndex, RowError, SparseRowErrors};
4use crate::errors::RunError;
5use crate::FloeResult;
6
7pub fn not_null_errors(
8    df: &DataFrame,
9    required_cols: &[String],
10    indices: &ColumnIndex,
11) -> FloeResult<Vec<Vec<RowError>>> {
12    let mut errors_per_row = vec![Vec::new(); df.height()];
13    if required_cols.is_empty() {
14        return Ok(errors_per_row);
15    }
16
17    let mut null_masks = Vec::with_capacity(required_cols.len());
18    for name in required_cols {
19        let index = indices.get(name).ok_or_else(|| {
20            Box::new(RunError(format!(
21                "required column {name} not found in dataframe"
22            )))
23        })?;
24        let mask = df
25            .select_at_idx(*index)
26            .ok_or_else(|| {
27                Box::new(RunError(format!(
28                    "required column {name} not found in dataframe"
29                )))
30            })?
31            .is_null();
32        null_masks.push(mask);
33    }
34
35    for (row_idx, errors) in errors_per_row.iter_mut().enumerate() {
36        for (col, mask) in required_cols.iter().zip(null_masks.iter()) {
37            if mask.get(row_idx).unwrap_or(false) {
38                errors.push(RowError::new("not_null", col, "required value missing"));
39            }
40        }
41    }
42
43    Ok(errors_per_row)
44}
45
46pub fn not_null_errors_sparse(
47    df: &DataFrame,
48    required_cols: &[String],
49    indices: &ColumnIndex,
50) -> FloeResult<SparseRowErrors> {
51    let mut errors = SparseRowErrors::new(df.height());
52    if required_cols.is_empty() || df.height() == 0 {
53        return Ok(errors);
54    }
55
56    let mut null_masks = Vec::with_capacity(required_cols.len());
57    for name in required_cols {
58        let index = indices.get(name).ok_or_else(|| {
59            Box::new(RunError(format!(
60                "required column {name} not found in dataframe"
61            )))
62        })?;
63        let mask = df
64            .select_at_idx(*index)
65            .ok_or_else(|| {
66                Box::new(RunError(format!(
67                    "required column {name} not found in dataframe"
68                )))
69            })?
70            .is_null();
71        null_masks.push((name, mask));
72    }
73
74    for row_idx in 0..df.height() {
75        for (col, mask) in null_masks.iter() {
76            if mask.get(row_idx).unwrap_or(false) {
77                errors.add_error(
78                    row_idx,
79                    RowError::new("not_null", col, "required value missing"),
80                );
81            }
82        }
83    }
84
85    Ok(errors)
86}
87
88pub fn not_null_counts(df: &DataFrame, required_cols: &[String]) -> FloeResult<Vec<(String, u64)>> {
89    if required_cols.is_empty() || df.height() == 0 {
90        return Ok(Vec::new());
91    }
92
93    let null_counts = df.null_count();
94    let mut counts = Vec::new();
95    for name in required_cols {
96        let series = null_counts.column(name).map_err(|err| {
97            Box::new(RunError(format!("required column {name} not found: {err}")))
98        })?;
99        let value = series.get(0).unwrap_or(AnyValue::UInt32(0));
100        let violations = match value {
101            AnyValue::UInt32(value) => value as u64,
102            AnyValue::UInt64(value) => value,
103            AnyValue::Int64(value) => value.max(0) as u64,
104            AnyValue::Int32(value) => value.max(0) as u64,
105            AnyValue::Null => 0,
106            _ => 0,
107        };
108        if violations > 0 {
109            counts.push((name.clone(), violations));
110        }
111    }
112    Ok(counts)
113}