deltalake 0.6.0

Native Delta Lake implementation in Rust
Documentation
//! Actions included in Delta table transaction logs

#![allow(non_camel_case_types)]

#[cfg(feature = "parquet")]
mod parquet_read;

#[cfg(feature = "parquet2")]
pub mod parquet2_read;

use crate::{schema::*, DeltaTableMetaData};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};

/// Error returned when an invalid Delta log action is encountered.
#[derive(thiserror::Error, Debug)]
pub enum ActionError {
    /// The action contains an invalid field.
    #[error("Invalid action field: {0}")]
    InvalidField(String),
    /// A parquet log checkpoint file contains an invalid action.
    #[error("Invalid action in parquet row: {0}")]
    InvalidRow(String),
    /// A generic action error. The wrapped error string describes the details.
    #[error("Generic action error: {0}")]
    Generic(String),
    #[cfg(feature = "parquet2")]
    #[error("Failed to parse parquet checkpoint: {}", .source)]
    /// Error returned when parsing checkpoint parquet using the parquet2 crate.
    ParquetParseError {
        /// Parquet error details returned when parsing the checkpoint parquet
        #[from]
        source: parquet2_read::ParseError,
    },
    #[cfg(feature = "parquet")]
    #[error("Failed to parse parquet checkpoint: {}", .source)]
    /// Error returned when parsing checkpoint parquet using the parquet crate.
    ParquetParseError {
        /// Parquet error details returned when parsing the checkpoint parquet
        #[from]
        source: parquet::errors::ParquetError,
    },
}

fn decode_path(raw_path: &str) -> Result<String, ActionError> {
    percent_decode(raw_path.as_bytes())
        .decode_utf8()
        .map(|c| c.to_string())
        .map_err(|e| ActionError::InvalidField(format!("Decode path failed for action: {}", e)))
}

/// Struct used to represent minValues and maxValues in add action statistics.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnValueStat {
    /// Composite HashMap representation of statistics.
    Column(HashMap<String, ColumnValueStat>),
    /// Json representation of statistics.
    Value(Value),
}

impl ColumnValueStat {
    /// Returns the HashMap representation of the ColumnValueStat.
    pub fn as_column(&self) -> Option<&HashMap<String, ColumnValueStat>> {
        match self {
            ColumnValueStat::Column(m) => Some(m),
            _ => None,
        }
    }

    /// Returns the serde_json representation of the ColumnValueStat.
    pub fn as_value(&self) -> Option<&Value> {
        match self {
            ColumnValueStat::Value(v) => Some(v),
            _ => None,
        }
    }
}

/// Struct used to represent nullCount in add action statistics.
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum ColumnCountStat {
    /// Composite HashMap representation of statistics.
    Column(HashMap<String, ColumnCountStat>),
    /// Json representation of statistics.
    Value(DeltaDataTypeLong),
}

impl ColumnCountStat {
    /// Returns the HashMap representation of the ColumnCountStat.
    pub fn as_column(&self) -> Option<&HashMap<String, ColumnCountStat>> {
        match self {
            ColumnCountStat::Column(m) => Some(m),
            _ => None,
        }
    }

    /// Returns the serde_json representation of the ColumnCountStat.
    pub fn as_value(&self) -> Option<DeltaDataTypeLong> {
        match self {
            ColumnCountStat::Value(v) => Some(*v),
            _ => None,
        }
    }
}

/// Statistics associated with Add actions contained in the Delta log.
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
    /// Number of records in the file associated with the log action.
    pub num_records: DeltaDataTypeLong,

    // start of per column stats
    /// Contains a value smaller than all values present in the file for all columns.
    pub min_values: HashMap<String, ColumnValueStat>,
    /// Contains a value larger than all values present in the file for all columns.
    pub max_values: HashMap<String, ColumnValueStat>,
    /// The number of null values for all columns.
    pub null_count: HashMap<String, ColumnCountStat>,
}

/// File stats parsed from raw parquet format.
#[derive(Debug, Default)]
pub struct StatsParsed {
    /// Number of records in the file associated with the log action.
    pub num_records: DeltaDataTypeLong,

    // start of per column stats
    /// Contains a value smaller than all values present in the file for all columns.
    #[cfg(feature = "parquet")]
    pub min_values: HashMap<String, parquet::record::Field>,
    /// Contains a value smaller than all values present in the file for all columns.
    #[cfg(feature = "parquet2")]
    pub min_values: HashMap<String, String>,
    /// Contains a value larger than all values present in the file for all columns.
    #[cfg(feature = "parquet")]
    /// Contains a value larger than all values present in the file for all columns.
    pub max_values: HashMap<String, parquet::record::Field>,
    #[cfg(feature = "parquet2")]
    /// Contains a value larger than all values present in the file for all columns.
    pub max_values: HashMap<String, String>,
    /// The number of null values for all columns.
    pub null_count: HashMap<String, DeltaDataTypeLong>,
}

