use std::sync::Arc;
use delta_kernel::table_features::TableFeature;
use futures::future::BoxFuture;
use itertools::Itertools;
use super::{CustomExecuteHandler, Operation};
use crate::DeltaTable;
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures, resolve_snapshot};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTableError};
pub struct AddTableFeatureBuilder {
snapshot: Option<EagerSnapshot>,
name: Vec<TableFeatures>,
allow_protocol_versions_increase: bool,
log_store: LogStoreRef,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl super::Operation for AddTableFeatureBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl AddTableFeatureBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
name: vec![],
allow_protocol_versions_increase: false,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_feature<S: Into<TableFeatures>>(mut self, name: S) -> Self {
self.name.push(name.into());
self
}
pub fn with_features<S: Into<TableFeatures>>(mut self, name: Vec<S>) -> Self {
self.name
.extend(name.into_iter().map(Into::into).collect_vec());
self
}
pub fn with_allow_protocol_versions_increase(mut self, allow: bool) -> Self {
self.allow_protocol_versions_increase = allow;
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
}
impl std::future::IntoFuture for AddTableFeatureBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), false, None).await?;
let name = if this.name.is_empty() {
return Err(DeltaTableError::Generic("No features provided".to_string()));
} else {
&this.name
};
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let (reader_features, writer_features): (
Vec<Option<TableFeature>>,
Vec<Option<TableFeature>>,
) = name.iter().map(|v| v.to_reader_writer_features()).unzip();
let reader_features = reader_features.into_iter().flatten().collect_vec();
let writer_features = writer_features.into_iter().flatten().collect_vec();
let mut protocol = snapshot.protocol().clone();
if !this.allow_protocol_versions_increase {
if !reader_features.is_empty()
&& !writer_features.is_empty()
&& !(protocol.min_reader_version() == 3 && protocol.min_writer_version() == 7)
{
return Err(DeltaTableError::Generic("Table feature enables reader and writer feature, but reader is not v3, and writer not v7. Set allow_protocol_versions_increase or increase versions explicitly through set_tbl_properties".to_string()));
} else if !reader_features.is_empty() && protocol.min_reader_version() < 3 {
return Err(DeltaTableError::Generic("Table feature enables reader feature, but min_reader is not v3. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
} else if !writer_features.is_empty() && protocol.min_writer_version() < 7 {
return Err(DeltaTableError::Generic("Table feature enables writer feature, but min_writer is not v7. Set allow_protocol_versions_increase or increase version explicitly through set_tbl_properties".to_string()));
}
}
protocol = protocol.append_reader_features(&reader_features);
protocol = protocol.append_writer_features(&writer_features);
let operation = DeltaOperation::AddFeature {
name: name.to_vec(),
};
let actions = vec![protocol.into()];
let commit = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.build(Some(&snapshot), this.log_store.clone(), operation)
.await?;
this.post_execute(operation_id).await?;
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::{
kernel::TableFeatures,
writer::test_utils::{create_bare_table, get_record_batch},
};
use delta_kernel::DeltaResult;
use delta_kernel::table_features::TableFeature;
#[tokio::test]
async fn add_feature() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let table = create_bare_table()
.write(vec![batch.clone()])
.await
.unwrap();
let result = table
.add_feature()
.with_feature(TableFeatures::ChangeDataFeed)
.with_allow_protocol_versions_increase(true)
.await
.unwrap();
assert!(
&result
.snapshot()
.unwrap()
.protocol()
.writer_features()
.unwrap_or_default()
.contains(&TableFeature::ChangeDataFeed)
);
let result = result
.add_feature()
.with_feature(TableFeatures::DeletionVectors)
.with_allow_protocol_versions_increase(true)
.await
.unwrap();
let current_protocol = &result.snapshot().unwrap().protocol().clone();
assert!(
¤t_protocol
.writer_features()
.unwrap_or_default()
.contains(&TableFeature::DeletionVectors)
);
assert!(
¤t_protocol
.reader_features()
.unwrap_or_default()
.contains(&TableFeature::DeletionVectors)
);
assert_eq!(result.version(), Some(2));
Ok(())
}
#[tokio::test]
async fn add_feature_disallowed_increase() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let table = create_bare_table()
.write(vec![batch.clone()])
.await
.unwrap();
let result = table
.add_feature()
.with_feature(TableFeatures::ChangeDataFeed)
.await;
assert!(result.is_err());
Ok(())
}
}