1use std::collections::HashMap;
5use std::sync::Arc;
6
7use delta_kernel::schema::{ColumnMetadataKey, MetadataValue};
8use futures::TryStreamExt as _;
9use futures::future::BoxFuture;
10use serde_json::Value;
11use uuid::Uuid;
12
13use super::{CustomExecuteHandler, Operation};
14use crate::errors::{DeltaResult, DeltaTableError, unsupported_column_mapping_write};
15use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL, TableReference};
16use crate::kernel::{
17 Action, DataType, MetadataExt, ProtocolExt as _, ProtocolInner, StructField, StructType,
18 new_metadata,
19};
20use crate::logstore::LogStoreRef;
21use crate::protocol::{DeltaOperation, SaveMode};
22use crate::table::builder::ensure_table_uri;
23use crate::table::config::TableProperty;
24use crate::table::normalize_table_url;
25use crate::{DeltaTable, DeltaTableBuilder};
26
27#[derive(thiserror::Error, Debug)]
28enum CreateError {
29 #[error("Location must be provided to create a table.")]
30 MissingLocation,
31
32 #[error("At least one column must be defined to create a table.")]
33 MissingSchema,
34
35 #[error("Please configure table meta data via the CreateBuilder.")]
36 MetadataSpecified,
37
38 #[error("A Delta Lake table already exists at that location.")]
39 TableAlreadyExists,
40
41 #[error("SaveMode `append` is not allowed for create operation.")]
42 AppendNotAllowed,
43}
44
45impl From<CreateError> for DeltaTableError {
46 fn from(err: CreateError) -> Self {
47 DeltaTableError::GenericError {
48 source: Box::new(err),
49 }
50 }
51}
52
53fn data_type_has_column_mapping_metadata(data_type: &DataType) -> bool {
54 match data_type {
55 DataType::Array(array) => data_type_has_column_mapping_metadata(array.element_type()),
56 DataType::Map(map) => {
57 data_type_has_column_mapping_metadata(map.key_type())
58 || data_type_has_column_mapping_metadata(map.value_type())
59 }
60 DataType::Struct(fields) | DataType::Variant(fields) => {
61 fields.fields().any(field_has_column_mapping_metadata)
62 }
63 DataType::Primitive(_) => false,
64 }
65}
66
67fn field_has_column_mapping_metadata(field: &StructField) -> bool {
68 field
69 .metadata()
70 .contains_key(ColumnMetadataKey::ColumnMappingId.as_ref())
71 || field
72 .metadata()
73 .contains_key(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
74 || data_type_has_column_mapping_metadata(field.data_type())
75}
76
77#[derive(Clone)]
79pub struct CreateBuilder {
80 name: Option<String>,
81 location: Option<String>,
82 mode: SaveMode,
83 comment: Option<String>,
84 columns: Vec<StructField>,
85 partition_columns: Option<Vec<String>>,
86 storage_options: Option<HashMap<String, String>>,
87 actions: Vec<Action>,
88 log_store: Option<LogStoreRef>,
89 configuration: HashMap<String, Option<String>>,
90 commit_properties: CommitProperties,
92 raise_if_key_not_exists: bool,
93 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
94}
95
96impl super::Operation for CreateBuilder {
97 fn log_store(&self) -> &LogStoreRef {
98 self.log_store
99 .as_ref()
100 .expect("Logstore shouldn't be none at this stage.")
101 }
102 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
103 self.custom_execute_handler.clone()
104 }
105}
106
107impl Default for CreateBuilder {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113impl CreateBuilder {
114 pub fn new() -> Self {
116 Self {
117 name: None,
118 location: None,
119 mode: SaveMode::ErrorIfExists,
120 comment: None,
121 columns: Default::default(),
122 partition_columns: None,
123 storage_options: None,
124 actions: Default::default(),
125 log_store: None,
126 configuration: Default::default(),
127 commit_properties: CommitProperties::default(),
128 raise_if_key_not_exists: true,
129 custom_execute_handler: None,
130 }
131 }
132
133 pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
136 self.name = Some(name.into());
137 self
138 }
139
140 pub fn with_location(mut self, location: impl Into<String>) -> Self {
143 self.location = Some(location.into());
144 self
145 }
146
147 pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self {
149 self.mode = save_mode;
150 self
151 }
152
153 pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
155 self.comment = Some(comment.into());
156 self
157 }
158
159 pub fn with_column(
161 mut self,
162 name: impl Into<String>,
163 data_type: DataType,
164 nullable: bool,
165 metadata: Option<HashMap<String, Value>>,
166 ) -> Self {
167 let mut field = StructField::new(name.into(), data_type, nullable);
168 if let Some(meta) = metadata {
169 field = field.with_metadata(meta.iter().map(|(k, v)| {
170 (
171 k,
172 if let Value::Number(n) = v {
173 n.as_i64().map_or_else(
174 || MetadataValue::String(v.to_string()),
175 MetadataValue::Number,
176 )
177 } else {
178 MetadataValue::String(v.to_string())
179 },
180 )
181 }));
182 };
183 self.columns.push(field);
184 self
185 }
186
187 pub fn with_columns(
189 mut self,
190 columns: impl IntoIterator<Item = impl Into<StructField>>,
191 ) -> Self {
192 self.columns.extend(columns.into_iter().map(|c| c.into()));
193 self
194 }
195
196 pub fn with_partition_columns(
198 mut self,
199 partition_columns: impl IntoIterator<Item = impl Into<String>>,
200 ) -> Self {
201 self.partition_columns = Some(partition_columns.into_iter().map(|s| s.into()).collect());
202 self
203 }
204
205 pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
212 self.storage_options = Some(storage_options);
213 self
214 }
215
216 pub fn with_configuration(
218 mut self,
219 configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
220 ) -> Self {
221 self.configuration = configuration
222 .into_iter()
223 .map(|(k, v)| (k.into(), v.map(|s| s.into())))
224 .collect();
225 self
226 }
227
228 pub fn with_configuration_property(
230 mut self,
231 key: TableProperty,
232 value: Option<impl Into<String>>,
233 ) -> Self {
234 self.configuration
235 .insert(key.as_ref().into(), value.map(|v| v.into()));
236 self
237 }
238
239 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
241 self.commit_properties = commit_properties;
242 self
243 }
244
245 pub fn with_raise_if_key_not_exists(mut self, raise_if_key_not_exists: bool) -> Self {
247 self.raise_if_key_not_exists = raise_if_key_not_exists;
248 self
249 }
250
251 pub fn with_actions(mut self, actions: impl IntoIterator<Item = Action>) -> Self {
256 self.actions.extend(actions);
257 self
258 }
259
260 pub fn with_log_store(mut self, log_store: LogStoreRef) -> Self {
262 self.log_store = Some(log_store);
263 self
264 }
265
266 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
268 self.custom_execute_handler = Some(handler);
269 self
270 }
271
272 pub(crate) async fn into_table_and_actions(
274 mut self,
275 ) -> DeltaResult<(DeltaTable, Vec<Action>, DeltaOperation, Uuid)> {
276 if self
277 .actions
278 .iter()
279 .any(|a| matches!(a, Action::Metadata(_)))
280 {
281 return Err(CreateError::MetadataSpecified.into());
282 }
283 if self.columns.is_empty() {
284 return Err(CreateError::MissingSchema.into());
285 }
286 if self
287 .configuration
288 .get(TableProperty::ColumnMappingMode.as_ref())
289 .is_some_and(|value| value.is_some())
290 {
291 return Err(unsupported_column_mapping_write(
292 "CREATE TABLE with delta.columnMapping.mode",
293 ));
294 }
295 if self.columns.iter().any(field_has_column_mapping_metadata) {
296 return Err(unsupported_column_mapping_write(
297 "CREATE TABLE with column mapping metadata",
298 ));
299 }
300
301 let (storage_url, table) = if let Some(log_store) = self.log_store {
302 (
303 normalize_table_url(log_store.root_url()),
304 DeltaTable::new(log_store, Default::default()),
305 )
306 } else {
307 let storage_url =
308 ensure_table_uri(self.location.clone().ok_or(CreateError::MissingLocation)?)?;
309 (
310 storage_url.clone(),
311 DeltaTableBuilder::from_url(storage_url)?
312 .with_storage_options(self.storage_options.clone().unwrap_or_default())
313 .build()?,
314 )
315 };
316
317 self.log_store = Some(table.log_store());
318 let operation_id = self.get_operation_id();
319 self.pre_execute(operation_id).await?;
320
321 let configuration = self
322 .configuration
323 .iter()
324 .filter_map(|(k, v)| Some((k.to_string(), v.as_ref()?.to_string())))
325 .collect();
326
327 let current_protocol = ProtocolInner {
328 min_reader_version: PROTOCOL.default_reader_version(),
329 min_writer_version: PROTOCOL.default_writer_version(),
330 reader_features: None,
331 writer_features: None,
332 }
333 .as_kernel();
334
335 let protocol = self
336 .actions
337 .iter()
338 .find(|a| matches!(a, Action::Protocol(_)))
339 .map(|a| match a {
340 Action::Protocol(p) => p.clone(),
341 _ => unreachable!(),
342 })
343 .unwrap_or_else(|| current_protocol);
344
345 let schema = StructType::try_new(self.columns)?;
346
347 let protocol = protocol
348 .apply_properties_to_protocol(&configuration, self.raise_if_key_not_exists)?
349 .apply_column_metadata_to_protocol(&schema)?
350 .move_table_properties_into_features(&configuration);
351
352 let mut metadata = new_metadata(
353 &schema,
354 self.partition_columns.unwrap_or_default(),
355 configuration,
356 )?;
357 if let Some(name) = self.name {
358 metadata = metadata.with_name(name)?;
359 }
360 if let Some(comment) = self.comment {
361 metadata = metadata.with_description(comment)?;
362 }
363
364 let operation = DeltaOperation::Create {
365 mode: self.mode,
366 metadata: metadata.clone(),
367 location: storage_url,
368 protocol: protocol.clone(),
369 };
370
371 let mut actions = vec![Action::Protocol(protocol), Action::Metadata(metadata)];
372
373 actions.extend(
374 self.actions
375 .into_iter()
376 .filter(|a| !matches!(a, Action::Protocol(_))),
377 );
378
379 Ok((table, actions, operation, operation_id))
380 }
381}
382
383impl std::future::IntoFuture for CreateBuilder {
384 type Output = DeltaResult<DeltaTable>;
385 type IntoFuture = BoxFuture<'static, Self::Output>;
386
387 fn into_future(self) -> Self::IntoFuture {
388 let this = self;
389 Box::pin(async move {
390 let handler = this.custom_execute_handler.clone();
391 let mode = &this.mode;
392 let (mut table, mut actions, operation, operation_id) =
393 this.clone().into_table_and_actions().await?;
394
395 let table_state = if table.log_store.is_delta_table_location().await? {
396 match mode {
397 SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
398 SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
399 SaveMode::Ignore => {
400 table.load().await?;
401 return Ok(table);
402 }
403 SaveMode::Overwrite => {
404 table.load().await?;
405 let remove_actions = table
406 .snapshot()?
407 .snapshot()
408 .file_views(&table.log_store(), None)
409 .map_ok(|p| p.remove_action(true).into())
410 .try_collect::<Vec<_>>()
411 .await?;
412 actions.extend(remove_actions);
413 Some(table.snapshot()?)
414 }
415 }
416 } else {
417 None
418 };
419
420 let version = CommitBuilder::from(this.commit_properties.clone())
421 .with_actions(actions)
422 .with_operation_id(operation_id)
423 .with_post_commit_hook_handler(handler.clone())
424 .build(
425 table_state.map(|f| f as &dyn TableReference),
426 table.log_store.clone(),
427 operation,
428 )
429 .await?
430 .version();
431 table.load_version(version).await?;
432
433 if let Some(handler) = handler {
434 handler
435 .post_execute(&table.log_store(), operation_id)
436 .await?;
437 }
438 Ok(table)
439 })
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use crate::table::config::TableProperty;
447 use crate::writer::test_utils::get_delta_schema;
448 use tempfile::TempDir;
449
450 #[tokio::test]
451 async fn test_create() {
452 let table_schema = get_delta_schema();
453
454 let table = DeltaTable::new_in_memory()
455 .create()
456 .with_columns(table_schema.fields().cloned())
457 .with_save_mode(SaveMode::Ignore)
458 .await
459 .unwrap();
460 assert_eq!(table.version(), Some(0));
461 assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
462 }
463
464 #[tokio::test]
465 async fn test_create_local_relative_path() {
466 let table_schema = get_delta_schema();
467 let tmp_dir = TempDir::new_in(".").unwrap();
468 let relative_path = format!(
469 "./{}",
470 tmp_dir.path().file_name().unwrap().to_str().unwrap()
471 );
472 let table_path = std::path::Path::new(&relative_path).canonicalize().unwrap();
473 let table_url = url::Url::from_directory_path(table_path)
474 .map_err(|_| DeltaTableError::InvalidTableLocation(relative_path.clone()))
475 .unwrap();
476 let table = DeltaTable::try_from_url(table_url)
477 .await
478 .unwrap()
479 .create()
480 .with_columns(table_schema.fields().cloned())
481 .with_save_mode(SaveMode::Ignore)
482 .await
483 .unwrap();
484 assert_eq!(table.version(), Some(0));
485 assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
486 }
487
488 #[tokio::test]
489 async fn test_create_table_local_path() {
490 let schema = get_delta_schema();
491 let tmp_dir = TempDir::new_in(".").unwrap();
492 let relative_path = format!(
493 "./{}",
494 tmp_dir.path().file_name().unwrap().to_str().unwrap()
495 );
496 let table = CreateBuilder::new()
497 .with_location(format!("./{relative_path}"))
498 .with_columns(schema.fields().cloned())
499 .await
500 .unwrap();
501 assert_eq!(table.version(), Some(0));
502 }
503
504 #[tokio::test]
505 async fn test_create_table_metadata() {
506 let schema = get_delta_schema();
507 let table = CreateBuilder::new()
508 .with_location("memory:///")
509 .with_columns(schema.fields().cloned())
510 .await
511 .unwrap();
512 let snapshot = table.snapshot().unwrap();
513 assert_eq!(snapshot.version(), 0);
514 assert_eq!(
515 snapshot.protocol().min_reader_version(),
516 PROTOCOL.default_reader_version()
517 );
518 assert_eq!(
519 snapshot.protocol().min_writer_version(),
520 PROTOCOL.default_writer_version()
521 );
522 assert_eq!(snapshot.schema().as_ref(), &schema);
523
524 let protocol = ProtocolInner {
526 min_reader_version: 1,
527 min_writer_version: 2,
528 writer_features: None,
529 reader_features: None,
530 }
531 .as_kernel();
532 let table = CreateBuilder::new()
533 .with_location("memory:///")
534 .with_columns(schema.fields().cloned())
535 .with_actions(vec![Action::Protocol(protocol)])
536 .await
537 .unwrap();
538 let snapshot = table.snapshot().unwrap();
539 assert_eq!(snapshot.protocol().min_reader_version(), 1);
540 assert_eq!(snapshot.protocol().min_writer_version(), 2);
541
542 let table = CreateBuilder::new()
543 .with_location("memory:///")
544 .with_columns(schema.fields().cloned())
545 .with_configuration_property(TableProperty::AppendOnly, Some("true"))
546 .await
547 .unwrap();
548 let append = table
549 .snapshot()
550 .unwrap()
551 .metadata()
552 .configuration()
553 .get(TableProperty::AppendOnly.as_ref())
554 .unwrap()
555 .clone();
556 assert_eq!(String::from("true"), append)
557 }
558
559 #[cfg(feature = "datafusion")]
560 mod datafusion_tests {
561 use super::*;
562
563 use crate::writer::test_utils::get_record_batch;
564 #[tokio::test]
565 async fn test_create_table_save_mode() {
566 let tmp_dir = tempfile::tempdir().unwrap();
567
568 let schema = get_delta_schema();
569 let table = CreateBuilder::new()
570 .with_location(tmp_dir.path().to_str().unwrap())
571 .with_columns(schema.fields().cloned())
572 .await
573 .unwrap();
574 assert_eq!(table.version(), Some(0));
575 let first_id = table.snapshot().unwrap().metadata().id().to_string();
576
577 let log_store = table.log_store;
578
579 let table = CreateBuilder::new()
581 .with_log_store(log_store.clone())
582 .with_columns(schema.fields().cloned())
583 .with_save_mode(SaveMode::ErrorIfExists)
584 .await;
585 assert!(table.is_err());
586
587 let table = CreateBuilder::new()
589 .with_log_store(log_store.clone())
590 .with_columns(schema.fields().cloned())
591 .with_save_mode(SaveMode::Ignore)
592 .await
593 .unwrap();
594 assert_eq!(table.snapshot().unwrap().metadata().id(), first_id);
595
596 let table = CreateBuilder::new()
598 .with_log_store(log_store)
599 .with_columns(schema.fields().cloned())
600 .with_save_mode(SaveMode::Overwrite)
601 .await
602 .unwrap();
603 assert_ne!(table.snapshot().unwrap().metadata().id(), first_id)
604 }
605
606 #[tokio::test]
607 async fn test_create_or_replace_existing_table() {
608 let batch = get_record_batch(None, false);
609 let schema = get_delta_schema();
610 let table = DeltaTable::new_in_memory()
611 .write(vec![batch.clone()])
612 .with_save_mode(SaveMode::ErrorIfExists)
613 .await
614 .unwrap();
615 let state = table.snapshot().unwrap();
616 assert_eq!(state.version(), 0);
617 assert_eq!(state.log_data().num_files(), 1);
618
619 let mut table = table
620 .create()
621 .with_columns(schema.fields().cloned())
622 .with_save_mode(SaveMode::Overwrite)
623 .await
624 .unwrap();
625 table.load().await.unwrap();
626 let state = table.snapshot().unwrap();
627 assert_eq!(state.version(), 1);
628 assert_eq!(state.log_data().num_files(), 0);
630 }
631
632 #[tokio::test]
633 async fn test_create_or_replace_existing_table_partitioned() {
634 let batch = get_record_batch(None, false);
635 let schema = get_delta_schema();
636 let table = DeltaTable::new_in_memory()
637 .write(vec![batch.clone()])
638 .with_save_mode(SaveMode::ErrorIfExists)
639 .await
640 .unwrap();
641 let state = table.snapshot().unwrap();
642 assert_eq!(state.version(), 0);
643 assert_eq!(state.log_data().num_files(), 1);
644
645 let mut table = table
646 .create()
647 .with_columns(schema.fields().cloned())
648 .with_save_mode(SaveMode::Overwrite)
649 .with_partition_columns(vec!["id"])
650 .await
651 .unwrap();
652 table.load().await.unwrap();
653 let state = table.snapshot().unwrap();
654 assert_eq!(state.version(), 1);
655 assert_eq!(state.log_data().num_files(), 0);
657 }
658
659 #[tokio::test]
660 async fn test_create_table_metadata_raise_if_key_not_exists() {
661 let schema = get_delta_schema();
662 let config: HashMap<String, Option<String>> =
663 vec![("key".to_string(), Some("value".to_string()))]
664 .into_iter()
665 .collect();
666
667 let table = CreateBuilder::new()
669 .with_location("memory:///")
670 .with_columns(schema.fields().cloned())
671 .with_configuration(config.clone())
672 .await;
673 assert!(table.is_err());
674
675 let table = CreateBuilder::new()
677 .with_location("memory:///")
678 .with_columns(schema.fields().cloned())
679 .with_raise_if_key_not_exists(false)
680 .with_configuration(config)
681 .await;
682 assert!(table.is_ok());
683
684 let value = table
686 .unwrap()
687 .snapshot()
688 .unwrap()
689 .metadata()
690 .configuration()
691 .get("key")
692 .unwrap()
693 .clone();
694 assert_eq!(String::from("value"), value);
695 }
696 }
697}