1use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28use chrono::{Duration, Utc};
29use futures::future::{BoxFuture, ready};
30use futures::{StreamExt, TryStreamExt};
31use object_store::{Error, ObjectStore, path::Path};
32use serde::Serialize;
33use tracing::*;
34
35use super::{CustomExecuteHandler, Operation};
36use crate::errors::{DeltaResult, DeltaTableError};
37use crate::kernel::transaction::{CommitBuilder, CommitProperties};
38use crate::kernel::{EagerSnapshot, TombstoneView, Version, resolve_snapshot};
39use crate::logstore::{LogStore, LogStoreRef};
40use crate::protocol::DeltaOperation;
41use crate::table::config::TablePropertiesExt as _;
42use crate::table::state::DeltaTableState;
43use crate::{DeltaTable, DeltaTableConfig};
44
45#[derive(thiserror::Error, Debug)]
47enum VacuumError {
48 #[error(
50 "Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided
51 )]
52 InvalidVacuumRetentionPeriod {
53 provided: i64,
55 min: i64,
57 },
58
59 #[error(transparent)]
61 DeltaTable(#[from] DeltaTableError),
62}
63
64impl From<VacuumError> for DeltaTableError {
65 fn from(err: VacuumError) -> Self {
66 DeltaTableError::GenericError {
67 source: Box::new(err),
68 }
69 }
70}
71
72pub trait Clock: Debug + Send + Sync {
74 fn current_timestamp_millis(&self) -> i64;
76}
77
78#[derive(Debug, Default, Clone, PartialEq)]
80pub enum VacuumMode {
81 #[default]
84 Lite,
85 Full,
90}
91
92pub struct VacuumBuilder {
95 snapshot: Option<EagerSnapshot>,
97 log_store: LogStoreRef,
99 retention_period: Option<Duration>,
101 enforce_retention_duration: bool,
103 keep_versions: Option<Vec<Version>>,
105 dry_run: bool,
107 mode: VacuumMode,
109 clock: Option<Arc<dyn Clock>>,
111 commit_properties: CommitProperties,
113 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
114}
115
116impl super::Operation for VacuumBuilder {
117 fn log_store(&self) -> &LogStoreRef {
118 &self.log_store
119 }
120 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
121 self.custom_execute_handler.clone()
122 }
123}
124
125#[derive(Debug, Default)]
127pub struct VacuumMetrics {
128 pub dry_run: bool,
130 pub files_deleted: Vec<String>,
132}
133
134#[derive(Serialize)]
136#[serde(rename_all = "camelCase")]
137pub struct VacuumStartOperationMetrics {
138 pub num_files_to_delete: i64,
140 pub size_of_data_to_delete: i64,
142}
143
144#[derive(Serialize)]
146#[serde(rename_all = "camelCase")]
147pub struct VacuumEndOperationMetrics {
148 pub num_deleted_files: i64,
150 pub num_vacuumed_directories: i64,
152}
153
154impl VacuumBuilder {
156 pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
158 VacuumBuilder {
159 snapshot,
160 log_store,
161 retention_period: None,
162 enforce_retention_duration: true,
163 keep_versions: None,
164 dry_run: false,
165 mode: VacuumMode::Lite,
166 clock: None,
167 commit_properties: CommitProperties::default(),
168 custom_execute_handler: None,
169 }
170 }
171
172 pub fn with_retention_period(mut self, retention_period: Duration) -> Self {
174 self.retention_period = Some(retention_period);
175 self
176 }
177
178 pub fn with_keep_versions(mut self, versions: &[Version]) -> Self {
181 warn!("Using experimental API VacuumBuilder::with_keep_versions");
182 self.keep_versions = Some(versions.to_vec());
183 self
184 }
185
186 pub fn with_mode(mut self, mode: VacuumMode) -> Self {
188 self.mode = mode;
189 self
190 }
191
192 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
194 self.dry_run = dry_run;
195 self
196 }
197
198 pub fn with_enforce_retention_duration(mut self, enforce: bool) -> Self {
200 self.enforce_retention_duration = enforce;
201 self
202 }
203
204 #[doc(hidden)]
206 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
207 self.clock = Some(clock);
208 self
209 }
210
211 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
213 self.commit_properties = commit_properties;
214 self
215 }
216
217 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
219 self.custom_execute_handler = Some(handler);
220 self
221 }
222
223 async fn create_vacuum_plan(
225 &self,
226 snapshot: &EagerSnapshot,
227 ) -> Result<VacuumPlan, VacuumError> {
228 if self.mode == VacuumMode::Full {
229 info!(
230 "Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!"
231 );
232 }
233
234 let min_retention = Duration::milliseconds(
235 snapshot
236 .table_properties()
237 .deleted_file_retention_duration()
238 .as_millis() as i64,
239 );
240 let retention_period = self.retention_period.unwrap_or(min_retention);
241 let enforce_retention_duration = self.enforce_retention_duration;
242
243 if enforce_retention_duration && retention_period < min_retention {
244 return Err(VacuumError::InvalidVacuumRetentionPeriod {
245 provided: retention_period.num_hours(),
246 min: min_retention.num_hours(),
247 });
248 }
249
250 let now_millis = match &self.clock {
251 Some(clock) => clock.current_timestamp_millis(),
252 None => Utc::now().timestamp_millis(),
253 };
254
255 let keep_files = match &self.keep_versions {
256 Some(versions) => {
257 let mut sorted_versions = versions.clone();
258 sorted_versions.sort();
259 let mut sorted_versions = sorted_versions.into_iter();
260 match sorted_versions.next() {
261 Some(initial_version) => {
262 let mut keep_files: HashSet<String> = HashSet::new();
263 let mut state = DeltaTableState::try_new(
264 &self.log_store,
265 DeltaTableConfig::default(),
266 Some(initial_version),
267 )
268 .await?;
269 let mut record_keep_files = |version: Version, state: &DeltaTableState| {
270 let files: Vec<String> = state
271 .log_data()
272 .into_iter()
273 .map(|add| add.object_store_path())
274 .map(|path| path.to_string())
275 .collect();
276 debug!("keep version:{version}\n, {files:#?}");
277 keep_files.extend(files);
278 };
279
280 record_keep_files(initial_version, &state);
281 for version in sorted_versions {
282 state.update(&self.log_store, Some(version)).await?;
283 record_keep_files(version, &state);
284 }
285
286 keep_files
287 }
288 None => HashSet::new(),
289 }
290 }
291 _ => HashSet::new(),
292 };
293
294 let mut file_count = 0;
295
296 let tombstone_retention_timestamp = now_millis - retention_period.num_milliseconds();
297 let (expired_tombstones, tombstone_path_sets) = if self.mode == VacuumMode::Full {
298 collect_full_mode_tombstones(snapshot, tombstone_retention_timestamp, &self.log_store)
299 .await?
300 } else {
301 (
302 get_stale_files(snapshot, retention_period, now_millis, &self.log_store).await?,
303 TombstonePathSets::default(),
304 )
305 };
306 let valid_files: HashSet<_> = snapshot
307 .file_views(self.log_store.as_ref(), None)
308 .map_ok(|f| f.object_store_path())
309 .try_collect()
310 .await?;
311
312 let partition_columns = snapshot.metadata().partition_columns();
313
314 let mut files_to_delete = vec![];
315 let mut file_sizes = vec![];
316
317 for tombs in expired_tombstones.iter() {
320 let path = Path::from(tombs.path().to_string());
321 if ok_to_delete(&path, &valid_files, &keep_files, partition_columns)? {
322 files_to_delete.push(path);
323 file_sizes.push(tombs.size().unwrap_or(0));
324 }
325 }
326
327 if self.mode == VacuumMode::Full {
328 let object_store = self.log_store.object_store(None);
329
330 let list_span = info_span!("list_files", operation = "vacuum");
331 let mut all_files = list_span.in_scope(|| object_store.list(None));
332
333 while let Some(obj_meta) = all_files.next().await {
334 let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
336 if tombstone_path_sets
337 .expired_tombstone_paths
338 .contains(&obj_meta.location)
339 {
340 debug!(
341 "The file {:?} is already queued as an expired tombstone",
342 &obj_meta.location,
343 );
344 continue;
345 }
346
347 if !ok_to_delete(
348 &obj_meta.location,
349 &valid_files,
350 &keep_files,
351 partition_columns,
352 )? {
353 continue;
354 }
355
356 if tombstone_path_sets
357 .all_tombstone_paths
358 .contains(&obj_meta.location)
359 {
360 debug!(
361 "The file {:?} has a recent tombstone, keeping it until tombstone retention expires",
362 &obj_meta.location,
363 );
364 continue;
365 }
366
367 let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis();
370 if file_age_millis < retention_period.num_milliseconds() {
371 debug!(
372 "The file {:?} is an untracked recent file, protecting it from vacuum",
373 &obj_meta.location,
374 );
375 continue;
376 }
377
378 debug!(
379 "The file {:?} is an untracked stale orphan and will be vacuumed in full mode",
380 &obj_meta.location
381 );
382 files_to_delete.push(obj_meta.location);
383 file_sizes.push(obj_meta.size as i64);
384 file_count += 1;
385 }
386 }
387 info!(
388 files_scanned = file_count,
389 files_to_delete = files_to_delete.len(),
390 "vacuum file listing completed"
391 );
392
393 Ok(VacuumPlan {
394 files_to_delete,
395 file_sizes,
396 retention_check_enabled: enforce_retention_duration,
397 default_retention_millis: min_retention.num_milliseconds(),
398 specified_retention_millis: Some(retention_period.num_milliseconds()),
399 })
400 }
401}
402
403impl std::future::IntoFuture for VacuumBuilder {
404 type Output = DeltaResult<(DeltaTable, VacuumMetrics)>;
405 type IntoFuture = BoxFuture<'static, Self::Output>;
406
407 fn into_future(self) -> Self::IntoFuture {
408 let this = self;
409 Box::pin(async move {
410 let snapshot =
411 resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
412 let plan = this.create_vacuum_plan(&snapshot).await?;
413
414 if this.dry_run {
415 return Ok((
416 DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
417 VacuumMetrics {
418 files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(),
419 dry_run: true,
420 },
421 ));
422 }
423
424 let operation_id = this.get_operation_id();
425 this.pre_execute(operation_id).await?;
426
427 let result = plan
428 .execute(
429 this.log_store.clone(),
430 &snapshot,
431 this.commit_properties.clone(),
432 operation_id,
433 this.get_custom_execute_handler(),
434 )
435 .await?;
436
437 this.post_execute(operation_id).await?;
438
439 Ok(match result {
440 Some((snapshot, metrics)) => (
441 DeltaTable::new_with_state(this.log_store, snapshot),
442 metrics,
443 ),
444 None => (
445 DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
446 Default::default(),
447 ),
448 })
449 })
450 }
451}
452
453struct VacuumPlan {
455 pub files_to_delete: Vec<Path>,
457 pub file_sizes: Vec<i64>,
459 pub retention_check_enabled: bool,
461 pub default_retention_millis: i64,
463 pub specified_retention_millis: Option<i64>,
465}
466
467impl VacuumPlan {
468 pub async fn execute(
470 self,
471 store: LogStoreRef,
472 snapshot: &EagerSnapshot,
473 mut commit_properties: CommitProperties,
474 operation_id: uuid::Uuid,
475 handle: Option<Arc<dyn CustomExecuteHandler>>,
476 ) -> Result<Option<(DeltaTableState, VacuumMetrics)>, DeltaTableError> {
477 if self.files_to_delete.is_empty() {
478 return Ok(None);
479 }
480
481 let start_operation = DeltaOperation::VacuumStart {
482 retention_check_enabled: self.retention_check_enabled,
483 specified_retention_millis: self.specified_retention_millis,
484 default_retention_millis: self.default_retention_millis,
485 };
486
487 let end_operation = DeltaOperation::VacuumEnd {
488 status: String::from("COMPLETED"), };
490
491 let start_metrics = VacuumStartOperationMetrics {
492 num_files_to_delete: self.files_to_delete.len() as i64,
493 size_of_data_to_delete: self.file_sizes.iter().sum(),
494 };
495
496 let mut start_props = CommitProperties::default();
498 start_props.app_metadata = commit_properties.app_metadata.clone();
499 start_props.app_metadata.insert(
500 "operationMetrics".to_owned(),
501 serde_json::to_value(start_metrics)?,
502 );
503
504 let last_commit = CommitBuilder::from(start_props)
505 .with_operation_id(operation_id)
506 .with_post_commit_hook_handler(handle.clone())
507 .build(Some(snapshot), store.clone(), start_operation)
508 .await?;
509 let locations = futures::stream::iter(self.files_to_delete)
512 .map(Result::Ok)
513 .boxed();
514
515 let files_deleted = store
516 .object_store(Some(operation_id))
517 .delete_stream(locations)
518 .map(|res| match res {
519 Ok(path) => Ok(path.to_string()),
520 Err(Error::NotFound { path, .. }) => Ok(path),
521 Err(err) => Err(err),
522 })
523 .try_collect::<Vec<_>>()
524 .await?;
525
526 let end_metrics = VacuumEndOperationMetrics {
528 num_deleted_files: files_deleted.len() as i64,
529 num_vacuumed_directories: 0, };
531
532 commit_properties.app_metadata.insert(
534 "operationMetrics".to_owned(),
535 serde_json::to_value(end_metrics)?,
536 );
537 let last_commit = CommitBuilder::from(commit_properties)
538 .with_operation_id(operation_id)
539 .with_post_commit_hook_handler(handle)
540 .build(Some(&last_commit.snapshot), store.clone(), end_operation)
541 .await?;
542 Ok(Some((
545 last_commit.snapshot,
546 VacuumMetrics {
547 files_deleted,
548 dry_run: false,
549 },
550 )))
551 }
552}
553
554#[derive(Debug, Default, PartialEq, Eq)]
555struct TombstonePathSets {
556 expired_tombstone_paths: HashSet<Path>,
557 all_tombstone_paths: HashSet<Path>,
558}
559
560impl TombstonePathSets {
561 fn record(&mut self, path: Path, is_expired: bool) {
562 if is_expired {
563 self.expired_tombstone_paths.insert(path.clone());
564 }
565 self.all_tombstone_paths.insert(path);
566 }
567}
568
569fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
574 let path_name = path.to_string();
575 Ok((path_name.starts_with('.') || path_name.starts_with('_'))
576 && !path_name.starts_with("_delta_index")
577 && !path_name.starts_with("_change_data")
578 && !partition_columns
579 .iter()
580 .any(|partition_column| path_name.starts_with(partition_column)))
581}
582
583fn ok_to_delete(
587 location: &Path,
588 valid_files: &HashSet<Path>,
589 keep_files: &HashSet<String>,
590 partition_columns: &[String],
591) -> Result<bool, DeltaTableError> {
592 Ok(
593 !(valid_files.contains(location) || keep_files.contains(&location.to_string()) || is_hidden_directory(partition_columns, location)?),
596 )
597}
598
599async fn collect_full_mode_tombstones(
600 snapshot: &EagerSnapshot,
601 tombstone_retention_timestamp: i64,
602 store: &dyn LogStore,
603) -> DeltaResult<(Vec<TombstoneView>, TombstonePathSets)> {
604 snapshot
605 .snapshot()
606 .tombstones(store)
607 .try_fold(
608 (Vec::new(), TombstonePathSets::default()),
609 |(mut expired_tombstones, mut tombstone_path_sets), tombstone| {
610 let is_expired = is_tombstone_expired(&tombstone, tombstone_retention_timestamp);
611 let path = Path::from(tombstone.path().to_string());
612 tombstone_path_sets.record(path, is_expired);
613 if is_expired {
614 expired_tombstones.push(tombstone);
615 }
616 ready(Ok((expired_tombstones, tombstone_path_sets)))
617 },
618 )
619 .await
620}
621
622async fn get_stale_files(
624 snapshot: &EagerSnapshot,
625 retention_period: Duration,
626 now_timestamp_millis: i64,
627 store: &dyn LogStore,
628) -> DeltaResult<Vec<TombstoneView>> {
629 let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds();
630 snapshot
631 .snapshot()
632 .tombstones(store)
633 .try_filter(|tombstone| {
634 ready(is_tombstone_expired(
635 tombstone,
636 tombstone_retention_timestamp,
637 ))
638 })
639 .try_collect::<Vec<_>>()
640 .await
641}
642
643fn is_tombstone_expired(tombstone: &TombstoneView, tombstone_retention_timestamp: i64) -> bool {
644 tombstone.deletion_timestamp().unwrap_or(0) < tombstone_retention_timestamp
645}
646
647#[cfg(test)]
648mod tests {
649 use object_store::{ObjectStoreExt as _, PutPayload, local::LocalFileSystem, memory::InMemory};
650 use serde_json::json;
651
652 use super::*;
653 use crate::kernel::Action;
654 use crate::kernel::transaction::CommitBuilder;
655 use crate::protocol::SaveMode;
656 use crate::writer::test_utils::create_initialized_table;
657 use crate::writer::{DeltaWriter, JsonWriter};
658 use crate::{ensure_table_uri, open_table};
659 use std::path::Path;
660 use std::{
661 fs::{FileTimes, OpenOptions},
662 io::Read,
663 time::{Duration as StdDuration, SystemTime, UNIX_EPOCH},
664 };
665 use url::Url;
666
667 #[tokio::test]
668 async fn test_vacuum_full() -> DeltaResult<()> {
669 let table_path = Path::new("../test/tests/data/simple_commit");
670 let table_uri =
671 Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
672 let table = open_table(table_uri).await?;
673
674 let (_table, result) =
675 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
676 .with_retention_period(Duration::hours(0))
677 .with_dry_run(true)
678 .with_mode(VacuumMode::Lite)
679 .with_enforce_retention_duration(false)
680 .await?;
681 assert!(result.files_deleted.is_empty());
684
685 let (_table, result) =
686 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
687 .with_retention_period(Duration::hours(0))
688 .with_dry_run(true)
689 .with_mode(VacuumMode::Full)
690 .with_enforce_retention_duration(false)
691 .await?;
692 let mut files_deleted = result.files_deleted.clone();
693 files_deleted.sort();
694 assert_eq!(
698 files_deleted,
699 vec![
700 "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
701 "part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
702 "part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
703 "part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet",
704 ]
705 );
706 Ok(())
707 }
708
709 #[tokio::test]
712 async fn test_vacuum_keep_version_sanity_check() -> DeltaResult<()> {
713 let table_loc = "../test/tests/data/simple_table";
714 let table_uri = ensure_table_uri(table_loc).unwrap();
715 let table = open_table(table_uri).await?;
716 let versions_to_keep = vec![3];
717
718 let (_table, result) =
720 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
721 .with_retention_period(Duration::hours(0))
722 .with_dry_run(true)
723 .with_mode(VacuumMode::Full)
724 .with_enforce_retention_duration(false)
725 .await?;
726
727 assert_eq!(32, result.files_deleted.len());
729
730 let (_table, result) =
732 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
733 .with_retention_period(Duration::hours(0))
734 .with_keep_versions(&versions_to_keep)
735 .with_dry_run(true)
736 .with_mode(VacuumMode::Full)
737 .with_enforce_retention_duration(false)
738 .await?;
739 assert_ne!(
740 32,
741 result.files_deleted.len(),
742 "with_keep_versions should have fewer files deleted than a full vacuum"
743 );
744
745 Ok(())
746 }
747
748 #[tokio::test]
751 async fn test_vacuum_keep_version_add_removes() -> DeltaResult<()> {
752 let table_loc = "../test/tests/data/simple_table";
753 let table_uri = ensure_table_uri(table_loc).unwrap();
754 let table = open_table(table_uri).await?;
755 let versions_to_keep = vec![2, 3];
756
757 let (_table, result) =
759 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
760 .with_retention_period(Duration::hours(0))
761 .with_dry_run(true)
762 .with_mode(VacuumMode::Full)
763 .with_enforce_retention_duration(false)
764 .await?;
765
766 assert_eq!(32, result.files_deleted.len());
768
769 let (_table, result) =
771 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
772 .with_retention_period(Duration::hours(0))
773 .with_keep_versions(&versions_to_keep)
774 .with_dry_run(true)
775 .with_mode(VacuumMode::Full)
776 .with_enforce_retention_duration(false)
777 .await?;
778 assert_ne!(
779 32,
780 result.files_deleted.len(),
781 "with_keep_versions should have fewer files deleted than a full vacuum"
782 );
783
784 let kept_files = vec![
785 "part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet",
787 "part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet",
788 "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet",
790 "part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet",
791 ];
792
793 for kept in kept_files {
794 assert!(
795 !result.files_deleted.contains(&kept.to_string()),
796 "files_deleted contains something which should be kept!: {:#?} {kept}",
797 result.files_deleted
798 )
799 }
800 Ok(())
801 }
802
803 #[tokio::test]
804 async fn test_vacuum_keep_versions_descending_order() -> DeltaResult<()> {
805 let table_loc = "../test/tests/data/simple_table";
806 let table_uri = ensure_table_uri(table_loc).unwrap();
807 let table = open_table(table_uri).await?;
808
809 let (_table, ascending_result) =
810 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
811 .with_retention_period(Duration::hours(0))
812 .with_keep_versions(&[0, 1, 2, 3])
813 .with_dry_run(true)
814 .with_mode(VacuumMode::Full)
815 .with_enforce_retention_duration(false)
816 .await?;
817
818 let (_table, descending_result) =
819 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
820 .with_retention_period(Duration::hours(0))
821 .with_keep_versions(&[3, 2, 1, 0])
822 .with_dry_run(true)
823 .with_mode(VacuumMode::Full)
824 .with_enforce_retention_duration(false)
825 .await?;
826
827 let mut ascending_files = ascending_result.files_deleted;
828 ascending_files.sort();
829 let mut descending_files = descending_result.files_deleted;
830 descending_files.sort();
831
832 assert_eq!(descending_files, ascending_files);
833 Ok(())
834 }
835
836 #[cfg(feature = "datafusion")]
839 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
840 async fn test_vacuum_keep_version_validity() {
841 use datafusion::prelude::SessionContext;
842 use object_store::GetResultPayload;
843 let store = InMemory::new();
844 let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
845 let mut stream = source.list(None);
846
847 while let Some(Ok(entity)) = stream.next().await {
848 let mut contents = vec![];
849 match source.get(&entity.location).await.unwrap().payload {
850 GetResultPayload::File(mut fd, _path) => {
851 fd.read_to_end(&mut contents).unwrap();
852 }
853 _ => panic!("We should only be dealing in files!"),
854 }
855 let content = bytes::Bytes::from(contents);
856 store
857 .put(&entity.location, PutPayload::from_bytes(content))
858 .await
859 .unwrap();
860 }
861
862 let table_url = url::Url::parse("memory:///").unwrap();
863 let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
864 .unwrap()
865 .with_storage_backend(Arc::new(store), table_url)
866 .build()
867 .unwrap();
868 table.load().await.unwrap();
869
870 let (mut table, result) = VacuumBuilder::new(
871 table.log_store(),
872 Some(table.snapshot().unwrap().snapshot.clone()),
873 )
874 .with_retention_period(Duration::hours(0))
875 .with_keep_versions(&[2, 3])
876 .with_mode(VacuumMode::Full)
877 .with_enforce_retention_duration(false)
878 .await
879 .unwrap();
880 assert_ne!(32, result.files_deleted.len());
882
883 crate::checkpoints::create_checkpoint(&table, None)
885 .await
886 .unwrap();
887 table.load().await.unwrap();
888 assert_eq!(Some(6), table.version());
889
890 let ctx = SessionContext::new();
891 table.update_datafusion_session(&ctx.state()).unwrap();
892 ctx.register_table("test", table.table_provider().await.unwrap())
893 .unwrap();
894 let _batches = ctx
895 .sql("SELECT * FROM test")
896 .await
897 .unwrap()
898 .collect()
899 .await
900 .unwrap();
901 }
902
903 #[tokio::test]
904 async fn vacuum_delta_8_0_table() -> DeltaResult<()> {
905 let table_path = Path::new("../test/tests/data/delta-0.8.0");
906 let table_uri =
907 Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
908 let table = open_table(table_uri).await.unwrap();
909
910 let result = VacuumBuilder::new(
911 table.log_store(),
912 Some(table.snapshot().unwrap().snapshot.clone()),
913 )
914 .with_retention_period(Duration::hours(1))
915 .with_dry_run(true)
916 .await;
917
918 assert!(result.is_err());
919
920 let table_path = Path::new("../test/tests/data/delta-0.8.0");
921 let table_uri =
922 Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
923 let table = open_table(table_uri).await.unwrap();
924
925 let (table, result) = VacuumBuilder::new(
926 table.log_store(),
927 Some(table.snapshot().unwrap().snapshot.clone()),
928 )
929 .with_retention_period(Duration::hours(0))
930 .with_dry_run(true)
931 .with_enforce_retention_duration(false)
932 .await?;
933 assert_eq!(
935 result.files_deleted,
936 vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
937 );
938
939 let (table, result) = VacuumBuilder::new(
940 table.log_store(),
941 Some(table.snapshot().unwrap().snapshot.clone()),
942 )
943 .with_retention_period(Duration::hours(169))
944 .with_dry_run(true)
945 .await?;
946
947 assert_eq!(
948 result.files_deleted,
949 vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
950 );
951
952 let retention_hours = SystemTime::now()
953 .duration_since(SystemTime::UNIX_EPOCH)
954 .unwrap()
955 .as_secs()
956 / 3600;
957 let empty: Vec<String> = Vec::new();
958 let (_table, result) = VacuumBuilder::new(
959 table.log_store(),
960 Some(table.snapshot().unwrap().snapshot.clone()),
961 )
962 .with_retention_period(Duration::hours(retention_hours as i64))
963 .with_dry_run(true)
964 .await?;
965
966 assert_eq!(result.files_deleted, empty);
967 Ok(())
968 }
969
970 #[derive(Debug, Clone)]
972 struct MockClock {
973 timestamp_millis: i64,
974 }
975
976 impl MockClock {
977 fn new(timestamp_millis: i64) -> Self {
978 Self { timestamp_millis }
979 }
980 }
981
982 impl Clock for MockClock {
983 fn current_timestamp_millis(&self) -> i64 {
984 self.timestamp_millis
985 }
986 }
987
988 fn set_last_modified(path: &Path, last_modified: SystemTime) {
989 let file = OpenOptions::new().write(true).open(path).unwrap();
990 let times = FileTimes::new()
991 .set_accessed(last_modified)
992 .set_modified(last_modified);
993 file.set_times(times).unwrap();
994 }
995
996 #[tokio::test]
997 async fn test_vacuum_full_recent_tombstones_are_not_treated_as_orphans() -> DeltaResult<()> {
998 let temp_dir = tempfile::tempdir().unwrap();
999 let table_path = temp_dir.path().to_str().unwrap();
1000 let mut table = create_initialized_table(table_path, &[]).await;
1001 let current_time = SystemTime::now();
1002 let current_time_millis =
1003 current_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
1004 let stale_time = current_time - StdDuration::from_secs(10);
1005 let recent_time = current_time - StdDuration::from_secs(1);
1006 let original_data = json!({
1007 "id": "A",
1008 "value": 1,
1009 "modified": "2021-02-01"
1010 });
1011 let replacement_data = json!({
1012 "id": "B",
1013 "value": 2,
1014 "modified": "2021-02-02"
1015 });
1016
1017 let mut writer = JsonWriter::for_table(&table)?;
1018 writer.write(vec![original_data]).await?;
1019 writer.flush_and_commit(&mut table).await?;
1020
1021 let tombstoned_paths: Vec<_> = table
1022 .snapshot()?
1023 .log_data()
1024 .into_iter()
1025 .map(|add| add.object_store_path().to_string())
1026 .collect();
1027 assert_eq!(tombstoned_paths.len(), 1);
1028 let recent_tombstone_path = tombstoned_paths[0].clone();
1029 set_last_modified(&temp_dir.path().join(&recent_tombstone_path), stale_time);
1030
1031 let stale_orphan_path = "orphan-old.parquet";
1032 std::fs::write(temp_dir.path().join(stale_orphan_path), b"stale orphan").unwrap();
1033 set_last_modified(&temp_dir.path().join(stale_orphan_path), stale_time);
1034
1035 let remove_actions = table
1036 .snapshot()?
1037 .snapshot()
1038 .file_views(&table.log_store(), None)
1039 .map_ok(|file| {
1040 let mut remove = file.remove_action(true);
1041 remove.deletion_timestamp = Some(current_time_millis);
1042 Action::Remove(remove)
1043 })
1044 .try_collect::<Vec<_>>()
1045 .await?;
1046 let mut overwrite_writer = JsonWriter::for_table(&table)?;
1047 overwrite_writer.write(vec![replacement_data]).await?;
1048 let add_actions = overwrite_writer.flush().await?.into_iter().map(Action::Add);
1049 let mut actions = remove_actions;
1050 actions.extend(add_actions);
1051 let operation = DeltaOperation::Write {
1052 mode: SaveMode::Overwrite,
1053 partition_by: None,
1054 predicate: None,
1055 };
1056 CommitBuilder::default()
1057 .with_actions(actions)
1058 .build(
1059 Some(table.snapshot()?),
1060 table.log_store().clone(),
1061 operation,
1062 )
1063 .await?;
1064 table.update_state().await?;
1065
1066 let recent_orphan_path = "orphan-recent.parquet";
1067 std::fs::write(temp_dir.path().join(recent_orphan_path), b"recent orphan").unwrap();
1068 set_last_modified(&temp_dir.path().join(recent_orphan_path), recent_time);
1069
1070 let (_table, result) =
1071 VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
1072 .with_retention_period(Duration::seconds(5))
1073 .with_dry_run(true)
1074 .with_mode(VacuumMode::Full)
1075 .with_enforce_retention_duration(false)
1076 .with_clock(Arc::new(MockClock::new(current_time_millis)))
1077 .await?;
1078
1079 assert!(
1080 !result.files_deleted.contains(&recent_tombstone_path),
1081 "recent tombstone was treated like an orphan: {:?}",
1082 result.files_deleted
1083 );
1084 assert!(
1085 result
1086 .files_deleted
1087 .contains(&stale_orphan_path.to_string()),
1088 "stale orphan should still be vacuum eligible: {:?}",
1089 result.files_deleted
1090 );
1091 assert!(
1092 !result
1093 .files_deleted
1094 .contains(&recent_orphan_path.to_string()),
1095 "recent orphan should still be protected: {:?}",
1096 result.files_deleted
1097 );
1098
1099 Ok(())
1100 }
1101
1102 #[tokio::test]
1105 async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> {
1106 use chrono::DateTime;
1107 use object_store::GetResultPayload;
1108
1109 let store = InMemory::new();
1110 let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
1111 let mut stream = source.list(None);
1112
1113 while let Some(Ok(entity)) = stream.next().await {
1114 let mut contents = vec![];
1115 match source.get(&entity.location).await.unwrap().payload {
1116 GetResultPayload::File(mut fd, _path) => {
1117 fd.read_to_end(&mut contents).unwrap();
1118 }
1119 _ => panic!("We should only be dealing in files!"),
1120 }
1121 let content = bytes::Bytes::from(contents);
1122 store
1123 .put(&entity.location, PutPayload::from_bytes(content))
1124 .await
1125 .unwrap();
1126 }
1127
1128 let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet");
1130 store
1131 .put(
1132 &recent_file_path,
1133 PutPayload::from_bytes(bytes::Bytes::from("test data")),
1134 )
1135 .await
1136 .unwrap();
1137
1138 let table_url = url::Url::parse("memory:///").unwrap();
1139 let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
1140 .unwrap()
1141 .with_storage_backend(Arc::new(store), table_url)
1142 .build()
1143 .unwrap();
1144 table.load().await.unwrap();
1145
1146 let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0)
1148 .unwrap()
1149 .timestamp_millis();
1150 let mock_clock = Arc::new(MockClock::new(current_time));
1151
1152 let (_table, result) = VacuumBuilder::new(
1155 table.log_store(),
1156 Some(table.snapshot().unwrap().snapshot.clone()),
1157 )
1158 .with_retention_period(Duration::days(7))
1159 .with_dry_run(true)
1160 .with_mode(VacuumMode::Full)
1161 .with_enforce_retention_duration(false)
1162 .with_clock(mock_clock)
1163 .await
1164 .unwrap();
1165
1166 assert!(
1168 !result.files_deleted.contains(&recent_file_path.to_string()),
1169 "Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}",
1170 result.files_deleted
1171 );
1172
1173 Ok(())
1174 }
1175}