floe-core 0.3.2

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use polars::prelude::{AnyValue, DataFrame};

use super::{ColumnIndex, RowError, SparseRowErrors};
use crate::errors::RunError;
use crate::FloeResult;

pub fn not_null_errors(
    df: &DataFrame,
    required_cols: &[String],
    indices: &ColumnIndex,
) -> FloeResult<Vec<Vec<RowError>>> {
    let mut errors_per_row = vec![Vec::new(); df.height()];
    if required_cols.is_empty() {
        return Ok(errors_per_row);
    }

    let mut null_masks = Vec::with_capacity(required_cols.len());
    for name in required_cols {
        let index = indices.get(name).ok_or_else(|| {
            Box::new(RunError(format!(
                "required column {name} not found in dataframe"
            )))
        })?;
        let mask = df
            .select_at_idx(*index)
            .ok_or_else(|| {
                Box::new(RunError(format!(
                    "required column {name} not found in dataframe"
                )))
            })?
            .is_null();
        null_masks.push(mask);
    }

    for (row_idx, errors) in errors_per_row.iter_mut().enumerate() {
        for (col, mask) in required_cols.iter().zip(null_masks.iter()) {
            if mask.get(row_idx).unwrap_or(false) {
                errors.push(RowError::new("not_null", col, "required value missing"));
            }
        }
    }

    Ok(errors_per_row)
}

pub fn not_null_errors_sparse(
    df: &DataFrame,
    required_cols: &[String],
    indices: &ColumnIndex,
) -> FloeResult<SparseRowErrors> {
    let mut errors = SparseRowErrors::new(df.height());
    if required_cols.is_empty() || df.height() == 0 {
        return Ok(errors);
    }

    let mut null_masks = Vec::with_capacity(required_cols.len());
    for name in required_cols {
        let index = indices.get(name).ok_or_else(|| {
            Box::new(RunError(format!(
                "required column {name} not found in dataframe"
            )))
        })?;
        let mask = df
            .select_at_idx(*index)
            .ok_or_else(|| {
                Box::new(RunError(format!(
                    "required column {name} not found in dataframe"
                )))
            })?
            .is_null();
        null_masks.push((name, mask));
    }

    for row_idx in 0..df.height() {
        for (col, mask) in null_masks.iter() {
            if mask.get(row_idx).unwrap_or(false) {
                errors.add_error(
                    row_idx,
                    RowError::new("not_null", col, "required value missing"),
                );
            }
        }
    }

    Ok(errors)
}

pub fn not_null_counts(df: &DataFrame, required_cols: &[String]) -> FloeResult<Vec<(String, u64)>> {
    if required_cols.is_empty() || df.height() == 0 {
        return Ok(Vec::new());
    }

    let null_counts = df.null_count();
    let mut counts = Vec::new();
    for name in required_cols {
        let series = null_counts.column(name).map_err(|err| {
            Box::new(RunError(format!("required column {name} not found: {err}")))
        })?;
        let value = series.get(0).unwrap_or(AnyValue::UInt32(0));
        let violations = match value {
            AnyValue::UInt32(value) => value as u64,
            AnyValue::UInt64(value) => value,
            AnyValue::Int64(value) => value.max(0) as u64,
            AnyValue::Int32(value) => value.max(0) as u64,
            AnyValue::Null => 0,
            _ => 0,
        };
        if violations > 0 {
            counts.push((name.clone(), violations));
        }
    }
    Ok(counts)
}