floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use polars::prelude::{AnyValue, DataFrame, Series};
use std::collections::{BTreeMap, HashMap, HashSet};

use super::{ColumnIndex, RowError, SparseRowErrors};
use crate::errors::RunError;
use crate::{config, FloeResult};

const UNIQUE_SAMPLE_LIMIT: usize = 5;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum UniqueKey {
    Bool(bool),
    I64(i64),
    U64(u64),
    F64(u64),
    String(String),
    Other(String),
}

impl UniqueKey {
    fn as_string(&self) -> String {
        match self {
            UniqueKey::Bool(value) => value.to_string(),
            UniqueKey::I64(value) => value.to_string(),
            UniqueKey::U64(value) => value.to_string(),
            UniqueKey::F64(value) => f64::from_bits(*value).to_string(),
            UniqueKey::String(value) | UniqueKey::Other(value) => value.clone(),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct CompositeKey(Vec<UniqueKey>);

#[derive(Debug, Clone)]
pub struct UniqueConstraint {
    pub runtime_columns: Vec<String>,
    pub report_columns: Vec<String>,
    pub enforce_reject: bool,
}

#[derive(Debug, Clone)]
pub struct UniqueConstraintSample {
    pub values: BTreeMap<String, String>,
    pub count: u64,
}

#[derive(Debug, Clone)]
pub struct UniqueConstraintResult {
    pub columns: Vec<String>,
    pub duplicates_count: u64,
    pub affected_rows_count: u64,
    pub samples: Vec<UniqueConstraintSample>,
}

#[derive(Debug, Clone)]
struct ConstraintState {
    constraint: UniqueConstraint,
    seen: HashSet<CompositeKey>,
    duplicates_count: u64,
    sample_counts: HashMap<CompositeKey, u64>,
}

#[derive(Debug, Default)]
pub struct UniqueTracker {
    states: Vec<ConstraintState>,
}

impl UniqueTracker {
    pub fn new(columns: &[config::ColumnConfig]) -> Self {
        let constraints = legacy_unique_constraints(columns)
            .into_iter()
            .map(|column| UniqueConstraint {
                runtime_columns: vec![column.clone()],
                report_columns: vec![column],
                enforce_reject: false,
            })
            .collect::<Vec<_>>();
        Self::with_constraints(constraints)
    }

    pub fn with_constraints(constraints: Vec<UniqueConstraint>) -> Self {
        let states = constraints
            .into_iter()
            .map(|constraint| ConstraintState {
                constraint,
                seen: HashSet::new(),
                duplicates_count: 0,
                sample_counts: HashMap::new(),
            })
            .collect();
        Self { states }
    }

    pub fn is_empty(&self) -> bool {
        self.states.is_empty()
    }

    pub fn runtime_columns(&self) -> Vec<String> {
        let mut seen = HashSet::new();
        let mut columns = Vec::new();
        for state in &self.states {
            for column in &state.constraint.runtime_columns {
                if seen.insert(column.clone()) {
                    columns.push(column.clone());
                }
            }
        }
        columns
    }

    pub fn seed_from_df(&mut self, df: &DataFrame) -> FloeResult<()> {
        if df.height() == 0 || self.states.is_empty() {
            return Ok(());
        }
        for state in &mut self.states {
            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
            for row_idx in 0..df.height() {
                let key = match composite_key_from_row(&columns, row_idx)? {
                    Some(key) => key,
                    None => continue,
                };
                state.seen.insert(key);
            }
        }
        Ok(())
    }

    pub fn apply(
        &mut self,
        df: &DataFrame,
        columns: &[config::ColumnConfig],
    ) -> FloeResult<Vec<Vec<RowError>>> {
        let mut errors_per_row = vec![Vec::new(); df.height()];
        let sparse = self.apply_sparse(df, columns)?;
        for (row_idx, row_errors) in sparse.iter() {
            if let Some(slot) = errors_per_row.get_mut(*row_idx) {
                slot.extend(row_errors.clone());
            }
        }
        Ok(errors_per_row)
    }

    pub fn apply_sparse(
        &mut self,
        df: &DataFrame,
        _columns: &[config::ColumnConfig],
    ) -> FloeResult<SparseRowErrors> {
        let mut forced_reject_rows = HashSet::new();
        self.apply_sparse_with_forced_rejects(df, _columns, &mut forced_reject_rows)
    }

    pub fn apply_sparse_with_forced_rejects(
        &mut self,
        df: &DataFrame,
        _columns: &[config::ColumnConfig],
        forced_reject_rows: &mut HashSet<usize>,
    ) -> FloeResult<SparseRowErrors> {
        let mut errors = SparseRowErrors::new(df.height());
        if df.height() == 0 || self.states.is_empty() {
            return Ok(errors);
        }

        for state in &mut self.states {
            let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
            let report_columns = state.constraint.report_columns.clone();
            let (constraint_repr, message) = if report_columns.len() == 1 {
                (report_columns[0].clone(), "duplicate value")
            } else {
                (format!("[{}]", report_columns.join(",")), "duplicate key")
            };
            for row_idx in 0..df.height() {
                let key = match composite_key_from_row(&columns, row_idx)? {
                    Some(key) => key,
                    None => continue,
                };
                if state.seen.contains(&key) {
                    errors.add_error(row_idx, RowError::new("unique", &constraint_repr, message));
                    if state.constraint.enforce_reject {
                        forced_reject_rows.insert(row_idx);
                    }
                    state.duplicates_count += 1;
                    let counter = state.sample_counts.entry(key).or_insert(0);
                    *counter += 1;
                } else {
                    state.seen.insert(key);
                }
            }
        }

        Ok(errors)
    }

    pub fn results(&self) -> Vec<UniqueConstraintResult> {
        self.states
            .iter()
            .map(|state| {
                let mut sample_counts = state
                    .sample_counts
                    .iter()
                    .map(|(key, count)| (key, *count))
                    .collect::<Vec<_>>();
                sample_counts.sort_by(|left, right| {
                    right
                        .1
                        .cmp(&left.1)
                        .then_with(|| format!("{:?}", left.0).cmp(&format!("{:?}", right.0)))
                });
                let samples = sample_counts
                    .into_iter()
                    .take(UNIQUE_SAMPLE_LIMIT)
                    .map(|(key, count)| {
                        let mut values = BTreeMap::new();
                        for (idx, value) in key.0.iter().enumerate() {
                            if let Some(column_name) = state.constraint.report_columns.get(idx) {
                                values.insert(column_name.clone(), value.as_string());
                            }
                        }
                        UniqueConstraintSample { values, count }
                    })
                    .collect::<Vec<_>>();
                UniqueConstraintResult {
                    columns: state.constraint.report_columns.clone(),
                    duplicates_count: state.duplicates_count,
                    affected_rows_count: state.duplicates_count,
                    samples,
                }
            })
            .collect()
    }
}

pub fn unique_errors(
    df: &DataFrame,
    columns: &[config::ColumnConfig],
    _indices: &ColumnIndex,
) -> FloeResult<Vec<Vec<RowError>>> {
    let mut tracker = UniqueTracker::new(columns);
    tracker.apply(df, columns)
}

pub fn unique_errors_sparse(
    df: &DataFrame,
    columns: &[config::ColumnConfig],
    _indices: &ColumnIndex,
) -> FloeResult<SparseRowErrors> {
    let mut tracker = UniqueTracker::new(columns);
    tracker.apply_sparse(df, columns)
}

pub fn unique_counts(
    df: &DataFrame,
    columns: &[config::ColumnConfig],
) -> FloeResult<Vec<(String, u64)>> {
    if df.height() == 0 {
        return Ok(Vec::new());
    }

    let unique_columns: Vec<&config::ColumnConfig> = columns
        .iter()
        .filter(|col| col.unique == Some(true))
        .collect();
    if unique_columns.is_empty() {
        return Ok(Vec::new());
    }

    let mut counts = Vec::new();
    for column in unique_columns {
        let series = df.column(&column.name).map_err(|err| {
            Box::new(RunError(format!(
                "unique column {} not found: {err}",
                column.name
            )))
        })?;
        let non_null = series.len().saturating_sub(series.null_count());
        if non_null == 0 {
            continue;
        }
        let unique = series.drop_nulls().n_unique().map_err(|err| {
            Box::new(RunError(format!(
                "unique column {} read failed: {err}",
                column.name
            )))
        })?;
        let violations = non_null.saturating_sub(unique) as u64;
        if violations > 0 {
            counts.push((column.name.clone(), violations));
        }
    }

    Ok(counts)
}

pub fn resolve_schema_unique_keys(schema: &config::SchemaConfig) -> Vec<Vec<String>> {
    let mut seen = HashSet::new();
    let mut constraints = Vec::new();

    if let Some(unique_keys) = schema.unique_keys.as_ref() {
        for key in unique_keys {
            let normalized = key
                .iter()
                .map(|column| column.trim().to_string())
                .collect::<Vec<_>>();
            if normalized.is_empty() {
                continue;
            }
            let signature = normalized.join("\u{1f}");
            if seen.insert(signature) {
                constraints.push(normalized);
            }
        }
    } else {
        for column in legacy_unique_constraints(&schema.columns) {
            let constraint = vec![column];
            let signature = constraint.join("\u{1f}");
            if seen.insert(signature) {
                constraints.push(constraint);
            }
        }
    }

    if let Some(primary_key) = schema.primary_key.as_ref() {
        let normalized = primary_key
            .iter()
            .map(|column| column.trim().to_string())
            .collect::<Vec<_>>();
        if !normalized.is_empty() {
            let signature = normalized.join("\u{1f}");
            if seen.insert(signature) {
                constraints.push(normalized);
            }
        }
    }

    constraints
}

fn legacy_unique_constraints(columns: &[config::ColumnConfig]) -> Vec<String> {
    columns
        .iter()
        .filter(|col| col.unique == Some(true))
        .map(|col| col.name.trim().to_string())
        .filter(|name| !name.is_empty())
        .collect()
}

fn load_constraint_columns(df: &DataFrame, columns: &[String]) -> FloeResult<Vec<Series>> {
    let mut output = Vec::with_capacity(columns.len());
    for column in columns {
        let series = df.column(column).map_err(|err| {
            Box::new(RunError(format!(
                "unique constraint column {} not found: {err}",
                column
            )))
        })?;
        output.push(series.as_materialized_series().rechunk());
    }
    Ok(output)
}

fn composite_key_from_row(columns: &[Series], row_idx: usize) -> FloeResult<Option<CompositeKey>> {
    let mut key = Vec::with_capacity(columns.len());
    for series in columns {
        let value = series.get(row_idx).map_err(|err| {
            Box::new(RunError(format!(
                "unique constraint read failed at row {}: {err}",
                row_idx
            )))
        })?;
        let Some(value) = unique_key(value) else {
            return Ok(None);
        };
        key.push(value);
    }
    Ok(Some(CompositeKey(key)))
}

fn unique_key(value: AnyValue) -> Option<UniqueKey> {
    match value {
        AnyValue::Null => None,
        AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
        AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
        AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
        AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
        AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
        AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
        AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
        AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
        AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
        AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
        AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
        AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
        AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
        AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
        AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
        other => Some(UniqueKey::Other(other.to_string())),
    }
}