1use std::{
13 collections::HashMap,
14 sync::Arc,
15 time::{SystemTime, UNIX_EPOCH},
16};
17
18use derive_builder::Builder;
19use iceberg_rust_spec::{
20 identifier::Identifier,
21 spec::{
22 materialized_view_metadata::MaterializedViewMetadata,
23 partition::{PartitionSpec, DEFAULT_PARTITION_SPEC_ID},
24 schema::{Schema, DEFAULT_SCHEMA_ID},
25 sort::{SortOrder, DEFAULT_SORT_ORDER_ID},
26 table_metadata::TableMetadata,
27 view_metadata::{Version, ViewMetadata, DEFAULT_VERSION_ID},
28 },
29 view_metadata::Materialization,
30};
31use serde::{Deserialize, Serialize};
32use uuid::Uuid;
33
34use crate::{
35 error::Error,
36 materialized_view::{MaterializedView, STORAGE_TABLE_POSTFIX},
37 table::Table,
38 view::View,
39};
40
41use super::Catalog;
42
43#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Builder)]
55#[serde(rename_all = "kebab-case")]
56#[builder(
57 build_fn(name = "create", error = "Error", validate = "Self::validate"),
58 setter(prefix = "with")
59)]
60pub struct CreateTable {
61 #[builder(setter(into))]
62 pub name: String,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 #[builder(setter(into, strip_option), default)]
67 pub location: Option<String>,
68 pub schema: Schema,
70 #[serde(skip_serializing_if = "Option::is_none")]
72 #[builder(setter(strip_option), default)]
73 pub partition_spec: Option<PartitionSpec>,
74 #[serde(skip_serializing_if = "Option::is_none")]
76 #[builder(setter(strip_option, name = "with_sort_order"), default)]
77 pub write_order: Option<SortOrder>,
78 #[serde(skip_serializing_if = "Option::is_none")]
80 #[builder(setter(strip_option), default)]
81 pub stage_create: Option<bool>,
82 #[serde(skip_serializing_if = "Option::is_none")]
84 #[builder(setter(strip_option, each(name = "with_property")), default)]
85 pub properties: Option<HashMap<String, String>>,
86}
87
88impl CreateTableBuilder {
89 fn validate(&self) -> Result<(), Error> {
100 let name = self
101 .name
102 .as_ref()
103 .ok_or(Error::NotFound("Table name is required".to_string()))?;
104
105 let schema = self
106 .schema
107 .as_ref()
108 .ok_or(Error::NotFound("Table schema is required".to_string()))?;
109
110 let field_ids: Vec<i32> = schema.fields().iter().map(|f| f.id).collect();
112 let unique_ids: std::collections::HashSet<_> = field_ids.iter().collect();
113 if field_ids.len() != unique_ids.len() {
114 return Err(Error::InvalidFormat(format!(
115 "Schema for table '{}' contains duplicate field IDs",
116 name
117 )));
118 }
119
120 if let Some(Some(spec)) = &self.partition_spec {
122 for field in spec.fields() {
123 let source_id = field.source_id();
124 if !schema.fields().iter().any(|f| f.id == *source_id) {
125 return Err(Error::NotFound(format!(
126 "Partition field '{}' references non-existent schema field ID {} in table '{}'",
127 field.name(),
128 source_id,
129 name
130 )));
131 }
132 }
133 }
134
135 if let Some(Some(order)) = &self.write_order {
137 for field in &order.fields {
138 let source_id = field.source_id;
139 if !schema.fields().iter().any(|f| f.id == source_id) {
140 return Err(Error::NotFound(format!(
141 "Sort order field references non-existent schema field ID {} in table '{}'",
142 source_id, name
143 )));
144 }
145 }
146 }
147
148 Ok(())
149 }
150
151 pub async fn build(
164 &mut self,
165 namespace: &[String],
166 catalog: Arc<dyn Catalog>,
167 ) -> Result<Table, Error> {
168 let name = self
169 .name
170 .as_ref()
171 .ok_or(Error::NotFound("Name to create table".to_owned()))?;
172 let identifier = Identifier::new(namespace, name);
173
174 let create = self.create()?;
175
176 catalog.clone().create_table(identifier, create).await
178 }
179}
180
181impl TryInto<TableMetadata> for CreateTable {
182 type Error = Error;
183 fn try_into(self) -> Result<TableMetadata, Self::Error> {
184 let last_column_id = self.schema.fields().iter().map(|x| x.id).max().unwrap_or(0);
185
186 let last_partition_id = self
187 .partition_spec
188 .as_ref()
189 .and_then(|x| x.fields().iter().map(|x| *x.field_id()).max())
190 .unwrap_or(0);
191
192 Ok(TableMetadata {
193 format_version: Default::default(),
194 table_uuid: Uuid::new_v4(),
195 location: self
196 .location
197 .ok_or(Error::NotFound(format!("Location for table {}", self.name)))?,
198 last_sequence_number: 0,
199 last_updated_ms: SystemTime::now()
200 .duration_since(UNIX_EPOCH)
201 .unwrap()
202 .as_millis() as i64,
203 last_column_id,
204 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
205 current_schema_id: DEFAULT_SCHEMA_ID,
206 partition_specs: HashMap::from_iter(vec![(
207 DEFAULT_PARTITION_SPEC_ID,
208 self.partition_spec.unwrap_or_default(),
209 )]),
210 default_spec_id: DEFAULT_PARTITION_SPEC_ID,
211 last_partition_id,
212 properties: self.properties.unwrap_or_default(),
213 current_snapshot_id: None,
214 snapshots: HashMap::new(),
215 snapshot_log: Vec::new(),
216 metadata_log: Vec::new(),
217 sort_orders: HashMap::from_iter(vec![(
218 DEFAULT_SORT_ORDER_ID,
219 self.write_order.unwrap_or_default(),
220 )]),
221 default_sort_order_id: DEFAULT_SORT_ORDER_ID,
222 refs: HashMap::new(),
223 })
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
242#[serde(rename_all = "kebab-case")]
243#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
244pub struct CreateView<T: Materialization> {
245 #[builder(setter(into))]
247 pub name: String,
248 #[serde(skip_serializing_if = "Option::is_none")]
250 #[builder(setter(into, strip_option), default)]
251 pub location: Option<String>,
252 pub schema: Schema,
254 pub view_version: Version<T>,
256 #[builder(setter(each(name = "with_property")), default)]
258 pub properties: HashMap<String, String>,
259}
260
261impl CreateViewBuilder<Option<()>> {
262 pub async fn build(
275 &mut self,
276 namespace: &[String],
277 catalog: Arc<dyn Catalog>,
278 ) -> Result<View, Error> {
279 let name = self
280 .name
281 .as_ref()
282 .ok_or(Error::NotFound("Name to create view".to_owned()))?;
283 let identifier = Identifier::new(namespace, name);
284
285 if let Some(version) = &mut self.view_version {
286 if version.default_namespace().is_empty() {
287 version.default_namespace = namespace.to_vec()
288 }
289 if version.default_catalog().is_none() && !catalog.name().is_empty() {
290 version.default_catalog = Some(catalog.name().to_string())
291 }
292 }
293
294 let create = self.create()?;
295
296 catalog.clone().create_view(identifier, create).await
298 }
299}
300
301impl TryInto<ViewMetadata> for CreateView<Option<()>> {
302 type Error = Error;
303 fn try_into(self) -> Result<ViewMetadata, Self::Error> {
304 Ok(ViewMetadata {
305 view_uuid: Uuid::new_v4(),
306 format_version: Default::default(),
307 location: self
308 .location
309 .ok_or(Error::NotFound(format!("Location for view {}", self.name)))?,
310 current_version_id: DEFAULT_VERSION_ID,
311 versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
312 version_log: Vec::new(),
313 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
314 properties: self.properties,
315 })
316 }
317}
318
319impl TryInto<MaterializedViewMetadata> for CreateView<Identifier> {
320 type Error = Error;
321 fn try_into(self) -> Result<MaterializedViewMetadata, Self::Error> {
322 Ok(MaterializedViewMetadata {
323 view_uuid: Uuid::new_v4(),
324 format_version: Default::default(),
325 location: self.location.ok_or(Error::NotFound(format!(
326 "Location for materialized view {}",
327 self.name
328 )))?,
329 current_version_id: DEFAULT_VERSION_ID,
330 versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
331 version_log: Vec::new(),
332 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
333 properties: self.properties,
334 })
335 }
336}
337
338#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
352#[serde(rename_all = "kebab-case")]
353#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
354pub struct CreateMaterializedView {
355 #[builder(setter(into))]
357 pub name: String,
358 #[serde(skip_serializing_if = "Option::is_none")]
360 #[builder(setter(into, strip_option), default)]
361 pub location: Option<String>,
362 pub schema: Schema,
364 pub view_version: Version<Identifier>,
366 #[builder(setter(each(name = "with_property")), default)]
368 pub properties: HashMap<String, String>,
369 #[serde(skip_serializing_if = "Option::is_none")]
371 #[builder(setter(strip_option), default)]
372 pub partition_spec: Option<PartitionSpec>,
373 #[serde(skip_serializing_if = "Option::is_none")]
375 #[builder(setter(strip_option, name = "with_sort_order"), default)]
376 pub write_order: Option<SortOrder>,
377 #[serde(skip_serializing_if = "Option::is_none")]
379 #[builder(setter(strip_option), default)]
380 pub stage_create: Option<bool>,
381 #[serde(skip_serializing_if = "Option::is_none")]
383 #[builder(setter(strip_option, each(name = "with_table_property")), default)]
384 pub table_properties: Option<HashMap<String, String>>,
385}
386
387impl CreateMaterializedViewBuilder {
388 pub async fn build(
404 &mut self,
405 namespace: &[String],
406 catalog: Arc<dyn Catalog>,
407 ) -> Result<MaterializedView, Error> {
408 let name = self.name.as_ref().ok_or(Error::NotFound(
409 "Name to create materialized view".to_owned(),
410 ))?;
411 let identifier = Identifier::new(namespace, name);
412
413 if let Some(version) = &mut self.view_version {
414 if version.default_namespace().is_empty() {
415 version.default_namespace = namespace.to_vec()
416 }
417 if version.default_catalog().is_none() && !catalog.name().is_empty() {
418 version.default_catalog = Some(catalog.name().to_string())
419 }
420 }
421
422 let mut create = self.create()?;
423
424 let version = Version {
425 version_id: create.view_version.version_id,
426 schema_id: create.view_version.schema_id,
427 timestamp_ms: create.view_version.timestamp_ms,
428 summary: create.view_version.summary.clone(),
429 representations: create.view_version.representations.clone(),
430 default_catalog: create.view_version.default_catalog,
431 default_namespace: create.view_version.default_namespace,
432 storage_table: Identifier::new(
433 identifier.namespace(),
434 &(identifier.name().to_string() + STORAGE_TABLE_POSTFIX),
435 ),
436 };
437
438 create.view_version = version;
439
440 catalog
442 .clone()
443 .create_materialized_view(identifier.clone(), create)
444 .await
445 }
446}
447
448impl From<CreateMaterializedView> for (CreateView<Identifier>, CreateTable) {
449 fn from(val: CreateMaterializedView) -> Self {
450 let storage_table = val.view_version.storage_table.name().to_owned();
451 (
452 CreateView {
453 name: val.name.clone(),
454 location: val.location.clone(),
455 schema: val.schema.clone(),
456 view_version: val.view_version,
457 properties: val.properties,
458 },
459 CreateTable {
460 name: storage_table,
461 location: val.location,
462 schema: val.schema,
463 partition_spec: val.partition_spec,
464 write_order: val.write_order,
465 stage_create: val.stage_create,
466 properties: val.table_properties,
467 },
468 )
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475 use iceberg_rust_spec::spec::{
476 partition::{PartitionField, PartitionSpecBuilder, Transform},
477 sort::{NullOrder, SortDirection, SortField, SortOrderBuilder},
478 types::{PrimitiveType, StructField, Type},
479 };
480
481 fn create_test_schema() -> Schema {
483 Schema::builder()
484 .with_struct_field(StructField {
485 id: 1,
486 name: "id".to_string(),
487 required: true,
488 field_type: Type::Primitive(PrimitiveType::Long),
489 doc: None,
490 })
491 .with_struct_field(StructField {
492 id: 2,
493 name: "name".to_string(),
494 required: false,
495 field_type: Type::Primitive(PrimitiveType::String),
496 doc: None,
497 })
498 .with_struct_field(StructField {
499 id: 3,
500 name: "timestamp".to_string(),
501 required: false,
502 field_type: Type::Primitive(PrimitiveType::Timestamp),
503 doc: None,
504 })
505 .build()
506 .unwrap()
507 }
508
509 fn create_duplicate_field_id_schema() -> Schema {
511 Schema::builder()
512 .with_struct_field(StructField {
513 id: 1,
514 name: "id".to_string(),
515 required: true,
516 field_type: Type::Primitive(PrimitiveType::Long),
517 doc: None,
518 })
519 .with_struct_field(StructField {
520 id: 1, name: "name".to_string(),
522 required: false,
523 field_type: Type::Primitive(PrimitiveType::String),
524 doc: None,
525 })
526 .build()
527 .unwrap()
528 }
529
530 #[test]
531 fn test_create_table_builder_valid() {
532 let schema = create_test_schema();
533 let mut builder = CreateTableBuilder::default();
534 let result = builder
535 .with_name("test_table")
536 .with_location("/test/location")
537 .with_schema(schema)
538 .create();
539
540 assert!(result.is_ok(), "Valid table creation should succeed");
541 let create_table = result.unwrap();
542 assert_eq!(create_table.name, "test_table");
543 assert_eq!(create_table.location, Some("/test/location".to_string()));
544 }
545
546 #[test]
547 fn test_create_table_builder_missing_name() {
548 let schema = create_test_schema();
549 let mut builder = CreateTableBuilder::default();
550 let result = builder
551 .with_location("/test/location")
552 .with_schema(schema)
553 .create();
554
555 assert!(result.is_err(), "Table creation without name should fail");
556 }
557
558 #[test]
559 fn test_create_table_builder_missing_schema() {
560 let mut builder = CreateTableBuilder::default();
561 let result = builder
562 .with_name("test_table")
563 .with_location("/test/location")
564 .create();
565
566 assert!(result.is_err(), "Table creation without schema should fail");
567 }
568
569 #[test]
570 fn test_create_table_validation_duplicate_field_ids() {
571 let schema = create_duplicate_field_id_schema();
572 let mut builder = CreateTableBuilder::default();
573 let result = builder
574 .with_name("test_table")
575 .with_location("/test/location")
576 .with_schema(schema)
577 .create();
578
579 assert!(
580 result.is_err(),
581 "Table creation with duplicate field IDs should fail"
582 );
583 let err = result.unwrap_err();
584 assert!(
585 matches!(err, Error::InvalidFormat(_)),
586 "Error should be InvalidFormat, got: {:?}",
587 err
588 );
589 }
590
591 #[test]
592 fn test_create_table_validation_invalid_partition_spec() {
593 let schema = create_test_schema();
594
595 let mut partition_spec_builder = PartitionSpecBuilder::default();
597 let invalid_partition_spec = partition_spec_builder
598 .with_spec_id(1)
599 .with_partition_field(PartitionField::new(
600 999, 1000,
602 "invalid_partition",
603 Transform::Identity,
604 ))
605 .build()
606 .unwrap();
607
608 let mut builder = CreateTableBuilder::default();
609 let result = builder
610 .with_name("test_table")
611 .with_location("/test/location")
612 .with_schema(schema)
613 .with_partition_spec(invalid_partition_spec)
614 .create();
615
616 assert!(
617 result.is_err(),
618 "Table creation with invalid partition spec should fail"
619 );
620 let err = result.unwrap_err();
621 assert!(
622 matches!(err, Error::NotFound(_)),
623 "Error should be NotFound for invalid partition field reference, got: {:?}",
624 err
625 );
626 }
627
628 #[test]
629 fn test_create_table_validation_valid_partition_spec() {
630 let schema = create_test_schema();
631
632 let mut partition_spec_builder = PartitionSpecBuilder::default();
634 let partition_spec = partition_spec_builder
635 .with_spec_id(1)
636 .with_partition_field(PartitionField::new(
637 1, 1000,
639 "id_partition",
640 Transform::Identity,
641 ))
642 .build()
643 .unwrap();
644
645 let mut builder = CreateTableBuilder::default();
646 let result = builder
647 .with_name("test_table")
648 .with_location("/test/location")
649 .with_schema(schema)
650 .with_partition_spec(partition_spec)
651 .create();
652
653 assert!(
654 result.is_ok(),
655 "Table creation with valid partition spec should succeed"
656 );
657 }
658
659 #[test]
660 fn test_create_table_validation_invalid_sort_order() {
661 let schema = create_test_schema();
662
663 let mut sort_order_builder = SortOrderBuilder::default();
665 let invalid_sort_order = sort_order_builder
666 .with_order_id(1)
667 .with_sort_field(SortField {
668 source_id: 999, transform: Transform::Identity,
670 direction: SortDirection::Ascending,
671 null_order: NullOrder::First,
672 })
673 .build()
674 .unwrap();
675
676 let mut builder = CreateTableBuilder::default();
677 let result = builder
678 .with_name("test_table")
679 .with_location("/test/location")
680 .with_schema(schema)
681 .with_sort_order(invalid_sort_order)
682 .create();
683
684 assert!(
685 result.is_err(),
686 "Table creation with invalid sort order should fail"
687 );
688 let err = result.unwrap_err();
689 assert!(
690 matches!(err, Error::NotFound(_)),
691 "Error should be NotFound for invalid sort order field reference, got: {:?}",
692 err
693 );
694 }
695
696 #[test]
697 fn test_create_table_validation_valid_sort_order() {
698 let schema = create_test_schema();
699
700 let mut sort_order_builder = SortOrderBuilder::default();
702 let sort_order = sort_order_builder
703 .with_order_id(1)
704 .with_sort_field(SortField {
705 source_id: 1, transform: Transform::Identity,
707 direction: SortDirection::Ascending,
708 null_order: NullOrder::First,
709 })
710 .build()
711 .unwrap();
712
713 let mut builder = CreateTableBuilder::default();
714 let result = builder
715 .with_name("test_table")
716 .with_location("/test/location")
717 .with_schema(schema)
718 .with_sort_order(sort_order)
719 .create();
720
721 assert!(
722 result.is_ok(),
723 "Table creation with valid sort order should succeed"
724 );
725 }
726
727 #[test]
728 fn test_create_table_try_into_metadata() {
729 let schema = create_test_schema();
730 let mut builder = CreateTableBuilder::default();
731 let create_table = builder
732 .with_name("test_table")
733 .with_location("/test/location")
734 .with_schema(schema.clone())
735 .create()
736 .unwrap();
737
738 let metadata: Result<TableMetadata, Error> = create_table.try_into();
739 assert!(
740 metadata.is_ok(),
741 "Conversion to TableMetadata should succeed"
742 );
743
744 let metadata = metadata.unwrap();
745 assert_eq!(metadata.location, "/test/location");
746 assert_eq!(metadata.current_schema_id, DEFAULT_SCHEMA_ID);
747 assert_eq!(metadata.schemas.len(), 1);
748 assert_eq!(metadata.default_spec_id, DEFAULT_PARTITION_SPEC_ID);
749 assert_eq!(metadata.default_sort_order_id, DEFAULT_SORT_ORDER_ID);
750 assert_eq!(metadata.last_column_id, 3); }
752
753 #[test]
754 fn test_create_table_serialization_round_trip() {
755 let schema = create_test_schema();
756 let mut partition_spec_builder = PartitionSpecBuilder::default();
757 let partition_spec = partition_spec_builder
758 .with_spec_id(1)
759 .with_partition_field(PartitionField::new(
760 1,
761 1000,
762 "id_partition",
763 Transform::Identity,
764 ))
765 .build()
766 .unwrap();
767
768 let mut builder = CreateTableBuilder::default();
769 let create_table = builder
770 .with_name("test_table")
771 .with_location("/test/location")
772 .with_schema(schema)
773 .with_partition_spec(partition_spec)
774 .create()
775 .unwrap();
776
777 let json = serde_json::to_string(&create_table).unwrap();
779
780 let deserialized: CreateTable = serde_json::from_str(&json).unwrap();
782
783 assert_eq!(create_table, deserialized);
785 }
786}