use std::collections::HashMap;
use iceberg_rust_spec::{
spec::{
partition::PartitionSpec,
schema::Schema,
snapshot::{Snapshot, SnapshotReference},
sort::SortOrder,
table_metadata::TableMetadata,
view_metadata::{GeneralViewMetadata, Version},
},
view_metadata::Materialization,
};
use serde_derive::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::Error;
use super::identifier::Identifier;
#[derive(Debug, Serialize, Deserialize)]
pub struct CommitTable {
pub identifier: Identifier,
pub requirements: Vec<TableRequirement>,
pub updates: Vec<TableUpdate>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CommitView<T: Materialization> {
pub identifier: Identifier,
pub requirements: Vec<ViewRequirement>,
pub updates: Vec<ViewUpdate<T>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(
tag = "action",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum TableUpdate {
AssignUUID {
uuid: String,
},
UpgradeFormatVersion {
format_version: i32,
},
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
SetCurrentSchema {
schema_id: i32,
},
AddPartitionSpec {
spec: PartitionSpec,
},
SetDefaultSpec {
spec_id: i32,
},
AddSortOrder {
sort_order: SortOrder,
},
SetDefaultSortOrder {
sort_order_id: i32,
},
AddSnapshot {
snapshot: Snapshot,
},
SetSnapshotRef {
ref_name: String,
snapshot_reference: SnapshotReference,
},
RemoveSnapshots {
snapshot_ids: Vec<i64>,
},
RemoveSnapshotRef {
ref_name: String,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(
tag = "type",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum TableRequirement {
AssertCreate,
AssertTableUuid {
uuid: Uuid,
},
AssertRefSnapshotId {
r#ref: String,
snapshot_id: i64,
},
AssertLastAssignedFieldId {
last_assigned_field_id: i32,
},
AssertCurrentSchemaId {
current_schema_id: i32,
},
AssertLastAssignedPartitionId {
last_assigned_partition_id: i32,
},
AssertDefaultSpecId {
default_spec_id: i32,
},
AssertDefaultSortOrderId {
default_sort_order_id: i32,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(
tag = "action",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum ViewUpdate<T: Materialization> {
AssignUUID {
uuid: String,
},
UpgradeFormatVersion {
format_version: i32,
},
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
AddViewVersion {
view_version: Version<T>,
},
SetCurrentViewVersion {
view_version_id: i64,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(
tag = "type",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum ViewRequirement {
AssertViewUuid {
uuid: Uuid,
},
}
pub fn check_table_requirements(
requirements: &[TableRequirement],
metadata: &TableMetadata,
) -> bool {
requirements.iter().all(|x| match x {
TableRequirement::AssertCreate => true,
TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
.refs
.get(r#ref)
.map(|id| id.snapshot_id == *snapshot_id)
.unwrap_or(false),
TableRequirement::AssertLastAssignedFieldId {
last_assigned_field_id,
} => metadata.last_column_id == *last_assigned_field_id,
TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
metadata.current_schema_id == *current_schema_id
}
TableRequirement::AssertLastAssignedPartitionId {
last_assigned_partition_id,
} => metadata.last_partition_id == *last_assigned_partition_id,
TableRequirement::AssertDefaultSpecId { default_spec_id } => {
metadata.default_spec_id == *default_spec_id
}
TableRequirement::AssertDefaultSortOrderId {
default_sort_order_id,
} => metadata.default_sort_order_id == *default_sort_order_id,
})
}
pub fn check_view_requirements<T: Materialization + Eq + 'static>(
requirements: &[ViewRequirement],
metadata: &GeneralViewMetadata<T>,
) -> bool {
requirements.iter().all(|x| match x {
ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
})
}
pub fn apply_table_updates(
metadata: &mut TableMetadata,
updates: Vec<TableUpdate>,
) -> Result<(), Error> {
for update in updates {
match update {
TableUpdate::UpgradeFormatVersion { format_version: _ } => {
unimplemented!();
}
TableUpdate::AssignUUID { uuid } => {
metadata.table_uuid = Uuid::parse_str(&uuid)?;
}
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
metadata.schemas.insert(*schema.schema_id(), schema);
if let Some(last_column_id) = last_column_id {
metadata.last_column_id = last_column_id;
}
}
TableUpdate::SetCurrentSchema { schema_id } => {
metadata.current_schema_id = schema_id;
}
TableUpdate::AddPartitionSpec { spec } => {
metadata.partition_specs.insert(*spec.spec_id(), spec);
}
TableUpdate::SetDefaultSpec { spec_id } => {
metadata.default_spec_id = spec_id;
}
TableUpdate::AddSortOrder { sort_order } => {
metadata.sort_orders.insert(sort_order.order_id, sort_order);
}
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
metadata.default_sort_order_id = sort_order_id;
}
TableUpdate::AddSnapshot { snapshot } => {
metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
}
TableUpdate::SetSnapshotRef {
ref_name,
snapshot_reference,
} => {
if ref_name == "main" {
metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
}
metadata.refs.insert(ref_name, snapshot_reference);
}
TableUpdate::RemoveSnapshots { snapshot_ids } => {
for id in snapshot_ids {
metadata.snapshots.remove(&id);
}
}
TableUpdate::RemoveSnapshotRef { ref_name } => {
metadata.refs.remove(&ref_name);
}
TableUpdate::SetLocation { location } => {
metadata.location = location;
}
TableUpdate::SetProperties { updates } => {
metadata.properties.extend(updates);
}
TableUpdate::RemoveProperties { removals } => {
for rem in removals {
metadata.properties.remove(&rem);
}
}
};
}
Ok(())
}
pub fn apply_view_updates<T: Materialization + 'static>(
metadata: &mut GeneralViewMetadata<T>,
updates: Vec<ViewUpdate<T>>,
) -> Result<(), Error> {
for update in updates {
match update {
ViewUpdate::UpgradeFormatVersion { format_version: _ } => {
unimplemented!();
}
ViewUpdate::AssignUUID { uuid } => {
metadata.view_uuid = Uuid::parse_str(&uuid)?;
}
ViewUpdate::AddSchema {
schema,
last_column_id: _,
} => {
metadata.schemas.insert(*schema.schema_id(), schema);
}
ViewUpdate::SetLocation { location } => {
metadata.location = location;
}
ViewUpdate::SetProperties { updates } => {
metadata.properties.extend(updates);
}
ViewUpdate::RemoveProperties { removals } => {
for rem in removals {
metadata.properties.remove(&rem);
}
}
ViewUpdate::AddViewVersion { view_version } => {
metadata
.versions
.insert(view_version.version_id, view_version);
}
ViewUpdate::SetCurrentViewVersion { view_version_id } => {
metadata.current_version_id = view_version_id;
}
};
}
Ok(())
}