1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
/*!
* Defines the different [Operation]s on a [View].
*/
use iceberg_rust_spec::{
spec::{
schema::SchemaBuilder,
types::StructType,
view_metadata::{GeneralViewMetadata, Summary, Version, ViewRepresentation, REF_PREFIX},
},
view_metadata::{FullIdentifier, Lineage, Materialization},
};
use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use crate::{
catalog::commit::{ViewRequirement, ViewUpdate},
error::Error,
sql::find_relations,
};
/// View operation
pub enum Operation {
/// Update vresion
UpdateRepresentation {
/// Representation to add
representation: ViewRepresentation,
/// Schema of the representation
schema: StructType,
/// Branch where to add the representation
branch: Option<String>,
},
/// Update view properties
UpdateProperties(Vec<(String, String)>),
}
impl Operation {
/// Execute operation
pub async fn execute<T: Materialization>(
self,
metadata: &GeneralViewMetadata<T>,
) -> Result<(Option<ViewRequirement>, Vec<ViewUpdate<T>>), Error> {
match self {
Operation::UpdateRepresentation {
representation,
schema,
branch,
} => {
let version = metadata.current_version(branch.as_deref())?;
let version_id = metadata.versions.keys().max().unwrap_or(&0) + 1;
let schema_id = metadata.schemas.keys().max().unwrap_or(&0) + 1;
let last_column_id = schema.iter().map(|x| x.id).max().unwrap_or(0);
let relations = find_relations(match &representation {
ViewRepresentation::Sql {
sql,
dialect: _dialect,
} => sql,
})?;
let lineage = Lineage::from_iter(
relations
.into_iter()
.enumerate()
.map(|(i, name)| {
Ok::<_, Error>((
FullIdentifier::parse(
&name,
version.default_namespace().as_deref(),
version.default_catalog().as_deref(),
)?,
i as i64,
))
})
.collect::<Result<Vec<_>, _>>()?,
);
let version = Version {
version_id,
schema_id,
summary: Summary {
operation: iceberg_rust_spec::spec::view_metadata::Operation::Replace,
engine_name: None,
engine_version: None,
},
representations: vec![representation],
default_catalog: version.default_catalog.clone(),
default_namespace: version.default_namespace.clone(),
timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64,
storage_table: version.storage_table.clone(),
lineage: Some(lineage),
};
let branch_name = branch.unwrap_or("main".to_string());
Ok((
Some(ViewRequirement::AssertViewUuid {
uuid: metadata.view_uuid,
}),
vec![
ViewUpdate::AddViewVersion {
view_version: version,
},
ViewUpdate::SetCurrentViewVersion {
view_version_id: version_id,
},
ViewUpdate::AddSchema {
schema: SchemaBuilder::default()
.with_schema_id(schema_id)
.with_fields(schema)
.build()
.map_err(iceberg_rust_spec::error::Error::from)?,
last_column_id: Some(last_column_id),
},
ViewUpdate::SetProperties {
updates: HashMap::from_iter(vec![(
REF_PREFIX.to_string() + &branch_name,
version_id.to_string(),
)]),
},
],
))
}
Operation::UpdateProperties(entries) => Ok((
None,
vec![ViewUpdate::SetProperties {
updates: HashMap::from_iter(entries),
}],
)),
}
}
}