use std::any::type_name;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::str::FromStr;
use std::sync::LazyLock;
use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::schema::{SchemaRef, StructType};
use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor as _};
use visitors::{MetadataVisitor, ProtocolVisitor};
use delta_kernel_derive::Schema;
use serde::{Deserialize, Serialize};
pub mod deletion_vector;
pub mod set_transaction;
pub(crate) mod schemas;
#[cfg(feature = "developer-visibility")]
pub mod visitors;
#[cfg(not(feature = "developer-visibility"))]
pub(crate) mod visitors;
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const ADD_NAME: &str = "add";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const REMOVE_NAME: &str = "remove";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const METADATA_NAME: &str = "metaData";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([
Option::<Add>::get_struct_field(ADD_NAME),
Option::<Remove>::get_struct_field(REMOVE_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
])
.into()
});
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into()
});
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_schema() -> &'static SchemaRef {
&LOG_SCHEMA
}
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Format {
pub provider: String,
pub options: HashMap<String, String>,
}
impl Default for Format {
fn default() -> Self {
Self {
provider: String::from("parquet"),
options: HashMap::new(),
}
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Metadata {
pub id: String,
pub name: Option<String>,
pub description: Option<String>,
pub format: Format,
pub schema_string: String,
pub partition_columns: Vec<String>,
pub created_time: Option<i64>,
pub configuration: HashMap<String, String>,
}
impl Metadata {
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}
pub fn parse_schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
}
pub fn parse_table_properties(&self) -> TableProperties {
TableProperties::from(self.configuration.iter())
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Schema, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Protocol {
min_reader_version: i32,
min_writer_version: i32,
#[serde(skip_serializing_if = "Option::is_none")]
reader_features: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
writer_features: Option<Vec<String>>,
}
impl Protocol {
pub fn try_new(
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<impl IntoIterator<Item = impl Into<String>>>,
writer_features: Option<impl IntoIterator<Item = impl Into<String>>>,
) -> DeltaResult<Self> {
if min_reader_version == 3 {
require!(
reader_features.is_some(),
Error::invalid_protocol(
"Reader features must be present when minimum reader version = 3"
)
);
}
if min_writer_version == 7 {
require!(
writer_features.is_some(),
Error::invalid_protocol(
"Writer features must be present when minimum writer version = 7"
)
);
}
let reader_features = reader_features.map(|f| f.into_iter().map(Into::into).collect());
let writer_features = writer_features.map(|f| f.into_iter().map(Into::into).collect());
Ok(Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
})
}
pub fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Protocol>> {
let mut visitor = ProtocolVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.protocol)
}
pub fn min_reader_version(&self) -> i32 {
self.min_reader_version
}
pub fn min_writer_version(&self) -> i32 {
self.min_writer_version
}
pub fn reader_features(&self) -> Option<&[String]> {
self.reader_features.as_deref()
}
pub fn writer_features(&self) -> Option<&[String]> {
self.writer_features.as_deref()
}
pub fn has_reader_feature(&self, feature: &ReaderFeatures) -> bool {
self.reader_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
}
pub fn has_writer_feature(&self, feature: &WriterFeatures) -> bool {
self.writer_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
}
pub fn ensure_read_supported(&self) -> DeltaResult<()> {
match &self.reader_features {
Some(reader_features) if self.min_reader_version == 3 => {
ensure_supported_features(reader_features, &SUPPORTED_READER_FEATURES)
}
None if self.min_reader_version == 3 => Err(Error::internal_error(
"Reader features must be present when minimum reader version = 3",
)),
None if self.min_reader_version == 1 || self.min_reader_version == 2 => Ok(()),
Some(_) if self.min_reader_version == 1 || self.min_reader_version == 2 => {
Err(Error::internal_error(
"Reader features must not be present when minimum reader version = 1 or 2",
))
}
_ => Err(Error::Unsupported(format!(
"Unsupported minimum reader version {}",
self.min_reader_version
))),
}
}
pub fn ensure_write_supported(&self) -> DeltaResult<()> {
match &self.writer_features {
Some(writer_features)
if self.min_reader_version == 3 && self.min_writer_version == 7 =>
{
ensure_supported_features(writer_features, &SUPPORTED_WRITER_FEATURES)
}
_ => Err(Error::unsupported(
"Only tables with min reader version 3 and min writer version 7 with no table features are supported."
)),
}
}
}
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
where
<T as FromStr>::Err: Display,
T: Debug + FromStr + Hash + Eq,
{
let error = |unsupported, unsupported_or_unknown| {
let supported = supported_features.iter().collect::<Vec<_>>();
let features_type = type_name::<T>()
.rsplit("::")
.next()
.unwrap_or("table features");
Error::Unsupported(format!(
"{} {} {:?}. Supported {} are {:?}",
unsupported_or_unknown, features_type, unsupported, features_type, supported
))
};
let parsed_features: HashSet<T> = table_features
.iter()
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
.collect::<Result<_, Error>>()?;
parsed_features
.is_subset(supported_features)
.then_some(())
.ok_or_else(|| {
let unsupported = parsed_features
.difference(supported_features)
.map(|f| format!("{:?}", f))
.collect::<Vec<_>>();
error(unsupported, "Unsupported")
})
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
pub(crate) timestamp: Option<i64>,
pub(crate) operation: Option<String>,
pub(crate) operation_parameters: Option<HashMap<String, String>>,
pub(crate) kernel_version: Option<String>,
pub(crate) engine_commit_info: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub struct Add {
pub path: String,
#[drop_null_container_values]
pub partition_values: HashMap<String, String>,
pub size: i64,
pub modification_time: i64,
pub data_change: bool,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub tags: Option<HashMap<String, String>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub deletion_vector: Option<DeletionVectorDescriptor>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub default_row_commit_version: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub clustering_provider: Option<String>,
}
impl Add {
pub fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.as_ref().map(|dv| dv.unique_id())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
pub(crate) path: String,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_timestamp: Option<i64>,
pub(crate) data_change: bool,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) extended_file_metadata: Option<bool>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) partition_values: Option<HashMap<String, String>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) base_row_id: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) default_row_commit_version: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
pub path: String,
#[drop_null_container_values]
pub partition_values: HashMap<String, String>,
pub size: i64,
pub data_change: bool,
pub tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
pub struct SetTransaction {
pub app_id: String,
pub version: i64,
pub last_updated: Option<i64>,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::schema::{ArrayType, DataType, MapType, StructField};
#[test]
fn test_metadata_schema() {
let schema = get_log_schema()
.project(&[METADATA_NAME])
.expect("Couldn't get metaData field");
let expected = Arc::new(StructType::new([StructField::new(
"metaData",
StructType::new([
StructField::new("id", DataType::STRING, false),
StructField::new("name", DataType::STRING, true),
StructField::new("description", DataType::STRING, true),
StructField::new(
"format",
StructType::new([
StructField::new("provider", DataType::STRING, false),
StructField::new(
"options",
MapType::new(DataType::STRING, DataType::STRING, false),
false,
),
]),
false,
),
StructField::new("schemaString", DataType::STRING, false),
StructField::new(
"partitionColumns",
ArrayType::new(DataType::STRING, false),
false,
),
StructField::new("createdTime", DataType::LONG, true),
StructField::new(
"configuration",
MapType::new(DataType::STRING, DataType::STRING, false),
false,
),
]),
true,
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_add_schema() {
let schema = get_log_schema()
.project(&[ADD_NAME])
.expect("Couldn't get add field");
let expected = Arc::new(StructType::new([StructField::new(
"add",
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
false,
),
StructField::new("size", DataType::LONG, false),
StructField::new("modificationTime", DataType::LONG, false),
StructField::new("dataChange", DataType::BOOLEAN, false),
StructField::new("stats", DataType::STRING, true),
StructField::new(
"tags",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
deletion_vector_field(),
StructField::new("baseRowId", DataType::LONG, true),
StructField::new("defaultRowCommitVersion", DataType::LONG, true),
StructField::new("clusteringProvider", DataType::STRING, true),
]),
true,
)]));
assert_eq!(schema, expected);
}
fn tags_field() -> StructField {
StructField::new(
"tags",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
)
}
fn partition_values_field() -> StructField {
StructField::new(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
)
}
fn deletion_vector_field() -> StructField {
StructField::new(
"deletionVector",
DataType::struct_type([
StructField::new("storageType", DataType::STRING, false),
StructField::new("pathOrInlineDv", DataType::STRING, false),
StructField::new("offset", DataType::INTEGER, true),
StructField::new("sizeInBytes", DataType::INTEGER, false),
StructField::new("cardinality", DataType::LONG, false),
]),
true,
)
}
#[test]
fn test_remove_schema() {
let schema = get_log_schema()
.project(&[REMOVE_NAME])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new([StructField::new(
"remove",
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("deletionTimestamp", DataType::LONG, true),
StructField::new("dataChange", DataType::BOOLEAN, false),
StructField::new("extendedFileMetadata", DataType::BOOLEAN, true),
partition_values_field(),
StructField::new("size", DataType::LONG, true),
tags_field(),
deletion_vector_field(),
StructField::new("baseRowId", DataType::LONG, true),
StructField::new("defaultRowCommitVersion", DataType::LONG, true),
]),
true,
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new([StructField::new(
"cdc",
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
false,
),
StructField::new("size", DataType::LONG, false),
StructField::new("dataChange", DataType::BOOLEAN, false),
tags_field(),
]),
true,
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
.project(&["txn"])
.expect("Couldn't get transaction field");
let expected = Arc::new(StructType::new([StructField::new(
"txn",
StructType::new([
StructField::new("appId", DataType::STRING, false),
StructField::new("version", DataType::LONG, false),
StructField::new("lastUpdated", DataType::LONG, true),
]),
true,
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_commit_info_schema() {
let schema = get_log_schema()
.project(&["commitInfo"])
.expect("Couldn't get commitInfo field");
let expected = Arc::new(StructType::new(vec![StructField::new(
"commitInfo",
StructType::new(vec![
StructField::new("timestamp", DataType::LONG, true),
StructField::new("operation", DataType::STRING, true),
StructField::new(
"operationParameters",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
StructField::new("kernelVersion", DataType::STRING, true),
StructField::new(
"engineCommitInfo",
MapType::new(DataType::STRING, DataType::STRING, false),
true,
),
]),
true,
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_validate_protocol() {
let invalid_protocols = [
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: Some(vec![]),
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: None,
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: None,
},
];
for Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
} in invalid_protocols
{
assert!(matches!(
Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features
),
Err(Error::InvalidProtocol(_)),
));
}
}
#[test]
fn test_v2_checkpoint_unsupported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
let protocol = Protocol::try_new(
4,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
}
#[test]
fn test_ensure_read_supported() {
let protocol = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: Some(vec![]),
};
assert!(protocol.ensure_read_supported().is_ok());
let empty_features: [String; 0] = [];
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
let protocol = Protocol::try_new(
3,
7,
Some(&empty_features),
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_ok());
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 7,
reader_features: None,
writer_features: None,
};
assert!(protocol.ensure_read_supported().is_ok());
let protocol = Protocol {
min_reader_version: 2,
min_writer_version: 7,
reader_features: None,
writer_features: None,
};
assert!(protocol.ensure_read_supported().is_ok());
}
#[test]
fn test_ensure_write_supported() {
let protocol = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: Some(vec![]),
};
assert!(protocol.ensure_write_supported().is_ok());
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
Some([WriterFeatures::DeletionVectors]),
)
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
}
#[test]
fn test_ensure_supported_features() {
let supported_features = [
ReaderFeatures::ColumnMapping,
ReaderFeatures::DeletionVectors,
]
.into_iter()
.collect();
let table_features = vec![ReaderFeatures::ColumnMapping.to_string()];
ensure_supported_features(&table_features, &supported_features).unwrap();
let table_features = vec![ReaderFeatures::ColumnMapping.to_string(), "idk".to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
match error {
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [ColumnMapping, DeletionVectors]"
=> {},
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [DeletionVectors, ColumnMapping]"
=> {},
_ => panic!("Expected unsupported error"),
}
}
}