use std::collections::{HashMap, HashSet};
use std::fmt::{self, Display};
use std::str::FromStr;
use delta_kernel::schema::{DataType, StructField};
use delta_kernel::table_features::TableFeature;
use serde::{Deserialize, Serialize};
use crate::TableProperty;
use crate::kernel::{DeltaResult, error::Error};
use crate::kernel::{StructType, StructTypeExt, Version};
pub use delta_kernel::actions::{Metadata, Protocol};
pub fn new_metadata(
schema: &StructType,
partition_columns: impl IntoIterator<Item = impl ToString>,
configuration: impl IntoIterator<Item = (impl ToString, impl ToString)>,
) -> DeltaResult<Metadata> {
let value = serde_json::json!({
"id": uuid::Uuid::new_v4().to_string(),
"name": None::<String>,
"description": None::<String>,
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(schema)?,
"partitionColumns": partition_columns.into_iter().map(|c| c.to_string()).collect::<Vec<_>>(),
"configuration": configuration.into_iter().map(|(k, v)| (k.to_string(), v.to_string())).collect::<HashMap<_, _>>(),
"createdTime": chrono::Utc::now().timestamp_millis(),
});
Ok(serde_json::from_value(value)?)
}
pub trait MetadataExt {
fn with_table_id(self, table_id: String) -> DeltaResult<Metadata>;
fn with_name(self, name: String) -> DeltaResult<Metadata>;
fn with_description(self, description: String) -> DeltaResult<Metadata>;
fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata>;
fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata>;
fn remove_config_key(self, key: &str) -> DeltaResult<Metadata>;
}
impl MetadataExt for Metadata {
fn with_table_id(self, table_id: String) -> DeltaResult<Metadata> {
let value = serde_json::json!({
"id": table_id,
"name": self.name(),
"description": self.description(),
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
"partitionColumns": self.partition_columns(),
"configuration": self.configuration(),
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
fn with_name(self, name: String) -> DeltaResult<Metadata> {
let value = serde_json::json!({
"id": self.id(),
"name": name,
"description": self.description(),
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
"partitionColumns": self.partition_columns(),
"configuration": self.configuration(),
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
fn with_description(self, description: String) -> DeltaResult<Metadata> {
let value = serde_json::json!({
"id": self.id(),
"name": self.name(),
"description": description,
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
"partitionColumns": self.partition_columns(),
"configuration": self.configuration(),
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
fn with_schema(self, schema: &StructType) -> DeltaResult<Metadata> {
let value = serde_json::json!({
"id": self.id(),
"name": self.name(),
"description": self.description(),
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(schema)?,
"partitionColumns": self.partition_columns(),
"configuration": self.configuration(),
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
fn add_config_key(self, key: String, value: String) -> DeltaResult<Metadata> {
let mut config = self.configuration().clone();
config.insert(key, value);
let value = serde_json::json!({
"id": self.id(),
"name": self.name(),
"description": self.description(),
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
"partitionColumns": self.partition_columns(),
"configuration": config,
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
fn remove_config_key(self, key: &str) -> DeltaResult<Metadata> {
let mut config = self.configuration().clone();
config.remove(key);
let value = serde_json::json!({
"id": self.id(),
"name": self.name(),
"description": self.description(),
"format": { "provider": "parquet", "options": {} },
"schemaString": serde_json::to_string(&self.parse_schema().unwrap())?,
"partitionColumns": self.partition_columns(),
"configuration": config,
"createdTime": self.created_time(),
});
Ok(serde_json::from_value(value)?)
}
}
pub fn contains_timestampntz<'a>(mut fields: impl Iterator<Item = &'a StructField>) -> bool {
fn _check_type(dtype: &DataType) -> bool {
match dtype {
&DataType::TIMESTAMP_NTZ => true,
DataType::Array(inner) => _check_type(inner.element_type()),
DataType::Struct(inner) => inner.fields().any(|f| _check_type(f.data_type())),
_ => false,
}
}
fields.any(|f| _check_type(f.data_type()))
}
pub(crate) trait ProtocolExt {
fn reader_features_set(&self) -> Option<HashSet<TableFeature>>;
fn writer_features_set(&self) -> Option<HashSet<TableFeature>>;
fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol;
fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol;
fn move_table_properties_into_features(
self,
configuration: &HashMap<String, String>,
) -> Protocol;
fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol>;
fn apply_properties_to_protocol(
self,
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Protocol>;
}
impl ProtocolExt for Protocol {
fn reader_features_set(&self) -> Option<HashSet<TableFeature>> {
self.reader_features()
.map(|features| features.iter().cloned().collect())
}
fn writer_features_set(&self) -> Option<HashSet<TableFeature>> {
self.writer_features()
.map(|features| features.iter().cloned().collect())
}
fn append_reader_features(self, reader_features: &[TableFeature]) -> Protocol {
let mut inner = ProtocolInner::from_kernel(&self);
inner = inner.append_reader_features(reader_features.iter().cloned());
inner.as_kernel()
}
fn append_writer_features(self, writer_features: &[TableFeature]) -> Protocol {
let mut inner = ProtocolInner::from_kernel(&self);
inner = inner.append_writer_features(writer_features.iter().cloned());
inner.as_kernel()
}
fn move_table_properties_into_features(
self,
configuration: &HashMap<String, String>,
) -> Protocol {
let mut inner = ProtocolInner::from_kernel(&self);
inner = inner.move_table_properties_into_features(configuration);
inner.as_kernel()
}
fn apply_column_metadata_to_protocol(self, schema: &StructType) -> DeltaResult<Protocol> {
let mut inner = ProtocolInner::from_kernel(&self);
inner = inner.apply_column_metadata_to_protocol(schema)?;
Ok(inner.as_kernel())
}
fn apply_properties_to_protocol(
self,
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Protocol> {
let mut inner = ProtocolInner::from_kernel(&self);
inner = inner.apply_properties_to_protocol(new_properties, raise_if_not_exists)?;
Ok(inner.as_kernel())
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ProtocolInner {
pub min_reader_version: i32,
pub min_writer_version: i32,
#[serde(skip_serializing_if = "Option::is_none")]
pub reader_features: Option<HashSet<TableFeature>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub writer_features: Option<HashSet<TableFeature>>,
}
impl Default for ProtocolInner {
fn default() -> Self {
Self {
min_reader_version: 1,
min_writer_version: 2,
reader_features: None,
writer_features: None,
}
}
}
impl ProtocolInner {
#[cfg(test)]
pub(crate) fn new(min_reader_version: i32, min_writer_version: i32) -> Self {
Self {
min_reader_version,
min_writer_version,
reader_features: None,
writer_features: None,
}
}
pub(crate) fn from_kernel(value: &Protocol) -> ProtocolInner {
serde_json::from_value(serde_json::to_value(value).unwrap()).unwrap()
}
pub(crate) fn as_kernel(&self) -> Protocol {
serde_json::from_value(serde_json::to_value(self).unwrap()).unwrap()
}
pub fn append_reader_features(
mut self,
reader_features: impl IntoIterator<Item = impl Into<TableFeature>>,
) -> Self {
let all_reader_features = reader_features
.into_iter()
.map(Into::into)
.collect::<HashSet<_>>();
if !all_reader_features.is_empty() {
self.min_reader_version = 3;
match self.reader_features {
Some(mut features) => {
features.extend(all_reader_features);
self.reader_features = Some(features);
}
None => self.reader_features = Some(all_reader_features),
};
}
self
}
pub fn append_writer_features(
mut self,
writer_features: impl IntoIterator<Item = impl Into<TableFeature>>,
) -> Self {
let all_writer_features = writer_features
.into_iter()
.map(|c| c.into())
.collect::<HashSet<_>>();
if !all_writer_features.is_empty() {
self.min_writer_version = 7;
match self.writer_features {
Some(mut features) => {
features.extend(all_writer_features);
self.writer_features = Some(features);
}
None => self.writer_features = Some(all_writer_features),
};
}
self
}
pub fn move_table_properties_into_features(
mut self,
configuration: &HashMap<String, String>,
) -> Self {
fn parse_bool(value: &str) -> bool {
value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v)
}
if self.min_writer_version >= 7 {
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| value.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableChangeDataFeed" if parse_bool(value) => {
Some(TableFeature::ChangeDataFeed)
}
"delta.appendOnly" if parse_bool(value) => Some(TableFeature::AppendOnly),
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(TableFeature::DeletionVectors)
}
"delta.enableRowTracking" if parse_bool(value) => {
Some(TableFeature::RowTracking)
}
"delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
_ => None,
})
.collect::<HashSet<TableFeature>>();
if configuration
.keys()
.any(|v| v.starts_with("delta.constraints."))
{
converted_writer_features.insert(TableFeature::CheckConstraints);
}
match self.writer_features {
Some(mut features) => {
features.extend(converted_writer_features);
self.writer_features = Some(features);
}
None => self.writer_features = Some(converted_writer_features),
}
}
if self.min_reader_version >= 3 {
let converted_reader_features = configuration
.iter()
.filter_map(|(key, value)| match key.as_str() {
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(TableFeature::DeletionVectors)
}
"delta.checkpointPolicy" if value == "v2" => Some(TableFeature::V2Checkpoint),
_ => None,
})
.collect::<HashSet<TableFeature>>();
match self.reader_features {
Some(mut features) => {
features.extend(converted_reader_features);
self.reader_features = Some(features);
}
None => self.reader_features = Some(converted_reader_features),
}
}
self
}
pub fn apply_column_metadata_to_protocol(mut self, schema: &StructType) -> DeltaResult<Self> {
let generated_cols = schema.get_generated_columns()?;
let invariants = schema.get_invariants()?;
let contains_timestamp_ntz = self.contains_timestampntz(schema.fields());
if contains_timestamp_ntz {
self = self.enable_timestamp_ntz()
}
if !generated_cols.is_empty() {
self = self.enable_generated_columns()
}
if !invariants.is_empty() {
self = self.enable_invariants()
}
Ok(self)
}
pub fn apply_properties_to_protocol(
mut self,
new_properties: &HashMap<String, String>,
raise_if_not_exists: bool,
) -> DeltaResult<Self> {
let mut parsed_properties: HashMap<TableProperty, String> = HashMap::new();
for (key, value) in new_properties {
if let Ok(parsed_key) = key.parse::<TableProperty>() {
parsed_properties.insert(parsed_key, value.to_string());
} else if raise_if_not_exists {
return Err(Error::Generic(format!(
"Error parsing property '{key}':'{value}'",
)));
}
}
if let Some(min_reader_version) = parsed_properties.get(&TableProperty::MinReaderVersion) {
let new_min_reader_version = min_reader_version.parse::<i32>();
match new_min_reader_version {
Ok(version) => match version {
1..=3 => {
if version > self.min_reader_version {
self.min_reader_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
)));
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minReaderVersion = '{min_reader_version}' is invalid, valid values are ['1','2','3']"
)));
}
}
}
if let Some(min_writer_version) = parsed_properties.get(&TableProperty::MinWriterVersion) {
let new_min_writer_version = min_writer_version.parse::<i32>();
match new_min_writer_version {
Ok(version) => match version {
2..=7 => {
if version > self.min_writer_version {
self.min_writer_version = version
}
}
_ => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
)));
}
},
Err(_) => {
return Err(Error::Generic(format!(
"delta.minWriterVersion = '{min_writer_version}' is invalid, valid values are ['2','3','4','5','6','7']"
)));
}
}
}
if let Some(enable_cdf) = parsed_properties.get(&TableProperty::EnableChangeDataFeed) {
let if_enable_cdf = enable_cdf.to_ascii_lowercase().parse::<bool>();
match if_enable_cdf {
Ok(true) => {
if self.min_writer_version >= 7 {
match self.writer_features {
Some(mut features) => {
features.insert(TableFeature::ChangeDataFeed);
self.writer_features = Some(features);
}
None => {
self.writer_features =
Some(HashSet::from([TableFeature::ChangeDataFeed]))
}
}
} else if self.min_writer_version <= 3 {
self.min_writer_version = 4
}
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableChangeDataFeed = '{enable_cdf}' is invalid, valid values are ['true']"
)));
}
}
}
if let Some(enable_dv) = parsed_properties.get(&TableProperty::EnableDeletionVectors) {
let if_enable_dv = enable_dv.to_ascii_lowercase().parse::<bool>();
match if_enable_dv {
Ok(true) => {
let writer_features = match self.writer_features {
Some(mut features) => {
features.insert(TableFeature::DeletionVectors);
features
}
None => HashSet::from([TableFeature::DeletionVectors]),
};
let reader_features = match self.reader_features {
Some(mut features) => {
features.insert(TableFeature::DeletionVectors);
features
}
None => HashSet::from([TableFeature::DeletionVectors]),
};
self.min_reader_version = 3;
self.min_writer_version = 7;
self.writer_features = Some(writer_features);
self.reader_features = Some(reader_features);
}
Ok(false) => {}
_ => {
return Err(Error::Generic(format!(
"delta.enableDeletionVectors = '{enable_dv}' is invalid, valid values are ['true']"
)));
}
}
}
Ok(self)
}
fn contains_timestampntz<'a>(&self, fields: impl Iterator<Item = &'a StructField>) -> bool {
contains_timestampntz(fields)
}
fn enable_timestamp_ntz(mut self) -> Self {
self = self.append_reader_features([TableFeature::TimestampWithoutTimezone]);
self = self.append_writer_features([TableFeature::TimestampWithoutTimezone]);
self
}
fn enable_generated_columns(mut self) -> Self {
if self.min_writer_version < 4 {
self.min_writer_version = 4;
}
if self.min_writer_version >= 7 {
self = self.append_writer_features([TableFeature::GeneratedColumns]);
}
self
}
fn enable_invariants(mut self) -> Self {
if self.min_writer_version >= 7 {
self = self.append_writer_features([TableFeature::Invariants]);
}
self
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum TableFeatures {
ColumnMapping,
DeletionVectors,
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
V2Checkpoint,
AppendOnly,
Invariants,
CheckConstraints,
ChangeDataFeed,
GeneratedColumns,
IdentityColumns,
RowTracking,
DomainMetadata,
IcebergCompatV1,
MaterializePartitionColumns,
}
impl FromStr for TableFeatures {
type Err = ();
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"columnMapping" => Ok(TableFeatures::ColumnMapping),
"deletionVectors" => Ok(TableFeatures::DeletionVectors),
"timestampNtz" => Ok(TableFeatures::TimestampWithoutTimezone),
"v2Checkpoint" => Ok(TableFeatures::V2Checkpoint),
"appendOnly" => Ok(TableFeatures::AppendOnly),
"invariants" => Ok(TableFeatures::Invariants),
"checkConstraints" => Ok(TableFeatures::CheckConstraints),
"changeDataFeed" => Ok(TableFeatures::ChangeDataFeed),
"generatedColumns" => Ok(TableFeatures::GeneratedColumns),
"identityColumns" => Ok(TableFeatures::IdentityColumns),
"rowTracking" => Ok(TableFeatures::RowTracking),
"domainMetadata" => Ok(TableFeatures::DomainMetadata),
"icebergCompatV1" => Ok(TableFeatures::IcebergCompatV1),
"materializePartitionColumns" => Ok(TableFeatures::MaterializePartitionColumns),
_ => Err(()),
}
}
}
impl AsRef<str> for TableFeatures {
fn as_ref(&self) -> &str {
match self {
TableFeatures::ColumnMapping => "columnMapping",
TableFeatures::DeletionVectors => "deletionVectors",
TableFeatures::TimestampWithoutTimezone => "timestampNtz",
TableFeatures::V2Checkpoint => "v2Checkpoint",
TableFeatures::AppendOnly => "appendOnly",
TableFeatures::Invariants => "invariants",
TableFeatures::CheckConstraints => "checkConstraints",
TableFeatures::ChangeDataFeed => "changeDataFeed",
TableFeatures::GeneratedColumns => "generatedColumns",
TableFeatures::IdentityColumns => "identityColumns",
TableFeatures::RowTracking => "rowTracking",
TableFeatures::DomainMetadata => "domainMetadata",
TableFeatures::IcebergCompatV1 => "icebergCompatV1",
TableFeatures::MaterializePartitionColumns => "materializePartitionColumns",
}
}
}
impl fmt::Display for TableFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}
impl TryFrom<&TableFeatures> for TableFeature {
type Error = strum::ParseError;
fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
TableFeature::try_from(value.as_ref())
}
}
impl TableFeatures {
pub fn to_reader_writer_features(&self) -> (Option<TableFeature>, Option<TableFeature>) {
let feature = TableFeature::try_from(self).ok();
match feature {
Some(feature) => {
match feature {
TableFeature::AppendOnly
| TableFeature::Invariants
| TableFeature::CheckConstraints
| TableFeature::ChangeDataFeed
| TableFeature::GeneratedColumns
| TableFeature::IdentityColumns
| TableFeature::InCommitTimestamp
| TableFeature::RowTracking
| TableFeature::DomainMetadata
| TableFeature::IcebergCompatV1
| TableFeature::IcebergCompatV2
| TableFeature::ClusteredTable
| TableFeature::MaterializePartitionColumns => (None, Some(feature)),
TableFeature::CatalogManaged
| TableFeature::CatalogOwnedPreview
| TableFeature::ColumnMapping
| TableFeature::DeletionVectors
| TableFeature::TimestampWithoutTimezone
| TableFeature::TypeWidening
| TableFeature::TypeWideningPreview
| TableFeature::V2Checkpoint
| TableFeature::VacuumProtocolCheck
| TableFeature::VariantType
| TableFeature::VariantTypePreview
| TableFeature::VariantShreddingPreview => {
(Some(feature.clone()), Some(feature))
}
TableFeature::Unknown(_) => (None, None),
}
}
None => (None, None),
}
}
}
#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, Default)]
pub enum StorageType {
#[serde(rename = "u")]
#[default]
UuidRelativePath,
#[serde(rename = "i")]
Inline,
#[serde(rename = "p")]
AbsolutePath,
}
impl FromStr for StorageType {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"u" => Ok(Self::UuidRelativePath),
"i" => Ok(Self::Inline),
"p" => Ok(Self::AbsolutePath),
_ => Err(Error::DeletionVector(format!(
"Unknown storage format: '{s}'."
))),
}
}
}
impl AsRef<str> for StorageType {
fn as_ref(&self) -> &str {
match self {
Self::UuidRelativePath => "u",
Self::Inline => "i",
Self::AbsolutePath => "p",
}
}
}
impl Display for StorageType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DeletionVectorDescriptor {
pub storage_type: StorageType,
pub path_or_inline_dv: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i32>,
pub size_in_bytes: i32,
pub cardinality: i64,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "camelCase")]
pub struct Add {
#[serde(with = "serde_path")]
pub path: String,
pub partition_values: HashMap<String, Option<String>>,
pub size: i64,
pub modification_time: i64,
pub data_change: bool,
pub stats: Option<String>,
pub tags: Option<HashMap<String, Option<String>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deletion_vector: Option<DeletionVectorDescriptor>,
pub base_row_id: Option<i64>,
pub default_row_commit_version: Option<i64>,
pub clustering_provider: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct Remove {
#[serde(with = "serde_path")]
pub path: String,
pub data_change: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub deletion_timestamp: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extended_file_metadata: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub partition_values: Option<HashMap<String, Option<String>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Option<String>>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deletion_vector: Option<DeletionVectorDescriptor>,
#[serde(skip_serializing_if = "Option::is_none")]
pub base_row_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_row_commit_version: Option<i64>,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct AddCDCFile {
#[serde(with = "serde_path")]
pub path: String,
pub size: i64,
pub partition_values: HashMap<String, Option<String>>,
pub data_change: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Option<String>>>,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Transaction {
pub app_id: String,
pub version: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_updated: Option<i64>,
}
impl Transaction {
pub fn new(app_id: impl ToString, version: i64) -> Self {
Self::new_with_last_update(app_id, version, None)
}
pub fn new_with_last_update(
app_id: impl ToString,
version: i64,
last_updated: Option<i64>,
) -> Self {
Transaction {
app_id: app_id.to_string(),
version,
last_updated,
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct CommitInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub operation: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub operation_parameters: Option<HashMap<String, serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub read_version: Option<Version>,
#[serde(skip_serializing_if = "Option::is_none")]
pub isolation_level: Option<IsolationLevel>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_blind_append: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub engine_info: Option<String>,
#[serde(flatten, default)]
pub info: HashMap<String, serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_metadata: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DomainMetadata {
pub domain: String,
pub configuration: String,
pub removed: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
pub struct CheckpointMetadata {
pub flavor: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Option<String>>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Sidecar {
pub file_name: String,
pub size_in_bytes: i64,
pub modification_time: i64,
#[serde(rename = "type")]
pub sidecar_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, Option<String>>>,
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Default)]
pub enum IsolationLevel {
#[default]
Serializable,
WriteSerializable,
SnapshotIsolation,
}
impl AsRef<str> for IsolationLevel {
fn as_ref(&self) -> &str {
match self {
Self::Serializable => "Serializable",
Self::WriteSerializable => "WriteSerializable",
Self::SnapshotIsolation => "SnapshotIsolation",
}
}
}
impl FromStr for IsolationLevel {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"serializable" => Ok(Self::Serializable),
"writeserializable" | "write_serializable" => Ok(Self::WriteSerializable),
"snapshotisolation" | "snapshot_isolation" => Ok(Self::SnapshotIsolation),
_ => Err(Error::Generic("Invalid string for IsolationLevel".into())),
}
}
}
pub(crate) mod serde_path {
use std::str::Utf8Error;
use percent_encoding::{AsciiSet, CONTROLS, percent_decode_str, percent_encode};
use serde::{self, Deserialize, Deserializer, Serialize, Serializer};
pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
decode_path(&s).map_err(serde::de::Error::custom)
}
pub fn serialize<S>(value: &str, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let encoded = encode_path(value);
String::serialize(&encoded, serializer)
}
pub const _DELIMITER: &str = "/";
pub const _DELIMITER_BYTE: u8 = _DELIMITER.as_bytes()[0];
const INVALID: &AsciiSet = &CONTROLS
.add(b'\\')
.add(b'{')
.add(b'^')
.add(b'}')
.add(b'%')
.add(b'`')
.add(b']')
.add(b'"')
.add(b'>')
.add(b'[')
.add(b'<')
.add(b'#')
.add(b'|')
.add(b'\r')
.add(b'\n')
.add(b'*')
.add(b'?');
fn encode_path(path: &str) -> String {
percent_encode(path.as_bytes(), INVALID).to_string()
}
pub fn decode_path(path: &str) -> Result<String, Utf8Error> {
Ok(percent_decode_str(path).decode_utf8()?.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::PrimitiveType;
#[test]
fn test_primitive() {
let types: PrimitiveType = serde_json::from_str("\"string\"").unwrap();
println!("{types:?}");
}
#[test]
fn test_deserialize_protocol() {
let raw = serde_json::json!(
{
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": ["catalogOwned"],
"writerFeatures": ["catalogOwned", "invariants", "appendOnly"]
}
);
let protocol: Protocol = serde_json::from_value(raw).unwrap();
assert_eq!(protocol.min_reader_version(), 3);
assert_eq!(protocol.min_writer_version(), 7);
assert_eq!(
protocol.reader_features(),
Some(vec![TableFeature::Unknown("catalogOwned".to_owned())].as_slice())
);
assert_eq!(
protocol.writer_features(),
Some(
vec![
TableFeature::Unknown("catalogOwned".to_owned()),
TableFeature::Invariants,
TableFeature::AppendOnly
]
.as_slice()
)
);
}
}