use iceberg_rust_spec::{
schema::Schema,
spec::{
types::StructType,
view_metadata::{GeneralViewMetadata, Summary, Version, ViewRepresentation, REF_PREFIX},
},
view_metadata::Materialization,
};
use std::{
collections::HashMap,
fmt::Debug,
time::{SystemTime, UNIX_EPOCH},
};
use tracing::{debug, instrument};
use crate::{
catalog::commit::{ViewRequirement, ViewUpdate},
error::Error,
};
#[derive(Debug)]
pub enum Operation {
UpdateRepresentations {
representations: Vec<ViewRepresentation>,
schema: StructType,
branch: Option<String>,
},
UpdateProperties(Vec<(String, String)>),
}
fn upsert_representation(
current_representations: &[ViewRepresentation],
new_representation: ViewRepresentation,
) -> Vec<ViewRepresentation> {
let ViewRepresentation::Sql {
dialect: new_dialect,
..
} = &new_representation;
let mut updated = false;
let mut representations: Vec<ViewRepresentation> = current_representations
.iter()
.map(
|current_representation @ ViewRepresentation::Sql { dialect, .. }| {
if dialect == new_dialect {
updated = true;
new_representation.clone()
} else {
current_representation.clone()
}
},
)
.collect();
if !updated {
representations.push(new_representation);
}
representations
}
fn upsert_representations(
current_representations: &[ViewRepresentation],
new_representations: &[ViewRepresentation],
) -> Vec<ViewRepresentation> {
let mut representations: Vec<ViewRepresentation> = current_representations.into();
for r in new_representations {
representations = upsert_representation(&representations, r.clone());
}
representations
}
impl Operation {
#[instrument(
name = "iceberg_rust::view::transaction::operation::execute",
level = "debug"
)]
pub async fn execute<T: Materialization + Debug>(
self,
metadata: &GeneralViewMetadata<T>,
) -> Result<(Option<ViewRequirement>, Vec<ViewUpdate<T>>), Error> {
match self {
Operation::UpdateRepresentations {
representations,
schema,
branch,
} => {
debug!(
"Executing UpdateRepresentations operation: representations={}, schema_fields={}, branch={:?}",
representations.len(),
schema.len(),
branch
);
let schema_changed = metadata
.current_schema(branch.as_deref())
.map(|s| schema != *s.fields())
.unwrap_or(true);
let version = metadata.current_version(branch.as_deref())?;
let version_id = metadata.versions.keys().max().unwrap_or(&0) + 1;
let schema_id = if schema_changed {
metadata.schemas.keys().max().unwrap_or(&0) + 1
} else {
*metadata
.current_schema(branch.as_deref())
.unwrap()
.schema_id()
};
let last_column_id = schema.iter().map(|x| x.id).max().unwrap_or(0);
let version = Version {
version_id,
schema_id,
summary: Summary {
operation: iceberg_rust_spec::spec::view_metadata::Operation::Replace,
engine_name: None,
engine_version: None,
},
representations: upsert_representations(
version.representations(),
&representations,
),
default_catalog: version.default_catalog.clone(),
default_namespace: version.default_namespace.clone(),
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64,
storage_table: version.storage_table.clone(),
};
let branch_name = branch.unwrap_or("main".to_string());
let mut view_updates: Vec<ViewUpdate<T>> = if schema_changed {
vec![ViewUpdate::AddSchema {
schema: Schema::from_struct_type(schema, schema_id, None),
last_column_id: Some(last_column_id),
}]
} else {
vec![]
};
view_updates.append(&mut vec![
ViewUpdate::AddViewVersion {
view_version: version,
},
ViewUpdate::SetCurrentViewVersion {
view_version_id: version_id,
},
ViewUpdate::SetProperties {
updates: HashMap::from_iter(vec![(
REF_PREFIX.to_string() + &branch_name,
version_id.to_string(),
)]),
},
]);
Ok((
Some(ViewRequirement::AssertViewUuid {
uuid: metadata.view_uuid,
}),
view_updates,
))
}
Operation::UpdateProperties(entries) => {
debug!(
"Executing UpdateProperties operation: entries={:?}",
entries
);
Ok((
None,
vec![ViewUpdate::SetProperties {
updates: HashMap::from_iter(entries),
}],
))
}
}
}
}
#[cfg(test)]
mod tests {
use iceberg_rust_spec::view_metadata::ViewRepresentation;
use crate::view::transaction::operation::upsert_representations;
#[test]
fn test_upsert_representations() {
assert_eq!(
upsert_representations(
&[
ViewRepresentation::sql("a1", Some("a")),
ViewRepresentation::sql("b1", Some("b"))
],
&[
ViewRepresentation::sql("b2", Some("b")),
ViewRepresentation::sql("c2", Some("c"))
]
),
vec![
ViewRepresentation::sql("a1", Some("a")),
ViewRepresentation::sql("b2", Some("b")),
ViewRepresentation::sql("c2", Some("c")),
]
);
assert_eq!(
upsert_representations(
&[
ViewRepresentation::sql("a1", Some("a")),
ViewRepresentation::sql("b1", Some("b"))
],
&[
ViewRepresentation::sql("c2", Some("c")),
ViewRepresentation::sql("a2", Some("a"))
]
),
vec![
ViewRepresentation::sql("a2", Some("a")),
ViewRepresentation::sql("b1", Some("b")),
ViewRepresentation::sql("c2", Some("c")),
]
);
}
}