/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Add {
    /// A relative path, from the root of the table, to a file that should be added to the table
    pub path: String,
    /// The size of this file in bytes
    pub size: DeltaDataTypeLong,
    /// A map from partition column to value for this file
    pub partition_values: HashMap<String, Option<String>>,
    /// Partition values stored in raw parquet struct format. In this struct, the column names
    /// correspond to the partition columns and the values are stored in their corresponding data
    /// type. This is a required field when the table is partitioned and the table property
    /// delta.checkpoint.writeStatsAsStruct is set to true. If the table is not partitioned, this
    /// column can be omitted.
    ///
    /// This field is only available in add action records read from checkpoints
    #[cfg(feature = "parquet")]
    #[serde(skip_serializing, skip_deserializing)]
    pub partition_values_parsed: Option<parquet::record::Row>,
    /// Partition values stored in raw parquet struct format. In this struct, the column names
    /// correspond to the partition columns and the values are stored in their corresponding data
    /// type. This is a required field when the table is partitioned and the table property
    /// delta.checkpoint.writeStatsAsStruct is set to true. If the table is not partitioned, this
    /// column can be omitted.
    ///
    /// This field is only available in add action records read from checkpoints
    #[cfg(feature = "parquet2")]
    #[serde(skip_serializing, skip_deserializing)]
    pub partition_values_parsed: Option<String>,
    /// The time this file was created, as milliseconds since the epoch
    pub modification_time: DeltaDataTypeTimestamp,
    /// When false the file must already be present in the table or the records in the added file
    /// must be contained in one or more remove actions in the same version
    ///
    /// streaming queries that are tailing the transaction log can use this flag to skip actions
    /// that would not affect the final results.
    pub data_change: bool,
    /// Contains statistics (e.g., count, min/max values for columns) about the data in this file
    pub stats: Option<String>,
    /// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
    /// raw parquet format. This field needs to be written when statistics are available and the
    /// table property: delta.checkpoint.writeStatsAsStruct is set to true.
    ///
    /// This field is only available in add action records read from checkpoints
    #[cfg(feature = "parquet")]
    #[serde(skip_serializing, skip_deserializing)]
    pub stats_parsed: Option<parquet::record::Row>,
    /// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
    /// raw parquet format. This field needs to be written when statistics are available and the
    /// table property: delta.checkpoint.writeStatsAsStruct is set to true.
    ///
    /// This field is only available in add action records read from checkpoints
    #[cfg(feature = "parquet2")]
    #[serde(skip_serializing, skip_deserializing)]
    pub stats_parsed: Option<String>,
    /// Map containing metadata about this file
    pub tags: Option<HashMap<String, Option<String>>>,
}

impl Add {
    /// Returns the Add action with path decoded.
    pub fn path_decoded(self) -> Result<Self, ActionError> {
        decode_path(&self.path).map(|path| Self { path, ..self })
    }

    /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats.
    pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
        match self.get_stats_parsed() {
            Ok(Some(stats)) => Ok(Some(stats)),
            Ok(None) => self.get_json_stats(),
            Err(e) => {
                log::error!(
                    "Error when reading parquet stats {:?} {e}. Attempting to read json stats",
                    self.stats_parsed
                );
                self.get_json_stats()
            }
        }
    }

    /// Returns the serde_json representation of stats contained in the action if present.
    /// Since stats are defined as optional in the protocol, this may be None.
    pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
        self.stats
            .as_ref()
            .map_or(Ok(None), |s| serde_json::from_str(s))
    }
}

/// Describes the data format of files in the table.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct Format {
    /// Name of the encoding for files in this table.
    provider: String,
    /// A map containing configuration options for the format.
    options: HashMap<String, Option<String>>,
}

impl Format {
    /// Allows creation of a new action::Format
    pub fn new(provider: String, options: Option<HashMap<String, Option<String>>>) -> Self {
        let options = options.unwrap_or_default();
        Self { provider, options }
    }

    /// Return the Format provider
    pub fn get_provider(self) -> String {
        self.provider
    }
}

// Assuming this is a more appropriate default than derived Default
impl Default for Format {
    fn default() -> Self {
        Self {
            provider: "parquet".to_string(),
            options: Default::default(),
        }
    }
}

