Skip to main content

deltalake_core/operations/
vacuum.rs

1//! Vacuum a Delta table
2//!
3//! Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold.
4//! We do not recommend that you set a retention interval shorter than 7 days, because old snapshots
5//! and uncommitted files can still be in use by concurrent readers or writers to the table.
6//!
7//! If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be
8//! corrupted when vacuum deletes files that have not yet been committed.
9//! If `retention_period` is not set then the `configuration.deletedFileRetentionDuration` of
10//! delta table is used or if that's missing too, then the default value of 7 days otherwise.
11//!
12//! When you run vacuum then you cannot use time travel to a version older than
13//! the specified retention period.
14//!
15//! Warning: Vacuum does not support partitioned tables on Windows. This is due
16//! to Windows not using unix style paths. See #682
17//!
18//! # Example
19//! ```rust ignore
20//! let mut table = open_table(Url::from_directory_path("/abs/path/to/table").unwrap())?;
21//! let (table, metrics) = VacuumBuilder::new(table.object_store(). table.state).await?;
22//! ````
23
24use std::collections::HashSet;
25use std::fmt::Debug;
26use std::sync::Arc;
27
28use chrono::{Duration, Utc};
29use futures::future::{BoxFuture, ready};
30use futures::{StreamExt, TryStreamExt};
31use object_store::{Error, ObjectStore, path::Path};
32use serde::Serialize;
33use tracing::*;
34
35use super::{CustomExecuteHandler, Operation};
36use crate::errors::{DeltaResult, DeltaTableError};
37use crate::kernel::transaction::{CommitBuilder, CommitProperties};
38use crate::kernel::{EagerSnapshot, TombstoneView, Version, resolve_snapshot};
39use crate::logstore::{LogStore, LogStoreRef};
40use crate::protocol::DeltaOperation;
41use crate::table::config::TablePropertiesExt as _;
42use crate::table::state::DeltaTableState;
43use crate::{DeltaTable, DeltaTableConfig};
44
45/// Errors that can occur during vacuum
46#[derive(thiserror::Error, Debug)]
47enum VacuumError {
48    /// Error returned when Vacuum retention period is below the safe threshold
49    #[error(
50        "Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided
51    )]
52    InvalidVacuumRetentionPeriod {
53        /// User provided retention on vacuum call
54        provided: i64,
55        /// Minimal retention configured in delta table config
56        min: i64,
57    },
58
59    /// Error returned
60    #[error(transparent)]
61    DeltaTable(#[from] DeltaTableError),
62}
63
64impl From<VacuumError> for DeltaTableError {
65    fn from(err: VacuumError) -> Self {
66        DeltaTableError::GenericError {
67            source: Box::new(err),
68        }
69    }
70}
71
72/// A source of time
73pub trait Clock: Debug + Send + Sync {
74    /// get the current time in milliseconds since epoch
75    fn current_timestamp_millis(&self) -> i64;
76}
77
78/// Type of Vacuum operation to perform
79#[derive(Debug, Default, Clone, PartialEq)]
80pub enum VacuumMode {
81    /// The `lite` mode will only remove files which are referenced in the `_delta_log` associated
82    /// with `remove` action
83    #[default]
84    Lite,
85    /// A `full` mode vacuum will remove _all_ data files no longer actively referenced in the
86    /// `_delta_log` table. For example, if parquet files exist in the table directory but are no
87    /// longer mentioned as `add` actions in the transaction log, then this mode will scan storage
88    /// and remove those files.
89    Full,
90}
91
92/// Vacuum a Delta table with the given options
93/// See this module's documentation for more information
94pub struct VacuumBuilder {
95    /// A snapshot of the to-be-vacuumed table's state
96    snapshot: Option<EagerSnapshot>,
97    /// Delta object store for handling data files
98    log_store: LogStoreRef,
99    /// Period of stale files allowed.
100    retention_period: Option<Duration>,
101    /// Validate the retention period is not below the retention period configured in the table
102    enforce_retention_duration: bool,
103    /// Keep files associated with particular versions
104    keep_versions: Option<Vec<Version>>,
105    /// Don't delete the files. Just determine which files can be deleted
106    dry_run: bool,
107    /// Mode of vacuum that should be run
108    mode: VacuumMode,
109    /// Override the source of time
110    clock: Option<Arc<dyn Clock>>,
111    /// Additional information to add to the commit
112    commit_properties: CommitProperties,
113    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
114}
115
116impl super::Operation for VacuumBuilder {
117    fn log_store(&self) -> &LogStoreRef {
118        &self.log_store
119    }
120    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
121        self.custom_execute_handler.clone()
122    }
123}
124
125/// Details for the Vacuum operation including which files were
126#[derive(Debug, Default)]
127pub struct VacuumMetrics {
128    /// Was this a dry run
129    pub dry_run: bool,
130    /// Files deleted successfully
131    pub files_deleted: Vec<String>,
132}
133
134/// Details for the Vacuum start operation for the transaction log
135#[derive(Serialize)]
136#[serde(rename_all = "camelCase")]
137pub struct VacuumStartOperationMetrics {
138    /// The number of files that will be deleted
139    pub num_files_to_delete: i64,
140    /// Size of the data to be deleted in bytes
141    pub size_of_data_to_delete: i64,
142}
143
144/// Details for the Vacuum End operation for the transaction log
145#[derive(Serialize)]
146#[serde(rename_all = "camelCase")]
147pub struct VacuumEndOperationMetrics {
148    /// The number of actually deleted files
149    pub num_deleted_files: i64,
150    /// The number of actually vacuumed directories
151    pub num_vacuumed_directories: i64,
152}
153
154/// Methods to specify various vacuum options and to execute the operation
155impl VacuumBuilder {
156    /// Create a new [`VacuumBuilder`]
157    pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
158        VacuumBuilder {
159            snapshot,
160            log_store,
161            retention_period: None,
162            enforce_retention_duration: true,
163            keep_versions: None,
164            dry_run: false,
165            mode: VacuumMode::Lite,
166            clock: None,
167            commit_properties: CommitProperties::default(),
168            custom_execute_handler: None,
169        }
170    }
171
172    /// Override the default retention period for which files are deleted.
173    pub fn with_retention_period(mut self, retention_period: Duration) -> Self {
174        self.retention_period = Some(retention_period);
175        self
176    }
177
178    /// Specify table versions that we want to keep for time travel.
179    /// This will prevent deletion of files required by these versions.
180    pub fn with_keep_versions(mut self, versions: &[Version]) -> Self {
181        warn!("Using experimental API VacuumBuilder::with_keep_versions");
182        self.keep_versions = Some(versions.to_vec());
183        self
184    }
185
186    /// Override the default vacuum mode (lite)
187    pub fn with_mode(mut self, mode: VacuumMode) -> Self {
188        self.mode = mode;
189        self
190    }
191
192    /// Only determine which files should be deleted
193    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
194        self.dry_run = dry_run;
195        self
196    }
197
198    /// Check if the specified retention period is less than the table's minimum
199    pub fn with_enforce_retention_duration(mut self, enforce: bool) -> Self {
200        self.enforce_retention_duration = enforce;
201        self
202    }
203
204    /// add a time source for testing
205    #[doc(hidden)]
206    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
207        self.clock = Some(clock);
208        self
209    }
210
211    /// Additional metadata to be added to commit info
212    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
213        self.commit_properties = commit_properties;
214        self
215    }
216
217    /// Set a custom execute handler, for pre and post execution
218    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
219        self.custom_execute_handler = Some(handler);
220        self
221    }
222
223    /// Determine which files can be deleted. Does not actually perform the deletion
224    async fn create_vacuum_plan(
225        &self,
226        snapshot: &EagerSnapshot,
227    ) -> Result<VacuumPlan, VacuumError> {
228        if self.mode == VacuumMode::Full {
229            info!(
230                "Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!"
231            );
232        }
233
234        let min_retention = Duration::milliseconds(
235            snapshot
236                .table_properties()
237                .deleted_file_retention_duration()
238                .as_millis() as i64,
239        );
240        let retention_period = self.retention_period.unwrap_or(min_retention);
241        let enforce_retention_duration = self.enforce_retention_duration;
242
243        if enforce_retention_duration && retention_period < min_retention {
244            return Err(VacuumError::InvalidVacuumRetentionPeriod {
245                provided: retention_period.num_hours(),
246                min: min_retention.num_hours(),
247            });
248        }
249
250        let now_millis = match &self.clock {
251            Some(clock) => clock.current_timestamp_millis(),
252            None => Utc::now().timestamp_millis(),
253        };
254
255        let keep_files = match &self.keep_versions {
256            Some(versions) => {
257                let mut sorted_versions = versions.clone();
258                sorted_versions.sort();
259                let mut sorted_versions = sorted_versions.into_iter();
260                match sorted_versions.next() {
261                    Some(initial_version) => {
262                        let mut keep_files: HashSet<String> = HashSet::new();
263                        let mut state = DeltaTableState::try_new(
264                            &self.log_store,
265                            DeltaTableConfig::default(),
266                            Some(initial_version),
267                        )
268                        .await?;
269                        let mut record_keep_files = |version: Version, state: &DeltaTableState| {
270                            let files: Vec<String> = state
271                                .log_data()
272                                .into_iter()
273                                .map(|add| add.object_store_path())
274                                .map(|path| path.to_string())
275                                .collect();
276                            debug!("keep version:{version}\n, {files:#?}");
277                            keep_files.extend(files);
278                        };
279
280                        record_keep_files(initial_version, &state);
281                        for version in sorted_versions {
282                            state.update(&self.log_store, Some(version)).await?;
283                            record_keep_files(version, &state);
284                        }
285
286                        keep_files
287                    }
288                    None => HashSet::new(),
289                }
290            }
291            _ => HashSet::new(),
292        };
293
294        let mut file_count = 0;
295
296        let tombstone_retention_timestamp = now_millis - retention_period.num_milliseconds();
297        let (expired_tombstones, tombstone_path_sets) = if self.mode == VacuumMode::Full {
298            collect_full_mode_tombstones(snapshot, tombstone_retention_timestamp, &self.log_store)
299                .await?
300        } else {
301            (
302                get_stale_files(snapshot, retention_period, now_millis, &self.log_store).await?,
303                TombstonePathSets::default(),
304            )
305        };
306        let valid_files: HashSet<_> = snapshot
307            .file_views(self.log_store.as_ref(), None)
308            .map_ok(|f| f.object_store_path())
309            .try_collect()
310            .await?;
311
312        let partition_columns = snapshot.metadata().partition_columns();
313
314        let mut files_to_delete = vec![];
315        let mut file_sizes = vec![];
316
317        // VacuumMode::Lite file set
318        // Expired tombstones are *always deleted (*unless in keep list)
319        for tombs in expired_tombstones.iter() {
320            let path = Path::from(tombs.path().to_string());
321            if ok_to_delete(&path, &valid_files, &keep_files, partition_columns)? {
322                files_to_delete.push(path);
323                file_sizes.push(tombs.size().unwrap_or(0));
324            }
325        }
326
327        if self.mode == VacuumMode::Full {
328            let object_store = self.log_store.object_store(None);
329
330            let list_span = info_span!("list_files", operation = "vacuum");
331            let mut all_files = list_span.in_scope(|| object_store.list(None));
332
333            while let Some(obj_meta) = all_files.next().await {
334                // TODO should we allow NotFound here in case we have a temporary commit file in the list
335                let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
336                if tombstone_path_sets
337                    .expired_tombstone_paths
338                    .contains(&obj_meta.location)
339                {
340                    debug!(
341                        "The file {:?} is already queued as an expired tombstone",
342                        &obj_meta.location,
343                    );
344                    continue;
345                }
346
347                if !ok_to_delete(
348                    &obj_meta.location,
349                    &valid_files,
350                    &keep_files,
351                    partition_columns,
352                )? {
353                    continue;
354                }
355
356                if tombstone_path_sets
357                    .all_tombstone_paths
358                    .contains(&obj_meta.location)
359                {
360                    debug!(
361                        "The file {:?} has a recent tombstone, keeping it until tombstone retention expires",
362                        &obj_meta.location,
363                    );
364                    continue;
365                }
366
367                // At this point the path is untracked by the Delta log, so full mode falls back
368                // to physical object age to protect recent concurrent-writer output.
369                let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis();
370                if file_age_millis < retention_period.num_milliseconds() {
371                    debug!(
372                        "The file {:?} is an untracked recent file, protecting it from vacuum",
373                        &obj_meta.location,
374                    );
375                    continue;
376                }
377
378                debug!(
379                    "The file {:?} is an untracked stale orphan and will be vacuumed in full mode",
380                    &obj_meta.location
381                );
382                files_to_delete.push(obj_meta.location);
383                file_sizes.push(obj_meta.size as i64);
384                file_count += 1;
385            }
386        }
387        info!(
388            files_scanned = file_count,
389            files_to_delete = files_to_delete.len(),
390            "vacuum file listing completed"
391        );
392
393        Ok(VacuumPlan {
394            files_to_delete,
395            file_sizes,
396            retention_check_enabled: enforce_retention_duration,
397            default_retention_millis: min_retention.num_milliseconds(),
398            specified_retention_millis: Some(retention_period.num_milliseconds()),
399        })
400    }
401}
402
403impl std::future::IntoFuture for VacuumBuilder {
404    type Output = DeltaResult<(DeltaTable, VacuumMetrics)>;
405    type IntoFuture = BoxFuture<'static, Self::Output>;
406
407    fn into_future(self) -> Self::IntoFuture {
408        let this = self;
409        Box::pin(async move {
410            let snapshot =
411                resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
412            let plan = this.create_vacuum_plan(&snapshot).await?;
413
414            if this.dry_run {
415                return Ok((
416                    DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
417                    VacuumMetrics {
418                        files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(),
419                        dry_run: true,
420                    },
421                ));
422            }
423
424            let operation_id = this.get_operation_id();
425            this.pre_execute(operation_id).await?;
426
427            let result = plan
428                .execute(
429                    this.log_store.clone(),
430                    &snapshot,
431                    this.commit_properties.clone(),
432                    operation_id,
433                    this.get_custom_execute_handler(),
434                )
435                .await?;
436
437            this.post_execute(operation_id).await?;
438
439            Ok(match result {
440                Some((snapshot, metrics)) => (
441                    DeltaTable::new_with_state(this.log_store, snapshot),
442                    metrics,
443                ),
444                None => (
445                    DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
446                    Default::default(),
447                ),
448            })
449        })
450    }
451}
452
453/// Encapsulate which files are to be deleted and the parameters used to make that decision
454struct VacuumPlan {
455    /// What files are to be deleted
456    pub files_to_delete: Vec<Path>,
457    /// Size of each file which to delete
458    pub file_sizes: Vec<i64>,
459    /// If retention check is enabled
460    pub retention_check_enabled: bool,
461    /// Default retention in milliseconds
462    pub default_retention_millis: i64,
463    /// Overridden retention in milliseconds
464    pub specified_retention_millis: Option<i64>,
465}
466
467impl VacuumPlan {
468    /// Execute the vacuum plan and delete files from underlying storage
469    pub async fn execute(
470        self,
471        store: LogStoreRef,
472        snapshot: &EagerSnapshot,
473        mut commit_properties: CommitProperties,
474        operation_id: uuid::Uuid,
475        handle: Option<Arc<dyn CustomExecuteHandler>>,
476    ) -> Result<Option<(DeltaTableState, VacuumMetrics)>, DeltaTableError> {
477        if self.files_to_delete.is_empty() {
478            return Ok(None);
479        }
480
481        let start_operation = DeltaOperation::VacuumStart {
482            retention_check_enabled: self.retention_check_enabled,
483            specified_retention_millis: self.specified_retention_millis,
484            default_retention_millis: self.default_retention_millis,
485        };
486
487        let end_operation = DeltaOperation::VacuumEnd {
488            status: String::from("COMPLETED"), // Maybe this should be FAILED when vacuum has error during the files, not sure how to check for this
489        };
490
491        let start_metrics = VacuumStartOperationMetrics {
492            num_files_to_delete: self.files_to_delete.len() as i64,
493            size_of_data_to_delete: self.file_sizes.iter().sum(),
494        };
495
496        // Begin VACUUM START COMMIT
497        let mut start_props = CommitProperties::default();
498        start_props.app_metadata = commit_properties.app_metadata.clone();
499        start_props.app_metadata.insert(
500            "operationMetrics".to_owned(),
501            serde_json::to_value(start_metrics)?,
502        );
503
504        let last_commit = CommitBuilder::from(start_props)
505            .with_operation_id(operation_id)
506            .with_post_commit_hook_handler(handle.clone())
507            .build(Some(snapshot), store.clone(), start_operation)
508            .await?;
509        // Finish VACUUM START COMMIT
510
511        let locations = futures::stream::iter(self.files_to_delete)
512            .map(Result::Ok)
513            .boxed();
514
515        let files_deleted = store
516            .object_store(Some(operation_id))
517            .delete_stream(locations)
518            .map(|res| match res {
519                Ok(path) => Ok(path.to_string()),
520                Err(Error::NotFound { path, .. }) => Ok(path),
521                Err(err) => Err(err),
522            })
523            .try_collect::<Vec<_>>()
524            .await?;
525
526        // Create end metadata
527        let end_metrics = VacuumEndOperationMetrics {
528            num_deleted_files: files_deleted.len() as i64,
529            num_vacuumed_directories: 0, // Set to zero since we only remove files not dirs
530        };
531
532        // Begin VACUUM END COMMIT
533        commit_properties.app_metadata.insert(
534            "operationMetrics".to_owned(),
535            serde_json::to_value(end_metrics)?,
536        );
537        let last_commit = CommitBuilder::from(commit_properties)
538            .with_operation_id(operation_id)
539            .with_post_commit_hook_handler(handle)
540            .build(Some(&last_commit.snapshot), store.clone(), end_operation)
541            .await?;
542        // Finish VACUUM END COMMIT
543
544        Ok(Some((
545            last_commit.snapshot,
546            VacuumMetrics {
547                files_deleted,
548                dry_run: false,
549            },
550        )))
551    }
552}
553
554#[derive(Debug, Default, PartialEq, Eq)]
555struct TombstonePathSets {
556    expired_tombstone_paths: HashSet<Path>,
557    all_tombstone_paths: HashSet<Path>,
558}
559
560impl TombstonePathSets {
561    fn record(&mut self, path: Path, is_expired: bool) {
562        if is_expired {
563            self.expired_tombstone_paths.insert(path.clone());
564        }
565        self.all_tombstone_paths.insert(path);
566    }
567}
568
569/// Whether a path should be hidden for delta-related file operations, such as Vacuum.
570/// Names of the form partitionCol=[value] are partition directories, and should be
571/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter)
572/// indexes and these must be deleted when the data they are tied to is deleted.
573fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
574    let path_name = path.to_string();
575    Ok((path_name.starts_with('.') || path_name.starts_with('_'))
576        && !path_name.starts_with("_delta_index")
577        && !path_name.starts_with("_change_data")
578        && !partition_columns
579            .iter()
580            .any(|partition_column| path_name.starts_with(partition_column)))
581}
582
583/// Returns true if the file at `location` is a candidate for deletion.
584/// A file should NOT be deleted if it is still tracked in the table,
585/// associated with a kept version, or is a hidden directory.
586fn ok_to_delete(
587    location: &Path,
588    valid_files: &HashSet<Path>,
589    keep_files: &HashSet<String>,
590    partition_columns: &[String],
591) -> Result<bool, DeltaTableError> {
592    Ok(
593        !(valid_files.contains(location) // file is still being tracked in table
594        || keep_files.contains(&location.to_string()) // file is associated with a version that we are keeping
595        || is_hidden_directory(partition_columns, location)?),
596    )
597}
598
599async fn collect_full_mode_tombstones(
600    snapshot: &EagerSnapshot,
601    tombstone_retention_timestamp: i64,
602    store: &dyn LogStore,
603) -> DeltaResult<(Vec<TombstoneView>, TombstonePathSets)> {
604    snapshot
605        .snapshot()
606        .tombstones(store)
607        .try_fold(
608            (Vec::new(), TombstonePathSets::default()),
609            |(mut expired_tombstones, mut tombstone_path_sets), tombstone| {
610                let is_expired = is_tombstone_expired(&tombstone, tombstone_retention_timestamp);
611                let path = Path::from(tombstone.path().to_string());
612                tombstone_path_sets.record(path, is_expired);
613                if is_expired {
614                    expired_tombstones.push(tombstone);
615                }
616                ready(Ok((expired_tombstones, tombstone_path_sets)))
617            },
618        )
619        .await
620}
621
622/// List files no longer referenced by a Delta table and are older than the retention threshold.
623async fn get_stale_files(
624    snapshot: &EagerSnapshot,
625    retention_period: Duration,
626    now_timestamp_millis: i64,
627    store: &dyn LogStore,
628) -> DeltaResult<Vec<TombstoneView>> {
629    let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds();
630    snapshot
631        .snapshot()
632        .tombstones(store)
633        .try_filter(|tombstone| {
634            ready(is_tombstone_expired(
635                tombstone,
636                tombstone_retention_timestamp,
637            ))
638        })
639        .try_collect::<Vec<_>>()
640        .await
641}
642
643fn is_tombstone_expired(tombstone: &TombstoneView, tombstone_retention_timestamp: i64) -> bool {
644    tombstone.deletion_timestamp().unwrap_or(0) < tombstone_retention_timestamp
645}
646
647#[cfg(test)]
648mod tests {
649    use object_store::{ObjectStoreExt as _, PutPayload, local::LocalFileSystem, memory::InMemory};
650    use serde_json::json;
651
652    use super::*;
653    use crate::kernel::Action;
654    use crate::kernel::transaction::CommitBuilder;
655    use crate::protocol::SaveMode;
656    use crate::writer::test_utils::create_initialized_table;
657    use crate::writer::{DeltaWriter, JsonWriter};
658    use crate::{ensure_table_uri, open_table};
659    use std::path::Path;
660    use std::{
661        fs::{FileTimes, OpenOptions},
662        io::Read,
663        time::{Duration as StdDuration, SystemTime, UNIX_EPOCH},
664    };
665    use url::Url;
666
667    #[tokio::test]
668    async fn test_vacuum_full() -> DeltaResult<()> {
669        let table_path = Path::new("../test/tests/data/simple_commit");
670        let table_uri =
671            Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
672        let table = open_table(table_uri).await?;
673
674        let (_table, result) =
675            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
676                .with_retention_period(Duration::hours(0))
677                .with_dry_run(true)
678                .with_mode(VacuumMode::Lite)
679                .with_enforce_retention_duration(false)
680                .await?;
681        // When running lite, this table with superfluous parquet files should not have anything to
682        // delete
683        assert!(result.files_deleted.is_empty());
684
685        let (_table, result) =
686            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
687                .with_retention_period(Duration::hours(0))
688                .with_dry_run(true)
689                .with_mode(VacuumMode::Full)
690                .with_enforce_retention_duration(false)
691                .await?;
692        let mut files_deleted = result.files_deleted.clone();
693        files_deleted.sort();
694        // When running with full, these superfluous parquet files which are not actually
695        // referenced in the _delta_log commits should be considered for the
696        // low-orbit ion-cannon
697        assert_eq!(
698            files_deleted,
699            vec![
700                "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
701                "part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
702                "part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
703                "part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet",
704            ]
705        );
706        Ok(())
707    }
708
709    /// This test simply ensures that with_keep_versions invocation of [VacuumBuilder] removes
710    /// fewer files than a full vacuum.
711    #[tokio::test]
712    async fn test_vacuum_keep_version_sanity_check() -> DeltaResult<()> {
713        let table_loc = "../test/tests/data/simple_table";
714        let table_uri = ensure_table_uri(table_loc).unwrap();
715        let table = open_table(table_uri).await?;
716        let versions_to_keep = vec![3];
717
718        // First, vacuum without keeping any particular versions
719        let (_table, result) =
720            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
721                .with_retention_period(Duration::hours(0))
722                .with_dry_run(true)
723                .with_mode(VacuumMode::Full)
724                .with_enforce_retention_duration(false)
725                .await?;
726
727        // Our simple_table has 32 data files in it which could be vacuumed.
728        assert_eq!(32, result.files_deleted.len());
729
730        // Next, vacuum with specific versions retained
731        let (_table, result) =
732            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
733                .with_retention_period(Duration::hours(0))
734                .with_keep_versions(&versions_to_keep)
735                .with_dry_run(true)
736                .with_mode(VacuumMode::Full)
737                .with_enforce_retention_duration(false)
738                .await?;
739        assert_ne!(
740            32,
741            result.files_deleted.len(),
742            "with_keep_versions should have fewer files deleted than a full vacuum"
743        );
744
745        Ok(())
746    }
747
748    /// This test ensures that with_keep_versions invocations retain files which are removed within
749    /// the context of the kept ranges
750    #[tokio::test]
751    async fn test_vacuum_keep_version_add_removes() -> DeltaResult<()> {
752        let table_loc = "../test/tests/data/simple_table";
753        let table_uri = ensure_table_uri(table_loc).unwrap();
754        let table = open_table(table_uri).await?;
755        let versions_to_keep = vec![2, 3];
756
757        // First, vacuum without keeping any particular versions
758        let (_table, result) =
759            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
760                .with_retention_period(Duration::hours(0))
761                .with_dry_run(true)
762                .with_mode(VacuumMode::Full)
763                .with_enforce_retention_duration(false)
764                .await?;
765
766        // Our simple_table has 32 data files in it which could be vacuumed.
767        assert_eq!(32, result.files_deleted.len());
768
769        // Next, vacuum with specific versions retained
770        let (_table, result) =
771            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
772                .with_retention_period(Duration::hours(0))
773                .with_keep_versions(&versions_to_keep)
774                .with_dry_run(true)
775                .with_mode(VacuumMode::Full)
776                .with_enforce_retention_duration(false)
777                .await?;
778        assert_ne!(
779            32,
780            result.files_deleted.len(),
781            "with_keep_versions should have fewer files deleted than a full vacuum"
782        );
783
784        let kept_files = vec![
785            // Adds from v3
786            "part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet",
787            "part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet",
788            // Removes from v3, these were add in v2
789            "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet",
790            "part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet",
791        ];
792
793        for kept in kept_files {
794            assert!(
795                !result.files_deleted.contains(&kept.to_string()),
796                "files_deleted contains something which should be kept!: {:#?} {kept}",
797                result.files_deleted
798            )
799        }
800        Ok(())
801    }
802
803    #[tokio::test]
804    async fn test_vacuum_keep_versions_descending_order() -> DeltaResult<()> {
805        let table_loc = "../test/tests/data/simple_table";
806        let table_uri = ensure_table_uri(table_loc).unwrap();
807        let table = open_table(table_uri).await?;
808
809        let (_table, ascending_result) =
810            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
811                .with_retention_period(Duration::hours(0))
812                .with_keep_versions(&[0, 1, 2, 3])
813                .with_dry_run(true)
814                .with_mode(VacuumMode::Full)
815                .with_enforce_retention_duration(false)
816                .await?;
817
818        let (_table, descending_result) =
819            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
820                .with_retention_period(Duration::hours(0))
821                .with_keep_versions(&[3, 2, 1, 0])
822                .with_dry_run(true)
823                .with_mode(VacuumMode::Full)
824                .with_enforce_retention_duration(false)
825                .await?;
826
827        let mut ascending_files = ascending_result.files_deleted;
828        ascending_files.sort();
829        let mut descending_files = descending_result.files_deleted;
830        descending_files.sort();
831
832        assert_eq!(descending_files, ascending_files);
833        Ok(())
834    }
835
836    // This test will do some table operations after executing a vacuum with versions to ensure
837    // that the table is still functional, can be read, checkpointed, etc.
838    #[cfg(feature = "datafusion")]
839    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
840    async fn test_vacuum_keep_version_validity() {
841        use datafusion::prelude::SessionContext;
842        use object_store::GetResultPayload;
843        let store = InMemory::new();
844        let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
845        let mut stream = source.list(None);
846
847        while let Some(Ok(entity)) = stream.next().await {
848            let mut contents = vec![];
849            match source.get(&entity.location).await.unwrap().payload {
850                GetResultPayload::File(mut fd, _path) => {
851                    fd.read_to_end(&mut contents).unwrap();
852                }
853                _ => panic!("We should only be dealing in files!"),
854            }
855            let content = bytes::Bytes::from(contents);
856            store
857                .put(&entity.location, PutPayload::from_bytes(content))
858                .await
859                .unwrap();
860        }
861
862        let table_url = url::Url::parse("memory:///").unwrap();
863        let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
864            .unwrap()
865            .with_storage_backend(Arc::new(store), table_url)
866            .build()
867            .unwrap();
868        table.load().await.unwrap();
869
870        let (mut table, result) = VacuumBuilder::new(
871            table.log_store(),
872            Some(table.snapshot().unwrap().snapshot.clone()),
873        )
874        .with_retention_period(Duration::hours(0))
875        .with_keep_versions(&[2, 3])
876        .with_mode(VacuumMode::Full)
877        .with_enforce_retention_duration(false)
878        .await
879        .unwrap();
880        // Our simple_table has 32 data files in it, and we shouldn't have deleted them all!
881        assert_ne!(32, result.files_deleted.len());
882
883        // Can we checkpoint it?
884        crate::checkpoints::create_checkpoint(&table, None)
885            .await
886            .unwrap();
887        table.load().await.unwrap();
888        assert_eq!(Some(6), table.version());
889
890        let ctx = SessionContext::new();
891        table.update_datafusion_session(&ctx.state()).unwrap();
892        ctx.register_table("test", table.table_provider().await.unwrap())
893            .unwrap();
894        let _batches = ctx
895            .sql("SELECT * FROM test")
896            .await
897            .unwrap()
898            .collect()
899            .await
900            .unwrap();
901    }
902
903    #[tokio::test]
904    async fn vacuum_delta_8_0_table() -> DeltaResult<()> {
905        let table_path = Path::new("../test/tests/data/delta-0.8.0");
906        let table_uri =
907            Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
908        let table = open_table(table_uri).await.unwrap();
909
910        let result = VacuumBuilder::new(
911            table.log_store(),
912            Some(table.snapshot().unwrap().snapshot.clone()),
913        )
914        .with_retention_period(Duration::hours(1))
915        .with_dry_run(true)
916        .await;
917
918        assert!(result.is_err());
919
920        let table_path = Path::new("../test/tests/data/delta-0.8.0");
921        let table_uri =
922            Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
923        let table = open_table(table_uri).await.unwrap();
924
925        let (table, result) = VacuumBuilder::new(
926            table.log_store(),
927            Some(table.snapshot().unwrap().snapshot.clone()),
928        )
929        .with_retention_period(Duration::hours(0))
930        .with_dry_run(true)
931        .with_enforce_retention_duration(false)
932        .await?;
933        // do not enforce retention duration check with 0 hour will purge all files
934        assert_eq!(
935            result.files_deleted,
936            vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
937        );
938
939        let (table, result) = VacuumBuilder::new(
940            table.log_store(),
941            Some(table.snapshot().unwrap().snapshot.clone()),
942        )
943        .with_retention_period(Duration::hours(169))
944        .with_dry_run(true)
945        .await?;
946
947        assert_eq!(
948            result.files_deleted,
949            vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
950        );
951
952        let retention_hours = SystemTime::now()
953            .duration_since(SystemTime::UNIX_EPOCH)
954            .unwrap()
955            .as_secs()
956            / 3600;
957        let empty: Vec<String> = Vec::new();
958        let (_table, result) = VacuumBuilder::new(
959            table.log_store(),
960            Some(table.snapshot().unwrap().snapshot.clone()),
961        )
962        .with_retention_period(Duration::hours(retention_hours as i64))
963        .with_dry_run(true)
964        .await?;
965
966        assert_eq!(result.files_deleted, empty);
967        Ok(())
968    }
969
970    /// Mock clock for testing time-dependent vacuum behavior
971    #[derive(Debug, Clone)]
972    struct MockClock {
973        timestamp_millis: i64,
974    }
975
976    impl MockClock {
977        fn new(timestamp_millis: i64) -> Self {
978            Self { timestamp_millis }
979        }
980    }
981
982    impl Clock for MockClock {
983        fn current_timestamp_millis(&self) -> i64 {
984            self.timestamp_millis
985        }
986    }
987
988    fn set_last_modified(path: &Path, last_modified: SystemTime) {
989        let file = OpenOptions::new().write(true).open(path).unwrap();
990        let times = FileTimes::new()
991            .set_accessed(last_modified)
992            .set_modified(last_modified);
993        file.set_times(times).unwrap();
994    }
995
996    #[tokio::test]
997    async fn test_vacuum_full_recent_tombstones_are_not_treated_as_orphans() -> DeltaResult<()> {
998        let temp_dir = tempfile::tempdir().unwrap();
999        let table_path = temp_dir.path().to_str().unwrap();
1000        let mut table = create_initialized_table(table_path, &[]).await;
1001        let current_time = SystemTime::now();
1002        let current_time_millis =
1003            current_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
1004        let stale_time = current_time - StdDuration::from_secs(10);
1005        let recent_time = current_time - StdDuration::from_secs(1);
1006        let original_data = json!({
1007            "id": "A",
1008            "value": 1,
1009            "modified": "2021-02-01"
1010        });
1011        let replacement_data = json!({
1012            "id": "B",
1013            "value": 2,
1014            "modified": "2021-02-02"
1015        });
1016
1017        let mut writer = JsonWriter::for_table(&table)?;
1018        writer.write(vec![original_data]).await?;
1019        writer.flush_and_commit(&mut table).await?;
1020
1021        let tombstoned_paths: Vec<_> = table
1022            .snapshot()?
1023            .log_data()
1024            .into_iter()
1025            .map(|add| add.object_store_path().to_string())
1026            .collect();
1027        assert_eq!(tombstoned_paths.len(), 1);
1028        let recent_tombstone_path = tombstoned_paths[0].clone();
1029        set_last_modified(&temp_dir.path().join(&recent_tombstone_path), stale_time);
1030
1031        let stale_orphan_path = "orphan-old.parquet";
1032        std::fs::write(temp_dir.path().join(stale_orphan_path), b"stale orphan").unwrap();
1033        set_last_modified(&temp_dir.path().join(stale_orphan_path), stale_time);
1034
1035        let remove_actions = table
1036            .snapshot()?
1037            .snapshot()
1038            .file_views(&table.log_store(), None)
1039            .map_ok(|file| {
1040                let mut remove = file.remove_action(true);
1041                remove.deletion_timestamp = Some(current_time_millis);
1042                Action::Remove(remove)
1043            })
1044            .try_collect::<Vec<_>>()
1045            .await?;
1046        let mut overwrite_writer = JsonWriter::for_table(&table)?;
1047        overwrite_writer.write(vec![replacement_data]).await?;
1048        let add_actions = overwrite_writer.flush().await?.into_iter().map(Action::Add);
1049        let mut actions = remove_actions;
1050        actions.extend(add_actions);
1051        let operation = DeltaOperation::Write {
1052            mode: SaveMode::Overwrite,
1053            partition_by: None,
1054            predicate: None,
1055        };
1056        CommitBuilder::default()
1057            .with_actions(actions)
1058            .build(
1059                Some(table.snapshot()?),
1060                table.log_store().clone(),
1061                operation,
1062            )
1063            .await?;
1064        table.update_state().await?;
1065
1066        let recent_orphan_path = "orphan-recent.parquet";
1067        std::fs::write(temp_dir.path().join(recent_orphan_path), b"recent orphan").unwrap();
1068        set_last_modified(&temp_dir.path().join(recent_orphan_path), recent_time);
1069
1070        let (_table, result) =
1071            VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
1072                .with_retention_period(Duration::seconds(5))
1073                .with_dry_run(true)
1074                .with_mode(VacuumMode::Full)
1075                .with_enforce_retention_duration(false)
1076                .with_clock(Arc::new(MockClock::new(current_time_millis)))
1077                .await?;
1078
1079        assert!(
1080            !result.files_deleted.contains(&recent_tombstone_path),
1081            "recent tombstone was treated like an orphan: {:?}",
1082            result.files_deleted
1083        );
1084        assert!(
1085            result
1086                .files_deleted
1087                .contains(&stale_orphan_path.to_string()),
1088            "stale orphan should still be vacuum eligible: {:?}",
1089            result.files_deleted
1090        );
1091        assert!(
1092            !result
1093                .files_deleted
1094                .contains(&recent_orphan_path.to_string()),
1095            "recent orphan should still be protected: {:?}",
1096            result.files_deleted
1097        );
1098
1099        Ok(())
1100    }
1101
1102    /// Test that recently written uncommitted files are protected from deletion in Full mode
1103    /// This tests the fix for the race condition where concurrent writer's files could be deleted
1104    #[tokio::test]
1105    async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> {
1106        use chrono::DateTime;
1107        use object_store::GetResultPayload;
1108
1109        let store = InMemory::new();
1110        let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
1111        let mut stream = source.list(None);
1112
1113        while let Some(Ok(entity)) = stream.next().await {
1114            let mut contents = vec![];
1115            match source.get(&entity.location).await.unwrap().payload {
1116                GetResultPayload::File(mut fd, _path) => {
1117                    fd.read_to_end(&mut contents).unwrap();
1118                }
1119                _ => panic!("We should only be dealing in files!"),
1120            }
1121            let content = bytes::Bytes::from(contents);
1122            store
1123                .put(&entity.location, PutPayload::from_bytes(content))
1124                .await
1125                .unwrap();
1126        }
1127
1128        // Add a "recently written" orphaned file that simulates an uncommitted file
1129        let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet");
1130        store
1131            .put(
1132                &recent_file_path,
1133                PutPayload::from_bytes(bytes::Bytes::from("test data")),
1134            )
1135            .await
1136            .unwrap();
1137
1138        let table_url = url::Url::parse("memory:///").unwrap();
1139        let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
1140            .unwrap()
1141            .with_storage_backend(Arc::new(store), table_url)
1142            .build()
1143            .unwrap();
1144        table.load().await.unwrap();
1145
1146        // Set current time to 10 days after epoch
1147        let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0)
1148            .unwrap()
1149            .timestamp_millis();
1150        let mock_clock = Arc::new(MockClock::new(current_time));
1151
1152        // Run vacuum with 7-day retention in Full mode
1153        // The recent file should NOT be deleted because it's too new
1154        let (_table, result) = VacuumBuilder::new(
1155            table.log_store(),
1156            Some(table.snapshot().unwrap().snapshot.clone()),
1157        )
1158        .with_retention_period(Duration::days(7))
1159        .with_dry_run(true)
1160        .with_mode(VacuumMode::Full)
1161        .with_enforce_retention_duration(false)
1162        .with_clock(mock_clock)
1163        .await
1164        .unwrap();
1165
1166        // The recent uncommitted file should NOT be in the deletion list
1167        assert!(
1168            !result.files_deleted.contains(&recent_file_path.to_string()),
1169            "Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}",
1170            result.files_deleted
1171        );
1172
1173        Ok(())
1174    }
1175}