floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::sync::Arc;

use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use deltalake::protocol::SaveMode;
use polars::prelude::DataFrame;

use crate::checks::normalize;
use crate::errors::RunError;
use crate::io::write::arrow_convert::{self, ArrowConversionOptions, ArrowTimeEncoding};
use crate::{config, FloeResult};

pub(crate) fn dataframe_to_record_batch(
    df: &DataFrame,
    entity: &config::EntityConfig,
) -> FloeResult<RecordBatch> {
    if entity.schema.columns.is_empty() {
        return dataframe_to_record_batch_all(df);
    }

    let schema_columns = normalize::resolve_output_columns(
        &entity.schema.columns,
        normalize::resolve_normalize_strategy(entity)?.as_deref(),
    );
    dataframe_to_record_batch_with_schema(df, &schema_columns)
}

pub(crate) fn dataframe_to_record_batch_with_schema(
    df: &DataFrame,
    schema_columns: &[config::ColumnConfig],
) -> FloeResult<RecordBatch> {
    let mut fields = Vec::with_capacity(schema_columns.len());
    let mut arrays = Vec::with_capacity(schema_columns.len());
    for column in schema_columns {
        let series = df
            .column(column.name.as_str())
            .map_err(|err| Box::new(RunError(format!("delta column lookup failed: {err}"))))?;
        let series = series.as_materialized_series();
        let array = series_to_arrow_array(series)?;
        let nullable = column.nullable.unwrap_or(true);
        if !nullable && array.null_count() > 0 {
            return Err(Box::new(RunError(format!(
                "delta write rejected nulls for non-nullable column {}",
                column.name
            ))));
        }
        fields.push(Field::new(
            column.name.clone(),
            array.data_type().clone(),
            nullable,
        ));
        arrays.push(array);
    }
    let schema = Arc::new(Schema::new(fields));
    RecordBatch::try_new(schema, arrays).map_err(|err| {
        Box::new(RunError(format!("delta record batch build failed: {err}")))
            as Box<dyn std::error::Error + Send + Sync>
    })
}

pub(crate) fn dataframe_to_record_batch_all(df: &DataFrame) -> FloeResult<RecordBatch> {
    let mut fields = Vec::with_capacity(df.width());
    let mut arrays = Vec::with_capacity(df.width());
    for column in df.get_columns() {
        let series = column.as_materialized_series();
        let name = series.name().to_string();
        let array = series_to_arrow_array(series)?;
        let nullable = array.null_count() > 0;
        fields.push(Field::new(name, array.data_type().clone(), nullable));
        arrays.push(array);
    }
    let schema = Arc::new(Schema::new(fields));
    RecordBatch::try_new(schema, arrays).map_err(|err| {
        Box::new(RunError(format!("delta record batch build failed: {err}")))
            as Box<dyn std::error::Error + Send + Sync>
    })
}

pub(crate) fn save_mode_for_write_mode(mode: config::WriteMode) -> SaveMode {
    match mode {
        config::WriteMode::Overwrite => SaveMode::Overwrite,
        config::WriteMode::Append => SaveMode::Append,
        config::WriteMode::MergeScd1 => SaveMode::Overwrite,
        config::WriteMode::MergeScd2 => SaveMode::Overwrite,
    }
}

fn series_to_arrow_array(series: &polars::prelude::Series) -> FloeResult<ArrayRef> {
    arrow_convert::series_to_arrow_array(
        series,
        ArrowConversionOptions {
            upcast_i8_i16_to_i32: false,
            time_encoding: ArrowTimeEncoding::Nanoseconds,
        },
        |dtype| {
            RunError(format!(
                "delta sink does not support dtype {dtype:?} for {}",
                series.name()
            ))
        },
    )
}