/// Action that describes the metadata of the table.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MetaData {
    /// Unique identifier for this table
    pub id: Guid,
    /// User-provided identifier for this table
    pub name: Option<String>,
    /// User-provided description for this table
    pub description: Option<String>,
    /// Specification of the encoding for the files stored in the table
    pub format: Format,
    /// Schema of the table
    pub schema_string: String,
    /// An array containing the names of columns by which the data should be partitioned
    pub partition_columns: Vec<String>,
    /// The time when this metadata action is created, in milliseconds since the Unix epoch
    pub created_time: Option<DeltaDataTypeTimestamp>,
    /// A map containing configuration options for the table
    pub configuration: HashMap<String, Option<String>>,
}

impl MetaData {
    /// Returns the table schema from the embedded schema string contained within the metadata
    /// action.
    pub fn get_schema(&self) -> Result<Schema, serde_json::error::Error> {
        serde_json::from_str(&self.schema_string)
    }
}

/// Represents a tombstone (deleted file) in the Delta log.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Clone, Eq, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Remove {
    /// The path of the file that is removed from the table.
    pub path: String,
    /// The timestamp when the remove was added to table state.
    pub deletion_timestamp: Option<DeltaDataTypeTimestamp>,
    /// Whether data is changed by the remove. A table optimize will report this as false for
    /// example, since it adds and removes files by combining many files into one.
    pub data_change: bool,
    /// When true the fields partitionValues, size, and tags are present
    ///
    /// NOTE: Although it's defined as required in scala delta implementation, but some writes
    /// it's still nullable so we keep it as Option<> for compatibly.
    pub extended_file_metadata: Option<bool>,
    /// A map from partition column to value for this file.
    pub partition_values: Option<HashMap<String, Option<String>>>,
    /// Size of this file in bytes
    pub size: Option<DeltaDataTypeLong>,
    /// Map containing metadata about this file
    pub tags: Option<HashMap<String, Option<String>>>,
}

impl Hash for Remove {
    fn hash<H: Hasher>(&self, state: &mut H) {
        self.path.hash(state);
    }
}

/// Borrow `Remove` as str so we can look at tombstones hashset in `DeltaTableState` by using
/// a path from action `Add`.
impl Borrow<str> for Remove {
    fn borrow(&self) -> &str {
        self.path.as_ref()
    }
}

impl PartialEq for Remove {
    fn eq(&self, other: &Self) -> bool {
        self.path == other.path
            && self.deletion_timestamp == other.deletion_timestamp
            && self.data_change == other.data_change
            && self.extended_file_metadata == other.extended_file_metadata
            && self.partition_values == other.partition_values
            && self.size == other.size
            && self.tags == other.tags
    }
}

impl Remove {
    /// Returns the Remove action with path decoded.
    pub fn path_decoded(self) -> Result<Self, ActionError> {
        decode_path(&self.path).map(|path| Self { path, ..self })
    }
}

/// Action used by streaming systems to track progress using application-specific versions to
/// enable idempotency.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Txn {
    /// A unique identifier for the application performing the transaction.
    pub app_id: String,
    /// An application-specific numeric identifier for this transaction.
    pub version: DeltaDataTypeVersion,
    /// The time when this transaction action was created in milliseconds since the Unix epoch.
    pub last_updated: Option<DeltaDataTypeTimestamp>,
}

/// Action used to increase the version of the Delta protocol required to read or write to the
/// table.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Protocol {
    /// Minimum version of the Delta read protocol a client must implement to correctly read the
    /// table.
    pub min_reader_version: DeltaDataTypeInt,
    /// Minimum version of the Delta write protocol a client must implement to correctly read the
    /// table.
    pub min_writer_version: DeltaDataTypeInt,
}

type CommitInfo = Map<String, Value>;

/// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed
/// on the table, so the full list of actions is required to properly read a table.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Action {
    /// Changes the current metadata of the table. Must be present in the first version of a table.
    /// Subsequent `metaData` actions completely overwrite previous metadata.
    metaData(MetaData),
    /// Adds a file to the table state.
    add(Add),
    /// Removes a file from the table state.
    remove(Remove),
    /// Used by streaming systems to track progress externally with application specific version
    /// identifiers.
    txn(Txn),
    /// Describes the minimum reader and writer versions required to read or write to the table.
    protocol(Protocol),
    /// Describes commit provenance information for the table.
    commitInfo(CommitInfo),
}

