use std::collections::HashSet;
use std::sync::LazyLock;
use delta_kernel::table_features::TableFeature;
use super::{TableReference, TransactionError};
use crate::kernel::{
Action, EagerSnapshot, Protocol, ProtocolExt as _, Schema, contains_timestampntz,
};
use crate::protocol::DeltaOperation;
use crate::table::config::TablePropertiesExt as _;
use tracing::log::*;
static READER_V2: LazyLock<HashSet<TableFeature>> =
LazyLock::new(|| HashSet::from_iter([TableFeature::ColumnMapping]));
static READER_V3: LazyLock<HashSet<TableFeature>> =
LazyLock::new(|| HashSet::from_iter([TableFeature::DeletionVectors]));
#[cfg(feature = "datafusion")]
static WRITER_V2: LazyLock<HashSet<TableFeature>> =
LazyLock::new(|| HashSet::from_iter([TableFeature::AppendOnly, TableFeature::Invariants]));
#[cfg(not(feature = "datafusion"))]
static WRITER_V2: LazyLock<HashSet<TableFeature>> =
LazyLock::new(|| HashSet::from_iter([TableFeature::AppendOnly]));
static WRITER_V3: LazyLock<HashSet<TableFeature>> = LazyLock::new(|| {
HashSet::from_iter([
TableFeature::AppendOnly,
TableFeature::Invariants,
TableFeature::CheckConstraints,
])
});
static WRITER_V4: LazyLock<HashSet<TableFeature>> = LazyLock::new(|| {
HashSet::from_iter([
TableFeature::AppendOnly,
TableFeature::Invariants,
TableFeature::CheckConstraints,
TableFeature::ChangeDataFeed,
TableFeature::GeneratedColumns,
])
});
static WRITER_V5: LazyLock<HashSet<TableFeature>> = LazyLock::new(|| {
HashSet::from_iter([
TableFeature::AppendOnly,
TableFeature::Invariants,
TableFeature::CheckConstraints,
TableFeature::ChangeDataFeed,
TableFeature::GeneratedColumns,
TableFeature::ColumnMapping,
])
});
static WRITER_V6: LazyLock<HashSet<TableFeature>> = LazyLock::new(|| {
HashSet::from_iter([
TableFeature::AppendOnly,
TableFeature::Invariants,
TableFeature::CheckConstraints,
TableFeature::ChangeDataFeed,
TableFeature::GeneratedColumns,
TableFeature::ColumnMapping,
TableFeature::IdentityColumns,
])
});
pub struct ProtocolChecker {
reader_features: HashSet<TableFeature>,
writer_features: HashSet<TableFeature>,
}
impl ProtocolChecker {
pub fn new(
reader_features: HashSet<TableFeature>,
writer_features: HashSet<TableFeature>,
) -> Self {
Self {
reader_features,
writer_features,
}
}
pub fn default_reader_version(&self) -> i32 {
1
}
pub fn default_writer_version(&self) -> i32 {
2
}
pub fn check_append_only(&self, snapshot: &EagerSnapshot) -> Result<(), TransactionError> {
if snapshot.table_properties().append_only() {
return Err(TransactionError::DeltaTableAppendOnly);
}
Ok(())
}
pub fn check_can_write_timestamp_ntz(
&self,
snapshot: &EagerSnapshot,
schema: &Schema,
) -> Result<(), TransactionError> {
trace!("checking to see if {snapshot:?} can write timestampntz");
let contains_timestampntz = contains_timestampntz(schema.fields());
let required_features: Option<&[TableFeature]> =
match snapshot.protocol().min_writer_version() {
0..=6 => None,
_ => snapshot.protocol().writer_features(),
};
if let Some(table_features) = required_features {
if !table_features.contains(&TableFeature::TimestampWithoutTimezone)
&& contains_timestampntz
{
return Err(TransactionError::TableFeaturesRequired(
TableFeature::TimestampWithoutTimezone,
));
}
} else if contains_timestampntz {
return Err(TransactionError::TableFeaturesRequired(
TableFeature::TimestampWithoutTimezone,
));
}
Ok(())
}
pub fn can_read_from(&self, snapshot: &dyn TableReference) -> Result<(), TransactionError> {
self.can_read_from_protocol(snapshot.protocol())
}
pub fn can_read_from_protocol(&self, protocol: &Protocol) -> Result<(), TransactionError> {
trace!(
"validating that min reader version {} can be read",
protocol.min_reader_version()
);
let required_features: Option<HashSet<TableFeature>> = match protocol.min_reader_version() {
0 | 1 => None,
2 => Some(READER_V2.clone()),
3 => Some(READER_V3.clone()),
_ => protocol.reader_features_set(),
};
trace!("my reader features: {:?}", self.reader_features);
trace!("desired reader features: {required_features:?}");
if let Some(features) = required_features {
let mut diff = features.difference(&self.reader_features).peekable();
if diff.peek().is_some() {
return Err(TransactionError::UnsupportedTableFeatures(
diff.cloned().collect(),
));
}
};
Ok(())
}
pub fn can_write_to(&self, snapshot: &dyn TableReference) -> Result<(), TransactionError> {
self.can_read_from(snapshot)?;
let min_writer_version = snapshot.protocol().min_writer_version();
let required_features: Option<HashSet<TableFeature>> = match min_writer_version {
0 | 1 => None,
2 => Some(WRITER_V2.clone()),
3 => Some(WRITER_V3.clone()),
4 => Some(WRITER_V4.clone()),
5 => Some(WRITER_V5.clone()),
6 => Some(WRITER_V6.clone()),
_ => snapshot.protocol().writer_features_set(),
};
trace!("my writer features: {:?}", self.writer_features);
trace!("required writer features: {required_features:?}");
if let Some(features) = required_features {
let mut diff = features.difference(&self.writer_features).peekable();
if diff.peek().is_some() {
return Err(TransactionError::UnsupportedTableFeatures(
diff.cloned().collect(),
));
}
};
Ok(())
}
pub fn can_commit(
&self,
snapshot: &dyn TableReference,
actions: &[Action],
operation: &DeltaOperation,
) -> Result<(), TransactionError> {
self.can_write_to(snapshot)?;
let append_only_enabled = if snapshot.protocol().min_writer_version() < 2 {
false
} else if snapshot.protocol().min_writer_version() < 7 {
snapshot.config().append_only()
} else {
snapshot
.protocol()
.writer_features()
.ok_or(TransactionError::TableFeaturesRequired(
TableFeature::AppendOnly,
))?
.contains(&TableFeature::AppendOnly)
&& snapshot.config().append_only()
};
if append_only_enabled {
match operation {
DeltaOperation::Restore { .. } | DeltaOperation::FileSystemCheck { .. } => {}
_ => {
actions.iter().try_for_each(|action| match action {
Action::Remove(remove) if remove.data_change => {
Err(TransactionError::DeltaTableAppendOnly)
}
_ => Ok(()),
})?;
}
}
}
Ok(())
}
}
pub static INSTANCE: LazyLock<ProtocolChecker> = LazyLock::new(|| {
let mut reader_features = HashSet::new();
reader_features.insert(TableFeature::TimestampWithoutTimezone);
reader_features.insert(TableFeature::DeletionVectors);
let mut writer_features = HashSet::new();
writer_features.insert(TableFeature::AppendOnly);
writer_features.insert(TableFeature::TimestampWithoutTimezone);
#[cfg(feature = "datafusion")]
{
writer_features.insert(TableFeature::ChangeDataFeed);
writer_features.insert(TableFeature::Invariants);
writer_features.insert(TableFeature::CheckConstraints);
writer_features.insert(TableFeature::GeneratedColumns);
}
writer_features.insert(TableFeature::DeletionVectors);
ProtocolChecker::new(reader_features, writer_features)
});
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::TableProperty;
use crate::kernel::DataType as DeltaDataType;
use crate::kernel::{Action, Add, Metadata, PrimitiveType, ProtocolInner, Remove};
use crate::protocol::SaveMode;
use crate::table::state::DeltaTableState;
use crate::test_utils::{ActionFactory, TestSchemas};
fn metadata_action(configuration: Option<HashMap<String, Option<String>>>) -> Metadata {
ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, configuration)
}
#[tokio::test]
async fn test_can_commit_append_only() {
let append_actions = vec![Action::Add(Add {
path: "test".to_string(),
data_change: true,
..Default::default()
})];
let append_op = DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: None,
predicate: None,
};
let change_actions = vec![
Action::Add(Add {
path: "test".to_string(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "test".to_string(),
data_change: true,
..Default::default()
}),
];
let change_op = DeltaOperation::Update { predicate: None };
let neutral_actions = vec![
Action::Add(Add {
path: "test".to_string(),
data_change: false,
..Default::default()
}),
Action::Remove(Remove {
path: "test".to_string(),
data_change: false,
..Default::default()
}),
];
let neutral_op = DeltaOperation::Update { predicate: None };
let create_actions = |writer: i32, append: &str, feat: Vec<TableFeature>| {
vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 1,
min_writer_version: writer,
writer_features: if writer == 7 {
Some(feat.into_iter().collect())
} else if feat.is_empty() {
None
} else {
Some(feat.into_iter().collect())
},
..Default::default()
}
.as_kernel(),
),
metadata_action(Some(HashMap::from([(
TableProperty::AppendOnly.as_ref().to_string(),
Some(append.to_string()),
)])))
.into(),
]
};
let checker = ProtocolChecker::new(HashSet::new(), WRITER_V2.clone());
let actions = create_actions(1, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
let actions = create_actions(2, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_err()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
let actions = create_actions(2, "false", vec![]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
let actions = create_actions(7, "true", vec![TableFeature::AppendOnly]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_err()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
let actions = create_actions(7, "false", vec![TableFeature::AppendOnly]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
let actions = create_actions(7, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions).await.unwrap();
let eager = snapshot.snapshot();
assert!(
checker
.can_commit(eager, &append_actions, &append_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &change_actions, &change_op)
.is_ok()
);
assert!(
checker
.can_commit(eager, &neutral_actions, &neutral_op)
.is_ok()
);
}
#[tokio::test]
async fn test_versions() {
let checker_1 = ProtocolChecker::new(HashSet::new(), HashSet::new());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 1,
min_writer_version: 1,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_1 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_1 = snapshot_1.snapshot();
assert!(checker_1.can_read_from(eager_1).is_ok());
assert!(checker_1.can_write_to(eager_1).is_ok());
let checker_2 = ProtocolChecker::new(READER_V2.clone(), HashSet::new());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 1,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_2 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_2 = snapshot_2.snapshot();
assert!(checker_1.can_read_from(eager_2).is_err());
assert!(checker_1.can_write_to(eager_2).is_err());
assert!(checker_2.can_read_from(eager_1).is_ok());
assert!(checker_2.can_read_from(eager_2).is_ok());
assert!(checker_2.can_write_to(eager_2).is_ok());
let checker_3 = ProtocolChecker::new(READER_V2.clone(), WRITER_V2.clone());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 2,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_3 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_3 = snapshot_3.snapshot();
assert!(checker_1.can_read_from(eager_3).is_err());
assert!(checker_1.can_write_to(eager_3).is_err());
assert!(checker_2.can_read_from(eager_3).is_ok());
assert!(checker_2.can_write_to(eager_3).is_err());
assert!(checker_3.can_read_from(eager_1).is_ok());
assert!(checker_3.can_read_from(eager_2).is_ok());
assert!(checker_3.can_read_from(eager_3).is_ok());
assert!(checker_3.can_write_to(eager_3).is_ok());
let checker_4 = ProtocolChecker::new(READER_V2.clone(), WRITER_V3.clone());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 3,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_4 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_4 = snapshot_4.snapshot();
assert!(checker_1.can_read_from(eager_4).is_err());
assert!(checker_1.can_write_to(eager_4).is_err());
assert!(checker_2.can_read_from(eager_4).is_ok());
assert!(checker_2.can_write_to(eager_4).is_err());
assert!(checker_3.can_read_from(eager_4).is_ok());
assert!(checker_3.can_write_to(eager_4).is_err());
assert!(checker_4.can_read_from(eager_1).is_ok());
assert!(checker_4.can_read_from(eager_2).is_ok());
assert!(checker_4.can_read_from(eager_3).is_ok());
assert!(checker_4.can_read_from(eager_4).is_ok());
assert!(checker_4.can_write_to(eager_4).is_ok());
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 4,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_5 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_5 = snapshot_5.snapshot();
assert!(checker_1.can_read_from(eager_5).is_err());
assert!(checker_1.can_write_to(eager_5).is_err());
assert!(checker_2.can_read_from(eager_5).is_ok());
assert!(checker_2.can_write_to(eager_5).is_err());
assert!(checker_3.can_read_from(eager_5).is_ok());
assert!(checker_3.can_write_to(eager_5).is_err());
assert!(checker_4.can_read_from(eager_5).is_ok());
assert!(checker_4.can_write_to(eager_5).is_err());
assert!(checker_5.can_read_from(eager_1).is_ok());
assert!(checker_5.can_read_from(eager_2).is_ok());
assert!(checker_5.can_read_from(eager_3).is_ok());
assert!(checker_5.can_read_from(eager_4).is_ok());
assert!(checker_5.can_read_from(eager_5).is_ok());
assert!(checker_5.can_write_to(eager_5).is_ok());
let checker_6 = ProtocolChecker::new(READER_V2.clone(), WRITER_V5.clone());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 5,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_6 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_6 = snapshot_6.snapshot();
assert!(checker_1.can_read_from(eager_6).is_err());
assert!(checker_1.can_write_to(eager_6).is_err());
assert!(checker_2.can_read_from(eager_6).is_ok());
assert!(checker_2.can_write_to(eager_6).is_err());
assert!(checker_3.can_read_from(eager_6).is_ok());
assert!(checker_3.can_write_to(eager_6).is_err());
assert!(checker_4.can_read_from(eager_6).is_ok());
assert!(checker_4.can_write_to(eager_6).is_err());
assert!(checker_5.can_read_from(eager_6).is_ok());
assert!(checker_5.can_write_to(eager_6).is_err());
assert!(checker_6.can_read_from(eager_1).is_ok());
assert!(checker_6.can_read_from(eager_2).is_ok());
assert!(checker_6.can_read_from(eager_3).is_ok());
assert!(checker_6.can_read_from(eager_4).is_ok());
assert!(checker_6.can_read_from(eager_5).is_ok());
assert!(checker_6.can_read_from(eager_6).is_ok());
assert!(checker_6.can_write_to(eager_6).is_ok());
let checker_7 = ProtocolChecker::new(READER_V2.clone(), WRITER_V6.clone());
let actions = vec![
Action::Protocol(
ProtocolInner {
min_reader_version: 2,
min_writer_version: 6,
..Default::default()
}
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_7 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_7 = snapshot_7.snapshot();
assert!(checker_1.can_read_from(eager_7).is_err());
assert!(checker_1.can_write_to(eager_7).is_err());
assert!(checker_2.can_read_from(eager_7).is_ok());
assert!(checker_2.can_write_to(eager_7).is_err());
assert!(checker_3.can_read_from(eager_7).is_ok());
assert!(checker_3.can_write_to(eager_7).is_err());
assert!(checker_4.can_read_from(eager_7).is_ok());
assert!(checker_4.can_write_to(eager_7).is_err());
assert!(checker_5.can_read_from(eager_7).is_ok());
assert!(checker_5.can_write_to(eager_7).is_err());
assert!(checker_6.can_read_from(eager_7).is_ok());
assert!(checker_6.can_write_to(eager_7).is_err());
assert!(checker_7.can_read_from(eager_1).is_ok());
assert!(checker_7.can_read_from(eager_2).is_ok());
assert!(checker_7.can_read_from(eager_3).is_ok());
assert!(checker_7.can_read_from(eager_4).is_ok());
assert!(checker_7.can_read_from(eager_5).is_ok());
assert!(checker_7.can_read_from(eager_6).is_ok());
assert!(checker_7.can_read_from(eager_7).is_ok());
assert!(checker_7.can_write_to(eager_7).is_ok());
}
#[tokio::test]
async fn test_minwriter_v4_with_cdf() {
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![
Action::Protocol(
ProtocolInner::new(2, 4)
.append_writer_features(vec![TableFeature::ChangeDataFeed])
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_5 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_5 = snapshot_5.snapshot();
assert!(checker_5.can_write_to(eager_5).is_ok());
}
#[tokio::test]
async fn test_minwriter_v4_with_generated_columns() {
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![
Action::Protocol(
ProtocolInner::new(2, 4)
.append_writer_features([TableFeature::GeneratedColumns])
.as_kernel(),
),
metadata_action(None).into(),
];
let snapshot_5 = DeltaTableState::from_actions(actions).await.unwrap();
let eager_5 = snapshot_5.snapshot();
assert!(checker_5.can_write_to(eager_5).is_ok());
}
#[tokio::test]
async fn test_minwriter_v4_with_generated_columns_and_expressions() {
let checker_5 = ProtocolChecker::new(Default::default(), WRITER_V4.clone());
let actions = vec![Action::Protocol(ProtocolInner::new(1, 4).as_kernel())];
let table = crate::DeltaTable::new_in_memory()
.create()
.with_column(
"value",
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
Some(HashMap::from([(
"delta.generationExpression".into(),
"x IS TRUE".into(),
)])),
)
.with_actions(actions)
.with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
.await
.expect("failed to make a version 4 table with EnableChangeDataFeed");
let eager_5 = table
.snapshot()
.expect("Failed to get snapshot from test table");
assert!(checker_5.can_write_to(eager_5).is_ok());
}
}