Skip to main content

deltalake_core/operations/
mod.rs

1//! High level operations API to interact with Delta tables
2//!
3//! At the heart of the high level operations APIs is the [`DeltaOps`] struct,
4//! which consumes a [`DeltaTable`] and exposes methods to attain builders for
5//! several high level operations. The specific builder structs allow fine-tuning
6//! the operations' behaviors and will return an updated table potentially in conjunction
7//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
8//! if the operation returns data as well.
9use std::collections::HashMap;
10#[cfg(feature = "datafusion")]
11use std::num::NonZeroU64;
12use std::sync::Arc;
13
14#[cfg(feature = "datafusion")]
15use arrow::array::RecordBatch;
16use async_trait::async_trait;
17#[cfg(feature = "datafusion")]
18pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
19use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
20use url::Url;
21use uuid::Uuid;
22
23use self::{
24    add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder,
25    filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder,
26    set_tbl_properties::SetTablePropertiesBuilder,
27    update_field_metadata::UpdateFieldMetadataBuilder,
28    update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder,
29};
30#[cfg(feature = "datafusion")]
31use self::{
32    constraints::ConstraintBuilder, delete::DeleteBuilder, drop_constraints::DropConstraintBuilder,
33    load::LoadBuilder, load_cdf::CdfLoadBuilder, merge::MergeBuilder, optimize::OptimizeBuilder,
34    update::UpdateBuilder, write::WriteBuilder,
35};
36use crate::DeltaTable;
37#[cfg(feature = "datafusion")]
38use crate::delta_datafusion::Expression;
39use crate::errors::{DeltaResult, DeltaTableError};
40use crate::logstore::LogStoreRef;
41use crate::operations::generate::GenerateBuilder;
42use crate::table::builder::DeltaTableBuilder;
43use crate::table::config::{DEFAULT_NUM_INDEX_COLS, TablePropertiesExt as _};
44
45pub mod add_column;
46pub mod add_feature;
47pub mod convert_to_delta;
48pub mod create;
49pub mod drop_constraints;
50pub mod filesystem_check;
51pub mod generate;
52pub mod restore;
53pub mod update_field_metadata;
54pub mod update_table_metadata;
55pub mod vacuum;
56
57#[cfg(feature = "datafusion")]
58mod cdc;
59#[cfg(feature = "datafusion")]
60pub mod constraints;
61#[cfg(feature = "datafusion")]
62pub mod delete;
63#[cfg(feature = "datafusion")]
64mod load;
65#[cfg(feature = "datafusion")]
66pub mod load_cdf;
67#[cfg(feature = "datafusion")]
68pub mod merge;
69#[cfg(feature = "datafusion")]
70pub mod optimize;
71pub mod set_tbl_properties;
72#[cfg(feature = "datafusion")]
73pub mod update;
74#[cfg(feature = "datafusion")]
75pub mod write;
76
77#[cfg(all(test, feature = "datafusion"))]
78mod session_fallback_policy_tests;
79
80impl DeltaTable {
81    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given URL.
82    ///
83    /// ```
84    /// use deltalake_core::DeltaTable;
85    /// use url::Url;
86    ///
87    /// async {
88    ///     let url = Url::parse("memory:///").unwrap();
89    ///     let ops = DeltaTable::try_from_url(url).await.unwrap();
90    /// };
91    /// ```
92    pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
93        let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
94        // We allow for uninitialized locations, since we may want to create the table
95        match table.load().await {
96            Ok(_) => Ok(table),
97            Err(DeltaTableError::NotATable(_)) => Ok(table),
98            Err(err) => Err(err),
99        }
100    }
101
102    /// Create a [`DeltaTable`] instance from URL with storage options
103    pub async fn try_from_url_with_storage_options(
104        uri: Url,
105        storage_options: HashMap<String, String>,
106    ) -> DeltaResult<Self> {
107        let mut table = DeltaTableBuilder::from_url(uri)?
108            .with_storage_options(storage_options)
109            .build()?;
110        // We allow for uninitialized locations, since we may want to create the table
111        match table.load().await {
112            Ok(_) => Ok(table),
113            Err(DeltaTableError::NotATable(_)) => Ok(table),
114            Err(err) => Err(err),
115        }
116    }
117
118    #[must_use]
119    pub fn create(&self) -> CreateBuilder {
120        CreateBuilder::default().with_log_store(self.log_store())
121    }
122
123    #[must_use]
124    pub fn restore(self) -> RestoreBuilder {
125        RestoreBuilder::new(
126            self.log_store(),
127            self.state.clone().map(|state| state.snapshot),
128        )
129    }
130
131    /// Vacuum stale files from delta table
132    #[must_use]
133    pub fn vacuum(self) -> VacuumBuilder {
134        VacuumBuilder::new(
135            self.log_store(),
136            self.state.clone().map(|state| state.snapshot),
137        )
138    }
139
140    /// Audit active files with files present on the filesystem
141    #[must_use]
142    pub fn filesystem_check(self) -> FileSystemCheckBuilder {
143        FileSystemCheckBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
144    }
145
146    /// Enable a table feature for a table
147    #[must_use]
148    pub fn add_feature(self) -> AddTableFeatureBuilder {
149        AddTableFeatureBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
150    }
151
152    /// Set table properties
153    #[must_use]
154    pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
155        SetTablePropertiesBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
156    }
157
158    /// Add new columns
159    #[must_use]
160    pub fn add_columns(self) -> AddColumnBuilder {
161        AddColumnBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
162    }
163
164    /// Update field metadata
165    #[must_use]
166    pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
167        UpdateFieldMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
168    }
169
170    /// Update table metadata
171    #[must_use]
172    pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
173        UpdateTableMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
174    }
175
176    /// Generate a symlink_format_manifest for other engines
177    pub fn generate(self) -> GenerateBuilder {
178        GenerateBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
179    }
180}
181
182#[cfg(feature = "datafusion")]
183impl DeltaTable {
184    #[must_use]
185    pub fn scan_table(&self) -> LoadBuilder {
186        LoadBuilder::new(
187            self.log_store(),
188            self.state.clone().map(|state| state.snapshot),
189        )
190    }
191
192    /// Load a table with CDF Enabled
193    #[must_use]
194    pub fn scan_cdf(self) -> CdfLoadBuilder {
195        CdfLoadBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
196    }
197
198    #[must_use]
199    pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
200        WriteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
201            .with_input_batches(batches)
202    }
203
204    /// Audit active files with files present on the filesystem
205    #[must_use]
206    pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
207        OptimizeBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
208    }
209
210    /// Delete data from Delta table
211    #[must_use]
212    pub fn delete(self) -> DeleteBuilder {
213        DeleteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
214    }
215
216    /// Update data from Delta table
217    #[must_use]
218    pub fn update(self) -> UpdateBuilder {
219        UpdateBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
220    }
221
222    /// Update data from Delta table
223    #[must_use]
224    pub fn merge<E: Into<Expression>>(
225        self,
226        source: datafusion::prelude::DataFrame,
227        predicate: E,
228    ) -> MergeBuilder {
229        MergeBuilder::new(
230            self.log_store(),
231            self.state.clone().map(|s| s.snapshot),
232            predicate.into(),
233            source,
234        )
235    }
236
237    /// Add a check constraint to a table
238    #[must_use]
239    pub fn add_constraint(self) -> ConstraintBuilder {
240        ConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
241    }
242
243    /// Drops constraints from a table
244    #[must_use]
245    pub fn drop_constraints(self) -> DropConstraintBuilder {
246        DropConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
247    }
248}
249
250#[async_trait]
251pub trait CustomExecuteHandler: Send + Sync {
252    // Execute arbitrary code at the start of a delta operation
253    async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
254
255    // Execute arbitrary code at the end of a delta operation
256    async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
257
258    // Execute arbitrary code at the start of the post commit hook
259    async fn before_post_commit_hook(
260        &self,
261        log_store: &LogStoreRef,
262        file_operation: bool,
263        operation_id: Uuid,
264    ) -> DeltaResult<()>;
265
266    // Execute arbitrary code at the end of the post commit hook
267    async fn after_post_commit_hook(
268        &self,
269        log_store: &LogStoreRef,
270        file_operation: bool,
271        operation_id: Uuid,
272    ) -> DeltaResult<()>;
273}
274
275#[allow(unused)]
276/// The [Operation] trait defines common behaviors that all operations builders
277/// should have consistent
278pub(crate) trait Operation: std::future::IntoFuture {
279    fn log_store(&self) -> &LogStoreRef;
280    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
281    async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
282        if let Some(handler) = self.get_custom_execute_handler() {
283            handler.pre_execute(self.log_store(), operation_id).await
284        } else {
285            Ok(())
286        }
287    }
288
289    async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
290        if let Some(handler) = self.get_custom_execute_handler() {
291            handler.post_execute(self.log_store(), operation_id).await
292        } else {
293            Ok(())
294        }
295    }
296
297    fn get_operation_id(&self) -> uuid::Uuid {
298        Uuid::new_v4()
299    }
300}
301
302/// High level interface for executing commands against a DeltaTable
303#[deprecated(note = "Use methods directly on DeltaTable instead, e.g. `delta_table.create()`")]
304pub struct DeltaOps(pub DeltaTable);
305
306#[allow(deprecated)]
307impl DeltaOps {
308    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given URL.
309    ///
310    /// ```
311    /// use deltalake_core::DeltaOps;
312    /// use url::Url;
313    ///
314    /// async {
315    ///     let url = Url::parse("memory:///").unwrap();
316    ///     let ops = DeltaOps::try_from_url(url).await.unwrap();
317    /// };
318    /// ```
319    pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
320        let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
321        // We allow for uninitialized locations, since we may want to create the table
322        match table.load().await {
323            Ok(_) => Ok(table.into()),
324            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
325            Err(err) => Err(err),
326        }
327    }
328
329    /// Create a [`DeltaOps`] instance from URL with storage options
330    pub async fn try_from_url_with_storage_options(
331        uri: Url,
332        storage_options: HashMap<String, String>,
333    ) -> DeltaResult<Self> {
334        let mut table = DeltaTableBuilder::from_url(uri)?
335            .with_storage_options(storage_options)
336            .build()?;
337        // We allow for uninitialized locations, since we may want to create the table
338        match table.load().await {
339            Ok(_) => Ok(table.into()),
340            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
341            Err(err) => Err(err),
342        }
343    }
344
345    /// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
346    ///
347    /// Using this will not persist any changes beyond the lifetime of the table object.
348    /// The main purpose of in-memory tables is for use in testing.
349    ///
350    /// ```
351    /// use deltalake_core::DeltaOps;
352    ///
353    /// let ops = DeltaOps::new_in_memory();
354    /// ```
355    #[must_use]
356    pub fn new_in_memory() -> Self {
357        let url = Url::parse("memory:///").unwrap();
358        DeltaTableBuilder::from_url(url)
359            .unwrap()
360            .build()
361            .unwrap()
362            .into()
363    }
364
365    /// Create a new Delta table
366    ///
367    /// ```
368    /// use deltalake_core::DeltaOps;
369    ///
370    /// async {
371    ///     let ops = DeltaOps::try_from_url(url::Url::parse("memory://").unwrap()).await.unwrap();
372    ///     let table = ops.create().with_table_name("my_table").await.unwrap();
373    ///     assert_eq!(table.version(), Some(0));
374    /// };
375    /// ```
376    #[must_use]
377    #[deprecated(note = "Use [`DeltaTable::create`] instead")]
378    pub fn create(self) -> CreateBuilder {
379        CreateBuilder::default().with_log_store(self.0.log_store)
380    }
381
382    /// Generate a symlink_format_manifest for other engines
383    #[deprecated(note = "Use [`DeltaTable::generate`] instead")]
384    pub fn generate(self) -> GenerateBuilder {
385        GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
386    }
387
388    /// Load data from a DeltaTable
389    #[cfg(feature = "datafusion")]
390    #[must_use]
391    #[deprecated(note = "Use [`DeltaTable::scan`] instead")]
392    pub fn load(self) -> LoadBuilder {
393        LoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
394    }
395
396    /// Load a table with CDF Enabled
397    #[cfg(feature = "datafusion")]
398    #[must_use]
399    #[deprecated(note = "Use [`DeltaTable::scan_cdf`] instead")]
400    pub fn load_cdf(self) -> CdfLoadBuilder {
401        CdfLoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
402    }
403
404    /// Write data to Delta table
405    #[cfg(feature = "datafusion")]
406    #[must_use]
407    #[deprecated(note = "Use [`DeltaTable::write`] instead")]
408    pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
409        WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
410            .with_input_batches(batches)
411    }
412
413    /// Vacuum stale files from delta table
414    #[must_use]
415    #[deprecated(note = "Use [`DeltaTable::vacuum`] instead")]
416    pub fn vacuum(self) -> VacuumBuilder {
417        VacuumBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
418    }
419
420    /// Audit and repair active files with files present on the filesystem
421    #[must_use]
422    #[deprecated(note = "Use [`DeltaTable::filesystem_check`] instead")]
423    pub fn filesystem_check(self) -> FileSystemCheckBuilder {
424        FileSystemCheckBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
425    }
426
427    /// Audit active files with files present on the filesystem
428    #[cfg(feature = "datafusion")]
429    #[must_use]
430    #[deprecated(note = "Use [`DeltaTable::optimize`] instead")]
431    pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
432        OptimizeBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
433    }
434
435    /// Delete data from Delta table
436    #[cfg(feature = "datafusion")]
437    #[must_use]
438    #[deprecated(note = "Use [`DeltaTable::delete`] instead")]
439    pub fn delete(self) -> DeleteBuilder {
440        DeleteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
441    }
442
443    /// Update data from Delta table
444    #[cfg(feature = "datafusion")]
445    #[must_use]
446    #[deprecated(note = "Use [`DeltaTable::update`] instead")]
447    pub fn update(self) -> UpdateBuilder {
448        UpdateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
449    }
450
451    /// Restore delta table to a specified version or datetime
452    #[must_use]
453    #[deprecated(note = "Use [`DeltaTable::restore`] instead")]
454    pub fn restore(self) -> RestoreBuilder {
455        RestoreBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
456    }
457
458    /// Update data from Delta table
459    #[cfg(feature = "datafusion")]
460    #[must_use]
461    #[deprecated(note = "Use [`DeltaTable::merge`] instead")]
462    pub fn merge<E: Into<Expression>>(
463        self,
464        source: datafusion::prelude::DataFrame,
465        predicate: E,
466    ) -> MergeBuilder {
467        MergeBuilder::new(
468            self.0.log_store,
469            self.0.state.map(|s| s.snapshot),
470            predicate.into(),
471            source,
472        )
473    }
474
475    /// Add a check constraint to a table
476    #[cfg(feature = "datafusion")]
477    #[must_use]
478    #[deprecated(note = "Use [`DeltaTable::add_constraint`] instead")]
479    pub fn add_constraint(self) -> ConstraintBuilder {
480        ConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
481    }
482
483    /// Enable a table feature for a table
484    #[must_use]
485    #[deprecated(note = "Use [`DeltaTable::add_feature`] instead")]
486    pub fn add_feature(self) -> AddTableFeatureBuilder {
487        AddTableFeatureBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
488    }
489
490    /// Drops constraints from a table
491    #[cfg(feature = "datafusion")]
492    #[must_use]
493    #[deprecated(note = "Use [`DeltaTable::drop_constraints`] instead")]
494    pub fn drop_constraints(self) -> DropConstraintBuilder {
495        DropConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
496    }
497
498    /// Set table properties
499    #[deprecated(note = "Use [`DeltaTable::set_tbl_properties`] instead")]
500    pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
501        SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
502    }
503
504    /// Add new columns
505    #[deprecated(note = "Use [`DeltaTable::add_columns`] instead")]
506    pub fn add_columns(self) -> AddColumnBuilder {
507        AddColumnBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
508    }
509
510    /// Update field metadata
511    #[deprecated(note = "Use [`DeltaTable::update_field_metadata`] instead")]
512    pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
513        UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
514    }
515
516    /// Update table metadata
517    #[deprecated(note = "Use [`DeltaTable::update_table_metadata`] instead")]
518    pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
519        UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
520    }
521}
522
523#[allow(deprecated)]
524impl From<DeltaTable> for DeltaOps {
525    fn from(table: DeltaTable) -> Self {
526        Self(table)
527    }
528}
529
530#[allow(deprecated)]
531impl From<DeltaOps> for DeltaTable {
532    fn from(ops: DeltaOps) -> Self {
533        ops.0
534    }
535}
536
537#[allow(deprecated)]
538impl AsRef<DeltaTable> for DeltaOps {
539    fn as_ref(&self) -> &DeltaTable {
540        &self.0
541    }
542}
543
544/// Get the num_idx_columns and stats_columns from the table configuration in the state
545/// If table_config does not exist (only can occur in the first write action) it takes
546/// the configuration that was passed to the writerBuilder.
547pub fn get_num_idx_cols_and_stats_columns(
548    config: Option<&TableProperties>,
549    configuration: HashMap<String, Option<String>>,
550) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
551    let (num_index_cols, stats_columns) = match &config {
552        Some(conf) => (
553            conf.num_indexed_cols(),
554            conf.data_skipping_stats_columns
555                .clone()
556                .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
557        ),
558        _ => (
559            configuration
560                .get("delta.dataSkippingNumIndexedCols")
561                .and_then(|v| {
562                    v.as_ref()
563                        .and_then(|vv| vv.parse::<u64>().ok())
564                        .map(DataSkippingNumIndexedCols::NumColumns)
565                })
566                .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
567                    DEFAULT_NUM_INDEX_COLS,
568                )),
569            configuration
570                .get("delta.dataSkippingStatsColumns")
571                .and_then(|v| {
572                    v.as_ref()
573                        .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
574                }),
575        ),
576    };
577    (
578        num_index_cols,
579        stats_columns
580            .clone()
581            .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
582    )
583}
584
585/// Get the target_file_size from the table configuration in the sates
586/// If table_config does not exist (only can occur in the first write action) it takes
587/// the configuration that was passed to the writerBuilder.
588#[cfg(feature = "datafusion")]
589pub(crate) fn get_target_file_size(
590    config: Option<&TableProperties>,
591    configuration: &HashMap<String, Option<String>>,
592) -> NonZeroU64 {
593    match &config {
594        Some(conf) => conf.target_file_size(),
595        _ => configuration
596            .get("delta.targetFileSize")
597            .and_then(|v| v.clone().and_then(|v| v.parse::<NonZeroU64>().ok()))
598            .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
599    }
600}