/// Operation performed when creating a new log entry with one or more actions.
/// This is a key element of the `CommitInfo` action.
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub enum DeltaOperation {
    /// Represents a Delta `Create` operation.
    /// Would usually only create the table, if also data is written,
    /// a `Write` operations is more appropriate
    Create {
        /// The save mode used during the create.
        mode: SaveMode,
        /// The storage location of the new table
        location: String,
        /// The min reader and writer protocol versions of the table
        protocol: Protocol,
        /// Metadata associated with the new table
        metadata: DeltaTableMetaData,
    },

    /// Represents a Delta `Write` operation.
    /// Write operations will typically only include `Add` actions.
    #[serde(rename_all = "camelCase")]
    Write {
        /// The save mode used during the write.
        mode: SaveMode,
        /// The columns the write is partitioned by.
        partition_by: Option<Vec<String>>,
        /// The predicate used during the write.
        predicate: Option<String>,
    },
    /// Represents a Delta `StreamingUpdate` operation.
    #[serde(rename_all = "camelCase")]
    StreamingUpdate {
        /// The output mode the streaming writer is using.
        output_mode: OutputMode,
        /// The query id of the streaming writer.
        query_id: String,
        /// The epoch id of the written micro-batch.
        epoch_id: i64,
    },

    #[serde(rename_all = "camelCase")]
    /// Represents a `Optimize` operation
    Optimize {
        // TODO: Create a string representation of the filter passed to optimize
        /// The filter used to determine which partitions to filter
        predicate: Option<String>,
        /// Target optimize size
        target_size: DeltaDataTypeLong,
    }, // TODO: Add more operations
}

impl DeltaOperation {
    /// Retrieve basic commit information to be added to Delta commits
    pub fn get_commit_info(&self) -> Map<String, Value> {
        let mut commit_info = Map::<String, Value>::new();
        let operation = match &self {
            DeltaOperation::Create { .. } => "delta-rs.Create",
            DeltaOperation::Write { .. } => "delta-rs.Write",
            DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate",
            DeltaOperation::Optimize { .. } => "delta-rs.Optimize",
        };
        commit_info.insert(
            "operation".to_string(),
            serde_json::Value::String(operation.into()),
        );

        if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) {
            commit_info.insert(
                "operationParameters".to_string(),
                map.values().next().unwrap().clone(),
            );
        };

        commit_info
    }
}

/// The SaveMode used when performing a DeltaOperation
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SaveMode {
    /// Files will be appended to the target location.
    Append,
    /// The target location will be overwritten.
    Overwrite,
    /// If files exist for the target, the operation must fail.
    ErrorIfExists,
    /// If files exist for the target, the operation must not proceed or change any data.
    Ignore,
}

/// The OutputMode used in streaming operations.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum OutputMode {
    /// Only new rows will be written when new data is available.
    Append,
    /// The full output (all rows) will be written whenever new data is available.
    Complete,
    /// Only rows with updates will be written when new or changed data is available.
    Update,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_load_table_stats() {
        let action = Add {
            stats: Some(
                serde_json::json!({
                    "numRecords": 22,
                    "minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
                    "maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
                    "nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
                })
                .to_string(),
            ),
            ..Default::default()
        };

        let stats = action.get_stats().unwrap().unwrap();

        assert_eq!(stats.num_records, 22);

        assert_eq!(
            stats.min_values["a"].as_value().unwrap(),
            &serde_json::json!(1)
        );
        assert_eq!(
            stats.min_values["nested"].as_column().unwrap()["b"]
                .as_value()
                .unwrap(),
            &serde_json::json!(2)
        );
        assert_eq!(
            stats.min_values["nested"].as_column().unwrap()["c"]
                .as_value()
                .unwrap(),
            &serde_json::json!("a")
        );

        assert_eq!(
            stats.max_values["a"].as_value().unwrap(),
            &serde_json::json!(10)
        );
        assert_eq!(
            stats.max_values["nested"].as_column().unwrap()["b"]
                .as_value()
                .unwrap(),
            &serde_json::json!(20)
        );
        assert_eq!(
            stats.max_values["nested"].as_column().unwrap()["c"]
                .as_value()
                .unwrap(),
            &serde_json::json!("z")
        );

        assert_eq!(stats.null_count["a"].as_value().unwrap(), 1);
        assert_eq!(
            stats.null_count["nested"].as_column().unwrap()["b"]
                .as_value()
                .unwrap(),
            0
        );
        assert_eq!(
            stats.null_count["nested"].as_column().unwrap()["c"]
                .as_value()
                .unwrap(),
            1
        );
    }
}