use std::sync::Arc;
use futures::future::BoxFuture;
use super::{CustomExecuteHandler, Operation};
use crate::DeltaTable;
use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use crate::kernel::{Action, EagerSnapshot, MetadataExt, resolve_snapshot};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTableError};
pub struct DropConstraintBuilder {
snapshot: Option<EagerSnapshot>,
name: Option<String>,
raise_if_not_exists: bool,
log_store: LogStoreRef,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl super::Operation for DropConstraintBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl DropConstraintBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
name: None,
raise_if_not_exists: true,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_constraint<S: Into<String>>(mut self, name: S) -> Self {
self.name = Some(name.into());
self
}
pub fn with_raise_if_not_exists(mut self, raise: bool) -> Self {
self.raise_if_not_exists = raise;
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
}
impl std::future::IntoFuture for DropConstraintBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), false, None).await?;
PROTOCOL.can_write_to(&snapshot)?;
let name = this
.name
.clone()
.ok_or(DeltaTableError::Generic("No name provided".to_string()))?;
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let mut metadata = snapshot.metadata().clone();
let configuration_key = format!("delta.constraints.{name}");
if !metadata.configuration().contains_key(&configuration_key) {
if this.raise_if_not_exists {
return Err(DeltaTableError::Generic(format!(
"Constraint with name '{name}' does not exist."
)));
}
return Ok(DeltaTable::new_with_state(
this.log_store,
DeltaTableState::new(snapshot),
));
}
metadata = metadata.remove_config_key(&configuration_key)?;
let operation = DeltaOperation::DropConstraint { name: name.clone() };
let actions = vec![Action::Metadata(metadata)];
let commit = CommitBuilder::from(this.commit_properties.clone())
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.get_custom_execute_handler())
.with_actions(actions)
.build(Some(&snapshot), this.log_store.clone(), operation)
.await?;
this.post_execute(operation_id).await?;
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
#[cfg(feature = "datafusion")]
#[cfg(test)]
mod tests {
use crate::writer::test_utils::{create_bare_table, get_record_batch};
use crate::{DeltaResult, DeltaTable};
async fn get_constraint_op_params(table: &mut DeltaTable) -> String {
let last_commit = table.last_commit().await.unwrap();
last_commit
.operation_parameters
.as_ref()
.unwrap()
.get("name")
.unwrap()
.as_str()
.unwrap()
.to_owned()
}
#[tokio::test]
async fn drop_valid_constraint() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let table = create_bare_table().write(vec![batch.clone()]).await?;
let table = table
.add_constraint()
.with_constraint("id", "value < 1000")
.await?;
let mut table = table.drop_constraints().with_constraint("id").await?;
let expected_name = "id";
assert_eq!(get_constraint_op_params(&mut table).await, expected_name);
assert_eq!(
table
.snapshot()
.unwrap()
.metadata()
.configuration()
.get("id"),
None
);
Ok(())
}
#[tokio::test]
async fn drop_invalid_constraint_not_existing() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = create_bare_table().write(vec![batch.clone()]).await?;
let table = write
.drop_constraints()
.with_constraint("not_existing")
.await;
assert!(table.is_err());
Ok(())
}
#[tokio::test]
async fn drop_invalid_constraint_ignore() -> DeltaResult<()> {
let batch = get_record_batch(None, false);
let write = create_bare_table().write(vec![batch.clone()]).await?;
let version = write.version();
let table = write
.drop_constraints()
.with_constraint("not_existing")
.with_raise_if_not_exists(false)
.await?;
let version_after = table.version();
assert_eq!(version, version_after);
Ok(())
}
}