use iceberg_rust_spec::{
spec::{
partition::PartitionSpec,
schema::Schema,
snapshot::{Snapshot, SnapshotReference},
sort::SortOrder,
table_metadata::TableMetadata,
view_metadata::{GeneralViewMetadata, Version},
},
table_metadata::SnapshotLog,
view_metadata::Materialization,
};
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::error::Error;
use super::identifier::Identifier;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CommitTable {
pub identifier: Identifier,
pub requirements: Vec<TableRequirement>,
pub updates: Vec<TableUpdate>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CommitView<T: Materialization> {
pub identifier: Identifier,
pub requirements: Vec<ViewRequirement>,
pub updates: Vec<ViewUpdate<T>>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(
tag = "action",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum TableUpdate {
AssignUuid {
uuid: String,
},
UpgradeFormatVersion {
format_version: i32,
},
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
SetCurrentSchema {
schema_id: i32,
},
AddSpec {
spec: PartitionSpec,
},
SetDefaultSpec {
spec_id: i32,
},
AddSortOrder {
sort_order: SortOrder,
},
SetDefaultSortOrder {
sort_order_id: i32,
},
AddSnapshot {
snapshot: Snapshot,
},
SetSnapshotRef {
ref_name: String,
#[serde(flatten)]
snapshot_reference: SnapshotReference,
},
RemoveSnapshots {
snapshot_ids: Vec<i64>,
},
RemoveSnapshotRef {
ref_name: String,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(
tag = "type",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum TableRequirement {
AssertCreate,
AssertTableUuid {
uuid: Uuid,
},
AssertRefSnapshotId {
r#ref: String,
snapshot_id: i64,
},
AssertLastAssignedFieldId {
last_assigned_field_id: i32,
},
AssertCurrentSchemaId {
current_schema_id: i32,
},
AssertLastAssignedPartitionId {
last_assigned_partition_id: i32,
},
AssertDefaultSpecId {
default_spec_id: i32,
},
AssertDefaultSortOrderId {
default_sort_order_id: i32,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(
tag = "action",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum ViewUpdate<T: Materialization> {
AssignUuid {
uuid: String,
},
UpgradeFormatVersion {
format_version: i32,
},
AddSchema {
schema: Schema,
last_column_id: Option<i32>,
},
SetLocation {
location: String,
},
SetProperties {
updates: HashMap<String, String>,
},
RemoveProperties {
removals: Vec<String>,
},
AddViewVersion {
view_version: Version<T>,
},
SetCurrentViewVersion {
view_version_id: i64,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(
tag = "type",
rename_all = "kebab-case",
rename_all_fields = "kebab-case"
)]
pub enum ViewRequirement {
AssertViewUuid {
uuid: Uuid,
},
}
pub fn check_table_requirements(
requirements: &[TableRequirement],
metadata: &TableMetadata,
) -> bool {
requirements.iter().all(|x| match x {
TableRequirement::AssertCreate => true,
TableRequirement::AssertTableUuid { uuid } => metadata.table_uuid == *uuid,
TableRequirement::AssertRefSnapshotId { r#ref, snapshot_id } => metadata
.refs
.get(r#ref)
.map(|id| id.snapshot_id == *snapshot_id)
.unwrap_or(false),
TableRequirement::AssertLastAssignedFieldId {
last_assigned_field_id,
} => metadata.last_column_id == *last_assigned_field_id,
TableRequirement::AssertCurrentSchemaId { current_schema_id } => {
metadata.current_schema_id == *current_schema_id
}
TableRequirement::AssertLastAssignedPartitionId {
last_assigned_partition_id,
} => metadata.last_partition_id == *last_assigned_partition_id,
TableRequirement::AssertDefaultSpecId { default_spec_id } => {
metadata.default_spec_id == *default_spec_id
}
TableRequirement::AssertDefaultSortOrderId {
default_sort_order_id,
} => metadata.default_sort_order_id == *default_sort_order_id,
})
}
pub fn check_view_requirements<T: Materialization + Eq + 'static>(
requirements: &[ViewRequirement],
metadata: &GeneralViewMetadata<T>,
) -> bool {
requirements.iter().all(|x| match x {
ViewRequirement::AssertViewUuid { uuid } => metadata.view_uuid == *uuid,
})
}
pub fn apply_table_updates(
metadata: &mut TableMetadata,
updates: Vec<TableUpdate>,
) -> Result<(), Error> {
let mut added_schema_id = None;
let mut added_spec_id = None;
let mut added_sort_order_id = None;
for update in updates {
match update {
TableUpdate::UpgradeFormatVersion { format_version } => {
if i32::from(metadata.format_version) != format_version {
unimplemented!("Table format upgrade");
}
}
TableUpdate::AssignUuid { uuid } => {
metadata.table_uuid = Uuid::parse_str(&uuid)?;
}
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
let schema_id = *schema.schema_id();
metadata.schemas.insert(schema_id, schema);
added_schema_id = Some(schema_id);
if let Some(last_column_id) = last_column_id {
metadata.last_column_id = last_column_id;
}
}
TableUpdate::SetCurrentSchema { schema_id } => {
if schema_id == -1 {
if let Some(added_schema_id) = added_schema_id {
metadata.current_schema_id = added_schema_id;
} else {
return Err(Error::InvalidFormat(
"Cannot set current schema to -1 without adding a schema first"
.to_string(),
));
}
} else {
metadata.current_schema_id = schema_id;
}
}
TableUpdate::AddSpec { spec } => {
let spec_id = *spec.spec_id();
metadata.partition_specs.insert(spec_id, spec);
added_spec_id = Some(spec_id);
}
TableUpdate::SetDefaultSpec { spec_id } => {
if spec_id == -1 {
if let Some(added_spec_id) = added_spec_id {
metadata.default_spec_id = added_spec_id;
} else {
return Err(Error::InvalidFormat(
"Cannot set default spec to -1 without adding a spec first".to_string(),
));
}
} else {
metadata.default_spec_id = spec_id;
}
}
TableUpdate::AddSortOrder { sort_order } => {
let sort_order_id = sort_order.order_id;
metadata.sort_orders.insert(sort_order_id, sort_order);
added_sort_order_id = Some(sort_order_id);
}
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
if sort_order_id == -1 {
if let Some(added_sort_order_id) = added_sort_order_id {
metadata.default_sort_order_id = added_sort_order_id;
} else {
return Err(Error::InvalidFormat(
"Cannot set default sort order to -1 without adding a sort order first"
.to_string(),
));
}
} else {
metadata.default_sort_order_id = sort_order_id;
}
}
TableUpdate::AddSnapshot { snapshot } => {
metadata.snapshot_log.push(SnapshotLog {
snapshot_id: *snapshot.snapshot_id(),
timestamp_ms: *snapshot.timestamp_ms(),
});
metadata.last_sequence_number = *snapshot.sequence_number();
metadata.snapshots.insert(*snapshot.snapshot_id(), snapshot);
}
TableUpdate::SetSnapshotRef {
ref_name,
snapshot_reference,
} => {
if ref_name == "main" {
metadata.current_snapshot_id = Some(snapshot_reference.snapshot_id);
}
metadata.refs.insert(ref_name, snapshot_reference);
}
TableUpdate::RemoveSnapshots { snapshot_ids } => {
for id in snapshot_ids {
metadata.snapshots.remove(&id);
}
metadata
.snapshot_log
.retain(|e| metadata.snapshots.contains_key(&e.snapshot_id));
}
TableUpdate::RemoveSnapshotRef { ref_name } => {
metadata.refs.remove(&ref_name);
}
TableUpdate::SetLocation { location } => {
metadata.location = location;
}
TableUpdate::SetProperties { updates } => {
metadata.properties.extend(updates);
}
TableUpdate::RemoveProperties { removals } => {
for rem in removals {
metadata.properties.remove(&rem);
}
}
};
}
metadata.last_updated_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
Ok(())
}
pub fn apply_view_updates<T: Materialization + 'static>(
metadata: &mut GeneralViewMetadata<T>,
updates: Vec<ViewUpdate<T>>,
) -> Result<(), Error> {
for update in updates {
match update {
ViewUpdate::UpgradeFormatVersion { format_version } => {
if i32::from(metadata.format_version.clone()) != format_version {
unimplemented!("Upgrade of format version");
}
}
ViewUpdate::AssignUuid { uuid } => {
metadata.view_uuid = Uuid::parse_str(&uuid)?;
}
ViewUpdate::AddSchema {
schema,
last_column_id: _,
} => {
metadata.schemas.insert(*schema.schema_id(), schema);
}
ViewUpdate::SetLocation { location } => {
metadata.location = location;
}
ViewUpdate::SetProperties { updates } => {
metadata.properties.extend(updates);
}
ViewUpdate::RemoveProperties { removals } => {
for rem in removals {
metadata.properties.remove(&rem);
}
}
ViewUpdate::AddViewVersion { view_version } => {
metadata
.versions
.insert(view_version.version_id, view_version);
}
ViewUpdate::SetCurrentViewVersion { view_version_id } => {
metadata.current_version_id = view_version_id;
}
};
}
Ok(())
}