deltalake_core/operations/
mod.rs1use async_trait::async_trait;
10use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
11use std::collections::HashMap;
12use std::sync::Arc;
13use update_field_metadata::UpdateFieldMetadataBuilder;
14use uuid::Uuid;
15
16use add_feature::AddTableFeatureBuilder;
17#[cfg(feature = "datafusion")]
18use arrow_array::RecordBatch;
19#[cfg(feature = "datafusion")]
20pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
21
22use self::add_column::AddColumnBuilder;
23use self::create::CreateBuilder;
24use self::filesystem_check::FileSystemCheckBuilder;
25#[cfg(feature = "datafusion")]
26use self::optimize::OptimizeBuilder;
27use self::restore::RestoreBuilder;
28use self::set_tbl_properties::SetTablePropertiesBuilder;
29use self::update_table_metadata::UpdateTableMetadataBuilder;
30use self::vacuum::VacuumBuilder;
31#[cfg(feature = "datafusion")]
32use self::{
33 constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder,
34 drop_constraints::DropConstraintBuilder, load::LoadBuilder, load_cdf::CdfLoadBuilder,
35 merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder,
36};
37use crate::errors::{DeltaResult, DeltaTableError};
38use crate::logstore::LogStoreRef;
39use crate::table::builder::ensure_table_uri;
40use crate::table::builder::DeltaTableBuilder;
41use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS};
42use crate::DeltaTable;
43use url::Url;
44
45pub mod add_column;
46pub mod add_feature;
47pub mod convert_to_delta;
48pub mod create;
49pub mod drop_constraints;
50pub mod filesystem_check;
51pub mod restore;
52pub mod update_field_metadata;
53pub mod update_table_metadata;
54pub mod vacuum;
55
56#[cfg(feature = "datafusion")]
57mod cdc;
58#[cfg(feature = "datafusion")]
59pub mod constraints;
60#[cfg(feature = "datafusion")]
61pub mod delete;
62#[cfg(feature = "datafusion")]
63mod load;
64#[cfg(feature = "datafusion")]
65pub mod load_cdf;
66#[cfg(feature = "datafusion")]
67pub mod merge;
68#[cfg(feature = "datafusion")]
69pub mod optimize;
70pub mod set_tbl_properties;
71#[cfg(feature = "datafusion")]
72pub mod update;
73#[cfg(feature = "datafusion")]
74pub mod write;
75
76#[async_trait]
77pub trait CustomExecuteHandler: Send + Sync {
78 async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
80
81 async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
83
84 async fn before_post_commit_hook(
86 &self,
87 log_store: &LogStoreRef,
88 file_operation: bool,
89 operation_id: Uuid,
90 ) -> DeltaResult<()>;
91
92 async fn after_post_commit_hook(
94 &self,
95 log_store: &LogStoreRef,
96 file_operation: bool,
97 operation_id: Uuid,
98 ) -> DeltaResult<()>;
99}
100
101#[allow(unused)]
102pub(crate) trait Operation<State>: std::future::IntoFuture {
105 fn log_store(&self) -> &LogStoreRef;
106 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
107 async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
108 if let Some(handler) = self.get_custom_execute_handler() {
109 handler.pre_execute(self.log_store(), operation_id).await
110 } else {
111 Ok(())
112 }
113 }
114
115 async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
116 if let Some(handler) = self.get_custom_execute_handler() {
117 handler.post_execute(self.log_store(), operation_id).await
118 } else {
119 Ok(())
120 }
121 }
122
123 fn get_operation_id(&self) -> uuid::Uuid {
124 Uuid::new_v4()
125 }
126}
127
128pub struct DeltaOps(pub DeltaTable);
130
131impl DeltaOps {
132 pub async fn try_from_uri(uri: Url) -> DeltaResult<Self> {
144 let mut table = DeltaTableBuilder::from_uri(uri)?.build()?;
145 match table.load().await {
147 Ok(_) => Ok(table.into()),
148 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
149 Err(err) => Err(err),
150 }
151 }
152
153 #[deprecated(note = "Use try_from_uri with url::Url instead")]
163 pub async fn try_from_uri_str(uri: impl AsRef<str>) -> DeltaResult<Self> {
164 let url = ensure_table_uri(uri)?;
165 Self::try_from_uri(url).await
166 }
167
168 pub async fn try_from_uri_with_storage_options(
170 uri: Url,
171 storage_options: HashMap<String, String>,
172 ) -> DeltaResult<Self> {
173 let mut table = DeltaTableBuilder::from_uri(uri)?
174 .with_storage_options(storage_options)
175 .build()?;
176 match table.load().await {
178 Ok(_) => Ok(table.into()),
179 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
180 Err(err) => Err(err),
181 }
182 }
183
184 #[deprecated(note = "Use try_from_uri_with_storage_options with url::Url instead")]
186 pub async fn try_from_uri_str_with_storage_options(
187 uri: impl AsRef<str>,
188 storage_options: HashMap<String, String>,
189 ) -> DeltaResult<Self> {
190 let url = ensure_table_uri(uri)?;
191 Self::try_from_uri_with_storage_options(url, storage_options).await
192 }
193
194 #[must_use]
205 pub fn new_in_memory() -> Self {
206 let url = Url::parse("memory:///").unwrap();
207 DeltaTableBuilder::from_uri(url)
208 .unwrap()
209 .build()
210 .unwrap()
211 .into()
212 }
213
214 #[must_use]
226 pub fn create(self) -> CreateBuilder {
227 CreateBuilder::default().with_log_store(self.0.log_store)
228 }
229
230 #[cfg(feature = "datafusion")]
232 #[must_use]
233 pub fn load(self) -> LoadBuilder {
234 LoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
235 }
236
237 #[cfg(feature = "datafusion")]
239 #[must_use]
240 pub fn load_cdf(self) -> CdfLoadBuilder {
241 CdfLoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
242 }
243
244 #[cfg(feature = "datafusion")]
246 #[must_use]
247 pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
248 WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
249 .with_input_batches(batches)
250 }
251
252 #[must_use]
254 pub fn vacuum(self) -> VacuumBuilder {
255 VacuumBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
256 }
257
258 #[must_use]
260 pub fn filesystem_check(self) -> FileSystemCheckBuilder {
261 FileSystemCheckBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
262 }
263
264 #[cfg(feature = "datafusion")]
266 #[must_use]
267 pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
268 OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
269 }
270
271 #[cfg(feature = "datafusion")]
273 #[must_use]
274 pub fn delete(self) -> DeleteBuilder {
275 DeleteBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
276 }
277
278 #[cfg(feature = "datafusion")]
280 #[must_use]
281 pub fn update(self) -> UpdateBuilder {
282 UpdateBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
283 }
284
285 #[must_use]
287 pub fn restore(self) -> RestoreBuilder {
288 RestoreBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
289 }
290
291 #[cfg(feature = "datafusion")]
293 #[must_use]
294 pub fn merge<E: Into<Expression>>(
295 self,
296 source: datafusion::prelude::DataFrame,
297 predicate: E,
298 ) -> MergeBuilder {
299 MergeBuilder::new(
300 self.0.log_store,
301 self.0.state.unwrap().snapshot,
302 predicate.into(),
303 source,
304 )
305 }
306
307 #[cfg(feature = "datafusion")]
309 #[must_use]
310 pub fn add_constraint(self) -> ConstraintBuilder {
311 ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
312 }
313
314 #[must_use]
316 pub fn add_feature(self) -> AddTableFeatureBuilder {
317 AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
318 }
319
320 #[cfg(feature = "datafusion")]
322 #[must_use]
323 pub fn drop_constraints(self) -> DropConstraintBuilder {
324 DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
325 }
326
327 pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
329 SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
330 }
331
332 pub fn add_columns(self) -> AddColumnBuilder {
334 AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
335 }
336
337 pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
339 UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
340 }
341
342 pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
344 UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot)
345 }
346}
347
348impl From<DeltaTable> for DeltaOps {
349 fn from(table: DeltaTable) -> Self {
350 Self(table)
351 }
352}
353
354impl From<DeltaOps> for DeltaTable {
355 fn from(ops: DeltaOps) -> Self {
356 ops.0
357 }
358}
359
360impl AsRef<DeltaTable> for DeltaOps {
361 fn as_ref(&self) -> &DeltaTable {
362 &self.0
363 }
364}
365
366pub fn get_num_idx_cols_and_stats_columns(
370 config: Option<&TableProperties>,
371 configuration: HashMap<String, Option<String>>,
372) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
373 let (num_index_cols, stats_columns) = match &config {
374 Some(conf) => (
375 conf.num_indexed_cols(),
376 conf.data_skipping_stats_columns
377 .clone()
378 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
379 ),
380 _ => (
381 configuration
382 .get("delta.dataSkippingNumIndexedCols")
383 .and_then(|v| {
384 v.as_ref()
385 .and_then(|vv| vv.parse::<u64>().ok())
386 .map(DataSkippingNumIndexedCols::NumColumns)
387 })
388 .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
389 DEFAULT_NUM_INDEX_COLS,
390 )),
391 configuration
392 .get("delta.dataSkippingStatsColumns")
393 .and_then(|v| {
394 v.as_ref()
395 .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
396 }),
397 ),
398 };
399 (
400 num_index_cols,
401 stats_columns
402 .clone()
403 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
404 )
405}
406
407#[cfg(feature = "datafusion")]
411pub(crate) fn get_target_file_size(
412 config: Option<&TableProperties>,
413 configuration: &HashMap<String, Option<String>>,
414) -> u64 {
415 match &config {
416 Some(conf) => conf.target_file_size().get(),
417 _ => configuration
418 .get("delta.targetFileSize")
419 .and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
420 .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
421 }
422}
423
424#[cfg(feature = "datafusion")]
425mod datafusion_utils {
426 use datafusion::logical_expr::Expr;
427 use datafusion::{catalog::Session, common::DFSchema};
428
429 use crate::{delta_datafusion::expr::parse_predicate_expression, DeltaResult};
430
431 #[derive(Debug, Clone)]
433 pub enum Expression {
434 DataFusion(Expr),
436 String(String),
438 }
439
440 impl From<Expr> for Expression {
441 fn from(val: Expr) -> Self {
442 Expression::DataFusion(val)
443 }
444 }
445
446 impl From<&str> for Expression {
447 fn from(val: &str) -> Self {
448 Expression::String(val.to_string())
449 }
450 }
451 impl From<String> for Expression {
452 fn from(val: String) -> Self {
453 Expression::String(val)
454 }
455 }
456
457 pub(crate) fn into_expr(
458 expr: Expression,
459 schema: &DFSchema,
460 session: &dyn Session,
461 ) -> DeltaResult<Expr> {
462 match expr {
463 Expression::DataFusion(expr) => Ok(expr),
464 Expression::String(s) => parse_predicate_expression(schema, s, session),
465 }
466 }
467
468 pub(crate) fn maybe_into_expr(
469 expr: Option<Expression>,
470 schema: &DFSchema,
471 session: &dyn Session,
472 ) -> DeltaResult<Option<Expr>> {
473 Ok(match expr {
474 Some(predicate) => Some(into_expr(predicate, schema, session)?),
475 None => None,
476 })
477 }
478}