Skip to main content

deltalake_core/operations/
create.rs

1//! Command for creating a new delta table
2// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use delta_kernel::schema::{ColumnMetadataKey, MetadataValue};
8use futures::TryStreamExt as _;
9use futures::future::BoxFuture;
10use serde_json::Value;
11use uuid::Uuid;
12
13use super::{CustomExecuteHandler, Operation};
14use crate::errors::{DeltaResult, DeltaTableError, unsupported_column_mapping_write};
15use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL, TableReference};
16use crate::kernel::{
17    Action, DataType, MetadataExt, ProtocolExt as _, ProtocolInner, StructField, StructType,
18    new_metadata,
19};
20use crate::logstore::LogStoreRef;
21use crate::protocol::{DeltaOperation, SaveMode};
22use crate::table::builder::ensure_table_uri;
23use crate::table::config::TableProperty;
24use crate::table::normalize_table_url;
25use crate::{DeltaTable, DeltaTableBuilder};
26
27#[derive(thiserror::Error, Debug)]
28enum CreateError {
29    #[error("Location must be provided to create a table.")]
30    MissingLocation,
31
32    #[error("At least one column must be defined to create a table.")]
33    MissingSchema,
34
35    #[error("Please configure table meta data via the CreateBuilder.")]
36    MetadataSpecified,
37
38    #[error("A Delta Lake table already exists at that location.")]
39    TableAlreadyExists,
40
41    #[error("SaveMode `append` is not allowed for create operation.")]
42    AppendNotAllowed,
43}
44
45impl From<CreateError> for DeltaTableError {
46    fn from(err: CreateError) -> Self {
47        DeltaTableError::GenericError {
48            source: Box::new(err),
49        }
50    }
51}
52
53fn data_type_has_column_mapping_metadata(data_type: &DataType) -> bool {
54    match data_type {
55        DataType::Array(array) => data_type_has_column_mapping_metadata(array.element_type()),
56        DataType::Map(map) => {
57            data_type_has_column_mapping_metadata(map.key_type())
58                || data_type_has_column_mapping_metadata(map.value_type())
59        }
60        DataType::Struct(fields) | DataType::Variant(fields) => {
61            fields.fields().any(field_has_column_mapping_metadata)
62        }
63        DataType::Primitive(_) => false,
64    }
65}
66
67fn field_has_column_mapping_metadata(field: &StructField) -> bool {
68    field
69        .metadata()
70        .contains_key(ColumnMetadataKey::ColumnMappingId.as_ref())
71        || field
72            .metadata()
73            .contains_key(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
74        || data_type_has_column_mapping_metadata(field.data_type())
75}
76
77/// Build an operation to create a new [DeltaTable]
78#[derive(Clone)]
79pub struct CreateBuilder {
80    name: Option<String>,
81    location: Option<String>,
82    mode: SaveMode,
83    comment: Option<String>,
84    columns: Vec<StructField>,
85    partition_columns: Option<Vec<String>>,
86    storage_options: Option<HashMap<String, String>>,
87    actions: Vec<Action>,
88    log_store: Option<LogStoreRef>,
89    configuration: HashMap<String, Option<String>>,
90    /// Additional information to add to the commit
91    commit_properties: CommitProperties,
92    raise_if_key_not_exists: bool,
93    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
94}
95
96impl super::Operation for CreateBuilder {
97    fn log_store(&self) -> &LogStoreRef {
98        self.log_store
99            .as_ref()
100            .expect("Logstore shouldn't be none at this stage.")
101    }
102    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
103        self.custom_execute_handler.clone()
104    }
105}
106
107impl Default for CreateBuilder {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113impl CreateBuilder {
114    /// Create a new [`CreateBuilder`]
115    pub fn new() -> Self {
116        Self {
117            name: None,
118            location: None,
119            mode: SaveMode::ErrorIfExists,
120            comment: None,
121            columns: Default::default(),
122            partition_columns: None,
123            storage_options: None,
124            actions: Default::default(),
125            log_store: None,
126            configuration: Default::default(),
127            commit_properties: CommitProperties::default(),
128            raise_if_key_not_exists: true,
129            custom_execute_handler: None,
130        }
131    }
132
133    /// Specify the table name. Optionally qualified with
134    /// a database name [database_name.] table_name.
135    pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
136        self.name = Some(name.into());
137        self
138    }
139
140    /// Specify the path to the location where table data is stored,
141    /// which could be a path on distributed storage.
142    pub fn with_location(mut self, location: impl Into<String>) -> Self {
143        self.location = Some(location.into());
144        self
145    }
146
147    /// Specify the behavior when a table exists at location
148    pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self {
149        self.mode = save_mode;
150        self
151    }
152
153    /// Comment to describe the table.
154    pub fn with_comment(mut self, comment: impl Into<String>) -> Self {
155        self.comment = Some(comment.into());
156        self
157    }
158
159    /// Specify a column in the table
160    pub fn with_column(
161        mut self,
162        name: impl Into<String>,
163        data_type: DataType,
164        nullable: bool,
165        metadata: Option<HashMap<String, Value>>,
166    ) -> Self {
167        let mut field = StructField::new(name.into(), data_type, nullable);
168        if let Some(meta) = metadata {
169            field = field.with_metadata(meta.iter().map(|(k, v)| {
170                (
171                    k,
172                    if let Value::Number(n) = v {
173                        n.as_i64().map_or_else(
174                            || MetadataValue::String(v.to_string()),
175                            MetadataValue::Number,
176                        )
177                    } else {
178                        MetadataValue::String(v.to_string())
179                    },
180                )
181            }));
182        };
183        self.columns.push(field);
184        self
185    }
186
187    /// Specify columns to append to schema
188    pub fn with_columns(
189        mut self,
190        columns: impl IntoIterator<Item = impl Into<StructField>>,
191    ) -> Self {
192        self.columns.extend(columns.into_iter().map(|c| c.into()));
193        self
194    }
195
196    /// Specify table partitioning
197    pub fn with_partition_columns(
198        mut self,
199        partition_columns: impl IntoIterator<Item = impl Into<String>>,
200    ) -> Self {
201        self.partition_columns = Some(partition_columns.into_iter().map(|s| s.into()).collect());
202        self
203    }
204
205    /// Set options used to initialize storage backend
206    ///
207    /// Options may be passed in the HashMap or set as environment variables.
208    ///
209    /// [crate::table::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
210    /// If an object store is also passed using `with_object_store()` these options will be ignored.
211    pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
212        self.storage_options = Some(storage_options);
213        self
214    }
215
216    /// Set configuration on created table
217    pub fn with_configuration(
218        mut self,
219        configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
220    ) -> Self {
221        self.configuration = configuration
222            .into_iter()
223            .map(|(k, v)| (k.into(), v.map(|s| s.into())))
224            .collect();
225        self
226    }
227
228    /// Specify a table property in the table configuration
229    pub fn with_configuration_property(
230        mut self,
231        key: TableProperty,
232        value: Option<impl Into<String>>,
233    ) -> Self {
234        self.configuration
235            .insert(key.as_ref().into(), value.map(|v| v.into()));
236        self
237    }
238
239    /// Additional metadata to be added to commit info
240    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
241        self.commit_properties = commit_properties;
242        self
243    }
244
245    /// Specify whether to raise an error if the table properties in the configuration are not TableProperties
246    pub fn with_raise_if_key_not_exists(mut self, raise_if_key_not_exists: bool) -> Self {
247        self.raise_if_key_not_exists = raise_if_key_not_exists;
248        self
249    }
250
251    /// Specify additional actions to be added to the commit.
252    ///
253    /// This method is mainly meant for internal use. Manually adding inconsistent
254    /// actions to a create operation may have undesired effects - use with caution.
255    pub fn with_actions(mut self, actions: impl IntoIterator<Item = Action>) -> Self {
256        self.actions.extend(actions);
257        self
258    }
259
260    /// Provide a [`LogStore`] instance
261    pub fn with_log_store(mut self, log_store: LogStoreRef) -> Self {
262        self.log_store = Some(log_store);
263        self
264    }
265
266    /// Set a custom execute handler, for pre and post execution
267    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
268        self.custom_execute_handler = Some(handler);
269        self
270    }
271
272    /// Consume self into uninitialized table with corresponding create actions and operation meta
273    pub(crate) async fn into_table_and_actions(
274        mut self,
275    ) -> DeltaResult<(DeltaTable, Vec<Action>, DeltaOperation, Uuid)> {
276        if self
277            .actions
278            .iter()
279            .any(|a| matches!(a, Action::Metadata(_)))
280        {
281            return Err(CreateError::MetadataSpecified.into());
282        }
283        if self.columns.is_empty() {
284            return Err(CreateError::MissingSchema.into());
285        }
286        if self
287            .configuration
288            .get(TableProperty::ColumnMappingMode.as_ref())
289            .is_some_and(|value| value.is_some())
290        {
291            return Err(unsupported_column_mapping_write(
292                "CREATE TABLE with delta.columnMapping.mode",
293            ));
294        }
295        if self.columns.iter().any(field_has_column_mapping_metadata) {
296            return Err(unsupported_column_mapping_write(
297                "CREATE TABLE with column mapping metadata",
298            ));
299        }
300
301        let (storage_url, table) = if let Some(log_store) = self.log_store {
302            (
303                normalize_table_url(log_store.root_url()),
304                DeltaTable::new(log_store, Default::default()),
305            )
306        } else {
307            let storage_url =
308                ensure_table_uri(self.location.clone().ok_or(CreateError::MissingLocation)?)?;
309            (
310                storage_url.clone(),
311                DeltaTableBuilder::from_url(storage_url)?
312                    .with_storage_options(self.storage_options.clone().unwrap_or_default())
313                    .build()?,
314            )
315        };
316
317        self.log_store = Some(table.log_store());
318        let operation_id = self.get_operation_id();
319        self.pre_execute(operation_id).await?;
320
321        let configuration = self
322            .configuration
323            .iter()
324            .filter_map(|(k, v)| Some((k.to_string(), v.as_ref()?.to_string())))
325            .collect();
326
327        let current_protocol = ProtocolInner {
328            min_reader_version: PROTOCOL.default_reader_version(),
329            min_writer_version: PROTOCOL.default_writer_version(),
330            reader_features: None,
331            writer_features: None,
332        }
333        .as_kernel();
334
335        let protocol = self
336            .actions
337            .iter()
338            .find(|a| matches!(a, Action::Protocol(_)))
339            .map(|a| match a {
340                Action::Protocol(p) => p.clone(),
341                _ => unreachable!(),
342            })
343            .unwrap_or_else(|| current_protocol);
344
345        let schema = StructType::try_new(self.columns)?;
346
347        let protocol = protocol
348            .apply_properties_to_protocol(&configuration, self.raise_if_key_not_exists)?
349            .apply_column_metadata_to_protocol(&schema)?
350            .move_table_properties_into_features(&configuration);
351
352        let mut metadata = new_metadata(
353            &schema,
354            self.partition_columns.unwrap_or_default(),
355            configuration,
356        )?;
357        if let Some(name) = self.name {
358            metadata = metadata.with_name(name)?;
359        }
360        if let Some(comment) = self.comment {
361            metadata = metadata.with_description(comment)?;
362        }
363
364        let operation = DeltaOperation::Create {
365            mode: self.mode,
366            metadata: metadata.clone(),
367            location: storage_url,
368            protocol: protocol.clone(),
369        };
370
371        let mut actions = vec![Action::Protocol(protocol), Action::Metadata(metadata)];
372
373        actions.extend(
374            self.actions
375                .into_iter()
376                .filter(|a| !matches!(a, Action::Protocol(_))),
377        );
378
379        Ok((table, actions, operation, operation_id))
380    }
381}
382
383impl std::future::IntoFuture for CreateBuilder {
384    type Output = DeltaResult<DeltaTable>;
385    type IntoFuture = BoxFuture<'static, Self::Output>;
386
387    fn into_future(self) -> Self::IntoFuture {
388        let this = self;
389        Box::pin(async move {
390            let handler = this.custom_execute_handler.clone();
391            let mode = &this.mode;
392            let (mut table, mut actions, operation, operation_id) =
393                this.clone().into_table_and_actions().await?;
394
395            let table_state = if table.log_store.is_delta_table_location().await? {
396                match mode {
397                    SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
398                    SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
399                    SaveMode::Ignore => {
400                        table.load().await?;
401                        return Ok(table);
402                    }
403                    SaveMode::Overwrite => {
404                        table.load().await?;
405                        let remove_actions = table
406                            .snapshot()?
407                            .snapshot()
408                            .file_views(&table.log_store(), None)
409                            .map_ok(|p| p.remove_action(true).into())
410                            .try_collect::<Vec<_>>()
411                            .await?;
412                        actions.extend(remove_actions);
413                        Some(table.snapshot()?)
414                    }
415                }
416            } else {
417                None
418            };
419
420            let version = CommitBuilder::from(this.commit_properties.clone())
421                .with_actions(actions)
422                .with_operation_id(operation_id)
423                .with_post_commit_hook_handler(handler.clone())
424                .build(
425                    table_state.map(|f| f as &dyn TableReference),
426                    table.log_store.clone(),
427                    operation,
428                )
429                .await?
430                .version();
431            table.load_version(version).await?;
432
433            if let Some(handler) = handler {
434                handler
435                    .post_execute(&table.log_store(), operation_id)
436                    .await?;
437            }
438            Ok(table)
439        })
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use crate::table::config::TableProperty;
447    use crate::writer::test_utils::get_delta_schema;
448    use tempfile::TempDir;
449
450    #[tokio::test]
451    async fn test_create() {
452        let table_schema = get_delta_schema();
453
454        let table = DeltaTable::new_in_memory()
455            .create()
456            .with_columns(table_schema.fields().cloned())
457            .with_save_mode(SaveMode::Ignore)
458            .await
459            .unwrap();
460        assert_eq!(table.version(), Some(0));
461        assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
462    }
463
464    #[tokio::test]
465    async fn test_create_local_relative_path() {
466        let table_schema = get_delta_schema();
467        let tmp_dir = TempDir::new_in(".").unwrap();
468        let relative_path = format!(
469            "./{}",
470            tmp_dir.path().file_name().unwrap().to_str().unwrap()
471        );
472        let table_path = std::path::Path::new(&relative_path).canonicalize().unwrap();
473        let table_url = url::Url::from_directory_path(table_path)
474            .map_err(|_| DeltaTableError::InvalidTableLocation(relative_path.clone()))
475            .unwrap();
476        let table = DeltaTable::try_from_url(table_url)
477            .await
478            .unwrap()
479            .create()
480            .with_columns(table_schema.fields().cloned())
481            .with_save_mode(SaveMode::Ignore)
482            .await
483            .unwrap();
484        assert_eq!(table.version(), Some(0));
485        assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
486    }
487
488    #[tokio::test]
489    async fn test_create_table_local_path() {
490        let schema = get_delta_schema();
491        let tmp_dir = TempDir::new_in(".").unwrap();
492        let relative_path = format!(
493            "./{}",
494            tmp_dir.path().file_name().unwrap().to_str().unwrap()
495        );
496        let table = CreateBuilder::new()
497            .with_location(format!("./{relative_path}"))
498            .with_columns(schema.fields().cloned())
499            .await
500            .unwrap();
501        assert_eq!(table.version(), Some(0));
502    }
503
504    #[tokio::test]
505    async fn test_create_table_metadata() {
506        let schema = get_delta_schema();
507        let table = CreateBuilder::new()
508            .with_location("memory:///")
509            .with_columns(schema.fields().cloned())
510            .await
511            .unwrap();
512        let snapshot = table.snapshot().unwrap();
513        assert_eq!(snapshot.version(), 0);
514        assert_eq!(
515            snapshot.protocol().min_reader_version(),
516            PROTOCOL.default_reader_version()
517        );
518        assert_eq!(
519            snapshot.protocol().min_writer_version(),
520            PROTOCOL.default_writer_version()
521        );
522        assert_eq!(snapshot.schema().as_ref(), &schema);
523
524        // check we can overwrite default settings via adding actions
525        let protocol = ProtocolInner {
526            min_reader_version: 1,
527            min_writer_version: 2,
528            writer_features: None,
529            reader_features: None,
530        }
531        .as_kernel();
532        let table = CreateBuilder::new()
533            .with_location("memory:///")
534            .with_columns(schema.fields().cloned())
535            .with_actions(vec![Action::Protocol(protocol)])
536            .await
537            .unwrap();
538        let snapshot = table.snapshot().unwrap();
539        assert_eq!(snapshot.protocol().min_reader_version(), 1);
540        assert_eq!(snapshot.protocol().min_writer_version(), 2);
541
542        let table = CreateBuilder::new()
543            .with_location("memory:///")
544            .with_columns(schema.fields().cloned())
545            .with_configuration_property(TableProperty::AppendOnly, Some("true"))
546            .await
547            .unwrap();
548        let append = table
549            .snapshot()
550            .unwrap()
551            .metadata()
552            .configuration()
553            .get(TableProperty::AppendOnly.as_ref())
554            .unwrap()
555            .clone();
556        assert_eq!(String::from("true"), append)
557    }
558
559    #[cfg(feature = "datafusion")]
560    mod datafusion_tests {
561        use super::*;
562
563        use crate::writer::test_utils::get_record_batch;
564        #[tokio::test]
565        async fn test_create_table_save_mode() {
566            let tmp_dir = tempfile::tempdir().unwrap();
567
568            let schema = get_delta_schema();
569            let table = CreateBuilder::new()
570                .with_location(tmp_dir.path().to_str().unwrap())
571                .with_columns(schema.fields().cloned())
572                .await
573                .unwrap();
574            assert_eq!(table.version(), Some(0));
575            let first_id = table.snapshot().unwrap().metadata().id().to_string();
576
577            let log_store = table.log_store;
578
579            // Check an error is raised when a table exists at location
580            let table = CreateBuilder::new()
581                .with_log_store(log_store.clone())
582                .with_columns(schema.fields().cloned())
583                .with_save_mode(SaveMode::ErrorIfExists)
584                .await;
585            assert!(table.is_err());
586
587            // Check current table is returned when ignore option is chosen.
588            let table = CreateBuilder::new()
589                .with_log_store(log_store.clone())
590                .with_columns(schema.fields().cloned())
591                .with_save_mode(SaveMode::Ignore)
592                .await
593                .unwrap();
594            assert_eq!(table.snapshot().unwrap().metadata().id(), first_id);
595
596            // Check table is overwritten
597            let table = CreateBuilder::new()
598                .with_log_store(log_store)
599                .with_columns(schema.fields().cloned())
600                .with_save_mode(SaveMode::Overwrite)
601                .await
602                .unwrap();
603            assert_ne!(table.snapshot().unwrap().metadata().id(), first_id)
604        }
605
606        #[tokio::test]
607        async fn test_create_or_replace_existing_table() {
608            let batch = get_record_batch(None, false);
609            let schema = get_delta_schema();
610            let table = DeltaTable::new_in_memory()
611                .write(vec![batch.clone()])
612                .with_save_mode(SaveMode::ErrorIfExists)
613                .await
614                .unwrap();
615            let state = table.snapshot().unwrap();
616            assert_eq!(state.version(), 0);
617            assert_eq!(state.log_data().num_files(), 1);
618
619            let mut table = table
620                .create()
621                .with_columns(schema.fields().cloned())
622                .with_save_mode(SaveMode::Overwrite)
623                .await
624                .unwrap();
625            table.load().await.unwrap();
626            let state = table.snapshot().unwrap();
627            assert_eq!(state.version(), 1);
628            // Checks if files got removed after overwrite
629            assert_eq!(state.log_data().num_files(), 0);
630        }
631
632        #[tokio::test]
633        async fn test_create_or_replace_existing_table_partitioned() {
634            let batch = get_record_batch(None, false);
635            let schema = get_delta_schema();
636            let table = DeltaTable::new_in_memory()
637                .write(vec![batch.clone()])
638                .with_save_mode(SaveMode::ErrorIfExists)
639                .await
640                .unwrap();
641            let state = table.snapshot().unwrap();
642            assert_eq!(state.version(), 0);
643            assert_eq!(state.log_data().num_files(), 1);
644
645            let mut table = table
646                .create()
647                .with_columns(schema.fields().cloned())
648                .with_save_mode(SaveMode::Overwrite)
649                .with_partition_columns(vec!["id"])
650                .await
651                .unwrap();
652            table.load().await.unwrap();
653            let state = table.snapshot().unwrap();
654            assert_eq!(state.version(), 1);
655            // Checks if files got removed after overwrite
656            assert_eq!(state.log_data().num_files(), 0);
657        }
658
659        #[tokio::test]
660        async fn test_create_table_metadata_raise_if_key_not_exists() {
661            let schema = get_delta_schema();
662            let config: HashMap<String, Option<String>> =
663                vec![("key".to_string(), Some("value".to_string()))]
664                    .into_iter()
665                    .collect();
666
667            // Fail to create table with unknown Delta key
668            let table = CreateBuilder::new()
669                .with_location("memory:///")
670                .with_columns(schema.fields().cloned())
671                .with_configuration(config.clone())
672                .await;
673            assert!(table.is_err());
674
675            // Succeed in creating table with unknown Delta key since we set raise_if_key_not_exists to false
676            let table = CreateBuilder::new()
677                .with_location("memory:///")
678                .with_columns(schema.fields().cloned())
679                .with_raise_if_key_not_exists(false)
680                .with_configuration(config)
681                .await;
682            assert!(table.is_ok());
683
684            // Ensure the non-Delta key was set correctly
685            let value = table
686                .unwrap()
687                .snapshot()
688                .unwrap()
689                .metadata()
690                .configuration()
691                .get("key")
692                .unwrap()
693                .clone();
694            assert_eq!(String::from("value"), value);
695        }
696    }
697}