use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use delta_kernel_derive::{internal_api, IntoEngineData, ToSchema};
use serde::{Deserialize, Serialize};
use url::Url;
use visitors::{MetadataVisitor, ProtocolVisitor};
use self::deletion_vector::DeletionVectorDescriptor;
use crate::expressions::{MapData, Scalar, StructData};
use crate::schema::{DataType, MapType, SchemaRef, StructField, StructType, ToSchema as _};
use crate::table_features::{
FeatureType, IntoTableFeature, TableFeature, MIN_VALID_RW_VERSION,
TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension as _, FileMeta,
IntoEngineData, RowVisitor as _,
};
const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION");
const UNKNOWN_OPERATION: &str = "UNKNOWN";
pub mod deletion_vector;
pub mod deletion_vector_writer;
pub mod set_transaction;
#[cfg(feature = "internal-api")]
pub mod visitors;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod visitors;
#[internal_api]
pub(crate) const ADD_NAME: &str = "add";
#[internal_api]
pub(crate) const REMOVE_NAME: &str = "remove";
#[internal_api]
pub(crate) const METADATA_NAME: &str = "metaData";
#[internal_api]
pub(crate) const PROTOCOL_NAME: &str = "protocol";
#[internal_api]
pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
#[internal_api]
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[internal_api]
pub(crate) const CDC_NAME: &str = "cdc";
#[internal_api]
pub(crate) const SIDECAR_NAME: &str = "sidecar";
#[internal_api]
pub(crate) const CHECKPOINT_METADATA_NAME: &str = "checkpointMetadata";
#[internal_api]
pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";
pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";
static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
StructField::nullable(COMMIT_INFO_NAME, CommitInfo::to_schema()),
StructField::nullable(CDC_NAME, Cdc::to_schema()),
StructField::nullable(DOMAIN_METADATA_NAME, DomainMetadata::to_schema()),
]))
});
static ALL_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(
get_commit_schema().fields().cloned().chain([
StructField::nullable(CHECKPOINT_METADATA_NAME, CheckpointMetadata::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
]),
))
});
static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME,
Add::to_schema(),
)]))
});
static LOG_REMOVE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
REMOVE_NAME,
Remove::to_schema(),
)]))
});
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
COMMIT_INFO_NAME,
CommitInfo::to_schema(),
)]))
});
static LOG_TXN_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
SET_TRANSACTION_NAME,
SetTransaction::to_schema(),
)]))
});
static LOG_DOMAIN_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
DOMAIN_METADATA_NAME,
DomainMetadata::to_schema(),
)]))
});
#[internal_api]
pub(crate) fn get_commit_schema() -> &'static SchemaRef {
&COMMIT_SCHEMA
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn get_all_actions_schema() -> &'static SchemaRef {
&ALL_ACTIONS_SCHEMA
}
#[internal_api]
pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
&LOG_ADD_SCHEMA
}
pub(crate) fn get_log_remove_schema() -> &'static SchemaRef {
&LOG_REMOVE_SCHEMA
}
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}
pub(crate) fn get_log_txn_schema() -> &'static SchemaRef {
&LOG_TXN_SCHEMA
}
pub(crate) fn get_log_domain_metadata_schema() -> &'static SchemaRef {
&LOG_DOMAIN_METADATA_SCHEMA
}
#[internal_api]
pub(crate) fn schema_contains_file_actions(schema: &SchemaRef) -> bool {
schema.contains(ADD_NAME) || schema.contains(REMOVE_NAME)
}
pub(crate) fn as_log_add_schema(schema: SchemaRef) -> SchemaRef {
Arc::new(StructType::new_unchecked([StructField::nullable(
ADD_NAME, schema,
)]))
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[internal_api]
pub(crate) struct Format {
pub(crate) provider: String,
pub(crate) options: HashMap<String, String>,
}
impl Default for Format {
fn default() -> Self {
Self {
provider: String::from("parquet"),
options: HashMap::new(),
}
}
}
impl TryFrom<Format> for Scalar {
type Error = Error;
fn try_from(format: Format) -> DeltaResult<Self> {
let provider = Scalar::from(format.provider);
let options = MapData::try_new(
MapType::new(DataType::STRING, DataType::STRING, false),
format.options,
)
.map(Scalar::Map)?;
Ok(Scalar::Struct(StructData::try_new(
Format::to_schema().into_fields().collect(),
vec![provider, options],
)?))
}
}
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[internal_api]
pub(crate) struct Metadata {
id: String,
name: Option<String>,
description: Option<String>,
format: Format,
schema_string: String,
partition_columns: Vec<String>,
created_time: Option<i64>,
configuration: HashMap<String, String>,
}
impl Metadata {
#[internal_api]
pub(crate) fn try_new(
name: Option<String>,
description: Option<String>,
schema: SchemaRef,
partition_columns: Vec<String>,
created_time: i64,
configuration: HashMap<String, String>,
) -> DeltaResult<Self> {
if let Some(metadata_field) = schema.fields().find(|field| field.is_metadata_column()) {
return Err(Error::Schema(format!(
"Table schema must not contain metadata columns. Found metadata column: '{}'",
metadata_field.name
)));
}
Ok(Self {
id: uuid::Uuid::new_v4().to_string(),
name,
description,
format: Format::default(),
schema_string: serde_json::to_string(&schema)?,
partition_columns,
created_time: Some(created_time),
configuration,
})
}
#[internal_api]
pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Metadata>> {
let mut visitor = MetadataVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.metadata)
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn id(&self) -> &str {
&self.id
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn name(&self) -> Option<&str> {
self.name.as_deref()
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn description(&self) -> Option<&str> {
self.description.as_deref()
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn created_time(&self) -> Option<i64> {
self.created_time
}
#[internal_api]
pub(crate) fn configuration(&self) -> &HashMap<String, String> {
&self.configuration
}
#[internal_api]
#[allow(dead_code)]
pub(crate) fn format_provider(&self) -> &str {
&self.format.provider
}
#[internal_api]
pub(crate) fn schema_string(&self) -> &String {
&self.schema_string
}
#[internal_api]
pub(crate) fn parse_schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
}
#[internal_api]
pub(crate) fn partition_columns(&self) -> &[String] {
&self.partition_columns
}
#[internal_api]
pub(crate) fn parse_table_properties(&self) -> TableProperties {
TableProperties::from(self.configuration.iter())
}
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_unchecked(
id: impl Into<String>,
name: Option<String>,
description: Option<String>,
format: Format,
schema_string: impl Into<String>,
partition_columns: Vec<String>,
created_time: Option<i64>,
configuration: HashMap<String, String>,
) -> Self {
Self {
id: id.into(),
name,
description,
format,
schema_string: schema_string.into(),
partition_columns,
created_time,
configuration,
}
}
}
impl IntoEngineData for Metadata {
fn into_engine_data(
self,
schema: SchemaRef,
engine: &dyn Engine,
) -> DeltaResult<Box<dyn EngineData>> {
let values = [
self.id.into(),
self.name.into(),
self.description.into(),
self.format.provider.into(),
self.format.options.try_into()?,
self.schema_string.into(),
self.partition_columns.try_into()?,
self.created_time.into(),
self.configuration.try_into()?,
];
engine.evaluation_handler().create_one(schema, &values)
}
}
#[derive(
Default, Debug, Clone, PartialEq, Eq, ToSchema, Serialize, Deserialize, IntoEngineData,
)]
#[serde(rename_all = "camelCase")]
#[internal_api]
pub(crate) struct Protocol {
min_reader_version: i32,
min_writer_version: i32,
#[serde(skip_serializing_if = "Option::is_none")]
reader_features: Option<Vec<TableFeature>>,
#[serde(skip_serializing_if = "Option::is_none")]
writer_features: Option<Vec<TableFeature>>,
}
fn parse_features(
features: Option<impl IntoIterator<Item = impl IntoTableFeature>>,
) -> Option<Vec<TableFeature>> {
let features = features?.into_iter().map(|f| f.into_table_feature());
Some(features.collect())
}
impl Protocol {
pub(crate) fn try_new_modern(
reader_features: impl IntoIterator<Item = impl IntoTableFeature>,
writer_features: impl IntoIterator<Item = impl IntoTableFeature>,
) -> DeltaResult<Self> {
Self::try_new(
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
Some(reader_features),
Some(writer_features),
)
}
#[cfg(test)]
pub(crate) fn try_new_legacy(
min_reader_version: i32,
min_writer_version: i32,
) -> DeltaResult<Self> {
Self::try_new(
min_reader_version,
min_writer_version,
TableFeature::NO_LIST,
TableFeature::NO_LIST,
)
}
pub(crate) fn try_new(
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<impl IntoIterator<Item = impl IntoTableFeature>>,
writer_features: Option<impl IntoIterator<Item = impl IntoTableFeature>>,
) -> DeltaResult<Self> {
require!(
min_reader_version >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_reader_version must be >= {MIN_VALID_RW_VERSION}, got {min_reader_version}"
))
);
require!(
min_writer_version >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_writer_version must be >= {MIN_VALID_RW_VERSION}, got {min_writer_version}"
))
);
let reader_features = parse_features(reader_features);
let writer_features = parse_features(writer_features);
if min_reader_version == TABLE_FEATURES_MIN_READER_VERSION {
require!(
reader_features.is_some(),
Error::invalid_protocol(
"Reader features must be present when minimum reader version = 3"
)
);
} else {
require!(
reader_features.is_none(),
Error::invalid_protocol(
"Reader features must not be present when minimum reader version != 3"
)
);
}
if min_writer_version == TABLE_FEATURES_MIN_WRITER_VERSION {
require!(
writer_features.is_some(),
Error::invalid_protocol(
"Writer features must be present when minimum writer version = 7"
)
);
} else {
require!(
writer_features.is_none(),
Error::invalid_protocol(
"Writer features must not be present when minimum writer version != 7"
)
);
}
match (&reader_features, &writer_features) {
(Some(reader_features), Some(writer_features)) => {
let check_r = reader_features.iter().all(|feature| {
matches!(
feature.feature_type(),
FeatureType::ReaderWriter | FeatureType::Unknown
) && writer_features.contains(feature)
});
require!(
check_r,
Error::invalid_protocol(
"Reader features must contain only ReaderWriter features that are also listed in writer features"
)
);
let check_w = writer_features
.iter()
.all(|feature| match feature.feature_type() {
FeatureType::WriterOnly | FeatureType::Unknown => true,
FeatureType::ReaderWriter => reader_features.contains(feature),
});
require!(
check_w,
Error::invalid_protocol(
"Writer features must be Writer-only or also listed in reader features"
)
);
Ok(())
}
(None, None) => Ok(()),
(None, Some(writer_features)) => {
let is_valid = writer_features.iter().all(|feature| {
match feature.feature_type() {
FeatureType::WriterOnly | FeatureType::Unknown => true,
FeatureType::ReaderWriter => {
min_reader_version == 2 && feature == &TableFeature::ColumnMapping
}
}
});
require!(
is_valid,
Error::invalid_protocol(
"Writer features must be Writer-only or also listed in reader features"
)
);
Ok(())
}
(Some(_), None) => Err(Error::invalid_protocol(
"Reader features should be present in writer features",
)),
}?;
Ok(Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
})
}
pub(crate) fn try_new_from_data(data: &dyn EngineData) -> DeltaResult<Option<Protocol>> {
let mut visitor = ProtocolVisitor::default();
visitor.visit_rows_of(data)?;
Ok(visitor.protocol)
}
#[internal_api]
pub(crate) fn min_reader_version(&self) -> i32 {
self.min_reader_version
}
#[internal_api]
pub(crate) fn min_writer_version(&self) -> i32 {
self.min_writer_version
}
#[internal_api]
pub(crate) fn reader_features(&self) -> Option<&[TableFeature]> {
self.reader_features.as_deref()
}
#[internal_api]
pub(crate) fn writer_features(&self) -> Option<&[TableFeature]> {
self.writer_features.as_deref()
}
pub(crate) fn has_table_feature(&self, feature: &TableFeature) -> bool {
self.writer_features()
.is_some_and(|features| features.contains(feature))
}
#[cfg(test)]
pub(crate) fn new_unchecked(
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<Vec<TableFeature>>,
writer_features: Option<Vec<TableFeature>>,
) -> Self {
Self {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema, IntoEngineData)]
#[internal_api]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub(crate) struct CommitInfo {
pub(crate) timestamp: Option<i64>,
pub(crate) in_commit_timestamp: Option<i64>,
pub(crate) operation: Option<String>,
pub(crate) operation_parameters: Option<HashMap<String, String>>,
pub(crate) kernel_version: Option<String>,
pub(crate) is_blind_append: Option<bool>,
pub(crate) engine_info: Option<String>,
pub(crate) txn_id: Option<String>,
}
impl CommitInfo {
pub(crate) fn new(
timestamp: i64,
in_commit_timestamp: Option<i64>,
operation: Option<String>,
engine_info: Option<String>,
is_blind_append: bool,
) -> Self {
Self {
timestamp: Some(timestamp),
in_commit_timestamp,
operation: Some(operation.unwrap_or_else(|| UNKNOWN_OPERATION.to_string())),
operation_parameters: Some(HashMap::new()),
kernel_version: Some(format!("v{KERNEL_VERSION}")),
is_blind_append: is_blind_append.then_some(true),
engine_info,
txn_id: Some(uuid::Uuid::new_v4().to_string()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[cfg_attr(
test,
derive(Serialize, Deserialize, Default),
serde(rename_all = "camelCase")
)]
#[internal_api]
pub(crate) struct Add {
pub(crate) path: String,
#[allow_null_container_values]
pub(crate) partition_values: HashMap<String, String>,
pub(crate) size: i64,
pub(crate) modification_time: i64,
pub(crate) data_change: bool,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub tags: Option<HashMap<String, Option<String>>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub deletion_vector: Option<DeletionVectorDescriptor>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub default_row_commit_version: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub clustering_provider: Option<String>,
}
impl Add {
#[internal_api]
#[allow(dead_code)]
pub(crate) fn dv_unique_id(&self) -> Option<String> {
self.deletion_vector.as_ref().map(|dv| dv.unique_id())
}
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[internal_api]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub(crate) struct Remove {
pub(crate) path: String,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_timestamp: Option<i64>,
pub(crate) data_change: bool,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) extended_file_metadata: Option<bool>,
#[allow_null_container_values]
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) partition_values: Option<HashMap<String, String>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,
#[allow_null_container_values]
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) base_row_id: Option<i64>,
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) default_row_commit_version: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[internal_api]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub(crate) struct Cdc {
pub path: String,
#[allow_null_container_values]
pub partition_values: HashMap<String, String>,
pub size: i64,
pub data_change: bool,
#[allow_null_container_values]
pub tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, IntoEngineData)]
#[serde(rename_all = "camelCase")]
#[internal_api]
pub(crate) struct SetTransaction {
pub(crate) app_id: String,
pub(crate) version: i64,
pub(crate) last_updated: Option<i64>,
}
impl SetTransaction {
pub(crate) fn new(app_id: String, version: i64, last_updated: Option<i64>) -> Self {
Self {
app_id,
version,
last_updated,
}
}
}
#[derive(ToSchema, Debug, PartialEq)]
#[internal_api]
pub(crate) struct Sidecar {
pub path: String,
pub size_in_bytes: i64,
pub modification_time: i64,
#[allow_null_container_values]
pub tags: Option<HashMap<String, String>>,
}
impl Sidecar {
pub(crate) fn to_filemeta(&self, log_root: &Url) -> DeltaResult<FileMeta> {
Ok(FileMeta {
location: log_root.join("_sidecars/")?.join(&self.path)?,
last_modified: self.modification_time,
size: self.size_in_bytes.try_into().map_err(|_| {
Error::generic(format!(
"Failed to convert sidecar size {} to usize",
self.size_in_bytes
))
})?,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[internal_api]
pub(crate) struct CheckpointMetadata {
pub(crate) version: i64,
#[allow_null_container_values]
pub(crate) tags: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema, IntoEngineData)]
#[internal_api]
pub(crate) struct DomainMetadata {
domain: String,
configuration: String,
removed: bool,
}
impl DomainMetadata {
pub(crate) fn new(domain: String, configuration: String) -> Self {
Self {
domain,
configuration,
removed: false,
}
}
pub(crate) fn remove(domain: String, configuration: String) -> Self {
Self {
domain,
configuration,
removed: true,
}
}
#[allow(unused)]
#[internal_api]
pub(crate) fn is_internal(&self) -> bool {
self.domain.starts_with(INTERNAL_DOMAIN_PREFIX)
}
#[internal_api]
pub(crate) fn domain(&self) -> &str {
&self.domain
}
#[internal_api]
pub(crate) fn configuration(&self) -> &str {
&self.configuration
}
pub(crate) fn is_removed(&self) -> bool {
self.removed
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;
use super::set_transaction::is_set_txn_expired;
use super::*;
use crate::arrow::array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
};
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use crate::arrow::json::ReaderBuilder;
use crate::engine::arrow_data::EngineDataArrowExt as _;
use crate::engine::arrow_expression::ArrowEvaluationHandler;
use crate::schema::{ArrayType, DataType, MapType, StructField};
use crate::utils::test_utils::assert_result_error_with_message;
use crate::{
Engine, EvaluationHandler, IntoEngineData, JsonHandler, ParquetHandler, StorageHandler,
};
struct ExprEngine(Arc<dyn EvaluationHandler>);
impl ExprEngine {
fn new() -> Self {
ExprEngine(Arc::new(ArrowEvaluationHandler))
}
}
impl Engine for ExprEngine {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler> {
self.0.clone()
}
fn json_handler(&self) -> Arc<dyn JsonHandler> {
unimplemented!()
}
fn parquet_handler(&self) -> Arc<dyn ParquetHandler> {
unimplemented!()
}
fn storage_handler(&self) -> Arc<dyn StorageHandler> {
unimplemented!()
}
}
fn create_string_map_builder(
nullable_values: bool,
) -> MapBuilder<StringBuilder, StringBuilder> {
MapBuilder::new(
Some(MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
}),
StringBuilder::new(),
StringBuilder::new(),
)
.with_values_field(Field::new(
"value".to_string(),
ArrowDataType::Utf8,
nullable_values,
))
}
#[rstest]
#[case::no_expiration_configured(None, Some(1000), false)]
#[case::null_last_updated_never_expires(Some(5000), None, false)]
#[case::both_none(None, None, false)]
#[case::last_updated_before_expiration(Some(2000), Some(1000), true)]
#[case::last_updated_at_expiration(Some(1000), Some(1000), true)]
#[case::last_updated_after_expiration(Some(2000), Some(3000), false)]
fn test_is_set_txn_expired(
#[case] expiration_timestamp: Option<i64>,
#[case] last_updated: Option<i64>,
#[case] expected: bool,
) {
assert_eq!(
is_set_txn_expired(expiration_timestamp, last_updated),
expected
);
}
#[test]
fn test_metadata_schema() {
let schema = get_commit_schema()
.project(&[METADATA_NAME])
.expect("Couldn't get metaData field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"metaData",
StructType::new_unchecked([
StructField::not_null("id", DataType::STRING),
StructField::nullable("name", DataType::STRING),
StructField::nullable("description", DataType::STRING),
StructField::not_null(
"format",
StructType::new_unchecked([
StructField::not_null("provider", DataType::STRING),
StructField::not_null(
"options",
MapType::new(DataType::STRING, DataType::STRING, false),
),
]),
),
StructField::not_null("schemaString", DataType::STRING),
StructField::not_null("partitionColumns", ArrayType::new(DataType::STRING, false)),
StructField::nullable("createdTime", DataType::LONG),
StructField::not_null(
"configuration",
MapType::new(DataType::STRING, DataType::STRING, false),
),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_add_schema() {
let schema = get_commit_schema()
.project(&[ADD_NAME])
.expect("Couldn't get add field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"add",
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
StructField::nullable("stats", DataType::STRING),
StructField::nullable(
"tags",
MapType::new(DataType::STRING, DataType::STRING, true),
),
deletion_vector_field(),
StructField::nullable("baseRowId", DataType::LONG),
StructField::nullable("defaultRowCommitVersion", DataType::LONG),
StructField::nullable("clusteringProvider", DataType::STRING),
]),
)]));
assert_eq!(schema, expected);
}
fn tags_field() -> StructField {
StructField::nullable(
"tags",
MapType::new(DataType::STRING, DataType::STRING, true),
)
}
fn partition_values_field() -> StructField {
StructField::nullable(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
)
}
fn deletion_vector_field() -> StructField {
StructField::nullable(
"deletionVector",
DataType::struct_type_unchecked([
StructField::not_null("storageType", DataType::STRING),
StructField::not_null("pathOrInlineDv", DataType::STRING),
StructField::nullable("offset", DataType::INTEGER),
StructField::not_null("sizeInBytes", DataType::INTEGER),
StructField::not_null("cardinality", DataType::LONG),
]),
)
}
#[test]
fn test_remove_schema() {
let schema = get_commit_schema()
.project(&[REMOVE_NAME])
.expect("Couldn't get remove field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"remove",
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("deletionTimestamp", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
StructField::nullable("extendedFileMetadata", DataType::BOOLEAN),
partition_values_field(),
StructField::nullable("size", DataType::LONG),
StructField::nullable("stats", DataType::STRING),
tags_field(),
deletion_vector_field(),
StructField::nullable("baseRowId", DataType::LONG),
StructField::nullable("defaultRowCommitVersion", DataType::LONG),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_cdc_schema() {
let schema = get_commit_schema()
.project(&[CDC_NAME])
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"cdc",
StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("dataChange", DataType::BOOLEAN),
tags_field(),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_sidecar_schema() {
let schema = Sidecar::to_schema();
let expected = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]);
assert_eq!(schema, expected);
}
#[test]
fn test_checkpoint_metadata_schema() {
let schema = get_all_actions_schema()
.project(&[CHECKPOINT_METADATA_NAME])
.expect("Couldn't get checkpointMetadata field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"checkpointMetadata",
StructType::new_unchecked([
StructField::not_null("version", DataType::LONG),
tags_field(),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_transaction_schema() {
let schema = get_commit_schema()
.project(&["txn"])
.expect("Couldn't get transaction field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"txn",
StructType::new_unchecked([
StructField::not_null("appId", DataType::STRING),
StructField::not_null("version", DataType::LONG),
StructField::nullable("lastUpdated", DataType::LONG),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_commit_info_schema() {
let schema = get_commit_schema()
.project(&["commitInfo"])
.expect("Couldn't get commitInfo field");
let expected = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
"commitInfo",
StructType::new_unchecked(vec![
StructField::nullable("timestamp", DataType::LONG),
StructField::nullable("inCommitTimestamp", DataType::LONG),
StructField::nullable("operation", DataType::STRING),
StructField::nullable(
"operationParameters",
MapType::new(DataType::STRING, DataType::STRING, false),
),
StructField::nullable("kernelVersion", DataType::STRING),
StructField::nullable("isBlindAppend", DataType::BOOLEAN),
StructField::nullable("engineInfo", DataType::STRING),
StructField::nullable("txnId", DataType::STRING),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_domain_metadata_schema() {
let schema = get_commit_schema()
.project(&[DOMAIN_METADATA_NAME])
.expect("Couldn't get domainMetadata field");
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
"domainMetadata",
StructType::new_unchecked([
StructField::not_null("domain", DataType::STRING),
StructField::not_null("configuration", DataType::STRING),
StructField::not_null("removed", DataType::BOOLEAN),
]),
)]));
assert_eq!(schema, expected);
}
#[test]
fn test_validate_protocol() {
let invalid_protocols = [
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: Some(vec![]),
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec![]),
writer_features: None,
},
Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: None,
writer_features: None,
},
];
for Protocol {
min_reader_version,
min_writer_version,
reader_features,
writer_features,
} in invalid_protocols
{
assert!(matches!(
Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features
),
Err(Error::InvalidProtocol(_)),
));
}
}
#[rstest]
#[case(0, 1)]
#[case(1, 0)]
#[case(-1, 2)]
#[case(1, -1)]
fn reject_protocol_version_below_minimum(#[case] rv: i32, #[case] wv: i32) {
let expected = if rv < 1 {
format!("Invalid protocol action in the delta log: min_reader_version must be >= 1, got {rv}")
} else {
format!("Invalid protocol action in the delta log: min_writer_version must be >= 1, got {wv}")
};
assert_result_error_with_message(
Protocol::try_new(rv, wv, TableFeature::NO_LIST, TableFeature::NO_LIST),
&expected,
);
}
#[test]
fn accept_min_versions() {
let p = Protocol::try_new_legacy(1, 1).unwrap();
assert_eq!(p.min_reader_version(), 1);
assert_eq!(p.min_writer_version(), 1);
}
#[test]
fn test_validate_table_features_invalid() {
let invalid_features = [
(
vec![TableFeature::DeletionVectors],
vec![TableFeature::AppendOnly],
"Reader features must contain only ReaderWriter features that are also listed in writer features",
),
(
vec![TableFeature::DeletionVectors],
vec![],
"Reader features must contain only ReaderWriter features that are also listed in writer features",
),
(
vec![],
vec![TableFeature::DeletionVectors],
"Writer features must be Writer-only or also listed in reader features",
),
(
vec![TableFeature::VariantType],
vec![
TableFeature::VariantType,
TableFeature::DeletionVectors,
],
"Writer features must be Writer-only or also listed in reader features",
),
(
vec![TableFeature::AppendOnly],
vec![TableFeature::AppendOnly],
"Reader features must contain only ReaderWriter features that are also listed in writer features",
),
];
for (reader_features, writer_features, error_msg) in invalid_features {
let res = Protocol::try_new_modern(reader_features, writer_features);
assert!(
matches!(
&res,
Err(Error::InvalidProtocol(error)) if error.to_string().eq(error_msg)
),
"Expected:\t{error_msg}\nBut got:{res:?}\n"
);
}
}
#[test]
fn test_validate_table_features_unknown() {
let protocol = Protocol::try_new_modern(
vec![TableFeature::Unknown("unknown_reader".to_string())],
vec![TableFeature::Unknown("unknown_reader".to_string())],
);
assert!(protocol.is_ok());
let protocol = Protocol::try_new_modern(
TableFeature::EMPTY_LIST,
vec![TableFeature::Unknown("unknown_writer".to_string())],
);
assert!(protocol.is_ok());
}
#[test]
fn test_validate_table_features_valid() {
let valid_features = [
(
vec![TableFeature::DeletionVectors],
vec![TableFeature::DeletionVectors],
),
(vec![], vec![TableFeature::AppendOnly]),
(
vec![TableFeature::VariantType],
vec![TableFeature::VariantType, TableFeature::AppendOnly],
),
(
vec![TableFeature::Unknown("rw".to_string())],
vec![
TableFeature::Unknown("rw".to_string()),
TableFeature::Unknown("w".to_string()),
],
),
(vec![], vec![]),
];
for (reader_features, writer_features) in valid_features {
assert!(Protocol::try_new_modern(reader_features, writer_features).is_ok());
}
}
#[test]
fn test_validate_legacy_column_mapping_valid() {
let protocol = Protocol::try_new(
2,
7,
TableFeature::NO_LIST,
Some(vec![TableFeature::ColumnMapping]),
);
assert!(protocol.is_ok());
}
#[test]
fn test_validate_legacy_writer_only_features_valid() {
let protocol = Protocol::try_new(
1,
7,
TableFeature::NO_LIST,
Some(vec![TableFeature::AppendOnly]),
);
assert!(protocol.is_ok());
}
#[test]
fn test_validate_legacy_column_mapping_with_writer_features_valid() {
let protocol = Protocol::try_new(
2,
7,
TableFeature::NO_LIST,
Some(vec![TableFeature::AppendOnly, TableFeature::ColumnMapping]),
);
assert!(protocol.is_ok());
}
#[test]
fn test_validate_column_mapping_reader_v1_invalid() {
let protocol = Protocol::try_new(
1,
7,
TableFeature::NO_LIST,
Some(vec![TableFeature::ColumnMapping]),
);
assert!(protocol.is_err());
}
#[test]
fn test_validate_multiple_readerwriter_features_reader_v2_invalid() {
let protocol = Protocol::try_new(
2,
7,
TableFeature::NO_LIST,
Some(vec![
TableFeature::ColumnMapping,
TableFeature::DeletionVectors,
]),
);
assert!(protocol.is_err());
}
#[test]
fn test_parse_table_feature_never_fails() {
let features = Some(["", "absurD_)(+13%^⚙️"]);
let expected = Some(FromIterator::from_iter([
TableFeature::unknown(""),
TableFeature::unknown("absurD_)(+13%^⚙️"),
]));
assert_eq!(parse_features(features), expected);
}
#[test]
fn test_into_engine_data() {
let engine = ExprEngine::new();
let set_transaction = SetTransaction {
app_id: "app_id".to_string(),
version: 0,
last_updated: None,
};
let engine_data =
set_transaction.into_engine_data(SetTransaction::to_schema().into(), &engine);
let record_batch = engine_data.try_into_record_batch().unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("appId", ArrowDataType::Utf8, false),
Field::new("version", ArrowDataType::Int64, false),
Field::new("lastUpdated", ArrowDataType::Int64, true),
]));
let expected = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["app_id"])),
Arc::new(Int64Array::from(vec![0_i64])),
Arc::new(Int64Array::from(vec![None::<i64>])),
],
)
.unwrap();
assert_eq!(record_batch, expected);
}
#[test]
fn test_commit_info_into_engine_data() {
let engine = ExprEngine::new();
let commit_info = CommitInfo::new(0, None, None, None, false);
let commit_info_txn_id = commit_info.txn_id.clone();
let engine_data = commit_info.into_engine_data(CommitInfo::to_schema().into(), &engine);
let record_batch = engine_data.try_into_record_batch().unwrap();
let mut map_builder = create_string_map_builder(false);
map_builder.append(true).unwrap();
let operation_parameters = Arc::new(map_builder.finish());
let expected = RecordBatch::try_new(
record_batch.schema(),
vec![
Arc::new(Int64Array::from(vec![Some(0)])),
Arc::new(Int64Array::from(vec![None::<i64>])),
Arc::new(StringArray::from(vec![Some("UNKNOWN")])),
operation_parameters,
Arc::new(StringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])),
Arc::new(BooleanArray::from(vec![None::<bool>])),
Arc::new(StringArray::from(vec![None::<String>])),
Arc::new(StringArray::from(vec![commit_info_txn_id])),
],
)
.unwrap();
assert_eq!(record_batch, expected);
}
#[test]
fn test_domain_metadata_into_engine_data() {
let engine = ExprEngine::new();
let domain_metadata = DomainMetadata {
domain: "my.domain".to_string(),
configuration: "config_value".to_string(),
removed: false,
};
let engine_data =
domain_metadata.into_engine_data(DomainMetadata::to_schema().into(), &engine);
let record_batch = engine_data.try_into_record_batch().unwrap();
let expected = RecordBatch::try_new(
record_batch.schema(),
vec![
Arc::new(StringArray::from(vec!["my.domain"])),
Arc::new(StringArray::from(vec!["config_value"])),
Arc::new(BooleanArray::from(vec![false])),
],
)
.unwrap();
assert_eq!(record_batch, expected);
}
#[test]
fn test_metadata_try_new() {
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"id",
DataType::INTEGER,
)]));
let config = HashMap::from([("key1".to_string(), "value1".to_string())]);
let metadata = Metadata::try_new(
Some("test_table".to_string()),
Some("description".to_string()),
schema.clone(),
vec!["year".to_string()],
1234567890,
config.clone(),
)
.unwrap();
assert!(!metadata.id.is_empty());
assert_eq!(metadata.name, Some("test_table".to_string()));
assert_eq!(
metadata.schema_string,
serde_json::to_string(&schema).unwrap()
);
assert_eq!(metadata.created_time, Some(1234567890));
assert_eq!(metadata.configuration, config);
}
#[test]
fn test_metadata_try_new_default() {
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"id",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
assert!(!metadata.id.is_empty());
assert_eq!(metadata.name, None);
assert_eq!(metadata.description, None);
}
#[test]
fn test_metadata_unique_ids() {
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"id",
DataType::INTEGER,
)]));
let m1 = Metadata::try_new(None, None, schema.clone(), vec![], 0, HashMap::new()).unwrap();
let m2 = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
assert_ne!(m1.id, m2.id);
}
#[test]
fn test_format_try_from_scalar() {
let options = HashMap::from([
("path".to_string(), "/delta/table".to_string()),
("compressionType".to_string(), "snappy".to_string()),
]);
let format = Format {
provider: "parquet".to_string(),
options,
};
let scalar = Scalar::try_from(format).unwrap();
let Scalar::Struct(struct_data) = scalar else {
panic!("Expected struct scalar");
};
assert_eq!(struct_data.fields()[0].name(), "provider");
assert_eq!(struct_data.fields()[1].name(), "options");
let Scalar::String(provider) = &struct_data.values()[0] else {
panic!("Expected string provider");
};
assert_eq!(provider, "parquet");
let Scalar::Map(map_data) = &struct_data.values()[1] else {
panic!("Expected map options");
};
assert_eq!(map_data.pairs().len(), 2);
}
#[test]
fn test_format_default() {
let format = Format::default();
let expected = Format {
provider: "parquet".to_string(),
options: HashMap::new(),
};
assert_eq!(format, expected);
}
#[test]
fn test_format_empty_options() {
let format = Format {
provider: "parquet".to_string(),
options: HashMap::new(),
};
let scalar = Scalar::try_from(format).unwrap();
let Scalar::Struct(struct_data) = scalar else {
panic!("Expected struct");
};
let Scalar::Map(map_data) = &struct_data.values()[1] else {
panic!("Expected map");
};
assert!(map_data.pairs().is_empty());
}
#[test]
fn test_format_special_characters() {
let options = HashMap::from([
("path".to_string(), "/path/with spaces".to_string()),
("unicode".to_string(), "测试🎉".to_string()),
("empty".to_string(), "".to_string()),
]);
let format = Format {
provider: "custom".to_string(),
options,
};
let scalar = Scalar::try_from(format).unwrap();
let Scalar::Struct(struct_data) = scalar else {
panic!("Expected struct");
};
let Scalar::Map(map_data) = &struct_data.values()[1] else {
panic!("Expected map");
};
assert_eq!(map_data.pairs().len(), 3);
}
#[test]
fn test_metadata_into_engine_data() {
let engine = ExprEngine::new();
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"id",
DataType::INTEGER,
)]));
let test_metadata = Metadata::try_new(
Some("test".to_string()),
Some("my table".to_string()),
schema.clone(),
vec!["part".to_string()],
123,
HashMap::from([("k".to_string(), "v".to_string())]),
)
.unwrap();
let test_id = test_metadata.id.clone();
let actual = test_metadata
.into_engine_data(Metadata::to_schema().into(), &engine)
.unwrap()
.try_into_record_batch()
.unwrap();
let expected_json = json!({
"id": test_id,
"name": "test",
"description": "my table",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}",
"partitionColumns": ["part"],
"createdTime": 123,
"configuration": {
"k": "v"
}
}).to_string();
let expected = ReaderBuilder::new(actual.schema())
.build(expected_json.as_bytes())
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(actual, expected);
}
#[test]
fn test_metadata_with_log_schema() {
let engine = ExprEngine::new();
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"id",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
Some("table".to_string()),
None, schema,
vec![],
456,
HashMap::new(),
)
.unwrap();
let metadata_id = metadata.id.clone();
let commit_schema = get_commit_schema().project(&[METADATA_NAME]).unwrap();
let actual = metadata
.into_engine_data(commit_schema, &engine)
.unwrap()
.try_into_record_batch()
.unwrap();
let expected_json = json!({
"metaData": {
"id": metadata_id,
"name": "table",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}",
"partitionColumns": [],
"createdTime": 456,
"configuration": {}
}
}).to_string();
let expected = ReaderBuilder::new(actual.schema())
.build(expected_json.as_bytes())
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(actual, expected);
}
#[test]
fn test_protocol_into_engine_data() {
let engine = ExprEngine::new();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors, TableFeature::ColumnMapping],
[TableFeature::DeletionVectors, TableFeature::ColumnMapping],
)
.unwrap();
let engine_data = protocol
.clone()
.into_engine_data(Protocol::to_schema().into(), &engine);
let record_batch = engine_data.try_into_record_batch().unwrap();
let list_field = Arc::new(Field::new("element", ArrowDataType::Utf8, false));
let protocol_fields = vec![
Field::new("minReaderVersion", ArrowDataType::Int32, false),
Field::new("minWriterVersion", ArrowDataType::Int32, false),
Field::new(
"readerFeatures",
ArrowDataType::List(list_field.clone()),
true, ),
Field::new(
"writerFeatures",
ArrowDataType::List(list_field.clone()),
true, ),
];
let schema = Arc::new(Schema::new(protocol_fields.clone()));
let string_builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder).with_field(list_field.clone());
list_builder.values().append_value("deletionVectors");
list_builder.values().append_value("columnMapping");
list_builder.append(true);
let reader_features_array = list_builder.finish();
let string_builder = StringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder).with_field(list_field.clone());
list_builder.values().append_value("deletionVectors");
list_builder.values().append_value("columnMapping");
list_builder.append(true);
let writer_features_array = list_builder.finish();
let expected = RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![3])),
Arc::new(Int32Array::from(vec![7])),
Arc::new(reader_features_array.clone()),
Arc::new(writer_features_array.clone()),
],
)
.unwrap();
assert_eq!(record_batch, expected);
let commit_schema = get_commit_schema().project(&[PROTOCOL_NAME]).unwrap();
let engine_data = protocol.into_engine_data(commit_schema, &engine);
let schema = Arc::new(Schema::new(vec![Field::new(
"protocol",
ArrowDataType::Struct(protocol_fields.into()),
true,
)]));
let expected = RecordBatch::try_new(
schema,
vec![Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("minReaderVersion", ArrowDataType::Int32, false)),
Arc::new(Int32Array::from(vec![3])) as Arc<dyn Array>,
),
(
Arc::new(Field::new("minWriterVersion", ArrowDataType::Int32, false)),
Arc::new(Int32Array::from(vec![7])) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"readerFeatures",
ArrowDataType::List(list_field.clone()),
true,
)),
Arc::new(reader_features_array) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"writerFeatures",
ArrowDataType::List(list_field),
true,
)),
Arc::new(writer_features_array) as Arc<dyn Array>,
),
]))],
)
.unwrap();
let record_batch = engine_data.try_into_record_batch().unwrap();
assert_eq!(record_batch, expected);
}
#[test]
fn test_protocol_into_engine_data_empty_features() {
let engine = ExprEngine::new();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, TableFeature::EMPTY_LIST).unwrap();
let engine_data = protocol
.into_engine_data(Protocol::to_schema().into(), &engine)
.unwrap();
let record_batch = engine_data.try_into_record_batch().unwrap();
assert_eq!(record_batch.num_rows(), 1);
assert_eq!(record_batch.num_columns(), 4);
let reader_features_col = record_batch
.column(2)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(reader_features_col.len(), 1);
assert_eq!(reader_features_col.value(0).len(), 0); let writer_features_col = record_batch
.column(3)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(writer_features_col.len(), 1);
assert_eq!(writer_features_col.value(0).len(), 0); }
#[test]
fn test_protocol_into_engine_data_no_features() {
let engine = ExprEngine::new();
let protocol = Protocol::try_new_legacy(1, 2).unwrap();
let engine_data = protocol
.into_engine_data(Protocol::to_schema().into(), &engine)
.unwrap();
let record_batch = engine_data.try_into_record_batch().unwrap();
assert_eq!(record_batch.num_rows(), 1);
assert_eq!(record_batch.num_columns(), 4);
assert!(record_batch.column(2).is_null(0));
assert!(record_batch.column(3).is_null(0));
}
#[test]
fn test_schema_contains_file_actions_with_add() {
let schema = get_commit_schema()
.project(&[ADD_NAME, PROTOCOL_NAME])
.unwrap();
assert!(schema_contains_file_actions(&schema));
assert!(schema_contains_file_actions(
&schema.project(&[ADD_NAME]).unwrap()
));
}
#[test]
fn test_schema_contains_file_actions_with_remove() {
let schema = get_commit_schema()
.project(&[REMOVE_NAME, METADATA_NAME])
.unwrap();
assert!(schema_contains_file_actions(&schema));
assert!(schema_contains_file_actions(
&schema.project(&[REMOVE_NAME]).unwrap()
));
}
#[test]
fn test_schema_contains_file_actions_with_both() {
let schema = get_commit_schema()
.project(&[ADD_NAME, REMOVE_NAME])
.unwrap();
assert!(schema_contains_file_actions(&schema));
}
#[test]
fn test_schema_contains_file_actions_with_neither() {
let schema = get_commit_schema()
.project(&[PROTOCOL_NAME, METADATA_NAME])
.unwrap();
assert!(!schema_contains_file_actions(&schema));
}
#[test]
fn test_schema_contains_file_actions_empty_schema() {
let schema = Arc::new(StructType::new_unchecked([]));
assert!(!schema_contains_file_actions(&schema));
}
#[test]
fn test_add_tags_deserialization_null_case() {
let json1 = r#"{"path":"file1.parquet","partitionValues":{},"size":100,"modificationTime":1234567890,"dataChange":true,"tags":null}"#;
let add1: Add = serde_json::from_str(json1).unwrap();
assert_eq!(add1.tags, None);
}
#[test]
fn test_add_tags_deserialization_nullable_values_case() {
let json2 = r#"{"path":"file2.parquet","partitionValues":{},"size":200,"modificationTime":1234567890,"dataChange":true,"tags":{"INSERTION_TIME":"1677811178336000","NULLABLE_TAG":null}}"#;
let add2: Add = serde_json::from_str(json2).unwrap();
assert!(add2.tags.is_some());
let tags = add2.tags.unwrap();
assert_eq!(tags.len(), 2);
assert_eq!(
tags.get("INSERTION_TIME"),
Some(&Some("1677811178336000".to_string()))
);
assert_eq!(tags.get("NULLABLE_TAG"), Some(&None));
}
#[test]
fn test_add_tags_deserialization_non_null_values_case() {
let json3 = r#"{"path":"file3.parquet","partitionValues":{},"size":300,"modificationTime":1234567890,"dataChange":true,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000"}}"#;
let add3: Add = serde_json::from_str(json3).unwrap();
assert!(add3.tags.is_some());
let tags = add3.tags.unwrap();
assert_eq!(tags.len(), 2);
assert_eq!(
tags.get("INSERTION_TIME"),
Some(&Some("1677811178336000".to_string()))
);
assert_eq!(
tags.get("MIN_INSERTION_TIME"),
Some(&Some("1677811178336000".to_string()))
);
}
}