1use std::{
13 collections::HashMap,
14 sync::Arc,
15 time::{SystemTime, UNIX_EPOCH},
16};
17
18use derive_builder::Builder;
19use iceberg_rust_spec::{
20 identifier::FullIdentifier,
21 spec::{
22 materialized_view_metadata::MaterializedViewMetadata,
23 partition::{PartitionSpec, DEFAULT_PARTITION_SPEC_ID},
24 schema::{Schema, DEFAULT_SCHEMA_ID},
25 sort::{SortOrder, DEFAULT_SORT_ORDER_ID},
26 table_metadata::TableMetadata,
27 view_metadata::{Version, ViewMetadata, DEFAULT_VERSION_ID},
28 },
29 view_metadata::Materialization,
30};
31use serde::{Deserialize, Serialize};
32use uuid::Uuid;
33
34use crate::{
35 error::Error,
36 materialized_view::{MaterializedView, STORAGE_TABLE_POSTFIX},
37 table::Table,
38 view::View,
39};
40
41use super::{identifier::Identifier, Catalog};
42
43#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Builder)]
55#[serde(rename_all = "kebab-case")]
56#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
57pub struct CreateTable {
58 #[builder(setter(into))]
59 pub name: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 #[builder(setter(into, strip_option), default)]
64 pub location: Option<String>,
65 pub schema: Schema,
67 #[serde(skip_serializing_if = "Option::is_none")]
69 #[builder(setter(strip_option), default)]
70 pub partition_spec: Option<PartitionSpec>,
71 #[serde(skip_serializing_if = "Option::is_none")]
73 #[builder(setter(strip_option, name = "with_sort_order"), default)]
74 pub write_order: Option<SortOrder>,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 #[builder(setter(strip_option), default)]
78 pub stage_create: Option<bool>,
79 #[serde(skip_serializing_if = "Option::is_none")]
81 #[builder(setter(strip_option, each(name = "with_property")), default)]
82 pub properties: Option<HashMap<String, String>>,
83}
84
85impl CreateTableBuilder {
86 pub async fn build(
99 &mut self,
100 namespace: &[String],
101 catalog: Arc<dyn Catalog>,
102 ) -> Result<Table, Error> {
103 let name = self
104 .name
105 .as_ref()
106 .ok_or(Error::NotFound("Name to create table".to_owned()))?;
107 let identifier = Identifier::new(namespace, name);
108
109 let create = self.create()?;
110
111 catalog.clone().create_table(identifier, create).await
113 }
114}
115
116impl TryInto<TableMetadata> for CreateTable {
117 type Error = Error;
118 fn try_into(self) -> Result<TableMetadata, Self::Error> {
119 let last_column_id = self.schema.fields().iter().map(|x| x.id).max().unwrap_or(0);
120
121 let last_partition_id = self
122 .partition_spec
123 .as_ref()
124 .and_then(|x| x.fields().iter().map(|x| *x.field_id()).max())
125 .unwrap_or(0);
126
127 Ok(TableMetadata {
128 format_version: Default::default(),
129 table_uuid: Uuid::new_v4(),
130 location: self
131 .location
132 .ok_or(Error::NotFound(format!("Location for table {}", self.name)))?,
133 last_sequence_number: 0,
134 last_updated_ms: SystemTime::now()
135 .duration_since(UNIX_EPOCH)
136 .unwrap()
137 .as_millis() as i64,
138 last_column_id,
139 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
140 current_schema_id: DEFAULT_SCHEMA_ID,
141 partition_specs: HashMap::from_iter(vec![(
142 DEFAULT_PARTITION_SPEC_ID,
143 self.partition_spec.unwrap_or_default(),
144 )]),
145 default_spec_id: DEFAULT_PARTITION_SPEC_ID,
146 last_partition_id,
147 properties: self.properties.unwrap_or_default(),
148 current_snapshot_id: None,
149 snapshots: HashMap::new(),
150 snapshot_log: Vec::new(),
151 metadata_log: Vec::new(),
152 sort_orders: HashMap::from_iter(vec![(
153 DEFAULT_SORT_ORDER_ID,
154 self.write_order.unwrap_or_default(),
155 )]),
156 default_sort_order_id: DEFAULT_SORT_ORDER_ID,
157 refs: HashMap::new(),
158 })
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
177#[serde(rename_all = "kebab-case")]
178#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
179pub struct CreateView<T: Materialization> {
180 #[builder(setter(into))]
182 pub name: String,
183 #[serde(skip_serializing_if = "Option::is_none")]
185 #[builder(setter(into, strip_option), default)]
186 pub location: Option<String>,
187 pub schema: Schema,
189 pub view_version: Version<T>,
191 #[builder(setter(each(name = "with_property")), default)]
193 pub properties: HashMap<String, String>,
194}
195
196impl CreateViewBuilder<Option<()>> {
197 pub async fn build(
210 &mut self,
211 namespace: &[String],
212 catalog: Arc<dyn Catalog>,
213 ) -> Result<View, Error> {
214 let name = self
215 .name
216 .as_ref()
217 .ok_or(Error::NotFound("Name to create view".to_owned()))?;
218 let identifier = Identifier::new(namespace, name);
219
220 if let Some(version) = &mut self.view_version {
221 if version.default_namespace().is_empty() {
222 version.default_namespace = namespace.to_vec()
223 }
224 if version.default_catalog().is_none() && !catalog.name().is_empty() {
225 version.default_catalog = Some(catalog.name().to_string())
226 }
227 }
228
229 let create = self.create()?;
230
231 catalog.clone().create_view(identifier, create).await
233 }
234}
235
236impl TryInto<ViewMetadata> for CreateView<Option<()>> {
237 type Error = Error;
238 fn try_into(self) -> Result<ViewMetadata, Self::Error> {
239 Ok(ViewMetadata {
240 view_uuid: Uuid::new_v4(),
241 format_version: Default::default(),
242 location: self
243 .location
244 .ok_or(Error::NotFound(format!("Location for view {}", self.name)))?,
245 current_version_id: DEFAULT_VERSION_ID,
246 versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
247 version_log: Vec::new(),
248 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
249 properties: self.properties,
250 })
251 }
252}
253
254impl TryInto<MaterializedViewMetadata> for CreateView<FullIdentifier> {
255 type Error = Error;
256 fn try_into(self) -> Result<MaterializedViewMetadata, Self::Error> {
257 Ok(MaterializedViewMetadata {
258 view_uuid: Uuid::new_v4(),
259 format_version: Default::default(),
260 location: self.location.ok_or(Error::NotFound(format!(
261 "Location for materialized view {}",
262 self.name
263 )))?,
264 current_version_id: DEFAULT_VERSION_ID,
265 versions: HashMap::from_iter(vec![(DEFAULT_VERSION_ID, self.view_version)]),
266 version_log: Vec::new(),
267 schemas: HashMap::from_iter(vec![(DEFAULT_SCHEMA_ID, self.schema)]),
268 properties: self.properties,
269 })
270 }
271}
272
273#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Builder)]
287#[serde(rename_all = "kebab-case")]
288#[builder(build_fn(name = "create", error = "Error"), setter(prefix = "with"))]
289pub struct CreateMaterializedView {
290 #[builder(setter(into))]
292 pub name: String,
293 #[serde(skip_serializing_if = "Option::is_none")]
295 #[builder(setter(into, strip_option), default)]
296 pub location: Option<String>,
297 pub schema: Schema,
299 pub view_version: Version<FullIdentifier>,
301 #[builder(setter(each(name = "with_property")), default)]
303 pub properties: HashMap<String, String>,
304 #[serde(skip_serializing_if = "Option::is_none")]
306 #[builder(setter(strip_option), default)]
307 pub partition_spec: Option<PartitionSpec>,
308 #[serde(skip_serializing_if = "Option::is_none")]
310 #[builder(setter(strip_option, name = "with_sort_order"), default)]
311 pub write_order: Option<SortOrder>,
312 #[serde(skip_serializing_if = "Option::is_none")]
314 #[builder(setter(strip_option), default)]
315 pub stage_create: Option<bool>,
316 #[serde(skip_serializing_if = "Option::is_none")]
318 #[builder(setter(strip_option, each(name = "with_table_property")), default)]
319 pub table_properties: Option<HashMap<String, String>>,
320}
321
322impl CreateMaterializedViewBuilder {
323 pub async fn build(
339 &mut self,
340 namespace: &[String],
341 catalog: Arc<dyn Catalog>,
342 ) -> Result<MaterializedView, Error> {
343 let name = self.name.as_ref().ok_or(Error::NotFound(
344 "Name to create materialized view".to_owned(),
345 ))?;
346 let identifier = Identifier::new(namespace, name);
347
348 if let Some(version) = &mut self.view_version {
349 if version.default_namespace().is_empty() {
350 version.default_namespace = namespace.to_vec()
351 }
352 if version.default_catalog().is_none() && !catalog.name().is_empty() {
353 version.default_catalog = Some(catalog.name().to_string())
354 }
355 }
356
357 let mut create = self.create()?;
358
359 let version = Version {
360 version_id: create.view_version.version_id,
361 schema_id: create.view_version.schema_id,
362 timestamp_ms: create.view_version.timestamp_ms,
363 summary: create.view_version.summary.clone(),
364 representations: create.view_version.representations.clone(),
365 default_catalog: create.view_version.default_catalog,
366 default_namespace: create.view_version.default_namespace,
367 storage_table: FullIdentifier::new(
368 None,
369 identifier.namespace(),
370 &(identifier.name().to_string() + STORAGE_TABLE_POSTFIX),
371 ),
372 };
373
374 create.view_version = version;
375
376 catalog
378 .clone()
379 .create_materialized_view(identifier.clone(), create)
380 .await
381 }
382}
383
384impl From<CreateMaterializedView> for (CreateView<FullIdentifier>, CreateTable) {
385 fn from(val: CreateMaterializedView) -> Self {
386 let storage_table = val.view_version.storage_table.name().to_owned();
387 (
388 CreateView {
389 name: val.name.clone(),
390 location: val.location.clone(),
391 schema: val.schema.clone(),
392 view_version: val.view_version,
393 properties: val.properties,
394 },
395 CreateTable {
396 name: storage_table,
397 location: val.location,
398 schema: val.schema,
399 partition_spec: val.partition_spec,
400 write_order: val.write_order,
401 stage_create: val.stage_create,
402 properties: val.table_properties,
403 },
404 )
405 }
406}