use std::marker::PhantomData;
use std::sync::Arc;
use crate::committer::Committer;
use crate::expressions::ColumnName;
use crate::schema::StructField;
use crate::snapshot::SnapshotRef;
use crate::table_configuration::TableConfiguration;
use crate::table_features::{Operation, TableFeature};
use crate::table_properties::COLUMN_MAPPING_MAX_COLUMN_ID;
use crate::transaction::alter_table::AlterTableTransaction;
use crate::transaction::schema_evolution::{
apply_schema_operations, SchemaEvolutionResult, SchemaOperation,
};
use crate::{DeltaResult, Engine, Error};
pub struct Ready;
pub struct Modifying;
pub trait Chainable: sealed::Sealed {}
impl Chainable for Ready {}
impl Chainable for Modifying {}
mod sealed {
pub trait Sealed {}
impl Sealed for super::Ready {}
impl Sealed for super::Modifying {}
}
pub struct AlterTableTransactionBuilder<S = Ready> {
snapshot: SnapshotRef,
operations: Vec<SchemaOperation>,
_state: PhantomData<S>,
}
impl<S> AlterTableTransactionBuilder<S> {
fn transition<T>(self) -> AlterTableTransactionBuilder<T> {
AlterTableTransactionBuilder {
snapshot: self.snapshot,
operations: self.operations,
_state: PhantomData,
}
}
}
impl AlterTableTransactionBuilder<Ready> {
pub(crate) fn new(snapshot: SnapshotRef) -> Self {
AlterTableTransactionBuilder {
snapshot,
operations: Vec::new(),
_state: PhantomData,
}
}
}
impl<S: Chainable> AlterTableTransactionBuilder<S> {
pub fn add_column(mut self, field: StructField) -> AlterTableTransactionBuilder<Modifying> {
self.operations.push(SchemaOperation::AddColumn { field });
self.transition()
}
pub fn set_nullable(mut self, column: ColumnName) -> AlterTableTransactionBuilder<Modifying> {
self.operations
.push(SchemaOperation::SetNullable { column });
self.transition()
}
}
impl AlterTableTransactionBuilder<Modifying> {
pub fn build(
self,
_engine: &dyn Engine,
committer: Box<dyn Committer>,
) -> DeltaResult<AlterTableTransaction> {
let table_config = self.snapshot.table_configuration();
if table_config.is_feature_enabled(&TableFeature::IcebergCompatV3) {
return Err(Error::unsupported(
"ALTER TABLE is not yet supported on tables with icebergCompatV3 enabled",
));
}
table_config.ensure_operation_supported(Operation::Write)?;
let schema = Arc::unwrap_or_clone(table_config.logical_schema());
let column_mapping_mode = table_config.column_mapping_mode();
let current_max_column_id = table_config.table_properties().column_mapping_max_column_id;
let SchemaEvolutionResult {
schema: evolved_schema,
new_max_column_id,
} = apply_schema_operations(
schema,
self.operations,
column_mapping_mode,
current_max_column_id,
)?;
let mut evolved_metadata = table_config
.metadata()
.clone()
.with_schema(evolved_schema.clone())?;
if let Some(id) = new_max_column_id {
evolved_metadata = evolved_metadata
.with_configuration_entry(COLUMN_MAPPING_MAX_COLUMN_ID, id.to_string());
}
let evolved_table_config = TableConfiguration::try_new_with_schema(
table_config,
evolved_metadata,
evolved_schema,
)?;
AlterTableTransaction::try_new_alter_table(self.snapshot, evolved_table_config, committer)
}
}