Skip to main content

iceberg_rust/catalog/
create.rs

1//! Creation interfaces for Iceberg catalog objects
2//!
3//! This module provides builder-pattern implementations for creating new objects in an Iceberg catalog:
4//!
5//! * Tables with schema, partition specs, and sort orders
6//! * Views with schema and version specifications
7//! * Materialized views with both view metadata and storage tables
8//!
9//! All builders support fluent configuration and handle default values appropriately.
10//! The module ensures proper initialization of metadata like UUIDs and timestamps.
11
12use std::{
13    collections::HashMap,
14    sync::Arc,
15    time::{SystemTime, UNIX_EPOCH},
16};
17
18use derive_builder::Builder;
19use iceberg_rust_spec::{
20    identifier::Identifier,
21    spec::{
22        materialized_view_metadata::MaterializedViewMetadata,
23        partition::{PartitionSpec, DEFAULT_PARTITION_SPEC_ID},
24        schema::{Schema, DEFAULT_SCHEMA_ID},
25        sort::{SortOrder, DEFAULT_SORT_ORDER_ID},
26        table_metadata::TableMetadata,
27        view_metadata::{Version, ViewMetadata, DEFAULT_VERSION_ID},
28    },
29    view_metadata::Materialization,
30};
31use serde::{Deserialize, Serialize};
32use uuid::Uuid;
33
34use crate::{
35    error::Error,
36    materialized_view::{MaterializedView, STORAGE_TABLE_POSTFIX},
37    table::Table,
38    view::View,
39};
40
41use super::Catalog;
42
43/// Configuration for creating a new Iceberg table in a catalog
44///
45/// This struct contains all the necessary information to create a new table:
46/// * Table name and optional location
47/// * Schema definition
48/// * Optional partition specification
49/// * Optional sort order
50/// * Optional properties
51///
52/// The struct implements Builder pattern for convenient construction and
53/// can be serialized/deserialized using serde.
54#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Builder)]
55#[serde(rename_all = "kebab-case")]
56#[builder(
57    build_fn(name = "create", error = "Error", validate = "Self::validate"),
58    setter(prefix = "with")
59)]
60pub struct CreateTable {
61    #[builder(setter(into))]
62    /// Name of the table
63    pub name: String,
64    /// Location tables base location
65    #[serde(skip_serializing_if = "Option::is_none")]
66    #[builder(setter(into, strip_option), default)]
67    pub location: Option<String>,
68    /// Table schemma
69    pub schema: Schema,
70    /// Partition spec
71    #[serde(skip_serializing_if = "Option::is_none")]
72    #[builder(setter(strip_option), default)]
73    pub partition_spec: Option<PartitionSpec>,
74    /// Sort order
75    #[serde(skip_serializing_if = "Option::is_none")]
76    #[builder(setter(strip_option, name = "with_sort_order"), default)]
77    pub write_order: Option<SortOrder>,
78    /// stage create
79    #[serde(skip_serializing_if = "Option::is_none")]
80    #[builder(setter(strip_option), default)]
81    pub stage_create: Option<bool>,
82    /// Table properties
83    #[serde(skip_serializing_if = "Option::is_none")]
84    #[builder(setter(strip_option, each(name = "with_property")), default)]
85    pub properties: Option<HashMap<String, String>>,
86}
87
88impl CreateTableBuilder {
89    /// Validates the table configuration
90    ///
91    /// Performs the following checks:
92    /// * Schema field IDs are unique
93    /// * Partition spec fields reference valid schema fields
94    /// * Sort order fields reference valid schema fields
95    ///
96    /// # Returns
97    /// * `Ok(())` - If all validations pass
98    /// * `Err(Error)` - If any validation fails with contextual error message
99    fn validate(&self) -> Result<(), Error> {
100        let name = self
101            .name
102            .as_ref()
103            .ok_or(Error::NotFound("Table name is required".to_string()))?;
104
105        let schema = self
106            .schema
107            .as_ref()
108            .ok_or(Error::NotFound("Table schema is required".to_string()))?;
109
110        // Validate schema field IDs are unique
111        let field_ids: Vec<i32> = schema.fields().iter().map(|f| f.id).collect();
112        let unique_ids: std::collections::HashSet<_> = field_ids.iter().collect();
113        if field_ids.len() != unique_ids.len() {
114            return Err(Error::InvalidFormat(format!(
115                "Schema for table '{}' contains duplicate field IDs",
116                name
117            )));
118        }
119
120        // Validate partition spec references valid schema fields
121        if let Some(Some(spec)) = &self.partition_spec {
122            for field in spec.fields() {
123                let source_id = field.source_id();
124                if !schema.fields().iter().any(|f| f.id == *source_id) {
125                    return Err(Error::NotFound(format!(
126                            "Partition field '{}' references non-existent schema field ID {} in table '{}'",
127                            field.name(),
128                            source_id,
129                            name
130                        )));
131                }
132            }
133        }
134
135        // Validate sort order references valid schema fields
136        if let Some(Some(order)) = &self.write_order {
137            for field in &order.fields {
138                let source_id = field.source_id;
139                if !schema.fields().iter().any(|f| f.id == source_id) {
140                    return Err(Error::NotFound(format!(
141                        "Sort order field references non-existent schema field ID {} in table '{}'",
142                        source_id, name
143                    )));
144                }
145            }
146        }
147
148        Ok(())
149    }
150
151    /// Builds and registers a new table in the catalog
152    ///
153    /// # Arguments
154    /// * `namespace` - The namespace where the table will be created
155    /// * `catalog` - The catalog where the table will be registered
156    ///
157    /// # Returns
158    /// * `Ok(Table)` - The newly created table
159    /// * `Err(Error)` - If table creation fails, e.g. due to missing name or catalog errors
160    ///
161    /// This method finalizes the table configuration and registers it in the specified catalog.
162    /// It uses the builder's current state to create the table metadata.
163    pub async fn build(
164        &mut self,
165        namespace: &[String],
166        catalog: Arc<dyn Catalog>,
167    ) -> Result<Table, Error> {
168        let name = self
169            .name
170            .as_ref()
171            .ok_or(Error::NotFound("Name to create table".to_owned()))?;
172        let identifier = Identifier::new(namespace, name);
173
174        let create = self.create()?;
175
176        // Register table in catalog
177        catalog.clone().create_table(identifier, create).await
178    }
179}
180
181impl TryInto<TableMetadata> for CreateTable {
182    type Error = Error;
183    fn try_into(self) -> Result<TableMetadata, Self::Error> {
184        let last_column_id = self.schema.fields().iter().map(|x| x.id).max().unwrap_or(0);
185
186        let last_partition_id = self
187            .partition_spec
188            .as_ref()
189            .and_then(|x| x.fields().iter().map(|x| *x.field_id()).max())
190            .unwrap_or(0);
191
192        Ok(TableMetadata {
193            format_version: Default::default(),
194            table_uuid: Uuid::new_v4(),
195            location: self
196                .location
197                .ok_or(Error::NotFound(format!("Location for table {}", self.name)))?,
198            last_sequence_number: 0,
199            last_updated_ms: SystemTime::now()
200                .duration_since(UNIX_EPOCH)
201                .unwrap()
202                .as_millis() as i64,
203            last_column_id,
204            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
205            current_schema_id: DEFAULT_SCHEMA_ID,
206            partition_specs: HashMap::from_iter(vec![(
207                DEFAULT_PARTITION_SPEC_ID,
208                self.partition_spec.unwrap_or_default(),
209            )]),
210            default_spec_id: DEFAULT_PARTITION_SPEC_ID,
211            last_partition_id,
212            properties: self.properties.unwrap_or_default(),
213            current_snapshot_id: None,
214            snapshots: HashMap::new(),
215            snapshot_log: Vec::new(),
216            metadata_log: Vec::new(),
217            sort_orders: HashMap::from_iter(vec![(
218                DEFAULT_SORT_ORDER_ID,
219                self.write_order.unwrap_or_default(),
220            )]),
221            default_sort_order_id: DEFAULT_SORT_ORDER_ID,
222            refs: HashMap::new(),
223        })
224    }
225}
226
227/// Configuration for creating a new Iceberg view in a catalog
228///
229/// This struct contains all the necessary information to create a new view:
230/// * View name and optional location
231/// * Schema definition
232/// * View version specification
233/// * Optional properties
234///
235/// # Type Parameters
236/// * `T` - The materialization type for the view, typically `Option<()>` for regular views
237///   or `FullIdentifier` for materialized views
238///
239/// The struct implements Builder pattern for convenient construction and
240/// can be serialized/deserialized using serde.
241#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
242#[serde(rename_all = "kebab-case")]
243#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
244pub struct CreateView<T: Materialization> {
245    /// Name of the view
246    #[builder(setter(into))]
247    pub name: String,
248    /// View base location
249    #[serde(skip_serializing_if = "Option::is_none")]
250    #[builder(setter(into, strip_option), default)]
251    pub location: Option<String>,
252    /// Schema of the view
253    pub schema: Schema,
254    /// Viersion of the view
255    pub view_version: Version<T>,
256    /// View properties
257    #[builder(setter(each(name = "with_property")), default)]
258    pub properties: HashMap<String, String>,
259}
260
261impl CreateViewBuilder<Option<()>> {
262    /// Builds and registers a new view in the catalog
263    ///
264    /// # Arguments
265    /// * `namespace` - The namespace where the view will be created
266    /// * `catalog` - The catalog where the view will be registered
267    ///
268    /// # Returns
269    /// * `Ok(View)` - The newly created view
270    /// * `Err(Error)` - If view creation fails, e.g. due to missing name or catalog errors
271    ///
272    /// This method finalizes the view configuration and registers it in the specified catalog.
273    /// It automatically sets default namespace and catalog values if not already specified.
274    pub async fn build(
275        &mut self,
276        namespace: &[String],
277        catalog: Arc<dyn Catalog>,
278    ) -> Result<View, Error> {
279        let name = self
280            .name
281            .as_ref()
282            .ok_or(Error::NotFound("Name to create view".to_owned()))?;
283        let identifier = Identifier::new(namespace, name);
284
285        if let Some(version) = &mut self.view_version {
286            if version.default_namespace().is_empty() {
287                version.default_namespace = namespace.to_vec()
288            }
289            if version.default_catalog().is_none() && !catalog.name().is_empty() {
290                version.default_catalog = Some(catalog.name().to_string())
291            }
292        }
293
294        let create = self.create()?;
295
296        // Register table in catalog
297        catalog.clone().create_view(identifier, create).await
298    }
299}
300
301impl TryInto<ViewMetadata> for CreateView<Option<()>> {
302    type Error = Error;
303    fn try_into(self) -> Result<ViewMetadata, Self::Error> {
304        Ok(ViewMetadata {
305            view_uuid: Uuid::new_v4(),
306            format_version: Default::default(),
307            location: self
308                .location
309                .ok_or(Error::NotFound(format!("Location for view {}", self.name)))?,
310            current_version_id: DEFAULT_VERSION_ID,
311            versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
312            version_log: Vec::new(),
313            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
314            properties: self.properties,
315        })
316    }
317}
318
319impl TryInto<MaterializedViewMetadata> for CreateView<Identifier> {
320    type Error = Error;
321    fn try_into(self) -> Result<MaterializedViewMetadata, Self::Error> {
322        Ok(MaterializedViewMetadata {
323            view_uuid: Uuid::new_v4(),
324            format_version: Default::default(),
325            location: self.location.ok_or(Error::NotFound(format!(
326                "Location for materialized view {}",
327                self.name
328            )))?,
329            current_version_id: DEFAULT_VERSION_ID,
330            versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
331            version_log: Vec::new(),
332            schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
333            properties: self.properties,
334        })
335    }
336}
337
338/// Configuration for creating a new materialized view in an Iceberg catalog
339///
340/// This struct contains all the necessary information to create both a materialized view
341/// and its underlying storage table:
342/// * View name and optional location
343/// * Schema definition
344/// * View version specification with storage table reference
345/// * Optional partition specification for the storage table
346/// * Optional sort order for the storage table
347/// * Separate properties for both view and storage table
348///
349/// The struct implements Builder pattern for convenient construction and
350/// can be serialized/deserialized using serde.
351#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
352#[serde(rename_all = "kebab-case")]
353#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
354pub struct CreateMaterializedView {
355    /// Name of the view
356    #[builder(setter(into))]
357    pub name: String,
358    /// View base location
359    #[serde(skip_serializing_if = "Option::is_none")]
360    #[builder(setter(into, strip_option), default)]
361    pub location: Option<String>,
362    /// Schema of the view
363    pub schema: Schema,
364    /// Viersion of the view
365    pub view_version: Version<Identifier>,
366    /// View properties
367    #[builder(setter(each(name = "with_property")), default)]
368    pub properties: HashMap<String, String>,
369    /// Partition spec
370    #[serde(skip_serializing_if = "Option::is_none")]
371    #[builder(setter(strip_option), default)]
372    pub partition_spec: Option<PartitionSpec>,
373    /// Sort order
374    #[serde(skip_serializing_if = "Option::is_none")]
375    #[builder(setter(strip_option, name = "with_sort_order"), default)]
376    pub write_order: Option<SortOrder>,
377    /// stage create
378    #[serde(skip_serializing_if = "Option::is_none")]
379    #[builder(setter(strip_option), default)]
380    pub stage_create: Option<bool>,
381    /// Table properties
382    #[serde(skip_serializing_if = "Option::is_none")]
383    #[builder(setter(strip_option, each(name = "with_table_property")), default)]
384    pub table_properties: Option<HashMap<String, String>>,
385}
386
387impl CreateMaterializedViewBuilder {
388    /// Builds and registers a new materialized view in the catalog
389    ///
390    /// # Arguments
391    /// * `namespace` - The namespace where the materialized view will be created
392    /// * `catalog` - The catalog where the materialized view will be registered
393    ///
394    /// # Returns
395    /// * `Ok(MaterializedView)` - The newly created materialized view
396    /// * `Err(Error)` - If view creation fails, e.g. due to missing name or catalog errors
397    ///
398    /// This method finalizes the materialized view configuration and registers it in the specified catalog.
399    /// It automatically:
400    /// * Sets default namespace and catalog values if not specified
401    /// * Creates the underlying storage table with the appropriate name suffix
402    /// * Registers both the view and its storage table in the catalog
403    pub async fn build(
404        &mut self,
405        namespace: &[String],
406        catalog: Arc<dyn Catalog>,
407    ) -> Result<MaterializedView, Error> {
408        let name = self.name.as_ref().ok_or(Error::NotFound(
409            "Name to create materialized view".to_owned(),
410        ))?;
411        let identifier = Identifier::new(namespace, name);
412
413        if let Some(version) = &mut self.view_version {
414            if version.default_namespace().is_empty() {
415                version.default_namespace = namespace.to_vec()
416            }
417            if version.default_catalog().is_none() && !catalog.name().is_empty() {
418                version.default_catalog = Some(catalog.name().to_string())
419            }
420        }
421
422        let mut create = self.create()?;
423
424        let version = Version {
425            version_id: create.view_version.version_id,
426            schema_id: create.view_version.schema_id,
427            timestamp_ms: create.view_version.timestamp_ms,
428            summary: create.view_version.summary.clone(),
429            representations: create.view_version.representations.clone(),
430            default_catalog: create.view_version.default_catalog,
431            default_namespace: create.view_version.default_namespace,
432            storage_table: Identifier::new(
433                identifier.namespace(),
434                &(identifier.name().to_string() + STORAGE_TABLE_POSTFIX),
435            ),
436        };
437
438        create.view_version = version;
439
440        // Register materialized view in catalog
441        catalog
442            .clone()
443            .create_materialized_view(identifier.clone(), create)
444            .await
445    }
446}
447
448impl From<CreateMaterializedView> for (CreateView<Identifier>, CreateTable) {
449    fn from(val: CreateMaterializedView) -> Self {
450        let storage_table = val.view_version.storage_table.name().to_owned();
451        (
452            CreateView {
453                name: val.name.clone(),
454                location: val.location.clone(),
455                schema: val.schema.clone(),
456                view_version: val.view_version,
457                properties: val.properties,
458            },
459            CreateTable {
460                name: storage_table,
461                location: val.location,
462                schema: val.schema,
463                partition_spec: val.partition_spec,
464                write_order: val.write_order,
465                stage_create: val.stage_create,
466                properties: val.table_properties,
467            },
468        )
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use iceberg_rust_spec::spec::{
476        partition::{PartitionField, PartitionSpecBuilder, Transform},
477        sort::{NullOrder, SortDirection, SortField, SortOrderBuilder},
478        types::{PrimitiveType, StructField, Type},
479    };
480
481    /// Helper function to create a simple valid schema for testing
482    fn create_test_schema() -> Schema {
483        Schema::builder()
484            .with_struct_field(StructField {
485                id: 1,
486                name: "id".to_string(),
487                required: true,
488                field_type: Type::Primitive(PrimitiveType::Long),
489                doc: None,
490            })
491            .with_struct_field(StructField {
492                id: 2,
493                name: "name".to_string(),
494                required: false,
495                field_type: Type::Primitive(PrimitiveType::String),
496                doc: None,
497            })
498            .with_struct_field(StructField {
499                id: 3,
500                name: "timestamp".to_string(),
501                required: false,
502                field_type: Type::Primitive(PrimitiveType::Timestamp),
503                doc: None,
504            })
505            .build()
506            .unwrap()
507    }
508
509    /// Helper function to create a schema with duplicate field IDs (invalid)
510    fn create_duplicate_field_id_schema() -> Schema {
511        Schema::builder()
512            .with_struct_field(StructField {
513                id: 1,
514                name: "id".to_string(),
515                required: true,
516                field_type: Type::Primitive(PrimitiveType::Long),
517                doc: None,
518            })
519            .with_struct_field(StructField {
520                id: 1, // Duplicate ID
521                name: "name".to_string(),
522                required: false,
523                field_type: Type::Primitive(PrimitiveType::String),
524                doc: None,
525            })
526            .build()
527            .unwrap()
528    }
529
530    #[test]
531    fn test_create_table_builder_valid() {
532        let schema = create_test_schema();
533        let mut builder = CreateTableBuilder::default();
534        let result = builder
535            .with_name("test_table")
536            .with_location("/test/location")
537            .with_schema(schema)
538            .create();
539
540        assert!(result.is_ok(), "Valid table creation should succeed");
541        let create_table = result.unwrap();
542        assert_eq!(create_table.name, "test_table");
543        assert_eq!(create_table.location, Some("/test/location".to_string()));
544    }
545
546    #[test]
547    fn test_create_table_builder_missing_name() {
548        let schema = create_test_schema();
549        let mut builder = CreateTableBuilder::default();
550        let result = builder
551            .with_location("/test/location")
552            .with_schema(schema)
553            .create();
554
555        assert!(result.is_err(), "Table creation without name should fail");
556    }
557
558    #[test]
559    fn test_create_table_builder_missing_schema() {
560        let mut builder = CreateTableBuilder::default();
561        let result = builder
562            .with_name("test_table")
563            .with_location("/test/location")
564            .create();
565
566        assert!(result.is_err(), "Table creation without schema should fail");
567    }
568
569    #[test]
570    fn test_create_table_validation_duplicate_field_ids() {
571        let schema = create_duplicate_field_id_schema();
572        let mut builder = CreateTableBuilder::default();
573        let result = builder
574            .with_name("test_table")
575            .with_location("/test/location")
576            .with_schema(schema)
577            .create();
578
579        assert!(
580            result.is_err(),
581            "Table creation with duplicate field IDs should fail"
582        );
583        let err = result.unwrap_err();
584        assert!(
585            matches!(err, Error::InvalidFormat(_)),
586            "Error should be InvalidFormat, got: {:?}",
587            err
588        );
589    }
590
591    #[test]
592    fn test_create_table_validation_invalid_partition_spec() {
593        let schema = create_test_schema();
594
595        // Create partition spec that references non-existent field ID 999
596        let mut partition_spec_builder = PartitionSpecBuilder::default();
597        let invalid_partition_spec = partition_spec_builder
598            .with_spec_id(1)
599            .with_partition_field(PartitionField::new(
600                999, // Non-existent source field ID
601                1000,
602                "invalid_partition",
603                Transform::Identity,
604            ))
605            .build()
606            .unwrap();
607
608        let mut builder = CreateTableBuilder::default();
609        let result = builder
610            .with_name("test_table")
611            .with_location("/test/location")
612            .with_schema(schema)
613            .with_partition_spec(invalid_partition_spec)
614            .create();
615
616        assert!(
617            result.is_err(),
618            "Table creation with invalid partition spec should fail"
619        );
620        let err = result.unwrap_err();
621        assert!(
622            matches!(err, Error::NotFound(_)),
623            "Error should be NotFound for invalid partition field reference, got: {:?}",
624            err
625        );
626    }
627
628    #[test]
629    fn test_create_table_validation_valid_partition_spec() {
630        let schema = create_test_schema();
631
632        // Create partition spec that references valid field ID 1
633        let mut partition_spec_builder = PartitionSpecBuilder::default();
634        let partition_spec = partition_spec_builder
635            .with_spec_id(1)
636            .with_partition_field(PartitionField::new(
637                1, // Valid source field ID from schema
638                1000,
639                "id_partition",
640                Transform::Identity,
641            ))
642            .build()
643            .unwrap();
644
645        let mut builder = CreateTableBuilder::default();
646        let result = builder
647            .with_name("test_table")
648            .with_location("/test/location")
649            .with_schema(schema)
650            .with_partition_spec(partition_spec)
651            .create();
652
653        assert!(
654            result.is_ok(),
655            "Table creation with valid partition spec should succeed"
656        );
657    }
658
659    #[test]
660    fn test_create_table_validation_invalid_sort_order() {
661        let schema = create_test_schema();
662
663        // Create sort order that references non-existent field ID 999
664        let mut sort_order_builder = SortOrderBuilder::default();
665        let invalid_sort_order = sort_order_builder
666            .with_order_id(1)
667            .with_sort_field(SortField {
668                source_id: 999, // Non-existent source field ID
669                transform: Transform::Identity,
670                direction: SortDirection::Ascending,
671                null_order: NullOrder::First,
672            })
673            .build()
674            .unwrap();
675
676        let mut builder = CreateTableBuilder::default();
677        let result = builder
678            .with_name("test_table")
679            .with_location("/test/location")
680            .with_schema(schema)
681            .with_sort_order(invalid_sort_order)
682            .create();
683
684        assert!(
685            result.is_err(),
686            "Table creation with invalid sort order should fail"
687        );
688        let err = result.unwrap_err();
689        assert!(
690            matches!(err, Error::NotFound(_)),
691            "Error should be NotFound for invalid sort order field reference, got: {:?}",
692            err
693        );
694    }
695
696    #[test]
697    fn test_create_table_validation_valid_sort_order() {
698        let schema = create_test_schema();
699
700        // Create sort order that references valid field ID 1
701        let mut sort_order_builder = SortOrderBuilder::default();
702        let sort_order = sort_order_builder
703            .with_order_id(1)
704            .with_sort_field(SortField {
705                source_id: 1, // Valid source field ID from schema
706                transform: Transform::Identity,
707                direction: SortDirection::Ascending,
708                null_order: NullOrder::First,
709            })
710            .build()
711            .unwrap();
712
713        let mut builder = CreateTableBuilder::default();
714        let result = builder
715            .with_name("test_table")
716            .with_location("/test/location")
717            .with_schema(schema)
718            .with_sort_order(sort_order)
719            .create();
720
721        assert!(
722            result.is_ok(),
723            "Table creation with valid sort order should succeed"
724        );
725    }
726
727    #[test]
728    fn test_create_table_try_into_metadata() {
729        let schema = create_test_schema();
730        let mut builder = CreateTableBuilder::default();
731        let create_table = builder
732            .with_name("test_table")
733            .with_location("/test/location")
734            .with_schema(schema.clone())
735            .create()
736            .unwrap();
737
738        let metadata: Result<TableMetadata, Error> = create_table.try_into();
739        assert!(
740            metadata.is_ok(),
741            "Conversion to TableMetadata should succeed"
742        );
743
744        let metadata = metadata.unwrap();
745        assert_eq!(metadata.location, "/test/location");
746        assert_eq!(metadata.current_schema_id, DEFAULT_SCHEMA_ID);
747        assert_eq!(metadata.schemas.len(), 1);
748        assert_eq!(metadata.default_spec_id, DEFAULT_PARTITION_SPEC_ID);
749        assert_eq!(metadata.default_sort_order_id, DEFAULT_SORT_ORDER_ID);
750        assert_eq!(metadata.last_column_id, 3); // Max field ID from schema
751    }
752
753    #[test]
754    fn test_create_table_serialization_round_trip() {
755        let schema = create_test_schema();
756        let mut partition_spec_builder = PartitionSpecBuilder::default();
757        let partition_spec = partition_spec_builder
758            .with_spec_id(1)
759            .with_partition_field(PartitionField::new(
760                1,
761                1000,
762                "id_partition",
763                Transform::Identity,
764            ))
765            .build()
766            .unwrap();
767
768        let mut builder = CreateTableBuilder::default();
769        let create_table = builder
770            .with_name("test_table")
771            .with_location("/test/location")
772            .with_schema(schema)
773            .with_partition_spec(partition_spec)
774            .create()
775            .unwrap();
776
777        // Serialize
778        let json = serde_json::to_string(&create_table).unwrap();
779
780        // Deserialize
781        let deserialized: CreateTable = serde_json::from_str(&json).unwrap();
782
783        // Verify equality
784        assert_eq!(create_table, deserialized);
785    }
786}