Skip to main content

deltalake_core/operations/
mod.rs

1//! High level operations API to interact with Delta tables
2//!
3//! At the heart of the high level operations APIs is the [`DeltaOps`] struct,
4//! which consumes a [`DeltaTable`] and exposes methods to attain builders for
5//! several high level operations. The specific builder structs allow fine-tuning
6//! the operations' behaviors and will return an updated table potentially in conjunction
7//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
8//! if the operation returns data as well.
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[cfg(feature = "datafusion")]
13use arrow::array::RecordBatch;
14use async_trait::async_trait;
15#[cfg(feature = "datafusion")]
16pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
17use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
18use url::Url;
19use uuid::Uuid;
20
21use self::{
22    add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder,
23    filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder,
24    set_tbl_properties::SetTablePropertiesBuilder,
25    update_field_metadata::UpdateFieldMetadataBuilder,
26    update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder,
27};
28#[cfg(feature = "datafusion")]
29use self::{
30    constraints::ConstraintBuilder, delete::DeleteBuilder, drop_constraints::DropConstraintBuilder,
31    load::LoadBuilder, load_cdf::CdfLoadBuilder, merge::MergeBuilder, optimize::OptimizeBuilder,
32    update::UpdateBuilder, write::WriteBuilder,
33};
34use crate::DeltaTable;
35#[cfg(feature = "datafusion")]
36use crate::delta_datafusion::Expression;
37use crate::errors::{DeltaResult, DeltaTableError};
38use crate::logstore::LogStoreRef;
39use crate::operations::generate::GenerateBuilder;
40use crate::table::builder::DeltaTableBuilder;
41use crate::table::config::{DEFAULT_NUM_INDEX_COLS, TablePropertiesExt as _};
42
43pub mod add_column;
44pub mod add_feature;
45pub mod convert_to_delta;
46pub mod create;
47pub mod drop_constraints;
48pub mod filesystem_check;
49pub mod generate;
50pub mod restore;
51pub mod update_field_metadata;
52pub mod update_table_metadata;
53pub mod vacuum;
54
55#[cfg(feature = "datafusion")]
56mod cdc;
57#[cfg(feature = "datafusion")]
58pub mod constraints;
59#[cfg(feature = "datafusion")]
60pub mod delete;
61#[cfg(feature = "datafusion")]
62mod load;
63#[cfg(feature = "datafusion")]
64pub mod load_cdf;
65#[cfg(feature = "datafusion")]
66pub mod merge;
67#[cfg(feature = "datafusion")]
68pub mod optimize;
69pub mod set_tbl_properties;
70#[cfg(feature = "datafusion")]
71pub mod update;
72#[cfg(feature = "datafusion")]
73pub mod write;
74
75#[cfg(all(test, feature = "datafusion"))]
76mod session_fallback_policy_tests;
77
78impl DeltaTable {
79    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given URL.
80    ///
81    /// ```
82    /// use deltalake_core::DeltaTable;
83    /// use url::Url;
84    ///
85    /// async {
86    ///     let url = Url::parse("memory:///").unwrap();
87    ///     let ops = DeltaTable::try_from_url(url).await.unwrap();
88    /// };
89    /// ```
90    pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
91        let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
92        // We allow for uninitialized locations, since we may want to create the table
93        match table.load().await {
94            Ok(_) => Ok(table),
95            Err(DeltaTableError::NotATable(_)) => Ok(table),
96            Err(err) => Err(err),
97        }
98    }
99
100    /// Create a [`DeltaTable`] instance from URL with storage options
101    pub async fn try_from_url_with_storage_options(
102        uri: Url,
103        storage_options: HashMap<String, String>,
104    ) -> DeltaResult<Self> {
105        let mut table = DeltaTableBuilder::from_url(uri)?
106            .with_storage_options(storage_options)
107            .build()?;
108        // We allow for uninitialized locations, since we may want to create the table
109        match table.load().await {
110            Ok(_) => Ok(table),
111            Err(DeltaTableError::NotATable(_)) => Ok(table),
112            Err(err) => Err(err),
113        }
114    }
115
116    #[must_use]
117    pub fn create(&self) -> CreateBuilder {
118        CreateBuilder::default().with_log_store(self.log_store())
119    }
120
121    #[must_use]
122    pub fn restore(self) -> RestoreBuilder {
123        RestoreBuilder::new(
124            self.log_store(),
125            self.state.clone().map(|state| state.snapshot),
126        )
127    }
128
129    /// Vacuum stale files from delta table
130    #[must_use]
131    pub fn vacuum(self) -> VacuumBuilder {
132        VacuumBuilder::new(
133            self.log_store(),
134            self.state.clone().map(|state| state.snapshot),
135        )
136    }
137
138    /// Audit active files with files present on the filesystem
139    #[must_use]
140    pub fn filesystem_check(self) -> FileSystemCheckBuilder {
141        FileSystemCheckBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
142    }
143
144    /// Enable a table feature for a table
145    #[must_use]
146    pub fn add_feature(self) -> AddTableFeatureBuilder {
147        AddTableFeatureBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
148    }
149
150    /// Set table properties
151    #[must_use]
152    pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
153        SetTablePropertiesBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
154    }
155
156    /// Add new columns
157    #[must_use]
158    pub fn add_columns(self) -> AddColumnBuilder {
159        AddColumnBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
160    }
161
162    /// Update field metadata
163    #[must_use]
164    pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
165        UpdateFieldMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
166    }
167
168    /// Update table metadata
169    #[must_use]
170    pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
171        UpdateTableMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
172    }
173
174    /// Generate a symlink_format_manifest for other engines
175    pub fn generate(self) -> GenerateBuilder {
176        GenerateBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
177    }
178}
179
180#[cfg(feature = "datafusion")]
181impl DeltaTable {
182    #[must_use]
183    pub fn scan_table(&self) -> LoadBuilder {
184        LoadBuilder::new(
185            self.log_store(),
186            self.state.clone().map(|state| state.snapshot),
187        )
188    }
189
190    /// Load a table with CDF Enabled
191    #[must_use]
192    pub fn scan_cdf(self) -> CdfLoadBuilder {
193        CdfLoadBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
194    }
195
196    #[must_use]
197    pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
198        WriteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
199            .with_input_batches(batches)
200    }
201
202    /// Audit active files with files present on the filesystem
203    #[must_use]
204    pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
205        OptimizeBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
206    }
207
208    /// Delete data from Delta table
209    #[must_use]
210    pub fn delete(self) -> DeleteBuilder {
211        DeleteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
212    }
213
214    /// Update data from Delta table
215    #[must_use]
216    pub fn update(self) -> UpdateBuilder {
217        UpdateBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
218    }
219
220    /// Update data from Delta table
221    #[must_use]
222    pub fn merge<E: Into<Expression>>(
223        self,
224        source: datafusion::prelude::DataFrame,
225        predicate: E,
226    ) -> MergeBuilder {
227        MergeBuilder::new(
228            self.log_store(),
229            self.state.clone().map(|s| s.snapshot),
230            predicate.into(),
231            source,
232        )
233    }
234
235    /// Add a check constraint to a table
236    #[must_use]
237    pub fn add_constraint(self) -> ConstraintBuilder {
238        ConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
239    }
240
241    /// Drops constraints from a table
242    #[must_use]
243    pub fn drop_constraints(self) -> DropConstraintBuilder {
244        DropConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
245    }
246}
247
248#[async_trait]
249pub trait CustomExecuteHandler: Send + Sync {
250    // Execute arbitrary code at the start of a delta operation
251    async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
252
253    // Execute arbitrary code at the end of a delta operation
254    async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
255
256    // Execute arbitrary code at the start of the post commit hook
257    async fn before_post_commit_hook(
258        &self,
259        log_store: &LogStoreRef,
260        file_operation: bool,
261        operation_id: Uuid,
262    ) -> DeltaResult<()>;
263
264    // Execute arbitrary code at the end of the post commit hook
265    async fn after_post_commit_hook(
266        &self,
267        log_store: &LogStoreRef,
268        file_operation: bool,
269        operation_id: Uuid,
270    ) -> DeltaResult<()>;
271}
272
273#[allow(unused)]
274/// The [Operation] trait defines common behaviors that all operations builders
275/// should have consistent
276pub(crate) trait Operation: std::future::IntoFuture {
277    fn log_store(&self) -> &LogStoreRef;
278    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
279    async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
280        if let Some(handler) = self.get_custom_execute_handler() {
281            handler.pre_execute(self.log_store(), operation_id).await
282        } else {
283            Ok(())
284        }
285    }
286
287    async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
288        if let Some(handler) = self.get_custom_execute_handler() {
289            handler.post_execute(self.log_store(), operation_id).await
290        } else {
291            Ok(())
292        }
293    }
294
295    fn get_operation_id(&self) -> uuid::Uuid {
296        Uuid::new_v4()
297    }
298}
299
300/// High level interface for executing commands against a DeltaTable
301#[deprecated(note = "Use methods directly on DeltaTable instead, e.g. `delta_table.create()`")]
302pub struct DeltaOps(pub DeltaTable);
303
304#[allow(deprecated)]
305impl DeltaOps {
306    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given URL.
307    ///
308    /// ```
309    /// use deltalake_core::DeltaOps;
310    /// use url::Url;
311    ///
312    /// async {
313    ///     let url = Url::parse("memory:///").unwrap();
314    ///     let ops = DeltaOps::try_from_url(url).await.unwrap();
315    /// };
316    /// ```
317    pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
318        let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
319        // We allow for uninitialized locations, since we may want to create the table
320        match table.load().await {
321            Ok(_) => Ok(table.into()),
322            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
323            Err(err) => Err(err),
324        }
325    }
326
327    /// Create a [`DeltaOps`] instance from URL with storage options
328    pub async fn try_from_url_with_storage_options(
329        uri: Url,
330        storage_options: HashMap<String, String>,
331    ) -> DeltaResult<Self> {
332        let mut table = DeltaTableBuilder::from_url(uri)?
333            .with_storage_options(storage_options)
334            .build()?;
335        // We allow for uninitialized locations, since we may want to create the table
336        match table.load().await {
337            Ok(_) => Ok(table.into()),
338            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
339            Err(err) => Err(err),
340        }
341    }
342
343    /// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
344    ///
345    /// Using this will not persist any changes beyond the lifetime of the table object.
346    /// The main purpose of in-memory tables is for use in testing.
347    ///
348    /// ```
349    /// use deltalake_core::DeltaOps;
350    ///
351    /// let ops = DeltaOps::new_in_memory();
352    /// ```
353    #[must_use]
354    pub fn new_in_memory() -> Self {
355        let url = Url::parse("memory:///").unwrap();
356        DeltaTableBuilder::from_url(url)
357            .unwrap()
358            .build()
359            .unwrap()
360            .into()
361    }
362
363    /// Create a new Delta table
364    ///
365    /// ```
366    /// use deltalake_core::DeltaOps;
367    ///
368    /// async {
369    ///     let ops = DeltaOps::try_from_url(url::Url::parse("memory://").unwrap()).await.unwrap();
370    ///     let table = ops.create().with_table_name("my_table").await.unwrap();
371    ///     assert_eq!(table.version(), Some(0));
372    /// };
373    /// ```
374    #[must_use]
375    #[deprecated(note = "Use [`DeltaTable::create`] instead")]
376    pub fn create(self) -> CreateBuilder {
377        CreateBuilder::default().with_log_store(self.0.log_store)
378    }
379
380    /// Generate a symlink_format_manifest for other engines
381    #[deprecated(note = "Use [`DeltaTable::generate`] instead")]
382    pub fn generate(self) -> GenerateBuilder {
383        GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
384    }
385
386    /// Load data from a DeltaTable
387    #[cfg(feature = "datafusion")]
388    #[must_use]
389    #[deprecated(note = "Use [`DeltaTable::scan`] instead")]
390    pub fn load(self) -> LoadBuilder {
391        LoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
392    }
393
394    /// Load a table with CDF Enabled
395    #[cfg(feature = "datafusion")]
396    #[must_use]
397    #[deprecated(note = "Use [`DeltaTable::scan_cdf`] instead")]
398    pub fn load_cdf(self) -> CdfLoadBuilder {
399        CdfLoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
400    }
401
402    /// Write data to Delta table
403    #[cfg(feature = "datafusion")]
404    #[must_use]
405    #[deprecated(note = "Use [`DeltaTable::write`] instead")]
406    pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
407        WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
408            .with_input_batches(batches)
409    }
410
411    /// Vacuum stale files from delta table
412    #[must_use]
413    #[deprecated(note = "Use [`DeltaTable::vacuum`] instead")]
414    pub fn vacuum(self) -> VacuumBuilder {
415        VacuumBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
416    }
417
418    /// Audit and repair active files with files present on the filesystem
419    #[must_use]
420    #[deprecated(note = "Use [`DeltaTable::filesystem_check`] instead")]
421    pub fn filesystem_check(self) -> FileSystemCheckBuilder {
422        FileSystemCheckBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
423    }
424
425    /// Audit active files with files present on the filesystem
426    #[cfg(feature = "datafusion")]
427    #[must_use]
428    #[deprecated(note = "Use [`DeltaTable::optimize`] instead")]
429    pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
430        OptimizeBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
431    }
432
433    /// Delete data from Delta table
434    #[cfg(feature = "datafusion")]
435    #[must_use]
436    #[deprecated(note = "Use [`DeltaTable::delete`] instead")]
437    pub fn delete(self) -> DeleteBuilder {
438        DeleteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
439    }
440
441    /// Update data from Delta table
442    #[cfg(feature = "datafusion")]
443    #[must_use]
444    #[deprecated(note = "Use [`DeltaTable::update`] instead")]
445    pub fn update(self) -> UpdateBuilder {
446        UpdateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
447    }
448
449    /// Restore delta table to a specified version or datetime
450    #[must_use]
451    #[deprecated(note = "Use [`DeltaTable::restore`] instead")]
452    pub fn restore(self) -> RestoreBuilder {
453        RestoreBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
454    }
455
456    /// Update data from Delta table
457    #[cfg(feature = "datafusion")]
458    #[must_use]
459    #[deprecated(note = "Use [`DeltaTable::merge`] instead")]
460    pub fn merge<E: Into<Expression>>(
461        self,
462        source: datafusion::prelude::DataFrame,
463        predicate: E,
464    ) -> MergeBuilder {
465        MergeBuilder::new(
466            self.0.log_store,
467            self.0.state.map(|s| s.snapshot),
468            predicate.into(),
469            source,
470        )
471    }
472
473    /// Add a check constraint to a table
474    #[cfg(feature = "datafusion")]
475    #[must_use]
476    #[deprecated(note = "Use [`DeltaTable::add_constraint`] instead")]
477    pub fn add_constraint(self) -> ConstraintBuilder {
478        ConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
479    }
480
481    /// Enable a table feature for a table
482    #[must_use]
483    #[deprecated(note = "Use [`DeltaTable::add_feature`] instead")]
484    pub fn add_feature(self) -> AddTableFeatureBuilder {
485        AddTableFeatureBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
486    }
487
488    /// Drops constraints from a table
489    #[cfg(feature = "datafusion")]
490    #[must_use]
491    #[deprecated(note = "Use [`DeltaTable::drop_constraints`] instead")]
492    pub fn drop_constraints(self) -> DropConstraintBuilder {
493        DropConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
494    }
495
496    /// Set table properties
497    #[deprecated(note = "Use [`DeltaTable::set_tbl_properties`] instead")]
498    pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
499        SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
500    }
501
502    /// Add new columns
503    #[deprecated(note = "Use [`DeltaTable::add_columns`] instead")]
504    pub fn add_columns(self) -> AddColumnBuilder {
505        AddColumnBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
506    }
507
508    /// Update field metadata
509    #[deprecated(note = "Use [`DeltaTable::update_field_metadata`] instead")]
510    pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
511        UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
512    }
513
514    /// Update table metadata
515    #[deprecated(note = "Use [`DeltaTable::update_table_metadata`] instead")]
516    pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
517        UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
518    }
519}
520
521#[allow(deprecated)]
522impl From<DeltaTable> for DeltaOps {
523    fn from(table: DeltaTable) -> Self {
524        Self(table)
525    }
526}
527
528#[allow(deprecated)]
529impl From<DeltaOps> for DeltaTable {
530    fn from(ops: DeltaOps) -> Self {
531        ops.0
532    }
533}
534
535#[allow(deprecated)]
536impl AsRef<DeltaTable> for DeltaOps {
537    fn as_ref(&self) -> &DeltaTable {
538        &self.0
539    }
540}
541
542/// Get the num_idx_columns and stats_columns from the table configuration in the state
543/// If table_config does not exist (only can occur in the first write action) it takes
544/// the configuration that was passed to the writerBuilder.
545pub fn get_num_idx_cols_and_stats_columns(
546    config: Option<&TableProperties>,
547    configuration: HashMap<String, Option<String>>,
548) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
549    let (num_index_cols, stats_columns) = match &config {
550        Some(conf) => (
551            conf.num_indexed_cols(),
552            conf.data_skipping_stats_columns
553                .clone()
554                .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
555        ),
556        _ => (
557            configuration
558                .get("delta.dataSkippingNumIndexedCols")
559                .and_then(|v| {
560                    v.as_ref()
561                        .and_then(|vv| vv.parse::<u64>().ok())
562                        .map(DataSkippingNumIndexedCols::NumColumns)
563                })
564                .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
565                    DEFAULT_NUM_INDEX_COLS,
566                )),
567            configuration
568                .get("delta.dataSkippingStatsColumns")
569                .and_then(|v| {
570                    v.as_ref()
571                        .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
572                }),
573        ),
574    };
575    (
576        num_index_cols,
577        stats_columns
578            .clone()
579            .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
580    )
581}
582
583/// Get the target_file_size from the table configuration in the sates
584/// If table_config does not exist (only can occur in the first write action) it takes
585/// the configuration that was passed to the writerBuilder.
586#[cfg(feature = "datafusion")]
587pub(crate) fn get_target_file_size(
588    config: Option<&TableProperties>,
589    configuration: &HashMap<String, Option<String>>,
590) -> u64 {
591    match &config {
592        Some(conf) => conf.target_file_size().get(),
593        _ => configuration
594            .get("delta.targetFileSize")
595            .and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
596            .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
597    }
598}