deltalake_core/operations/
mod.rs1use std::collections::HashMap;
10#[cfg(feature = "datafusion")]
11use std::num::NonZeroU64;
12use std::sync::Arc;
13
14#[cfg(feature = "datafusion")]
15use arrow::array::RecordBatch;
16use async_trait::async_trait;
17#[cfg(feature = "datafusion")]
18pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
19use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
20use url::Url;
21use uuid::Uuid;
22
23use self::{
24 add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder,
25 filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder,
26 set_tbl_properties::SetTablePropertiesBuilder,
27 update_field_metadata::UpdateFieldMetadataBuilder,
28 update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder,
29};
30#[cfg(feature = "datafusion")]
31use self::{
32 constraints::ConstraintBuilder, delete::DeleteBuilder, drop_constraints::DropConstraintBuilder,
33 load::LoadBuilder, load_cdf::CdfLoadBuilder, merge::MergeBuilder, optimize::OptimizeBuilder,
34 update::UpdateBuilder, write::WriteBuilder,
35};
36use crate::DeltaTable;
37#[cfg(feature = "datafusion")]
38use crate::delta_datafusion::Expression;
39use crate::errors::{DeltaResult, DeltaTableError};
40use crate::logstore::LogStoreRef;
41use crate::operations::generate::GenerateBuilder;
42use crate::table::builder::DeltaTableBuilder;
43use crate::table::config::{DEFAULT_NUM_INDEX_COLS, TablePropertiesExt as _};
44
45pub mod add_column;
46pub mod add_feature;
47pub mod convert_to_delta;
48pub mod create;
49pub mod drop_constraints;
50pub mod filesystem_check;
51pub mod generate;
52pub mod restore;
53pub mod update_field_metadata;
54pub mod update_table_metadata;
55pub mod vacuum;
56
57#[cfg(feature = "datafusion")]
58mod cdc;
59#[cfg(feature = "datafusion")]
60pub mod constraints;
61#[cfg(feature = "datafusion")]
62pub mod delete;
63#[cfg(feature = "datafusion")]
64mod load;
65#[cfg(feature = "datafusion")]
66pub mod load_cdf;
67#[cfg(feature = "datafusion")]
68pub mod merge;
69#[cfg(feature = "datafusion")]
70pub mod optimize;
71pub mod set_tbl_properties;
72#[cfg(feature = "datafusion")]
73pub mod update;
74#[cfg(feature = "datafusion")]
75pub mod write;
76
77#[cfg(all(test, feature = "datafusion"))]
78mod session_fallback_policy_tests;
79
80impl DeltaTable {
81 pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
93 let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
94 match table.load().await {
96 Ok(_) => Ok(table),
97 Err(DeltaTableError::NotATable(_)) => Ok(table),
98 Err(err) => Err(err),
99 }
100 }
101
102 pub async fn try_from_url_with_storage_options(
104 uri: Url,
105 storage_options: HashMap<String, String>,
106 ) -> DeltaResult<Self> {
107 let mut table = DeltaTableBuilder::from_url(uri)?
108 .with_storage_options(storage_options)
109 .build()?;
110 match table.load().await {
112 Ok(_) => Ok(table),
113 Err(DeltaTableError::NotATable(_)) => Ok(table),
114 Err(err) => Err(err),
115 }
116 }
117
118 #[must_use]
119 pub fn create(&self) -> CreateBuilder {
120 CreateBuilder::default().with_log_store(self.log_store())
121 }
122
123 #[must_use]
124 pub fn restore(self) -> RestoreBuilder {
125 RestoreBuilder::new(
126 self.log_store(),
127 self.state.clone().map(|state| state.snapshot),
128 )
129 }
130
131 #[must_use]
133 pub fn vacuum(self) -> VacuumBuilder {
134 VacuumBuilder::new(
135 self.log_store(),
136 self.state.clone().map(|state| state.snapshot),
137 )
138 }
139
140 #[must_use]
142 pub fn filesystem_check(self) -> FileSystemCheckBuilder {
143 FileSystemCheckBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
144 }
145
146 #[must_use]
148 pub fn add_feature(self) -> AddTableFeatureBuilder {
149 AddTableFeatureBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
150 }
151
152 #[must_use]
154 pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
155 SetTablePropertiesBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
156 }
157
158 #[must_use]
160 pub fn add_columns(self) -> AddColumnBuilder {
161 AddColumnBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
162 }
163
164 #[must_use]
166 pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
167 UpdateFieldMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
168 }
169
170 #[must_use]
172 pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
173 UpdateTableMetadataBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
174 }
175
176 pub fn generate(self) -> GenerateBuilder {
178 GenerateBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
179 }
180}
181
182#[cfg(feature = "datafusion")]
183impl DeltaTable {
184 #[must_use]
185 pub fn scan_table(&self) -> LoadBuilder {
186 LoadBuilder::new(
187 self.log_store(),
188 self.state.clone().map(|state| state.snapshot),
189 )
190 }
191
192 #[must_use]
194 pub fn scan_cdf(self) -> CdfLoadBuilder {
195 CdfLoadBuilder::new(self.log_store(), self.state.map(|s| s.snapshot))
196 }
197
198 #[must_use]
199 pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
200 WriteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
201 .with_input_batches(batches)
202 }
203
204 #[must_use]
206 pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
207 OptimizeBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
208 }
209
210 #[must_use]
212 pub fn delete(self) -> DeleteBuilder {
213 DeleteBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
214 }
215
216 #[must_use]
218 pub fn update(self) -> UpdateBuilder {
219 UpdateBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
220 }
221
222 #[must_use]
224 pub fn merge<E: Into<Expression>>(
225 self,
226 source: datafusion::prelude::DataFrame,
227 predicate: E,
228 ) -> MergeBuilder {
229 MergeBuilder::new(
230 self.log_store(),
231 self.state.clone().map(|s| s.snapshot),
232 predicate.into(),
233 source,
234 )
235 }
236
237 #[must_use]
239 pub fn add_constraint(self) -> ConstraintBuilder {
240 ConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
241 }
242
243 #[must_use]
245 pub fn drop_constraints(self) -> DropConstraintBuilder {
246 DropConstraintBuilder::new(self.log_store(), self.state.clone().map(|s| s.snapshot))
247 }
248}
249
250#[async_trait]
251pub trait CustomExecuteHandler: Send + Sync {
252 async fn pre_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
254
255 async fn post_execute(&self, log_store: &LogStoreRef, operation_id: Uuid) -> DeltaResult<()>;
257
258 async fn before_post_commit_hook(
260 &self,
261 log_store: &LogStoreRef,
262 file_operation: bool,
263 operation_id: Uuid,
264 ) -> DeltaResult<()>;
265
266 async fn after_post_commit_hook(
268 &self,
269 log_store: &LogStoreRef,
270 file_operation: bool,
271 operation_id: Uuid,
272 ) -> DeltaResult<()>;
273}
274
275#[allow(unused)]
276pub(crate) trait Operation: std::future::IntoFuture {
279 fn log_store(&self) -> &LogStoreRef;
280 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>>;
281 async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
282 if let Some(handler) = self.get_custom_execute_handler() {
283 handler.pre_execute(self.log_store(), operation_id).await
284 } else {
285 Ok(())
286 }
287 }
288
289 async fn post_execute(&self, operation_id: Uuid) -> DeltaResult<()> {
290 if let Some(handler) = self.get_custom_execute_handler() {
291 handler.post_execute(self.log_store(), operation_id).await
292 } else {
293 Ok(())
294 }
295 }
296
297 fn get_operation_id(&self) -> uuid::Uuid {
298 Uuid::new_v4()
299 }
300}
301
302#[deprecated(note = "Use methods directly on DeltaTable instead, e.g. `delta_table.create()`")]
304pub struct DeltaOps(pub DeltaTable);
305
306#[allow(deprecated)]
307impl DeltaOps {
308 pub async fn try_from_url(uri: Url) -> DeltaResult<Self> {
320 let mut table = DeltaTableBuilder::from_url(uri)?.build()?;
321 match table.load().await {
323 Ok(_) => Ok(table.into()),
324 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
325 Err(err) => Err(err),
326 }
327 }
328
329 pub async fn try_from_url_with_storage_options(
331 uri: Url,
332 storage_options: HashMap<String, String>,
333 ) -> DeltaResult<Self> {
334 let mut table = DeltaTableBuilder::from_url(uri)?
335 .with_storage_options(storage_options)
336 .build()?;
337 match table.load().await {
339 Ok(_) => Ok(table.into()),
340 Err(DeltaTableError::NotATable(_)) => Ok(table.into()),
341 Err(err) => Err(err),
342 }
343 }
344
345 #[must_use]
356 pub fn new_in_memory() -> Self {
357 let url = Url::parse("memory:///").unwrap();
358 DeltaTableBuilder::from_url(url)
359 .unwrap()
360 .build()
361 .unwrap()
362 .into()
363 }
364
365 #[must_use]
377 #[deprecated(note = "Use [`DeltaTable::create`] instead")]
378 pub fn create(self) -> CreateBuilder {
379 CreateBuilder::default().with_log_store(self.0.log_store)
380 }
381
382 #[deprecated(note = "Use [`DeltaTable::generate`] instead")]
384 pub fn generate(self) -> GenerateBuilder {
385 GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
386 }
387
388 #[cfg(feature = "datafusion")]
390 #[must_use]
391 #[deprecated(note = "Use [`DeltaTable::scan`] instead")]
392 pub fn load(self) -> LoadBuilder {
393 LoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
394 }
395
396 #[cfg(feature = "datafusion")]
398 #[must_use]
399 #[deprecated(note = "Use [`DeltaTable::scan_cdf`] instead")]
400 pub fn load_cdf(self) -> CdfLoadBuilder {
401 CdfLoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
402 }
403
404 #[cfg(feature = "datafusion")]
406 #[must_use]
407 #[deprecated(note = "Use [`DeltaTable::write`] instead")]
408 pub fn write(self, batches: impl IntoIterator<Item = RecordBatch>) -> WriteBuilder {
409 WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
410 .with_input_batches(batches)
411 }
412
413 #[must_use]
415 #[deprecated(note = "Use [`DeltaTable::vacuum`] instead")]
416 pub fn vacuum(self) -> VacuumBuilder {
417 VacuumBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
418 }
419
420 #[must_use]
422 #[deprecated(note = "Use [`DeltaTable::filesystem_check`] instead")]
423 pub fn filesystem_check(self) -> FileSystemCheckBuilder {
424 FileSystemCheckBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
425 }
426
427 #[cfg(feature = "datafusion")]
429 #[must_use]
430 #[deprecated(note = "Use [`DeltaTable::optimize`] instead")]
431 pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
432 OptimizeBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
433 }
434
435 #[cfg(feature = "datafusion")]
437 #[must_use]
438 #[deprecated(note = "Use [`DeltaTable::delete`] instead")]
439 pub fn delete(self) -> DeleteBuilder {
440 DeleteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
441 }
442
443 #[cfg(feature = "datafusion")]
445 #[must_use]
446 #[deprecated(note = "Use [`DeltaTable::update`] instead")]
447 pub fn update(self) -> UpdateBuilder {
448 UpdateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
449 }
450
451 #[must_use]
453 #[deprecated(note = "Use [`DeltaTable::restore`] instead")]
454 pub fn restore(self) -> RestoreBuilder {
455 RestoreBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
456 }
457
458 #[cfg(feature = "datafusion")]
460 #[must_use]
461 #[deprecated(note = "Use [`DeltaTable::merge`] instead")]
462 pub fn merge<E: Into<Expression>>(
463 self,
464 source: datafusion::prelude::DataFrame,
465 predicate: E,
466 ) -> MergeBuilder {
467 MergeBuilder::new(
468 self.0.log_store,
469 self.0.state.map(|s| s.snapshot),
470 predicate.into(),
471 source,
472 )
473 }
474
475 #[cfg(feature = "datafusion")]
477 #[must_use]
478 #[deprecated(note = "Use [`DeltaTable::add_constraint`] instead")]
479 pub fn add_constraint(self) -> ConstraintBuilder {
480 ConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
481 }
482
483 #[must_use]
485 #[deprecated(note = "Use [`DeltaTable::add_feature`] instead")]
486 pub fn add_feature(self) -> AddTableFeatureBuilder {
487 AddTableFeatureBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
488 }
489
490 #[cfg(feature = "datafusion")]
492 #[must_use]
493 #[deprecated(note = "Use [`DeltaTable::drop_constraints`] instead")]
494 pub fn drop_constraints(self) -> DropConstraintBuilder {
495 DropConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
496 }
497
498 #[deprecated(note = "Use [`DeltaTable::set_tbl_properties`] instead")]
500 pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder {
501 SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
502 }
503
504 #[deprecated(note = "Use [`DeltaTable::add_columns`] instead")]
506 pub fn add_columns(self) -> AddColumnBuilder {
507 AddColumnBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
508 }
509
510 #[deprecated(note = "Use [`DeltaTable::update_field_metadata`] instead")]
512 pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder {
513 UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
514 }
515
516 #[deprecated(note = "Use [`DeltaTable::update_table_metadata`] instead")]
518 pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder {
519 UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot))
520 }
521}
522
523#[allow(deprecated)]
524impl From<DeltaTable> for DeltaOps {
525 fn from(table: DeltaTable) -> Self {
526 Self(table)
527 }
528}
529
530#[allow(deprecated)]
531impl From<DeltaOps> for DeltaTable {
532 fn from(ops: DeltaOps) -> Self {
533 ops.0
534 }
535}
536
537#[allow(deprecated)]
538impl AsRef<DeltaTable> for DeltaOps {
539 fn as_ref(&self) -> &DeltaTable {
540 &self.0
541 }
542}
543
544pub fn get_num_idx_cols_and_stats_columns(
548 config: Option<&TableProperties>,
549 configuration: HashMap<String, Option<String>>,
550) -> (DataSkippingNumIndexedCols, Option<Vec<String>>) {
551 let (num_index_cols, stats_columns) = match &config {
552 Some(conf) => (
553 conf.num_indexed_cols(),
554 conf.data_skipping_stats_columns
555 .clone()
556 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
557 ),
558 _ => (
559 configuration
560 .get("delta.dataSkippingNumIndexedCols")
561 .and_then(|v| {
562 v.as_ref()
563 .and_then(|vv| vv.parse::<u64>().ok())
564 .map(DataSkippingNumIndexedCols::NumColumns)
565 })
566 .unwrap_or(DataSkippingNumIndexedCols::NumColumns(
567 DEFAULT_NUM_INDEX_COLS,
568 )),
569 configuration
570 .get("delta.dataSkippingStatsColumns")
571 .and_then(|v| {
572 v.as_ref()
573 .map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<String>>())
574 }),
575 ),
576 };
577 (
578 num_index_cols,
579 stats_columns
580 .clone()
581 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
582 )
583}
584
585#[cfg(feature = "datafusion")]
589pub(crate) fn get_target_file_size(
590 config: Option<&TableProperties>,
591 configuration: &HashMap<String, Option<String>>,
592) -> NonZeroU64 {
593 match &config {
594 Some(conf) => conf.target_file_size(),
595 _ => configuration
596 .get("delta.targetFileSize")
597 .and_then(|v| v.clone().and_then(|v| v.parse::<NonZeroU64>().ok()))
598 .unwrap_or(crate::table::config::DEFAULT_TARGET_FILE_SIZE),
599 }
600}