deltalake-core 0.32.0

Native Delta Lake implementation in Rust
Documentation
use std::collections::HashMap;

use arrow::array::AsArray as _;
use arrow::datatypes::{Int32Type, Int64Type};
use chrono::Utc;
use delta_kernel::schema::{DataType, PrimitiveType};
use delta_kernel::table_features::TableFeature;
use itertools::Itertools;
use object_store::ObjectMeta;
use object_store::path::Path;
use serde_json::json;

use super::{DataFactory, FileStats, get_parquet_bytes};
use crate::kernel::transaction::PROTOCOL;
use crate::kernel::{Add, Metadata, Protocol, Remove, StructType};
use crate::kernel::{ProtocolInner, partitions_schema};

pub struct ActionFactory;

impl ActionFactory {
    pub fn add_raw(
        meta: ObjectMeta,
        stats: FileStats,
        partition_values: HashMap<String, Option<String>>,
        data_change: bool,
    ) -> Add {
        Add {
            path: meta.location.to_string(),
            size: meta.size as i64,
            partition_values,
            data_change,
            modification_time: meta.last_modified.timestamp_millis(),
            stats: serde_json::to_string(&stats).ok(),
            tags: Some(HashMap::new()),
            default_row_commit_version: None,
            deletion_vector: None,
            base_row_id: None,
            clustering_provider: None,
        }
    }

    pub fn add(
        schema: &StructType,
        bounds: HashMap<&str, (&str, &str)>,
        partition_columns: Vec<String>,
        data_change: bool,
    ) -> Add {
        let partitions_schema = partitions_schema(schema, &partition_columns).unwrap();
        let partition_values = if let Some(p_schema) = partitions_schema {
            let batch = DataFactory::record_batch(&p_schema, 1, &bounds).unwrap();
            p_schema
                .fields()
                .map(|f| {
                    let value = match f.data_type() {
                        DataType::Primitive(PrimitiveType::String) => {
                            let arr = batch.column_by_name(f.name()).unwrap().as_string::<i32>();
                            Some(arr.value(0).to_string())
                        }
                        DataType::Primitive(PrimitiveType::Integer) => {
                            let arr = batch
                                .column_by_name(f.name())
                                .unwrap()
                                .as_primitive::<Int32Type>();
                            Some(arr.value(0).to_string())
                        }
                        DataType::Primitive(PrimitiveType::Long) => {
                            let arr = batch
                                .column_by_name(f.name())
                                .unwrap()
                                .as_primitive::<Int64Type>();
                            Some(arr.value(0).to_string())
                        }
                        _ => unimplemented!(),
                    };
                    (f.name().to_owned(), value)
                })
                .collect()
        } else {
            HashMap::new()
        };

        let data_schema = StructType::try_new(
            schema
                .fields()
                .filter(|f| !partition_columns.contains(f.name()))
                .cloned(),
        )
        .unwrap();

        let batch = DataFactory::record_batch(&data_schema, 10, &bounds).unwrap();
        let stats = DataFactory::file_stats(&batch).unwrap();
        let path = Path::from(generate_file_name());
        let data = get_parquet_bytes(&batch).unwrap();
        let meta = ObjectMeta {
            location: path.clone(),
            size: data.len() as u64,
            last_modified: Utc::now(),
            e_tag: None,
            version: None,
        };
        ActionFactory::add_raw(meta, stats, partition_values, data_change)
    }

    pub fn remove(add: &Add, data_change: bool) -> Remove {
        add_as_remove(add, data_change)
    }

    pub fn protocol(
        max_reader: Option<i32>,
        max_writer: Option<i32>,
        reader_features: Option<impl IntoIterator<Item = TableFeature>>,
        writer_features: Option<impl IntoIterator<Item = TableFeature>>,
    ) -> Protocol {
        ProtocolInner {
            min_reader_version: max_reader.unwrap_or(PROTOCOL.default_reader_version()),
            min_writer_version: max_writer.unwrap_or(PROTOCOL.default_writer_version()),
            writer_features: writer_features.map(|i| i.into_iter().collect()),
            reader_features: reader_features.map(|i| i.into_iter().collect()),
        }
        .as_kernel()
    }

    pub fn metadata(
        schema: &StructType,
        partition_columns: Option<impl IntoIterator<Item = impl ToString>>,
        configuration: Option<HashMap<String, Option<String>>>,
    ) -> Metadata {
        let value = json!({
            "id": uuid::Uuid::new_v4().hyphenated().to_string(),
            "format": { "provider": "parquet", "options": {} },
            "schemaString": serde_json::to_string(schema).unwrap(),
            "partitionColumns": partition_columns
                .map(|i| i.into_iter().map(|c| c.to_string()).collect_vec())
                .unwrap_or_default(),
            "configuration": configuration.unwrap_or_default(),
            "name": None::<String>,
            "description": None::<String>,
            "createdTime": Some(Utc::now().timestamp_millis()),
        });
        serde_json::from_value(value).unwrap()
    }
}

pub fn add_as_remove(add: &Add, data_change: bool) -> Remove {
    Remove {
        path: add.path.clone(),
        data_change,
        deletion_timestamp: Some(Utc::now().timestamp_millis()),
        size: Some(add.size),
        extended_file_metadata: Some(true),
        partition_values: Some(add.partition_values.clone()),
        tags: add.tags.clone(),
        deletion_vector: add.deletion_vector.clone(),
        base_row_id: add.base_row_id,
        default_row_commit_version: add.default_row_commit_version,
    }
}

fn generate_file_name() -> String {
    let file_name = uuid::Uuid::new_v4().hyphenated().to_string();
    format!("part-0001-{file_name}.parquet")
}