deltalake_core/operations/
mod.rs

1//! High level operations API to interact with Delta tables
2//!
3//! At the heart of the high level operations APIs is the [`DeltaOps`] struct,
4//! which consumes a [`DeltaTable`] and exposes methods to attain builders for
5//! several high level operations. The specific builder structs allow fine-tuning
6//! the operations' behaviors and will return an updated table potentially in conjunction
7//! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream],
8//! if the operation returns data as well.
9use async_trait::async_trait;
10use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
11use std::collections::HashMap;
12use std::sync::Arc;
13use update_field_metadata::UpdateFieldMetadataBuilder;
14use uuid::Uuid;
15
16use add_feature::AddTableFeatureBuilder;
17#[cfg(feature = "datafusion")]
18use arrow_array::RecordBatch;
19#[cfg(feature = "datafusion")]
20pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
21
22use self::add_column::AddColumnBuilder;
23use self::create::CreateBuilder;
24use self::filesystem_check::FileSystemCheckBuilder;
25#[cfg(feature = "datafusion")]
26use self::optimize::OptimizeBuilder;
27use self::restore::RestoreBuilder;
28use self::set_tbl_properties::SetTablePropertiesBuilder;
29use self::update_table_metadata::UpdateTableMetadataBuilder;
30use self::vacuum::VacuumBuilder;
31#[cfg(feature = "datafusion")]
32use self::{
33    constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder,
34    drop_constraints::DropConstraintBuilder, load::LoadBuilder, load_cdf::CdfLoadBuilder,
35    merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder,
36};
37use crate::errors::{DeltaResult, DeltaTableError};
38use crate::logstore::LogStoreRef;
39use crate::table::builder::ensure_table_uri;
40use crate::table::builder::DeltaTableBuilder;
41use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS};
42use crate::DeltaTable;
43use url::Url;
44
45pub mod add_column;
46pub mod add_feature;
47pub mod convert_to_delta;
48pub mod create;
49pub mod drop_constraints;
50pub mod filesystem_check;
51pub mod restore;
52pub mod update_field_metadata;
53pub mod update_table_metadata;
54pub mod vacuum;
55
56#[cfg(feature = "datafusion")]
57mod cdc;
58#[cfg(feature = "datafusion")]
59pub mod constraints;
60#[cfg(feature = "datafusion")]
61pub mod delete;
62#[cfg(feature = "datafusion")]
63mod load;
64#[cfg(feature = "datafusion")]
65pub mod load_cdf;
66#[cfg(feature = "datafusion")]
67pub mod merge;
68#[cfg(feature = "datafusion")]
69pub mod optimize;
70pub mod set_tbl_properties;
71#[cfg(feature = "datafusion")]
72pub mod update;
73#[cfg(feature = "datafusion")]
74pub mod write;
75
76#[async_trait]
77pub trait CustomExecuteHandler: Send + Sync {
78    // Execute arbitrary code at the start of a delta operation
79    async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
80
81    // Execute arbitrary code at the end of a delta operation
82    async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
83
84    // Execute arbitrary code at the start of the post commit hook
85    async fn before_post_commit_hook(
86        &self,
87        log_store: &LogStoreRef,
88        file_operation: bool,
89        operation_id: Uuid,
90    ) -> DeltaResult<()>;
91
92    // Execute arbitrary code at the end of the post commit hook
93    async fn after_post_commit_hook(
94        &self,
95        log_store: &LogStoreRef,
96        file_operation: bool,
97        operation_id: Uuid,
98    ) -> DeltaResult<()>;
99}
100
101#[allow(unused)]
102/// The [Operation] trait defines common behaviors that all operations builders
103/// should have consistent
104pub(crate) trait Operation<State>: std::future::IntoFuture {
105    fn log_store(&self) -> &LogStoreRef;
106    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
107    async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
108        if let Some(handler) = self.get_custom_execute_handler() {
109            handler.pre_execute(self.log_store(), operation_id).await
110        } else {
111            Ok(())
112        }
113    }
114
115    async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
116        if let Some(handler) = self.get_custom_execute_handler() {
117            handler.post_execute(self.log_store(), operation_id).await
118        } else {
119            Ok(())
120        }
121    }
122
123    fn get_operation_id(&self) -> uuid::Uuid {
124        Uuid::new_v4()
125    }
126}
127
128/// High level interface for executing commands against a DeltaTable
129pub struct DeltaOps(pub DeltaTable);
130
131impl DeltaOps {
132    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given URL.
133    ///
134    /// ```
135    /// use deltalake_core::DeltaOps;
136    /// use url::Url;
137    ///
138    /// async {
139    ///     let url = Url::parse("memory:///").unwrap();
140    ///     let ops = DeltaOps::try_from_uri(url).await.unwrap();
141    /// };
142    /// ```
143    pub async fn try_from_uri(uri: Url) -> DeltaResult<Self> {
144        let mut table = DeltaTableBuilder::from_uri(uri)?.build()?;
145        // We allow for uninitialized locations, since we may want to create the table
146        match table.load().await {
147            Ok(_) => Ok(table.into()),
148            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
149            Err(err) => Err(err),
150        }
151    }
152
153    /// Create a new [`DeltaOps`] instance, operating on [`DeltaTable`] at given uri string (deprecated).
154    ///
155    /// ```
156    /// use deltalake_core::DeltaOps;
157    ///
158    /// async {
159    ///     let ops = DeltaOps::try_from_uri_str("memory:///").await.unwrap();
160    /// };
161    /// ```
162    #[deprecated(note = "Use try_from_uri with url::Url instead")]
163    pub async fn try_from_uri_str(uri: impl AsRef<str>) -> DeltaResult<Self> {
164        let url = ensure_table_uri(uri)?;
165        Self::try_from_uri(url).await
166    }
167
168    /// Create a [`DeltaOps`] instance from URL with storage options
169    pub async fn try_from_uri_with_storage_options(
170        uri: Url,
171        storage_options: HashMap<String, String>,
172    ) -> DeltaResult<Self> {
173        let mut table = DeltaTableBuilder::from_uri(uri)?
174            .with_storage_options(storage_options)
175            .build()?;
176        // We allow for uninitialized locations, since we may want to create the table
177        match table.load().await {
178            Ok(_) => Ok(table.into()),
179            Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
180            Err(err) => Err(err),
181        }
182    }
183
184    /// Create a [`DeltaOps`] instance from uri string with storage options (deprecated)
185    #[deprecated(note = "Use try_from_uri_with_storage_options with url::Url instead")]
186    pub async fn try_from_uri_str_with_storage_options(
187        uri: impl AsRef<str>,
188        storage_options: HashMap<String, String>,
189    ) -> DeltaResult<Self> {
190        let url = ensure_table_uri(uri)?;
191        Self::try_from_uri_with_storage_options(url, storage_options).await
192    }
193
194    /// Create a new [`DeltaOps`] instance, backed by an un-initialized in memory table
195    ///
196    /// Using this will not persist any changes beyond the lifetime of the table object.
197    /// The main purpose of in-memory tables is for use in testing.
198    ///
199    /// ```
200    /// use deltalake_core::DeltaOps;
201    ///
202    /// let ops = DeltaOps::new_in_memory();
203    /// ```
204    #[must_use]
205    pub fn new_in_memory() -> Self {
206        let url = Url::parse("memory:///").unwrap();
207        DeltaTableBuilder::from_uri(url)
208            .unwrap()
209            .build()
210            .unwrap()
211            .into()
212    }
213
214    /// Create a new Delta table
215    ///
216    /// ```
217    /// use deltalake_core::DeltaOps;
218    ///
219    /// async {
220    ///     let ops = DeltaOps::try_from_uri(url::Url::parse("memory://").unwrap()).await.unwrap();
221    ///     let table = ops.create().with_table_name("my_table").await.unwrap();
222    ///     assert_eq!(table.version(), Some(0));
223    /// };
224    /// ```
225    #[must_use]
226    pub fn create(self) -> CreateBuilder {
227        CreateBuilder::default().with_log_store(self.0.log_store)
228    }
229
230    /// Load data from a DeltaTable
231    #[cfg(feature = "datafusion")]
232    #[must_use]
233    pub fn load(self) -> LoadBuilder {
234        LoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
235    }
236
237    /// Load a table with CDF Enabled
238    #[cfg(feature = "datafusion")]
239    #[must_use]
240    pub fn load_cdf(self) -> CdfLoadBuilder {
241        CdfLoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
242    }
243
244    /// Write data to Delta table
245    #[cfg(feature = "datafusion")]
246    #[must_use]
247    pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
248        WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
249            .with_input_batches(batches)
250    }
251
252    /// Vacuum stale files from delta table
253    #[must_use]
254    pub fn vacuum(self) -> VacuumBuilder {
255        VacuumBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
256    }
257
258    /// Audit active files with files present on the filesystem
259    #[must_use]
260    pub fn filesystem_check(self) -> FileSystemCheckBuilder {
261        FileSystemCheckBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
262    }
263
264    /// Audit active files with files present on the filesystem
265    #[cfg(feature = "datafusion")]
266    #[must_use]
267    pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
268        OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
269    }
270
271    /// Delete data from Delta table
272    #[cfg(feature = "datafusion")]
273    #[must_use]
274    pub fn delete(self) -> DeleteBuilder {
275        DeleteBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
276    }
277
278    /// Update data from Delta table
279    #[cfg(feature = "datafusion")]
280    #[must_use]
281    pub fn update(self) -> UpdateBuilder {
282        UpdateBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
283    }
284
285    /// Restore delta table to a specified version or datetime
286    #[must_use]
287    pub fn restore(self) -> RestoreBuilder {
288        RestoreBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
289    }
290
291    /// Update data from Delta table
292    #[cfg(feature = "datafusion")]
293    #[must_use]
294    pub fn merge<E: Into<Expression>>(
295        self,
296        source: datafusion::prelude::DataFrame,
297        predicate: E,
298    ) -> MergeBuilder {
299        MergeBuilder::new(
300            self.0.log_store,
301            self.0.state.unwrap().snapshot,
302            predicate.into(),
303            source,
304        )
305    }
306
307    /// Add a check constraint to a table
308    #[cfg(feature = "datafusion")]
309    #[must_use]
310    pub fn add_constraint(self) -> ConstraintBuilder {
311        ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
312    }
313
314    /// Enable a table feature for a table
315    #[must_use]
316    pub fn add_feature(self) -> AddTableFeatureBuilder {
317        AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
318    }
319
320    /// Drops constraints from a table
321    #[cfg(feature = "datafusion")]
322    #[must_use]
323    pub fn drop_constraints(self) -> DropConstraintBuilder {
324        DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
325    }
326
327    /// Set table properties
328    pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
329        SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
330    }
331
332    /// Add new columns
333    pub fn add_columns(self) -> AddColumnBuilder {
334        AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
335    }
336
337    /// Update field metadata
338    pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
339        UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
340    }
341
342    /// Update table metadata
343    pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
344        UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
345    }
346}
347
348impl From<DeltaTable> for DeltaOps {
349    fn from(table: DeltaTable) -> Self {
350        Self(table)
351    }
352}
353
354impl From<DeltaOps> for DeltaTable {
355    fn from(ops: DeltaOps) -> Self {
356        ops.0
357    }
358}
359
360impl AsRef<DeltaTable> for DeltaOps {
361    fn as_ref(&self) -> &DeltaTable {
362        &self.0
363    }
364}
365
366/// Get the num_idx_columns and stats_columns from the table configuration in the state
367/// If table_config does not exist (only can occur in the first write action) it takes
368/// the configuration that was passed to the writerBuilder.
369pub fn get_num_idx_cols_and_stats_columns(
370    config: Option<&TableProperties>,
371    configuration: HashMap<String, Option<String>>,
372) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
373    let (num_index_cols, stats_columns) = match &config {
374        Some(conf) => (
375            conf.num_indexed_cols(),
376            conf.data_skipping_stats_columns
377                .clone()
378                .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
379        ),
380        _ => (
381            configuration
382                .get("delta.dataSkippingNumIndexedCols")
383                .and_then(|v| {
384                    v.as_ref()
385                        .and_then(|vv| vv.parse::<u64>().ok())
386                        .map(DataSkippingNumIndexedCols::NumColumns)
387                })
388                .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
389                    DEFAULT_NUM_INDEX_COLS,
390                )),
391            configuration
392                .get("delta.dataSkippingStatsColumns")
393                .and_then(|v| {
394                    v.as_ref()
395                        .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
396                }),
397        ),
398    };
399    (
400        num_index_cols,
401        stats_columns
402            .clone()
403            .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
404    )
405}
406
407/// Get the target_file_size from the table configuration in the sates
408/// If table_config does not exist (only can occur in the first write action) it takes
409/// the configuration that was passed to the writerBuilder.
410#[cfg(feature = "datafusion")]
411pub(crate) fn get_target_file_size(
412    config: Option<&TableProperties>,
413    configuration: &HashMap<String, Option<String>>,
414) -> u64 {
415    match &config {
416        Some(conf) => conf.target_file_size().get(),
417        _ => configuration
418            .get("delta.targetFileSize")
419            .and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
420            .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
421    }
422}
423
424#[cfg(feature = "datafusion")]
425mod datafusion_utils {
426    use datafusion::logical_expr::Expr;
427    use datafusion::{catalog::Session, common::DFSchema};
428
429    use crate::{delta_datafusion::expr::parse_predicate_expression, DeltaResult};
430
431    /// Used to represent user input of either a Datafusion expression or string expression
432    #[derive(Debug, Clone)]
433    pub enum Expression {
434        /// Datafusion Expression
435        DataFusion(Expr),
436        /// String Expression
437        String(String),
438    }
439
440    impl From<Expr> for Expression {
441        fn from(val: Expr) -> Self {
442            Expression::DataFusion(val)
443        }
444    }
445
446    impl From<&str> for Expression {
447        fn from(val: &str) -> Self {
448            Expression::String(val.to_string())
449        }
450    }
451    impl From<String> for Expression {
452        fn from(val: String) -> Self {
453            Expression::String(val)
454        }
455    }
456
457    pub(crate) fn into_expr(
458        expr: Expression,
459        schema: &DFSchema,
460        session: &dyn Session,
461    ) -> DeltaResult<Expr> {
462        match expr {
463            Expression::DataFusion(expr) => Ok(expr),
464            Expression::String(s) => parse_predicate_expression(schema, s, session),
465        }
466    }
467
468    pub(crate) fn maybe_into_expr(
469        expr: Option<Expression>,
470        schema: &DFSchema,
471        session: &dyn Session,
472    ) -> DeltaResult<Option<Expr>> {
473        Ok(match expr {
474            Some(predicate) => Some(into_expr(predicate, schema, session)?),
475            None => None,
476        })
477    }
478}