iceberg_rust/catalog/
create.rs

1//! Creation interfaces for Iceberg catalog objects
2//!
3//! This module provides builder-pattern implementations for creating new objects in an Iceberg catalog:
4//!
5//! * Tables with schema, partition specs, and sort orders
6//! * Views with schema and version specifications
7//! * Materialized views with both view metadata and storage tables
8//!
9//! All builders support fluent configuration and handle default values appropriately.
10//! The module ensures proper initialization of metadata like UUIDs and timestamps.
11
12use std::{
13    collections::HashMap,
14    sync::Arc,
15    time::{SystemTime, UNIX_EPOCH},
16};
17
18use derive_builder::Builder;
19use iceberg_rust_spec::{
20    identifier::FullIdentifier,
21    spec::{
22        materialized_view_metadata::MaterializedViewMetadata,
23        partition::{PartitionSpec, DEFAULT_PARTITION_SPEC_ID},
24        schema::{Schema, DEFAULT_SCHEMA_ID},
25        sort::{SortOrder, DEFAULT_SORT_ORDER_ID},
26        table_metadata::TableMetadata,
27        view_metadata::{Version, ViewMetadata, DEFAULT_VERSION_ID},
28    },
29    view_metadata::Materialization,
30};
31use serde::{Deserialize, Serialize};
32use uuid::Uuid;
33
34use crate::{
35    error::Error,
36    materialized_view::{MaterializedView, STORAGE_TABLE_POSTFIX},
37    table::Table,
38    view::View,
39};
40
41use super::{identifier::Identifier, Catalog};
42
43/// Configuration for creating a new Iceberg table in a catalog
44///
45/// This struct contains all the necessary information to create a new table:
46/// * Table name and optional location
47/// * Schema definition
48/// * Optional partition specification
49/// * Optional sort order
50/// * Optional properties
51///
52/// The struct implements Builder pattern for convenient construction and
53/// can be serialized/deserialized using serde.
54#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Builder)]
55#[serde(rename_all = "kebab-case")]
56#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
57pub struct CreateTable {
58    #[builder(setter(into))]
59    /// Name of the table
60    pub name: String,
61    /// Location tables base location
62    #[serde(skip_serializing_if = "Option::is_none")]
63    #[builder(setter(into, strip_option), default)]
64    pub location: Option<String>,
65    /// Table schemma
66    pub schema: Schema,
67    /// Partition spec
68    #[serde(skip_serializing_if = "Option::is_none")]
69    #[builder(setter(strip_option), default)]
70    pub partition_spec: Option<PartitionSpec>,
71    /// Sort order
72    #[serde(skip_serializing_if = "Option::is_none")]
73    #[builder(setter(strip_option, name = "with_sort_order"), default)]
74    pub write_order: Option<SortOrder>,
75    /// stage create
76    #[serde(skip_serializing_if = "Option::is_none")]
77    #[builder(setter(strip_option), default)]
78    pub stage_create: Option<bool>,
79    /// Table properties
80    #[serde(skip_serializing_if = "Option::is_none")]
81    #[builder(setter(strip_option, each(name = "with_property")), default)]
82    pub properties: Option<HashMap<String, String>>,
83}
84
85impl CreateTableBuilder {
86    /// Builds and registers a new table in the catalog
87    ///
88    /// # Arguments
89    /// * `namespace` - The namespace where the table will be created
90    /// * `catalog` - The catalog where the table will be registered
91    ///
92    /// # Returns
93    /// * `Ok(Table)` - The newly created table
94    /// * `Err(Error)` - If table creation fails, e.g. due to missing name or catalog errors
95    ///
96    /// This method finalizes the table configuration and registers it in the specified catalog.
97    /// It uses the builder's current state to create the table metadata.
98    pub async fn build(
99        &mut self,
100        namespace: &[String],
101        catalog: Arc<dyn Catalog>,
102    ) -> Result<Table, Error> {
103        let name = self
104            .name
105            .as_ref()
106            .ok_or(Error::NotFound("Name to create table".to_owned()))?;
107        let identifier = Identifier::new(namespace, name);
108
109        let create = self.create()?;
110
111        // Register table in catalog
112        catalog.clone().create_table(identifier, create).await
113    }
114}
115
116impl TryInto<TableMetadata> for CreateTable {
117    type Error = Error;
118    fn try_into(self) -> Result<TableMetadata, Self::Error> {
119        let last_column_id = self.schema.fields().iter().map(|x| x.id).max().unwrap_or(0);
120
121        let last_partition_id = self
122            .partition_spec
123            .as_ref()
124            .and_then(|x| x.fields().iter().map(|x| *x.field_id()).max())
125            .unwrap_or(0);
126
127        Ok(TableMetadata {
128            format_version: Default::default(),
129            table_uuid: Uuid::new_v4(),
130            location: self
131                .location
132                .ok_or(Error::NotFound(format!("Location for table {}", self.name)))?,
133            last_sequence_number: 0,
134            last_updated_ms: SystemTime::now()
135                .duration_since(UNIX_EPOCH)
136                .unwrap()
137                .as_millis() as i64,
138            last_column_id,
139            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
140            current_schema_id: DEFAULT_SCHEMA_ID,
141            partition_specs: HashMap::from_iter(vec![(
142                DEFAULT_PARTITION_SPEC_ID,
143                self.partition_spec.unwrap_or_default(),
144            )]),
145            default_spec_id: DEFAULT_PARTITION_SPEC_ID,
146            last_partition_id,
147            properties: self.properties.unwrap_or_default(),
148            current_snapshot_id: None,
149            snapshots: HashMap::new(),
150            snapshot_log: Vec::new(),
151            metadata_log: Vec::new(),
152            sort_orders: HashMap::from_iter(vec![(
153                DEFAULT_SORT_ORDER_ID,
154                self.write_order.unwrap_or_default(),
155            )]),
156            default_sort_order_id: DEFAULT_SORT_ORDER_ID,
157            refs: HashMap::new(),
158        })
159    }
160}
161
162/// Configuration for creating a new Iceberg view in a catalog
163///
164/// This struct contains all the necessary information to create a new view:
165/// * View name and optional location
166/// * Schema definition
167/// * View version specification
168/// * Optional properties
169///
170/// # Type Parameters
171/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
172///   or `FullIdentifier` for materialized views
173///
174/// The struct implements Builder pattern for convenient construction and
175/// can be serialized/deserialized using serde.
176#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
177#[serde(rename_all = "kebab-case")]
178#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
179pub struct CreateView<T: Materialization> {
180    /// Name of the view
181    #[builder(setter(into))]
182    pub name: String,
183    /// View base location
184    #[serde(skip_serializing_if = "Option::is_none")]
185    #[builder(setter(into, strip_option), default)]
186    pub location: Option<String>,
187    /// Schema of the view
188    pub schema: Schema,
189    /// Viersion of the view
190    pub view_version: Version<T>,
191    /// View properties
192    #[builder(setter(each(name = "with_property")), default)]
193    pub properties: HashMap<String, String>,
194}
195
196impl CreateViewBuilder<Option<()>> {
197    /// Builds and registers a new view in the catalog
198    ///
199    /// # Arguments
200    /// * `namespace` - The namespace where the view will be created
201    /// * `catalog` - The catalog where the view will be registered
202    ///
203    /// # Returns
204    /// * `Ok(View)` - The newly created view
205    /// * `Err(Error)` - If view creation fails, e.g. due to missing name or catalog errors
206    ///
207    /// This method finalizes the view configuration and registers it in the specified catalog.
208    /// It automatically sets default namespace and catalog values if not already specified.
209    pub async fn build(
210        &mut self,
211        namespace: &[String],
212        catalog: Arc<dyn Catalog>,
213    ) -> Result<View, Error> {
214        let name = self
215            .name
216            .as_ref()
217            .ok_or(Error::NotFound("Name to create view".to_owned()))?;
218        let identifier = Identifier::new(namespace, name);
219
220        if let Some(version) = &mut self.view_version {
221            if version.default_namespace().is_empty() {
222                version.default_namespace = namespace.to_vec()
223            }
224            if version.default_catalog().is_none() && !catalog.name().is_empty() {
225                version.default_catalog = Some(catalog.name().to_string())
226            }
227        }
228
229        let create = self.create()?;
230
231        // Register table in catalog
232        catalog.clone().create_view(identifier, create).await
233    }
234}
235
236impl TryInto<ViewMetadata> for CreateView<Option<()>> {
237    type Error = Error;
238    fn try_into(self) -> Result<ViewMetadata, Self::Error> {
239        Ok(ViewMetadata {
240            view_uuid: Uuid::new_v4(),
241            format_version: Default::default(),
242            location: self
243                .location
244                .ok_or(Error::NotFound(format!("Location for view {}", self.name)))?,
245            current_version_id: DEFAULT_VERSION_ID,
246            versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
247            version_log: Vec::new(),
248            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
249            properties: self.properties,
250        })
251    }
252}
253
254impl TryInto<MaterializedViewMetadata> for CreateView<FullIdentifier> {
255    type Error = Error;
256    fn try_into(self) -> Result<MaterializedViewMetadata, Self::Error> {
257        Ok(MaterializedViewMetadata {
258            view_uuid: Uuid::new_v4(),
259            format_version: Default::default(),
260            location: self.location.ok_or(Error::NotFound(format!(
261                "Location for materialized view {}",
262                self.name
263            )))?,
264            current_version_id: DEFAULT_VERSION_ID,
265            versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
266            version_log: Vec::new(),
267            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
268            properties: self.properties,
269        })
270    }
271}
272
273/// Configuration for creating a new materialized view in an Iceberg catalog
274///
275/// This struct contains all the necessary information to create both a materialized view
276/// and its underlying storage table:
277/// * View name and optional location
278/// * Schema definition
279/// * View version specification with storage table reference
280/// * Optional partition specification for the storage table
281/// * Optional sort order for the storage table
282/// * Separate properties for both view and storage table
283///
284/// The struct implements Builder pattern for convenient construction and
285/// can be serialized/deserialized using serde.
286#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
287#[serde(rename_all = "kebab-case")]
288#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
289pub struct CreateMaterializedView {
290    /// Name of the view
291    #[builder(setter(into))]
292    pub name: String,
293    /// View base location
294    #[serde(skip_serializing_if = "Option::is_none")]
295    #[builder(setter(into, strip_option), default)]
296    pub location: Option<String>,
297    /// Schema of the view
298    pub schema: Schema,
299    /// Viersion of the view
300    pub view_version: Version<FullIdentifier>,
301    /// View properties
302    #[builder(setter(each(name = "with_property")), default)]
303    pub properties: HashMap<String, String>,
304    /// Partition spec
305    #[serde(skip_serializing_if = "Option::is_none")]
306    #[builder(setter(strip_option), default)]
307    pub partition_spec: Option<PartitionSpec>,
308    /// Sort order
309    #[serde(skip_serializing_if = "Option::is_none")]
310    #[builder(setter(strip_option, name = "with_sort_order"), default)]
311    pub write_order: Option<SortOrder>,
312    /// stage create
313    #[serde(skip_serializing_if = "Option::is_none")]
314    #[builder(setter(strip_option), default)]
315    pub stage_create: Option<bool>,
316    /// Table properties
317    #[serde(skip_serializing_if = "Option::is_none")]
318    #[builder(setter(strip_option, each(name = "with_table_property")), default)]
319    pub table_properties: Option<HashMap<String, String>>,
320}
321
322impl CreateMaterializedViewBuilder {
323    /// Builds and registers a new materialized view in the catalog
324    ///
325    /// # Arguments
326    /// * `namespace` - The namespace where the materialized view will be created
327    /// * `catalog` - The catalog where the materialized view will be registered
328    ///
329    /// # Returns
330    /// * `Ok(MaterializedView)` - The newly created materialized view
331    /// * `Err(Error)` - If view creation fails, e.g. due to missing name or catalog errors
332    ///
333    /// This method finalizes the materialized view configuration and registers it in the specified catalog.
334    /// It automatically:
335    /// * Sets default namespace and catalog values if not specified
336    /// * Creates the underlying storage table with the appropriate name suffix
337    /// * Registers both the view and its storage table in the catalog
338    pub async fn build(
339        &mut self,
340        namespace: &[String],
341        catalog: Arc<dyn Catalog>,
342    ) -> Result<MaterializedView, Error> {
343        let name = self.name.as_ref().ok_or(Error::NotFound(
344            "Name to create materialized view".to_owned(),
345        ))?;
346        let identifier = Identifier::new(namespace, name);
347
348        if let Some(version) = &mut self.view_version {
349            if version.default_namespace().is_empty() {
350                version.default_namespace = namespace.to_vec()
351            }
352            if version.default_catalog().is_none() && !catalog.name().is_empty() {
353                version.default_catalog = Some(catalog.name().to_string())
354            }
355        }
356
357        let mut create = self.create()?;
358
359        let version = Version {
360            version_id: create.view_version.version_id,
361            schema_id: create.view_version.schema_id,
362            timestamp_ms: create.view_version.timestamp_ms,
363            summary: create.view_version.summary.clone(),
364            representations: create.view_version.representations.clone(),
365            default_catalog: create.view_version.default_catalog,
366            default_namespace: create.view_version.default_namespace,
367            storage_table: FullIdentifier::new(
368                None,
369                identifier.namespace(),
370                &(identifier.name().to_string() + STORAGE_TABLE_POSTFIX),
371            ),
372        };
373
374        create.view_version = version;
375
376        // Register materialized view in catalog
377        catalog
378            .clone()
379            .create_materialized_view(identifier.clone(), create)
380            .await
381    }
382}
383
384impl From<CreateMaterializedView> for (CreateView<FullIdentifier>, CreateTable) {
385    fn from(val: CreateMaterializedView) -> Self {
386        let storage_table = val.view_version.storage_table.name().to_owned();
387        (
388            CreateView {
389                name: val.name.clone(),
390                location: val.location.clone(),
391                schema: val.schema.clone(),
392                view_version: val.view_version,
393                properties: val.properties,
394            },
395            CreateTable {
396                name: storage_table,
397                location: val.location,
398                schema: val.schema,
399                partition_spec: val.partition_spec,
400                write_order: val.write_order,
401                stage_create: val.stage_create,
402                properties: val.table_properties,
403            },
404        )
405    }
406}