use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use chrono::{Duration, Utc};
use futures::future::{BoxFuture, ready};
use futures::{StreamExt, TryStreamExt};
use object_store::{Error, ObjectStore, path::Path};
use serde::Serialize;
use tracing::*;
use super::{CustomExecuteHandler, Operation};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, TombstoneView, Version, resolve_snapshot};
use crate::logstore::{LogStore, LogStoreRef};
use crate::protocol::DeltaOperation;
use crate::table::config::TablePropertiesExt as _;
use crate::table::state::DeltaTableState;
use crate::{DeltaTable, DeltaTableConfig};
#[derive(thiserror::Error, Debug)]
enum VacuumError {
#[error(
"Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided
)]
InvalidVacuumRetentionPeriod {
provided: i64,
min: i64,
},
#[error(transparent)]
DeltaTable(#[from] DeltaTableError),
}
impl From<VacuumError> for DeltaTableError {
fn from(err: VacuumError) -> Self {
DeltaTableError::GenericError {
source: Box::new(err),
}
}
}
pub trait Clock: Debug + Send + Sync {
fn current_timestamp_millis(&self) -> i64;
}
#[derive(Debug, Default, Clone, PartialEq)]
pub enum VacuumMode {
#[default]
Lite,
Full,
}
pub struct VacuumBuilder {
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
retention_period: Option<Duration>,
enforce_retention_duration: bool,
keep_versions: Option<Vec<Version>>,
dry_run: bool,
mode: VacuumMode,
clock: Option<Arc<dyn Clock>>,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl super::Operation for VacuumBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
#[derive(Debug, Default)]
pub struct VacuumMetrics {
pub dry_run: bool,
pub files_deleted: Vec<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumStartOperationMetrics {
pub num_files_to_delete: i64,
pub size_of_data_to_delete: i64,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumEndOperationMetrics {
pub num_deleted_files: i64,
pub num_vacuumed_directories: i64,
}
impl VacuumBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
VacuumBuilder {
snapshot,
log_store,
retention_period: None,
enforce_retention_duration: true,
keep_versions: None,
dry_run: false,
mode: VacuumMode::Lite,
clock: None,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_retention_period(mut self, retention_period: Duration) -> Self {
self.retention_period = Some(retention_period);
self
}
pub fn with_keep_versions(mut self, versions: &[Version]) -> Self {
warn!("Using experimental API VacuumBuilder::with_keep_versions");
self.keep_versions = Some(versions.to_vec());
self
}
pub fn with_mode(mut self, mode: VacuumMode) -> Self {
self.mode = mode;
self
}
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
pub fn with_enforce_retention_duration(mut self, enforce: bool) -> Self {
self.enforce_retention_duration = enforce;
self
}
#[doc(hidden)]
pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
self.clock = Some(clock);
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
async fn create_vacuum_plan(
&self,
snapshot: &EagerSnapshot,
) -> Result<VacuumPlan, VacuumError> {
if self.mode == VacuumMode::Full {
info!(
"Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!"
);
}
let min_retention = Duration::milliseconds(
snapshot
.table_properties()
.deleted_file_retention_duration()
.as_millis() as i64,
);
let retention_period = self.retention_period.unwrap_or(min_retention);
let enforce_retention_duration = self.enforce_retention_duration;
if enforce_retention_duration && retention_period < min_retention {
return Err(VacuumError::InvalidVacuumRetentionPeriod {
provided: retention_period.num_hours(),
min: min_retention.num_hours(),
});
}
let now_millis = match &self.clock {
Some(clock) => clock.current_timestamp_millis(),
None => Utc::now().timestamp_millis(),
};
let keep_files = match &self.keep_versions {
Some(versions) => {
let mut sorted_versions = versions.clone();
sorted_versions.sort();
let mut sorted_versions = sorted_versions.into_iter();
match sorted_versions.next() {
Some(initial_version) => {
let mut keep_files: HashSet<String> = HashSet::new();
let mut state = DeltaTableState::try_new(
&self.log_store,
DeltaTableConfig::default(),
Some(initial_version),
)
.await?;
let mut record_keep_files = |version: Version, state: &DeltaTableState| {
let files: Vec<String> = state
.log_data()
.into_iter()
.map(|add| add.object_store_path())
.map(|path| path.to_string())
.collect();
debug!("keep version:{version}\n, {files:#?}");
keep_files.extend(files);
};
record_keep_files(initial_version, &state);
for version in sorted_versions {
state.update(&self.log_store, Some(version)).await?;
record_keep_files(version, &state);
}
keep_files
}
None => HashSet::new(),
}
}
_ => HashSet::new(),
};
let mut file_count = 0;
let tombstone_retention_timestamp = now_millis - retention_period.num_milliseconds();
let (expired_tombstones, tombstone_path_sets) = if self.mode == VacuumMode::Full {
collect_full_mode_tombstones(snapshot, tombstone_retention_timestamp, &self.log_store)
.await?
} else {
(
get_stale_files(snapshot, retention_period, now_millis, &self.log_store).await?,
TombstonePathSets::default(),
)
};
let valid_files: HashSet<_> = snapshot
.file_views(self.log_store.as_ref(), None)
.map_ok(|f| f.object_store_path())
.try_collect()
.await?;
let partition_columns = snapshot.metadata().partition_columns();
let mut files_to_delete = vec![];
let mut file_sizes = vec![];
for tombs in expired_tombstones.iter() {
let path = Path::from(tombs.path().to_string());
if ok_to_delete(&path, &valid_files, &keep_files, partition_columns)? {
files_to_delete.push(path);
file_sizes.push(tombs.size().unwrap_or(0));
}
}
if self.mode == VacuumMode::Full {
let object_store = self.log_store.object_store(None);
let list_span = info_span!("list_files", operation = "vacuum");
let mut all_files = list_span.in_scope(|| object_store.list(None));
while let Some(obj_meta) = all_files.next().await {
let obj_meta = obj_meta.map_err(DeltaTableError::from)?;
if tombstone_path_sets
.expired_tombstone_paths
.contains(&obj_meta.location)
{
debug!(
"The file {:?} is already queued as an expired tombstone",
&obj_meta.location,
);
continue;
}
if !ok_to_delete(
&obj_meta.location,
&valid_files,
&keep_files,
partition_columns,
)? {
continue;
}
if tombstone_path_sets
.all_tombstone_paths
.contains(&obj_meta.location)
{
debug!(
"The file {:?} has a recent tombstone, keeping it until tombstone retention expires",
&obj_meta.location,
);
continue;
}
let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis();
if file_age_millis < retention_period.num_milliseconds() {
debug!(
"The file {:?} is an untracked recent file, protecting it from vacuum",
&obj_meta.location,
);
continue;
}
debug!(
"The file {:?} is an untracked stale orphan and will be vacuumed in full mode",
&obj_meta.location
);
files_to_delete.push(obj_meta.location);
file_sizes.push(obj_meta.size as i64);
file_count += 1;
}
}
info!(
files_scanned = file_count,
files_to_delete = files_to_delete.len(),
"vacuum file listing completed"
);
Ok(VacuumPlan {
files_to_delete,
file_sizes,
retention_check_enabled: enforce_retention_duration,
default_retention_millis: min_retention.num_milliseconds(),
specified_retention_millis: Some(retention_period.num_milliseconds()),
})
}
}
impl std::future::IntoFuture for VacuumBuilder {
type Output = DeltaResult<(DeltaTable, VacuumMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
let plan = this.create_vacuum_plan(&snapshot).await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
VacuumMetrics {
files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(),
dry_run: true,
},
));
}
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let result = plan
.execute(
this.log_store.clone(),
&snapshot,
this.commit_properties.clone(),
operation_id,
this.get_custom_execute_handler(),
)
.await?;
this.post_execute(operation_id).await?;
Ok(match result {
Some((snapshot, metrics)) => (
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
),
None => (
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
Default::default(),
),
})
})
}
}
struct VacuumPlan {
pub files_to_delete: Vec<Path>,
pub file_sizes: Vec<i64>,
pub retention_check_enabled: bool,
pub default_retention_millis: i64,
pub specified_retention_millis: Option<i64>,
}
impl VacuumPlan {
pub async fn execute(
self,
store: LogStoreRef,
snapshot: &EagerSnapshot,
mut commit_properties: CommitProperties,
operation_id: uuid::Uuid,
handle: Option<Arc<dyn CustomExecuteHandler>>,
) -> Result<Option<(DeltaTableState, VacuumMetrics)>, DeltaTableError> {
if self.files_to_delete.is_empty() {
return Ok(None);
}
let start_operation = DeltaOperation::VacuumStart {
retention_check_enabled: self.retention_check_enabled,
specified_retention_millis: self.specified_retention_millis,
default_retention_millis: self.default_retention_millis,
};
let end_operation = DeltaOperation::VacuumEnd {
status: String::from("COMPLETED"), };
let start_metrics = VacuumStartOperationMetrics {
num_files_to_delete: self.files_to_delete.len() as i64,
size_of_data_to_delete: self.file_sizes.iter().sum(),
};
let mut start_props = CommitProperties::default();
start_props.app_metadata = commit_properties.app_metadata.clone();
start_props.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(start_metrics)?,
);
let last_commit = CommitBuilder::from(start_props)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handle.clone())
.build(Some(snapshot), store.clone(), start_operation)
.await?;
let locations = futures::stream::iter(self.files_to_delete)
.map(Result::Ok)
.boxed();
let files_deleted = store
.object_store(Some(operation_id))
.delete_stream(locations)
.map(|res| match res {
Ok(path) => Ok(path.to_string()),
Err(Error::NotFound { path, .. }) => Ok(path),
Err(err) => Err(err),
})
.try_collect::<Vec<_>>()
.await?;
let end_metrics = VacuumEndOperationMetrics {
num_deleted_files: files_deleted.len() as i64,
num_vacuumed_directories: 0, };
commit_properties.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(end_metrics)?,
);
let last_commit = CommitBuilder::from(commit_properties)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handle)
.build(Some(&last_commit.snapshot), store.clone(), end_operation)
.await?;
Ok(Some((
last_commit.snapshot,
VacuumMetrics {
files_deleted,
dry_run: false,
},
)))
}
}
#[derive(Debug, Default, PartialEq, Eq)]
struct TombstonePathSets {
expired_tombstone_paths: HashSet<Path>,
all_tombstone_paths: HashSet<Path>,
}
impl TombstonePathSets {
fn record(&mut self, path: Path, is_expired: bool) {
if is_expired {
self.expired_tombstone_paths.insert(path.clone());
}
self.all_tombstone_paths.insert(path);
}
}
fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> {
let path_name = path.to_string();
Ok((path_name.starts_with('.') || path_name.starts_with('_'))
&& !path_name.starts_with("_delta_index")
&& !path_name.starts_with("_change_data")
&& !partition_columns
.iter()
.any(|partition_column| path_name.starts_with(partition_column)))
}
fn ok_to_delete(
location: &Path,
valid_files: &HashSet<Path>,
keep_files: &HashSet<String>,
partition_columns: &[String],
) -> Result<bool, DeltaTableError> {
Ok(
!(valid_files.contains(location) || keep_files.contains(&location.to_string()) || is_hidden_directory(partition_columns, location)?),
)
}
async fn collect_full_mode_tombstones(
snapshot: &EagerSnapshot,
tombstone_retention_timestamp: i64,
store: &dyn LogStore,
) -> DeltaResult<(Vec<TombstoneView>, TombstonePathSets)> {
snapshot
.snapshot()
.tombstones(store)
.try_fold(
(Vec::new(), TombstonePathSets::default()),
|(mut expired_tombstones, mut tombstone_path_sets), tombstone| {
let is_expired = is_tombstone_expired(&tombstone, tombstone_retention_timestamp);
let path = Path::from(tombstone.path().to_string());
tombstone_path_sets.record(path, is_expired);
if is_expired {
expired_tombstones.push(tombstone);
}
ready(Ok((expired_tombstones, tombstone_path_sets)))
},
)
.await
}
async fn get_stale_files(
snapshot: &EagerSnapshot,
retention_period: Duration,
now_timestamp_millis: i64,
store: &dyn LogStore,
) -> DeltaResult<Vec<TombstoneView>> {
let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds();
snapshot
.snapshot()
.tombstones(store)
.try_filter(|tombstone| {
ready(is_tombstone_expired(
tombstone,
tombstone_retention_timestamp,
))
})
.try_collect::<Vec<_>>()
.await
}
fn is_tombstone_expired(tombstone: &TombstoneView, tombstone_retention_timestamp: i64) -> bool {
tombstone.deletion_timestamp().unwrap_or(0) < tombstone_retention_timestamp
}
#[cfg(test)]
mod tests {
use object_store::{ObjectStoreExt as _, PutPayload, local::LocalFileSystem, memory::InMemory};
use serde_json::json;
use super::*;
use crate::kernel::Action;
use crate::kernel::transaction::CommitBuilder;
use crate::protocol::SaveMode;
use crate::writer::test_utils::create_initialized_table;
use crate::writer::{DeltaWriter, JsonWriter};
use crate::{ensure_table_uri, open_table};
use std::path::Path;
use std::{
fs::{FileTimes, OpenOptions},
io::Read,
time::{Duration as StdDuration, SystemTime, UNIX_EPOCH},
};
use url::Url;
#[tokio::test]
async fn test_vacuum_full() -> DeltaResult<()> {
let table_path = Path::new("../test/tests/data/simple_commit");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await?;
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Lite)
.with_enforce_retention_duration(false)
.await?;
assert!(result.files_deleted.is_empty());
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let mut files_deleted = result.files_deleted.clone();
files_deleted.sort();
assert_eq!(
files_deleted,
vec![
"part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
"part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
"part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
"part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet",
]
);
Ok(())
}
#[tokio::test]
async fn test_vacuum_keep_version_sanity_check() -> DeltaResult<()> {
let table_loc = "../test/tests/data/simple_table";
let table_uri = ensure_table_uri(table_loc).unwrap();
let table = open_table(table_uri).await?;
let versions_to_keep = vec![3];
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_eq!(32, result.files_deleted.len());
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_ne!(
32,
result.files_deleted.len(),
"with_keep_versions should have fewer files deleted than a full vacuum"
);
Ok(())
}
#[tokio::test]
async fn test_vacuum_keep_version_add_removes() -> DeltaResult<()> {
let table_loc = "../test/tests/data/simple_table";
let table_uri = ensure_table_uri(table_loc).unwrap();
let table = open_table(table_uri).await?;
let versions_to_keep = vec![2, 3];
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_eq!(32, result.files_deleted.len());
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_keep_versions(&versions_to_keep)
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
assert_ne!(
32,
result.files_deleted.len(),
"with_keep_versions should have fewer files deleted than a full vacuum"
);
let kept_files = vec![
"part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet",
"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet",
"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet",
"part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet",
];
for kept in kept_files {
assert!(
!result.files_deleted.contains(&kept.to_string()),
"files_deleted contains something which should be kept!: {:#?} {kept}",
result.files_deleted
)
}
Ok(())
}
#[tokio::test]
async fn test_vacuum_keep_versions_descending_order() -> DeltaResult<()> {
let table_loc = "../test/tests/data/simple_table";
let table_uri = ensure_table_uri(table_loc).unwrap();
let table = open_table(table_uri).await?;
let (_table, ascending_result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_keep_versions(&[0, 1, 2, 3])
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let (_table, descending_result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::hours(0))
.with_keep_versions(&[3, 2, 1, 0])
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await?;
let mut ascending_files = ascending_result.files_deleted;
ascending_files.sort();
let mut descending_files = descending_result.files_deleted;
descending_files.sort();
assert_eq!(descending_files, ascending_files);
Ok(())
}
#[cfg(feature = "datafusion")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_vacuum_keep_version_validity() {
use datafusion::prelude::SessionContext;
use object_store::GetResultPayload;
let store = InMemory::new();
let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
let mut stream = source.list(None);
while let Some(Ok(entity)) = stream.next().await {
let mut contents = vec![];
match source.get(&entity.location).await.unwrap().payload {
GetResultPayload::File(mut fd, _path) => {
fd.read_to_end(&mut contents).unwrap();
}
_ => panic!("We should only be dealing in files!"),
}
let content = bytes::Bytes::from(contents);
store
.put(&entity.location, PutPayload::from_bytes(content))
.await
.unwrap();
}
let table_url = url::Url::parse("memory:///").unwrap();
let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
.unwrap()
.with_storage_backend(Arc::new(store), table_url)
.build()
.unwrap();
table.load().await.unwrap();
let (mut table, result) = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::hours(0))
.with_keep_versions(&[2, 3])
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.await
.unwrap();
assert_ne!(32, result.files_deleted.len());
crate::checkpoints::create_checkpoint(&table, None)
.await
.unwrap();
table.load().await.unwrap();
assert_eq!(Some(6), table.version());
let ctx = SessionContext::new();
table.update_datafusion_session(&ctx.state()).unwrap();
ctx.register_table("test", table.table_provider().await.unwrap())
.unwrap();
let _batches = ctx
.sql("SELECT * FROM test")
.await
.unwrap()
.collect()
.await
.unwrap();
}
#[tokio::test]
async fn vacuum_delta_8_0_table() -> DeltaResult<()> {
let table_path = Path::new("../test/tests/data/delta-0.8.0");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await.unwrap();
let result = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::hours(1))
.with_dry_run(true)
.await;
assert!(result.is_err());
let table_path = Path::new("../test/tests/data/delta-0.8.0");
let table_uri =
Url::from_directory_path(std::fs::canonicalize(table_path).unwrap()).unwrap();
let table = open_table(table_uri).await.unwrap();
let (table, result) = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::hours(0))
.with_dry_run(true)
.with_enforce_retention_duration(false)
.await?;
assert_eq!(
result.files_deleted,
vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
);
let (table, result) = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::hours(169))
.with_dry_run(true)
.await?;
assert_eq!(
result.files_deleted,
vec!["part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"]
);
let retention_hours = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
/ 3600;
let empty: Vec<String> = Vec::new();
let (_table, result) = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::hours(retention_hours as i64))
.with_dry_run(true)
.await?;
assert_eq!(result.files_deleted, empty);
Ok(())
}
#[derive(Debug, Clone)]
struct MockClock {
timestamp_millis: i64,
}
impl MockClock {
fn new(timestamp_millis: i64) -> Self {
Self { timestamp_millis }
}
}
impl Clock for MockClock {
fn current_timestamp_millis(&self) -> i64 {
self.timestamp_millis
}
}
fn set_last_modified(path: &Path, last_modified: SystemTime) {
let file = OpenOptions::new().write(true).open(path).unwrap();
let times = FileTimes::new()
.set_accessed(last_modified)
.set_modified(last_modified);
file.set_times(times).unwrap();
}
#[tokio::test]
async fn test_vacuum_full_recent_tombstones_are_not_treated_as_orphans() -> DeltaResult<()> {
let temp_dir = tempfile::tempdir().unwrap();
let table_path = temp_dir.path().to_str().unwrap();
let mut table = create_initialized_table(table_path, &[]).await;
let current_time = SystemTime::now();
let current_time_millis =
current_time.duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let stale_time = current_time - StdDuration::from_secs(10);
let recent_time = current_time - StdDuration::from_secs(1);
let original_data = json!({
"id": "A",
"value": 1,
"modified": "2021-02-01"
});
let replacement_data = json!({
"id": "B",
"value": 2,
"modified": "2021-02-02"
});
let mut writer = JsonWriter::for_table(&table)?;
writer.write(vec![original_data]).await?;
writer.flush_and_commit(&mut table).await?;
let tombstoned_paths: Vec<_> = table
.snapshot()?
.log_data()
.into_iter()
.map(|add| add.object_store_path().to_string())
.collect();
assert_eq!(tombstoned_paths.len(), 1);
let recent_tombstone_path = tombstoned_paths[0].clone();
set_last_modified(&temp_dir.path().join(&recent_tombstone_path), stale_time);
let stale_orphan_path = "orphan-old.parquet";
std::fs::write(temp_dir.path().join(stale_orphan_path), b"stale orphan").unwrap();
set_last_modified(&temp_dir.path().join(stale_orphan_path), stale_time);
let remove_actions = table
.snapshot()?
.snapshot()
.file_views(&table.log_store(), None)
.map_ok(|file| {
let mut remove = file.remove_action(true);
remove.deletion_timestamp = Some(current_time_millis);
Action::Remove(remove)
})
.try_collect::<Vec<_>>()
.await?;
let mut overwrite_writer = JsonWriter::for_table(&table)?;
overwrite_writer.write(vec![replacement_data]).await?;
let add_actions = overwrite_writer.flush().await?.into_iter().map(Action::Add);
let mut actions = remove_actions;
actions.extend(add_actions);
let operation = DeltaOperation::Write {
mode: SaveMode::Overwrite,
partition_by: None,
predicate: None,
};
CommitBuilder::default()
.with_actions(actions)
.build(
Some(table.snapshot()?),
table.log_store().clone(),
operation,
)
.await?;
table.update_state().await?;
let recent_orphan_path = "orphan-recent.parquet";
std::fs::write(temp_dir.path().join(recent_orphan_path), b"recent orphan").unwrap();
set_last_modified(&temp_dir.path().join(recent_orphan_path), recent_time);
let (_table, result) =
VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone()))
.with_retention_period(Duration::seconds(5))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.with_clock(Arc::new(MockClock::new(current_time_millis)))
.await?;
assert!(
!result.files_deleted.contains(&recent_tombstone_path),
"recent tombstone was treated like an orphan: {:?}",
result.files_deleted
);
assert!(
result
.files_deleted
.contains(&stale_orphan_path.to_string()),
"stale orphan should still be vacuum eligible: {:?}",
result.files_deleted
);
assert!(
!result
.files_deleted
.contains(&recent_orphan_path.to_string()),
"recent orphan should still be protected: {:?}",
result.files_deleted
);
Ok(())
}
#[tokio::test]
async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> {
use chrono::DateTime;
use object_store::GetResultPayload;
let store = InMemory::new();
let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
let mut stream = source.list(None);
while let Some(Ok(entity)) = stream.next().await {
let mut contents = vec![];
match source.get(&entity.location).await.unwrap().payload {
GetResultPayload::File(mut fd, _path) => {
fd.read_to_end(&mut contents).unwrap();
}
_ => panic!("We should only be dealing in files!"),
}
let content = bytes::Bytes::from(contents);
store
.put(&entity.location, PutPayload::from_bytes(content))
.await
.unwrap();
}
let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet");
store
.put(
&recent_file_path,
PutPayload::from_bytes(bytes::Bytes::from("test data")),
)
.await
.unwrap();
let table_url = url::Url::parse("memory:///").unwrap();
let mut table = crate::DeltaTableBuilder::from_url(table_url.clone())
.unwrap()
.with_storage_backend(Arc::new(store), table_url)
.build()
.unwrap();
table.load().await.unwrap();
let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0)
.unwrap()
.timestamp_millis();
let mock_clock = Arc::new(MockClock::new(current_time));
let (_table, result) = VacuumBuilder::new(
table.log_store(),
Some(table.snapshot().unwrap().snapshot.clone()),
)
.with_retention_period(Duration::days(7))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.with_clock(mock_clock)
.await
.unwrap();
assert!(
!result.files_deleted.contains(&recent_file_path.to_string()),
"Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}",
result.files_deleted
);
Ok(())
}
}