use std::sync::Arc;
use iceberg_rust_spec::spec::{
materialized_view_metadata::MaterializedViewMetadata, schema::Schema,
};
use object_store::ObjectStore;
use crate::{
catalog::{
bucket::Bucket, create::CreateMaterializedViewBuilder, identifier::Identifier,
tabular::Tabular, Catalog,
},
error::Error,
};
use self::{storage_table::StorageTable, transaction::Transaction as MaterializedViewTransaction};
mod storage_table;
pub mod transaction;
pub static STORAGE_TABLE_POSTFIX: &str = "__storage";
pub static STORAGE_TABLE_FLAG: &str = "materialize.storage_table";
#[derive(Debug)]
pub struct MaterializedView {
identifier: Identifier,
metadata: MaterializedViewMetadata,
catalog: Arc<dyn Catalog>,
}
#[derive(Debug)]
pub enum StorageTableState {
Fresh,
Outdated(i64),
Invalid,
}
impl MaterializedView {
pub fn builder() -> CreateMaterializedViewBuilder {
CreateMaterializedViewBuilder::default()
}
pub async fn new(
identifier: Identifier,
catalog: Arc<dyn Catalog>,
metadata: MaterializedViewMetadata,
) -> Result<Self, Error> {
Ok(MaterializedView {
identifier,
metadata,
catalog,
})
}
pub fn identifier(&self) -> &Identifier {
&self.identifier
}
pub fn catalog(&self) -> Arc<dyn Catalog> {
self.catalog.clone()
}
pub fn object_store(&self) -> Arc<dyn ObjectStore> {
self.catalog
.object_store(Bucket::from_path(&self.metadata.location).unwrap())
}
pub fn current_schema(&self, branch: Option<&str>) -> Result<&Schema, Error> {
self.metadata.current_schema(branch).map_err(Error::from)
}
pub fn metadata(&self) -> &MaterializedViewMetadata {
&self.metadata
}
pub fn new_transaction(&mut self, branch: Option<&str>) -> MaterializedViewTransaction {
MaterializedViewTransaction::new(self, branch)
}
pub async fn storage_table(&self) -> Result<StorageTable, Error> {
let identifier: Identifier = self
.metadata()
.current_version(None)?
.storage_table()
.into();
if let Tabular::Table(table) = self.catalog().load_tabular(&identifier).await? {
Ok(StorageTable::new(table))
} else {
Err(Error::InvalidFormat("storage table".to_string()))
}
}
}