use std::sync::Arc;
use futures::future::BoxFuture;
use validator::Validate;
use super::{CustomExecuteHandler, Operation};
use crate::DeltaTable;
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{Action, EagerSnapshot, MetadataExt, resolve_snapshot};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::{DeltaResult, DeltaTableError};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Validate)]
#[validate(schema(
function = "validate_at_least_one_field",
message = "No metadata update specified"
))]
pub struct TableMetadataUpdate {
#[validate(length(
min = 1,
max = 255,
message = "Table name cannot be empty and cannot exceed 255 characters"
))]
pub name: Option<String>,
#[validate(length(
max = 4000,
message = "Table description cannot exceed 4000 characters"
))]
pub description: Option<String>,
}
fn validate_at_least_one_field(
update: &TableMetadataUpdate,
) -> Result<(), validator::ValidationError> {
if update.name.is_none() && update.description.is_none() {
return Err(validator::ValidationError::new("no_fields_specified"));
}
Ok(())
}
pub struct UpdateTableMetadataBuilder {
snapshot: Option<EagerSnapshot>,
update: Option<TableMetadataUpdate>,
log_store: LogStoreRef,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl super::Operation for UpdateTableMetadataBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl UpdateTableMetadataBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
update: None,
snapshot,
log_store,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_update(mut self, update: TableMetadataUpdate) -> Self {
self.update = Some(update);
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
}
impl std::future::IntoFuture for UpdateTableMetadataBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), false, None).await?;
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let update = this.update.ok_or_else(|| {
DeltaTableError::MetadataError("No metadata update specified".to_string())
})?;
update
.validate()
.map_err(|e| DeltaTableError::MetadataError(format!("{e}")))?;
let mut metadata = snapshot.metadata().clone();
if let Some(name) = &update.name {
metadata = metadata.with_name(name.clone())?;
}
if let Some(description) = &update.description {
metadata = metadata.with_description(description.clone())?;
}
let operation = DeltaOperation::UpdateTableMetadata {
metadata_update: update,
};
let actions = vec![Action::Metadata(metadata)];
let commit = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
.build(Some(&snapshot), this.log_store.clone(), operation.clone())
.await?;
if let Some(handler) = this.custom_execute_handler {
handler.post_execute(&this.log_store, operation_id).await?;
}
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}