deltalake_core/operations/
mod.rs1use std::collections::HashMap;
10use std::sync::Arc;
11
12#[cfg(feature = "datafusion")]
13use arrow::array::RecordBatch;
14use async_trait::async_trait;
15#[cfg(feature = "datafusion")]
16pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
17use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
18use url::Url;
19use uuid::Uuid;
20
21use self::{
22 add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder,
23 filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder,
24 set_tbl_properties::SetTablePropertiesBuilder,
25 update_field_metadata::UpdateFieldMetadataBuilder,
26 update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder,
27};
28#[cfg(feature = "datafusion")]
29use self::{
30 constraints::ConstraintBuilder, delete::DeleteBuilder, drop_constraints::DropConstraintBuilder,
31 load::LoadBuilder, load_cdf::CdfLoadBuilder, merge::MergeBuilder, optimize::OptimizeBuilder,
32 update::UpdateBuilder, write::WriteBuilder,
33};
34use crate::DeltaTable;
35#[cfg(feature = "datafusion")]
36use crate::delta_datafusion::Expression;
37use crate::errors::{DeltaResult, DeltaTableError};
38use crate::logstore::LogStoreRef;
39use crate::operations::generate::GenerateBuilder;
40use crate::table::builder::DeltaTableBuilder;
41use crate::table::config::{DEFAULT_NUM_INDEX_COLS, TablePropertiesExt as _};
42
43pub mod add_column;
44pub mod add_feature;
45pub mod convert_to_delta;
46pub mod create;
47pub mod drop_constraints;
48pub mod filesystem_check;
49pub mod generate;
50pub mod restore;
51pub mod update_field_metadata;
52pub mod update_table_metadata;
53pub mod vacuum;
54
55#[cfg(feature = "datafusion")]
56mod cdc;
57#[cfg(feature = "datafusion")]
58pub mod constraints;
59#[cfg(feature = "datafusion")]
60pub mod delete;
61#[cfg(feature = "datafusion")]
62mod load;
63#[cfg(feature = "datafusion")]
64pub mod load_cdf;
65#[cfg(feature = "datafusion")]
66pub mod merge;
67#[cfg(feature = "datafusion")]
68pub mod optimize;
69pub mod set_tbl_properties;
70#[cfg(feature = "datafusion")]
71pub mod update;
72#[cfg(feature = "datafusion")]
73pub mod write;
74
75#[cfg(all(test, feature = "datafusion"))]
76mod session_fallback_policy_tests;
77
78impl DeltaTable {
79 pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
91 let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
92 match table.load().await {
94 Ok(_) => Ok(table),
95 Err(DeltaTableError::NotATable(_)) => Ok(table),
96 Err(err) => Err(err),
97 }
98 }
99
100 pub async fn try_from_url_with_storage_options(
102 uri: Url,
103 storage_options: HashMap<String, String>,
104 ) -> DeltaResult<Self> {
105 let mut table = DeltaTableBuilder::from_url(uri)?
106 .with_storage_options(storage_options)
107 .build()?;
108 match table.load().await {
110 Ok(_) => Ok(table),
111 Err(DeltaTableError::NotATable(_)) => Ok(table),
112 Err(err) => Err(err),
113 }
114 }
115
116 #[must_use]
117 pub fn create(&self) -> CreateBuilder {
118 CreateBuilder::default().with_log_store(self.log_store())
119 }
120
121 #[must_use]
122 pub fn restore(self) -> RestoreBuilder {
123 RestoreBuilder::new(
124 self.log_store(),
125 self.state.clone().map(|state| state.snapshot),
126 )
127 }
128
129 #[must_use]
131 pub fn vacuum(self) -> VacuumBuilder {
132 VacuumBuilder::new(
133 self.log_store(),
134 self.state.clone().map(|state| state.snapshot),
135 )
136 }
137
138 #[must_use]
140 pub fn filesystem_check(self) -> FileSystemCheckBuilder {
141 FileSystemCheckBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
142 }
143
144 #[must_use]
146 pub fn add_feature(self) -> AddTableFeatureBuilder {
147 AddTableFeatureBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
148 }
149
150 #[must_use]
152 pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
153 SetTablePropertiesBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
154 }
155
156 #[must_use]
158 pub fn add_columns(self) -> AddColumnBuilder {
159 AddColumnBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
160 }
161
162 #[must_use]
164 pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
165 UpdateFieldMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
166 }
167
168 #[must_use]
170 pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
171 UpdateTableMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
172 }
173
174 pub fn generate(self) -> GenerateBuilder {
176 GenerateBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
177 }
178}
179
180#[cfg(feature = "datafusion")]
181impl DeltaTable {
182 #[must_use]
183 pub fn scan_table(&self) -> LoadBuilder {
184 LoadBuilder::new(
185 self.log_store(),
186 self.state.clone().map(|state| state.snapshot),
187 )
188 }
189
190 #[must_use]
192 pub fn scan_cdf(self) -> CdfLoadBuilder {
193 CdfLoadBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
194 }
195
196 #[must_use]
197 pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
198 WriteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
199 .with_input_batches(batches)
200 }
201
202 #[must_use]
204 pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
205 OptimizeBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
206 }
207
208 #[must_use]
210 pub fn delete(self) -> DeleteBuilder {
211 DeleteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
212 }
213
214 #[must_use]
216 pub fn update(self) -> UpdateBuilder {
217 UpdateBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
218 }
219
220 #[must_use]
222 pub fn merge<E: Into<Expression>>(
223 self,
224 source: datafusion::prelude::DataFrame,
225 predicate: E,
226 ) -> MergeBuilder {
227 MergeBuilder::new(
228 self.log_store(),
229 self.state.clone().map(|s| s.snapshot),
230 predicate.into(),
231 source,
232 )
233 }
234
235 #[must_use]
237 pub fn add_constraint(self) -> ConstraintBuilder {
238 ConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
239 }
240
241 #[must_use]
243 pub fn drop_constraints(self) -> DropConstraintBuilder {
244 DropConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
245 }
246}
247
248#[async_trait]
249pub trait CustomExecuteHandler: Send + Sync {
250 async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
252
253 async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
255
256 async fn before_post_commit_hook(
258 &self,
259 log_store: &LogStoreRef,
260 file_operation: bool,
261 operation_id: Uuid,
262 ) -> DeltaResult<()>;
263
264 async fn after_post_commit_hook(
266 &self,
267 log_store: &LogStoreRef,
268 file_operation: bool,
269 operation_id: Uuid,
270 ) -> DeltaResult<()>;
271}
272
273#[allow(unused)]
274pub(crate) trait Operation: std::future::IntoFuture {
277 fn log_store(&self) -> &LogStoreRef;
278 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
279 async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
280 if let Some(handler) = self.get_custom_execute_handler() {
281 handler.pre_execute(self.log_store(), operation_id).await
282 } else {
283 Ok(())
284 }
285 }
286
287 async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
288 if let Some(handler) = self.get_custom_execute_handler() {
289 handler.post_execute(self.log_store(), operation_id).await
290 } else {
291 Ok(())
292 }
293 }
294
295 fn get_operation_id(&self) -> uuid::Uuid {
296 Uuid::new_v4()
297 }
298}
299
300#[deprecated(note = "Use methods directly on DeltaTable instead, e.g. `delta_table.create()`")]
302pub struct DeltaOps(pub DeltaTable);
303
304#[allow(deprecated)]
305impl DeltaOps {
306 pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
318 let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
319 match table.load().await {
321 Ok(_) => Ok(table.into()),
322 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
323 Err(err) => Err(err),
324 }
325 }
326
327 pub async fn try_from_url_with_storage_options(
329 uri: Url,
330 storage_options: HashMap<String, String>,
331 ) -> DeltaResult<Self> {
332 let mut table = DeltaTableBuilder::from_url(uri)?
333 .with_storage_options(storage_options)
334 .build()?;
335 match table.load().await {
337 Ok(_) => Ok(table.into()),
338 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
339 Err(err) => Err(err),
340 }
341 }
342
343 #[must_use]
354 pub fn new_in_memory() -> Self {
355 let url = Url::parse("memory:///").unwrap();
356 DeltaTableBuilder::from_url(url)
357 .unwrap()
358 .build()
359 .unwrap()
360 .into()
361 }
362
363 #[must_use]
375 #[deprecated(note = "Use [`DeltaTable::create`] instead")]
376 pub fn create(self) -> CreateBuilder {
377 CreateBuilder::default().with_log_store(self.0.log_store)
378 }
379
380 #[deprecated(note = "Use [`DeltaTable::generate`] instead")]
382 pub fn generate(self) -> GenerateBuilder {
383 GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
384 }
385
386 #[cfg(feature = "datafusion")]
388 #[must_use]
389 #[deprecated(note = "Use [`DeltaTable::scan`] instead")]
390 pub fn load(self) -> LoadBuilder {
391 LoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
392 }
393
394 #[cfg(feature = "datafusion")]
396 #[must_use]
397 #[deprecated(note = "Use [`DeltaTable::scan_cdf`] instead")]
398 pub fn load_cdf(self) -> CdfLoadBuilder {
399 CdfLoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
400 }
401
402 #[cfg(feature = "datafusion")]
404 #[must_use]
405 #[deprecated(note = "Use [`DeltaTable::write`] instead")]
406 pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
407 WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
408 .with_input_batches(batches)
409 }
410
411 #[must_use]
413 #[deprecated(note = "Use [`DeltaTable::vacuum`] instead")]
414 pub fn vacuum(self) -> VacuumBuilder {
415 VacuumBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
416 }
417
418 #[must_use]
420 #[deprecated(note = "Use [`DeltaTable::filesystem_check`] instead")]
421 pub fn filesystem_check(self) -> FileSystemCheckBuilder {
422 FileSystemCheckBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
423 }
424
425 #[cfg(feature = "datafusion")]
427 #[must_use]
428 #[deprecated(note = "Use [`DeltaTable::optimize`] instead")]
429 pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
430 OptimizeBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
431 }
432
433 #[cfg(feature = "datafusion")]
435 #[must_use]
436 #[deprecated(note = "Use [`DeltaTable::delete`] instead")]
437 pub fn delete(self) -> DeleteBuilder {
438 DeleteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
439 }
440
441 #[cfg(feature = "datafusion")]
443 #[must_use]
444 #[deprecated(note = "Use [`DeltaTable::update`] instead")]
445 pub fn update(self) -> UpdateBuilder {
446 UpdateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
447 }
448
449 #[must_use]
451 #[deprecated(note = "Use [`DeltaTable::restore`] instead")]
452 pub fn restore(self) -> RestoreBuilder {
453 RestoreBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
454 }
455
456 #[cfg(feature = "datafusion")]
458 #[must_use]
459 #[deprecated(note = "Use [`DeltaTable::merge`] instead")]
460 pub fn merge<E: Into<Expression>>(
461 self,
462 source: datafusion::prelude::DataFrame,
463 predicate: E,
464 ) -> MergeBuilder {
465 MergeBuilder::new(
466 self.0.log_store,
467 self.0.state.map(|s| s.snapshot),
468 predicate.into(),
469 source,
470 )
471 }
472
473 #[cfg(feature = "datafusion")]
475 #[must_use]
476 #[deprecated(note = "Use [`DeltaTable::add_constraint`] instead")]
477 pub fn add_constraint(self) -> ConstraintBuilder {
478 ConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
479 }
480
481 #[must_use]
483 #[deprecated(note = "Use [`DeltaTable::add_feature`] instead")]
484 pub fn add_feature(self) -> AddTableFeatureBuilder {
485 AddTableFeatureBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
486 }
487
488 #[cfg(feature = "datafusion")]
490 #[must_use]
491 #[deprecated(note = "Use [`DeltaTable::drop_constraints`] instead")]
492 pub fn drop_constraints(self) -> DropConstraintBuilder {
493 DropConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
494 }
495
496 #[deprecated(note = "Use [`DeltaTable::set_tbl_properties`] instead")]
498 pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
499 SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
500 }
501
502 #[deprecated(note = "Use [`DeltaTable::add_columns`] instead")]
504 pub fn add_columns(self) -> AddColumnBuilder {
505 AddColumnBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
506 }
507
508 #[deprecated(note = "Use [`DeltaTable::update_field_metadata`] instead")]
510 pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
511 UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
512 }
513
514 #[deprecated(note = "Use [`DeltaTable::update_table_metadata`] instead")]
516 pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
517 UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
518 }
519}
520
521#[allow(deprecated)]
522impl From<DeltaTable> for DeltaOps {
523 fn from(table: DeltaTable) -> Self {
524 Self(table)
525 }
526}
527
528#[allow(deprecated)]
529impl From<DeltaOps> for DeltaTable {
530 fn from(ops: DeltaOps) -> Self {
531 ops.0
532 }
533}
534
535#[allow(deprecated)]
536impl AsRef<DeltaTable> for DeltaOps {
537 fn as_ref(&self) -> &DeltaTable {
538 &self.0
539 }
540}
541
542pub fn get_num_idx_cols_and_stats_columns(
546 config: Option<&TableProperties>,
547 configuration: HashMap<String, Option<String>>,
548) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
549 let (num_index_cols, stats_columns) = match &config {
550 Some(conf) => (
551 conf.num_indexed_cols(),
552 conf.data_skipping_stats_columns
553 .clone()
554 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
555 ),
556 _ => (
557 configuration
558 .get("delta.dataSkippingNumIndexedCols")
559 .and_then(|v| {
560 v.as_ref()
561 .and_then(|vv| vv.parse::<u64>().ok())
562 .map(DataSkippingNumIndexedCols::NumColumns)
563 })
564 .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
565 DEFAULT_NUM_INDEX_COLS,
566 )),
567 configuration
568 .get("delta.dataSkippingStatsColumns")
569 .and_then(|v| {
570 v.as_ref()
571 .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
572 }),
573 ),
574 };
575 (
576 num_index_cols,
577 stats_columns
578 .clone()
579 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
580 )
581}
582
583#[cfg(feature = "datafusion")]
587pub(crate) fn get_target_file_size(
588 config: Option<&TableProperties>,
589 configuration: &HashMap<String, Option<String>>,
590) -> u64 {
591 match &config {
592 Some(conf) => conf.target_file_size().get(),
593 _ => configuration
594 .get("delta.targetFileSize")
595 .and_then(|v| v.clone().map(|v| v.parse::<u64>().unwrap()))
596 .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
597 }
598}