use chrono::{DateTime, TimeDelta, Utc};
use futures::{stream, StreamExt, TryStreamExt};
use lance_core::{Error, Result};
use lance_table::{
format::{Index, Manifest},
io::{
deletion::deletion_file_path,
manifest::{read_manifest, read_manifest_indexes},
},
};
use object_store::path::Path;
use std::{
collections::{HashMap, HashSet},
future,
sync::{Mutex, MutexGuard},
};
use crate::{utils::temporal::utc_now, Dataset};
use super::refs::TagContents;
#[derive(Clone, Debug, Default)]
struct ReferencedFiles {
data_paths: HashSet<Path>,
delete_paths: HashSet<Path>,
tx_paths: HashSet<Path>,
index_uuids: HashSet<String>,
}
#[derive(Clone, Debug, Default)]
pub struct RemovalStats {
pub bytes_removed: u64,
pub old_versions: u64,
}
fn remove_prefix(path: &Path, prefix: &Path) -> Path {
let relative_parts = path.prefix_match(prefix);
if relative_parts.is_none() {
return path.clone();
}
Path::from_iter(relative_parts.unwrap())
}
#[derive(Clone, Debug)]
struct CleanupTask<'a> {
dataset: &'a Dataset,
before: DateTime<Utc>,
delete_unverified: bool,
error_if_old_versions_tagged: bool,
}
#[derive(Clone, Debug, Default)]
struct CleanupInspection {
old_manifests: Vec<Path>,
referenced_files: ReferencedFiles,
verified_files: ReferencedFiles,
tagged_old_versions: HashSet<u64>,
}
const UNVERIFIED_THRESHOLD_DAYS: i64 = 7;
impl<'a> CleanupTask<'a> {
fn new(
dataset: &'a Dataset,
before: DateTime<Utc>,
delete_unverified: bool,
error_if_old_versions_tagged: bool,
) -> Self {
Self {
dataset,
before,
delete_unverified,
error_if_old_versions_tagged,
}
}
async fn run(self) -> Result<RemovalStats> {
let tags = self.dataset.tags.list().await?;
let tagged_versions: HashSet<u64> = tags.values().map(|v| v.version).collect();
let inspection = self.process_manifests(&tagged_versions).await?;
if self.error_if_old_versions_tagged && !inspection.tagged_old_versions.is_empty() {
return Err(tagged_old_versions_cleanup_error(
&tags,
&inspection.tagged_old_versions,
));
}
self.delete_unreferenced_files(inspection).await
}
async fn process_manifests(
&'a self,
tagged_versions: &HashSet<u64>,
) -> Result<CleanupInspection> {
let inspection = Mutex::new(CleanupInspection::default());
self.dataset
.commit_handler
.list_manifests(&self.dataset.base, &self.dataset.object_store.inner)
.await?
.try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |path| {
self.process_manifest_file(path, &inspection, tagged_versions)
})
.await?;
Ok(inspection.into_inner().unwrap())
}
async fn process_manifest_file(
&self,
path: Path,
inspection: &Mutex<CleanupInspection>,
tagged_versions: &HashSet<u64>,
) -> Result<()> {
let manifest = read_manifest(&self.dataset.object_store, &path).await?;
let dataset_version = self.dataset.version().version;
let is_latest = dataset_version <= manifest.version;
let is_tagged = tagged_versions.contains(&manifest.version);
let in_working_set = is_latest || manifest.timestamp() >= self.before || is_tagged;
let indexes = read_manifest_indexes(&self.dataset.object_store, &path, &manifest).await?;
let mut inspection = inspection.lock().unwrap();
if is_tagged {
inspection.tagged_old_versions.insert(manifest.version);
}
self.process_manifest(&manifest, &indexes, in_working_set, &mut inspection)?;
if !in_working_set {
inspection.old_manifests.push(path.clone());
}
Ok(())
}
fn process_manifest(
&self,
manifest: &Manifest,
indexes: &Vec<Index>,
in_working_set: bool,
inspection: &mut MutexGuard<CleanupInspection>,
) -> Result<()> {
let referenced_files = if in_working_set {
&mut inspection.referenced_files
} else {
&mut inspection.verified_files
};
for fragment in manifest.fragments.iter() {
for file in fragment.files.iter() {
let full_data_path = self.dataset.data_dir().child(file.path.as_str());
let relative_data_path = remove_prefix(&full_data_path, &self.dataset.base);
referenced_files.data_paths.insert(relative_data_path);
}
let delpath = fragment
.deletion_file
.as_ref()
.map(|delfile| deletion_file_path(&self.dataset.base, fragment.id, delfile));
if let Some(delpath) = delpath {
let relative_path = remove_prefix(&delpath, &self.dataset.base);
referenced_files.delete_paths.insert(relative_path);
}
}
if let Some(relative_tx_path) = &manifest.transaction_file {
referenced_files
.tx_paths
.insert(Path::parse("_transactions")?.child(relative_tx_path.as_str()));
}
for index in indexes {
let uuid_str = index.uuid.to_string();
referenced_files.index_uuids.insert(uuid_str);
}
Ok(())
}
async fn delete_unreferenced_files(
&self,
inspection: CleanupInspection,
) -> Result<RemovalStats> {
let removal_stats = Mutex::new(RemovalStats::default());
let verification_threshold = utc_now()
- TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days");
let unreferenced_paths = self
.dataset
.object_store
.read_dir_all(&self.dataset.base, Some(self.before))
.await?
.try_filter_map(|obj_meta| {
let maybe_in_progress =
!self.delete_unverified && obj_meta.last_modified >= verification_threshold;
let path_to_remove =
self.path_if_not_referenced(obj_meta.location, maybe_in_progress, &inspection);
if matches!(path_to_remove, Ok(Some(..))) {
removal_stats.lock().unwrap().bytes_removed += obj_meta.size as u64;
}
future::ready(path_to_remove)
})
.boxed();
let old_manifests = inspection.old_manifests.clone();
let num_old_manifests = old_manifests.len();
let manifest_bytes_removed = stream::iter(&old_manifests)
.map(|path| self.dataset.object_store.size(path))
.collect::<Vec<_>>()
.await;
let manifest_bytes_removed = stream::iter(manifest_bytes_removed)
.buffer_unordered(self.dataset.object_store.io_parallelism())
.try_fold(0, |acc, size| async move { Ok(acc + (size as u64)) })
.await;
let old_manifests_stream = stream::iter(old_manifests).map(Result::<Path>::Ok).boxed();
let all_paths_to_remove =
stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten();
let delete_fut = self
.dataset
.object_store
.remove_stream(all_paths_to_remove.boxed())
.try_for_each(|_| future::ready(Ok(())));
delete_fut.await?;
let mut removal_stats = removal_stats.into_inner().unwrap();
removal_stats.old_versions = num_old_manifests as u64;
removal_stats.bytes_removed += manifest_bytes_removed?;
Ok(removal_stats)
}
fn path_if_not_referenced(
&self,
path: Path,
maybe_in_progress: bool,
inspection: &CleanupInspection,
) -> Result<Option<Path>> {
let relative_path = remove_prefix(&path, &self.dataset.base);
if relative_path.as_ref().starts_with("_versions/.tmp") {
if maybe_in_progress {
return Ok(None);
} else {
return Ok(Some(path));
}
}
if relative_path.as_ref().starts_with("_indices") {
if let Some(uuid) = relative_path.parts().nth(1) {
if inspection
.referenced_files
.index_uuids
.contains(uuid.as_ref())
{
return Ok(None);
} else if !maybe_in_progress
|| inspection
.verified_files
.index_uuids
.contains(uuid.as_ref())
{
return Ok(Some(path));
}
} else {
return Ok(None);
}
}
match path.extension() {
Some("lance") => {
if relative_path.as_ref().starts_with("data") {
if inspection
.referenced_files
.data_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress
|| inspection
.verified_files
.data_paths
.contains(&relative_path)
{
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
Some("manifest") => {
Ok(None)
}
Some("arrow") | Some("bin") => {
if relative_path.as_ref().starts_with("_deletions") {
if inspection
.referenced_files
.delete_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress
|| inspection
.verified_files
.delete_paths
.contains(&relative_path)
{
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
Some("txn") => {
if relative_path.as_ref().starts_with("_transactions") {
if inspection
.referenced_files
.tx_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress
|| inspection.verified_files.tx_paths.contains(&relative_path)
{
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
}
pub async fn cleanup_old_versions(
dataset: &Dataset,
before: DateTime<Utc>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
let cleanup = CleanupTask::new(
dataset,
before,
delete_unverified.unwrap_or(false),
error_if_tagged_old_versions.unwrap_or(true),
);
cleanup.run().await
}
fn tagged_old_versions_cleanup_error(
tags: &HashMap<String, TagContents>,
tagged_old_versions: &HashSet<u64>,
) -> Error {
let unreferenced_tags: HashMap<String, u64> = tags
.iter()
.filter_map(|(k, &v)| {
if tagged_old_versions.contains(&v.version) {
Some((k.clone(), v.version))
} else {
None
}
})
.collect();
Error::Cleanup {
message: format!(
"{} tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup: {:?}",
unreferenced_tags.len(),
unreferenced_tags
),
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatchReader;
use datafusion::common::assert_contains;
use lance_core::utils::testing::{MockClock, ProxyObjectStore, ProxyObjectStorePolicy};
use lance_index::{DatasetIndexExt, IndexType};
use lance_io::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore,
};
use lance_linalg::distance::MetricType;
use lance_testing::datagen::{some_batch, BatchGenerator, IncrementingInt32};
use snafu::{location, Location};
use crate::{
dataset::{builder::DatasetBuilder, ReadParams, WriteMode, WriteParams},
index::vector::VectorIndexParams,
};
use all_asserts::{assert_gt, assert_lt};
use tempfile::{tempdir, TempDir};
use super::*;
#[derive(Debug)]
struct MockObjectStore {
policy: Arc<Mutex<ProxyObjectStorePolicy>>,
last_modified_times: Arc<Mutex<HashMap<Path, DateTime<Utc>>>>,
}
impl WrappingObjectStore for MockObjectStore {
fn wrap(
&self,
original: Arc<dyn object_store::ObjectStore>,
) -> Arc<dyn object_store::ObjectStore> {
Arc::new(ProxyObjectStore::new(original, self.policy.clone()))
}
}
impl MockObjectStore {
pub(crate) fn new() -> Self {
let instance = Self {
policy: Arc::new(Mutex::new(ProxyObjectStorePolicy::new())),
last_modified_times: Arc::new(Mutex::new(HashMap::new())),
};
instance.add_timestamp_policy();
instance
}
fn add_timestamp_policy(&self) {
let mut policy = self.policy.lock().unwrap();
let times_map = self.last_modified_times.clone();
policy.set_before_policy(
"record_file_time",
Arc::new(move |_, path| {
let mut times_map = times_map.lock().unwrap();
times_map.insert(path.clone(), utc_now());
Ok(())
}),
);
let times_map = self.last_modified_times.clone();
policy.set_obj_meta_policy(
"add_recorded_file_time",
Arc::new(move |_, meta| {
let mut meta = meta;
if let Some(recorded) = times_map.lock().unwrap().get(&meta.location) {
meta.last_modified = *recorded;
}
Ok(meta)
}),
);
}
}
#[derive(Debug, PartialEq)]
struct FileCounts {
num_data_files: usize,
num_manifest_files: usize,
num_index_files: usize,
num_delete_files: usize,
num_tx_files: usize,
num_bytes: u64,
}
struct MockDatasetFixture<'a> {
_tmpdir: TempDir,
dataset_path: String,
mock_store: Arc<MockObjectStore>,
pub clock: MockClock<'a>,
}
impl<'a> MockDatasetFixture<'a> {
fn try_new() -> Result<Self> {
let tmpdir = tempdir()?;
let tmpdir_path = tmpdir.path().as_os_str().to_str().unwrap().to_owned();
Ok(Self {
_tmpdir: tmpdir,
dataset_path: format!("{}/my_db", tmpdir_path),
mock_store: Arc::new(MockObjectStore::new()),
clock: MockClock::new(),
})
}
fn os_params(&self) -> ObjectStoreParams {
ObjectStoreParams {
object_store_wrapper: Some(self.mock_store.clone()),
..Default::default()
}
}
async fn write_data_impl(
&self,
data: impl RecordBatchReader + Send + 'static,
mode: WriteMode,
) -> Result<()> {
Dataset::write(
data,
&self.dataset_path,
Some(WriteParams {
store_params: Some(self.os_params()),
mode,
..Default::default()
}),
)
.await?;
Ok(())
}
async fn write_some_data_impl(&self, mode: WriteMode) -> Result<()> {
self.write_data_impl(some_batch(), mode).await?;
Ok(())
}
async fn create_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Create).await
}
async fn overwrite_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Overwrite).await
}
async fn append_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Append).await
}
async fn create_with_data(
&self,
data: impl RecordBatchReader + Send + 'static,
) -> Result<()> {
self.write_data_impl(data, WriteMode::Create).await
}
async fn append_data(&self, data: impl RecordBatchReader + Send + 'static) -> Result<()> {
self.write_data_impl(data, WriteMode::Append).await
}
async fn overwrite_data(
&self,
data: impl RecordBatchReader + Send + 'static,
) -> Result<()> {
self.write_data_impl(data, WriteMode::Overwrite).await
}
async fn delete_data(&self, predicate: &str) -> Result<()> {
let mut db = self.open().await?;
db.delete(predicate).await?;
Ok(())
}
async fn create_some_index(&self) -> Result<()> {
let mut db = self.open().await?;
let index_params = Box::new(VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 5));
db.create_index(
&["indexable"],
IndexType::Vector,
Some("some_index".to_owned()),
&*index_params,
false,
)
.await?;
Ok(())
}
fn block_commits(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.set_before_policy(
"block_commit",
Arc::new(|op, _| -> Result<()> {
if op.contains("copy") {
return Err(Error::Internal {
message: "Copy blocked".to_string(),
location: location!(),
});
}
Ok(())
}),
);
}
fn block_delete_manifest(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.set_before_policy(
"block_delete_manifest",
Arc::new(|op, path| -> Result<()> {
if op.contains("delete") && path.extension() == Some("manifest") {
Err(Error::Internal {
message: "Delete manifest blocked".to_string(),
location: location!(),
})
} else {
Ok(())
}
}),
);
}
fn unblock_delete_manifest(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.clear_before_policy("block_delete_manifest");
}
async fn run_cleanup(&self, before: DateTime<Utc>) -> Result<RemovalStats> {
let db = self.open().await?;
cleanup_old_versions(&db, before, None, None).await
}
async fn run_cleanup_with_override(
&self,
before: DateTime<Utc>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
let db = self.open().await?;
cleanup_old_versions(&db, before, delete_unverified, error_if_tagged_old_versions).await
}
async fn open(&self) -> Result<Box<Dataset>> {
let ds = DatasetBuilder::from_uri(&self.dataset_path)
.with_read_params(ReadParams {
store_options: Some(self.os_params()),
..Default::default()
})
.load()
.await?;
Ok(Box::new(ds))
}
async fn count_files(&self) -> Result<FileCounts> {
let registry = Arc::new(ObjectStoreRegistry::default());
let (os, path) =
ObjectStore::from_uri_and_params(registry, &self.dataset_path, &self.os_params())
.await?;
let mut file_stream = os.read_dir_all(&path, None).await?;
let mut file_count = FileCounts {
num_data_files: 0,
num_delete_files: 0,
num_index_files: 0,
num_manifest_files: 0,
num_tx_files: 0,
num_bytes: 0,
};
while let Some(path) = file_stream.try_next().await? {
file_count.num_bytes += path.size as u64;
match path.location.extension() {
Some("lance") => file_count.num_data_files += 1,
Some("manifest") => file_count.num_manifest_files += 1,
Some("arrow") | Some("bin") => file_count.num_delete_files += 1,
Some("idx") => file_count.num_index_files += 1,
Some("txn") => file_count.num_tx_files += 1,
_ => (),
}
}
Ok(file_count)
}
async fn count_rows(&self) -> Result<usize> {
let db = self.open().await?;
let count = db.count_rows(None).await?;
Ok(count)
}
}
#[tokio::test]
async fn cleanup_unreferenced_data_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
let before_count = fixture.count_files().await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_lt!(after_count.num_data_files, before_count.num_data_files);
assert_lt!(
after_count.num_manifest_files,
before_count.num_manifest_files
);
assert_lt!(after_count.num_tx_files, before_count.num_tx_files);
assert_gt!(after_count.num_manifest_files, 0);
assert_gt!(after_count.num_data_files, 0);
assert_gt!(after_count.num_tx_files, 0);
}
#[tokio::test]
async fn do_not_cleanup_newer_data() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.append_some_data().await.unwrap();
fixture.append_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_manifest_files, 3);
let before = utc_now() - TimeDelta::try_days(7).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 3);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);
}
#[tokio::test]
async fn cleanup_error_when_tagged_old_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
let mut dataset = *(fixture.open().await.unwrap());
dataset.tags.create("old-tag", 1).await.unwrap();
dataset.tags.create("another-old-tag", 2).await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
let mut cleanup_error = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.err()
.unwrap();
assert_contains!(cleanup_error.to_string(), "Cleanup error: 2 tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup:");
dataset.tags.delete("old-tag").await.unwrap();
cleanup_error = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.err()
.unwrap();
assert_contains!(cleanup_error.to_string(), "Cleanup error: 1 tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup:");
dataset.tags.delete("another-old-tag").await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
assert_eq!(removed.old_versions, 2);
}
#[tokio::test]
async fn cleanup_around_tagged_old_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
let mut dataset = *(fixture.open().await.unwrap());
dataset.tags.create("old-tag", 1).await.unwrap();
dataset.tags.create("another-old-tag", 2).await.unwrap();
dataset.tags.create("tag-latest", 3).await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
let mut removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 0);
dataset.tags.delete("old-tag").await.unwrap();
removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 1);
dataset.tags.delete("another-old-tag").await.unwrap();
removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 1);
}
#[tokio::test]
async fn cleanup_recent_verified_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_seconds(1).unwrap());
fixture.overwrite_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
let before = utc_now();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
}
#[tokio::test]
async fn dont_cleanup_recent_unverified_files() {
for (override_opt, old_files) in [
(Some(false), false), (Some(true), false), (None, true), (None, false), ] {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
let age = if old_files {
TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS + 1).unwrap()
} else {
TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS - 1).unwrap()
};
fixture.clock.set_system_time(age);
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 1);
let before = utc_now();
let removed = fixture
.run_cleanup_with_override(before, override_opt, None)
.await
.unwrap();
let should_delete = override_opt.unwrap_or(false) || old_files;
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
if should_delete {
assert_gt!(removed.bytes_removed, 0);
} else {
assert_eq!(removed.bytes_removed, 0);
}
}
}
#[tokio::test]
async fn cleanup_old_index() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.create_some_index().await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.overwrite_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_index_files, 1);
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 3);
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 2);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_index_files, 0);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}
#[tokio::test]
async fn clean_old_delete_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
let mut data_gen = BatchGenerator::new().col(Box::new(
IncrementingInt32::new().named("filter_me".to_owned()),
));
fixture.create_with_data(data_gen.batch(16)).await.unwrap();
fixture.append_data(data_gen.batch(16)).await.unwrap();
fixture.delete_data("filter_me < 20").await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.overwrite_data(data_gen.batch(16)).await.unwrap();
fixture.delete_data("filter_me >= 40").await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_delete_files, 2);
assert_eq!(before_count.num_manifest_files, 5);
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 3);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_delete_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);
let row_count_after = fixture.count_rows().await.unwrap();
assert_eq!(row_count_after, 8);
}
#[tokio::test]
async fn dont_clean_index_data_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.create_some_data().await.unwrap();
fixture.create_some_index().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(removed.bytes_removed, 0);
let after_count = fixture.count_files().await.unwrap();
assert_eq!(before_count, after_count);
}
#[tokio::test]
async fn cleanup_failed_commit_data_file() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 1);
assert_eq!(before_count.num_tx_files, 2);
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}
#[tokio::test]
async fn dont_cleanup_in_progress_write() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
let before_count = fixture.count_files().await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(removed.bytes_removed, 0);
let after_count = fixture.count_files().await.unwrap();
assert_eq!(before_count, after_count);
}
#[tokio::test]
async fn can_recover_delete_failure() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture
.clock
.set_system_time(TimeDelta::try_days(10).unwrap());
fixture.overwrite_some_data().await.unwrap();
fixture.block_delete_manifest();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
assert!(fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.is_err());
let mid_count = fixture.count_files().await.unwrap();
assert_eq!(mid_count.num_data_files, 1);
assert_eq!(mid_count.num_manifest_files, 2);
fixture.unblock_delete_manifest();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
mid_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
}
}