use super::refs::TagContents;
use crate::dataset::TRANSACTIONS_DIR;
use crate::{Dataset, utils::temporal::utc_now};
use chrono::{DateTime, TimeDelta, Utc};
use dashmap::DashSet;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt, stream};
use humantime::parse_duration;
use lance_core::{
Error, Result,
utils::tracing::{
AUDIT_MODE_DELETE, AUDIT_MODE_DELETE_UNVERIFIED, AUDIT_TYPE_DATA, AUDIT_TYPE_DELETION,
AUDIT_TYPE_INDEX, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
},
};
use lance_table::{
format::{IndexMetadata, Manifest},
io::{
commit::ManifestLocation,
deletion::deletion_file_path,
manifest::{read_manifest, read_manifest_indexes},
},
};
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectMeta};
use std::fmt::Debug;
use std::{
collections::{HashMap, HashSet},
future,
sync::{Mutex, MutexGuard},
time::Duration,
};
use tokio::time::{MissedTickBehavior, interval};
use tokio_stream::wrappers::IntervalStream;
use tracing::{Span, debug, info, instrument};
#[derive(Clone, Debug, Default)]
struct ReferencedFiles {
data_paths: HashSet<Path>,
delete_paths: HashSet<Path>,
tx_paths: HashSet<Path>,
index_uuids: HashSet<String>,
}
#[derive(Clone, Debug, Default)]
pub struct RemovalStats {
pub bytes_removed: u64,
pub old_versions: u64,
pub data_files_removed: u64,
pub transaction_files_removed: u64,
pub index_files_removed: u64,
pub deletion_files_removed: u64,
}
#[derive(Clone, Copy, Debug)]
enum RemovedFileType {
Data,
Transaction,
Index,
Deletion,
}
fn remove_prefix(path: &Path, prefix: &Path) -> Path {
let relative_parts = path.prefix_match(prefix);
if relative_parts.is_none() {
return path.clone();
}
Path::from_iter(relative_parts.unwrap())
}
#[derive(Clone, Debug)]
struct CleanupTask<'a> {
dataset: &'a Dataset,
policy: CleanupPolicy,
}
#[derive(Clone, Debug, Default)]
struct CleanupInspection {
old_manifests: HashMap<Path, u64>,
referenced_files: ReferencedFiles,
verified_files: ReferencedFiles,
tagged_old_versions: HashSet<u64>,
earliest_retained_manifest_time: Option<DateTime<Utc>>,
}
const UNVERIFIED_THRESHOLD_DAYS: i64 = 7;
const S3_DELETE_STREAM_BATCH_SIZE: u64 = 1_000;
const AZURE_DELETE_STREAM_BATCH_SIZE: u64 = 256;
impl<'a> CleanupTask<'a> {
fn new(dataset: &'a Dataset, policy: CleanupPolicy) -> Self {
Self { dataset, policy }
}
async fn run(self) -> Result<RemovalStats> {
let mut final_stats = RemovalStats::default();
let referenced_branches: Vec<(String, u64)> = self.find_referenced_branches().await?;
if self.policy.clean_referenced_branches {
self.clean_referenced_branches(&referenced_branches).await?;
}
let tags = self.dataset.tags().list().await?;
let current_branch = &self.dataset.manifest.branch;
let tagged_versions: HashSet<u64> = tags
.values()
.filter(|tag| match (tag.branch.as_ref(), current_branch.as_ref()) {
(Some(branch_of_tag), Some(current_branch)) => branch_of_tag == current_branch,
(None, None) => true,
_ => false,
})
.map(|tag_content| tag_content.version)
.collect();
let mut inspection = self.process_manifests(&tagged_versions).await?;
if self.policy.error_if_tagged_old_versions && !inspection.tagged_old_versions.is_empty() {
return Err(tagged_old_versions_cleanup_error(
&tags,
&inspection.tagged_old_versions,
));
}
if !referenced_branches.is_empty() {
inspection = self
.retain_branch_lineage_files(inspection, &referenced_branches)
.await?
};
let stats = self.delete_unreferenced_files(inspection).await?;
final_stats.bytes_removed += stats.bytes_removed;
final_stats.old_versions += stats.old_versions;
final_stats.data_files_removed += stats.data_files_removed;
final_stats.transaction_files_removed += stats.transaction_files_removed;
final_stats.index_files_removed += stats.index_files_removed;
final_stats.deletion_files_removed += stats.deletion_files_removed;
Ok(final_stats)
}
#[instrument(level = "debug", skip_all)]
async fn process_manifests(
&'a self,
tagged_versions: &HashSet<u64>,
) -> Result<CleanupInspection> {
let inspection = Mutex::new(CleanupInspection::default());
self.dataset
.commit_handler
.list_manifest_locations(&self.dataset.base, &self.dataset.object_store, false)
.try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |location| {
self.process_manifest_file(location, &inspection, tagged_versions)
})
.await?;
Ok(inspection.into_inner().unwrap())
}
async fn process_manifest_file(
&self,
location: ManifestLocation,
inspection: &Mutex<CleanupInspection>,
tagged_versions: &HashSet<u64>,
) -> Result<()> {
let manifest =
read_manifest(&self.dataset.object_store, &location.path, location.size).await?;
let dataset_version = self.dataset.version().version;
let is_latest = dataset_version <= manifest.version;
let is_tagged = tagged_versions.contains(&manifest.version);
let in_working_set = is_latest || !self.policy.should_clean(&manifest) || is_tagged;
let indexes =
read_manifest_indexes(&self.dataset.object_store, &location, &manifest).await?;
let mut inspection = inspection.lock().unwrap();
if is_tagged && !is_latest && self.policy.should_clean(&manifest) {
inspection.tagged_old_versions.insert(manifest.version);
}
self.process_manifest(&manifest, &indexes, in_working_set, &mut inspection)?;
if !in_working_set {
inspection
.old_manifests
.insert(location.path.clone(), manifest.version);
} else {
let commit_ts = manifest.timestamp();
if let Some(ts) = inspection.earliest_retained_manifest_time {
if commit_ts < ts {
inspection.earliest_retained_manifest_time = Some(commit_ts);
}
} else {
inspection.earliest_retained_manifest_time = Some(commit_ts);
}
}
Ok(())
}
fn process_manifest(
&self,
manifest: &Manifest,
indexes: &Vec<IndexMetadata>,
in_working_set: bool,
inspection: &mut MutexGuard<CleanupInspection>,
) -> Result<()> {
let referenced_files = if in_working_set {
&mut inspection.referenced_files
} else {
&mut inspection.verified_files
};
for fragment in manifest.fragments.iter() {
for file in fragment.files.iter() {
let full_data_path = self.dataset.data_dir().child(file.path.as_str());
let relative_data_path = remove_prefix(&full_data_path, &self.dataset.base);
referenced_files.data_paths.insert(relative_data_path);
}
let delpath = fragment
.deletion_file
.as_ref()
.map(|delfile| deletion_file_path(&self.dataset.base, fragment.id, delfile));
if let Some(delpath) = delpath {
let relative_path = remove_prefix(&delpath, &self.dataset.base);
referenced_files.delete_paths.insert(relative_path);
}
}
if let Some(relative_tx_path) = &manifest.transaction_file {
referenced_files
.tx_paths
.insert(Path::parse(TRANSACTIONS_DIR)?.child(relative_tx_path.as_str()));
}
for index in indexes {
let uuid_str = index.uuid.to_string();
referenced_files.index_uuids.insert(uuid_str);
}
Ok(())
}
#[instrument(
level = "debug",
skip_all,
fields(
old_versions = inspection.old_manifests.len(),
bytes_removed = tracing::field::Empty,
data_files_removed = tracing::field::Empty,
transaction_files_removed = tracing::field::Empty,
index_files_removed = tracing::field::Empty,
deletion_files_removed = tracing::field::Empty
)
)]
async fn delete_unreferenced_files(
&self,
inspection: CleanupInspection,
) -> Result<RemovalStats> {
let removal_stats = Mutex::new(RemovalStats::default());
let verification_threshold = utc_now()
- TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS).expect("TimeDelta::try_days");
let is_not_found_err = |e: &Error| {
matches!(
e,
Error::IO { source,.. }
if source
.downcast_ref::<ObjectStoreError>()
.map(|os_err| matches!(os_err, ObjectStoreError::NotFound {.. }))
.unwrap_or(false)
)
};
let build_listing_stream = |dir: Path, file_type: Option<RemovedFileType>| {
let inspection_ref = &inspection;
let removal_stats_ref = &removal_stats;
self.dataset
.object_store
.read_dir_all(&dir, inspection.earliest_retained_manifest_time)
.map_ok(|obj| stream::once(future::ready(Ok(obj))).boxed())
.or_else(|e| {
if is_not_found_err(&e) {
future::ready(Ok(stream::empty::<Result<ObjectMeta>>().boxed()))
} else {
future::ready(Err(e))
}
})
.try_flatten()
.try_filter_map(move |obj_meta| {
let maybe_in_progress = !self.policy.delete_unverified
&& obj_meta.last_modified >= verification_threshold;
let path_to_remove = self.path_if_not_referenced(
obj_meta.location,
maybe_in_progress,
inspection_ref,
);
if matches!(path_to_remove, Ok(Some(..))) {
let mut stats = removal_stats_ref.lock().unwrap();
stats.bytes_removed += obj_meta.size;
if let Some(file_type) = file_type {
match file_type {
RemovedFileType::Data => stats.data_files_removed += 1,
RemovedFileType::Transaction => {
stats.transaction_files_removed += 1
}
RemovedFileType::Index => stats.index_files_removed += 1,
RemovedFileType::Deletion => stats.deletion_files_removed += 1,
}
}
}
future::ready(path_to_remove)
})
.boxed()
};
let streams = vec![
build_listing_stream(self.dataset.versions_dir(), None),
build_listing_stream(
self.dataset.transactions_dir(),
Some(RemovedFileType::Transaction),
),
build_listing_stream(self.dataset.data_dir(), Some(RemovedFileType::Data)),
build_listing_stream(self.dataset.indices_dir(), Some(RemovedFileType::Index)),
build_listing_stream(
self.dataset.deletions_dir(),
Some(RemovedFileType::Deletion),
),
];
let unreferenced_paths = stream::iter(streams).flatten().boxed();
let old_manifests = inspection.old_manifests.clone();
let num_old_manifests = old_manifests.len();
let manifest_bytes_removed = stream::iter(old_manifests.keys())
.map(|path| self.dataset.object_store.size(path))
.collect::<Vec<_>>()
.await;
let manifest_bytes_removed = stream::iter(manifest_bytes_removed)
.buffer_unordered(self.dataset.object_store.io_parallelism())
.try_fold(0, |acc, size| async move { Ok(acc + (size)) })
.await;
let old_manifests_stream = stream::iter(old_manifests.into_keys())
.map(|path| {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_MANIFEST, path = path.as_ref());
Ok(path)
})
.boxed();
let all_paths_to_remove =
stream::iter(vec![unreferenced_paths, old_manifests_stream]).flatten();
let paths_to_delete: BoxStream<Result<Path>> = if let Some(rate) =
self.policy.delete_rate_limit
{
let duration = calculate_duration(self.dataset.object_store.scheme().to_string(), rate);
let mut ticker = interval(duration);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
IntervalStream::new(ticker)
.zip(all_paths_to_remove)
.map(|(_, path)| path)
.boxed()
} else {
all_paths_to_remove.boxed()
};
let delete_fut = self
.dataset
.object_store
.remove_stream(paths_to_delete)
.try_for_each(|_| future::ready(Ok(())));
delete_fut.await?;
let mut removal_stats = removal_stats.into_inner().unwrap();
removal_stats.old_versions = num_old_manifests as u64;
removal_stats.bytes_removed += manifest_bytes_removed?;
let span = Span::current();
span.record("bytes_removed", removal_stats.bytes_removed);
span.record("data_files_removed", removal_stats.data_files_removed);
span.record(
"transaction_files_removed",
removal_stats.transaction_files_removed,
);
span.record("index_files_removed", removal_stats.index_files_removed);
span.record(
"deletion_files_removed",
removal_stats.deletion_files_removed,
);
Ok(removal_stats)
}
fn path_if_not_referenced(
&self,
path: Path,
maybe_in_progress: bool,
inspection: &CleanupInspection,
) -> Result<Option<Path>> {
let relative_path = remove_prefix(&path, &self.dataset.base);
if relative_path.as_ref().starts_with("_versions/.tmp") {
if maybe_in_progress {
return Ok(None);
} else {
return Ok(Some(path));
}
}
if relative_path.as_ref().starts_with("_indices") {
if let Some(uuid) = relative_path.parts().nth(1) {
if inspection
.referenced_files
.index_uuids
.contains(uuid.as_ref())
{
return Ok(None);
} else if !maybe_in_progress {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_INDEX, path = path.to_string());
return Ok(Some(path));
} else if inspection
.verified_files
.index_uuids
.contains(uuid.as_ref())
{
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_INDEX, path = path.to_string());
return Ok(Some(path));
}
} else {
return Ok(None);
}
}
match path.extension() {
Some("lance") => {
if relative_path.as_ref().starts_with("data") {
if inspection
.referenced_files
.data_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else if inspection
.verified_files
.data_paths
.contains(&relative_path)
{
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
Some("blob") => {
if !relative_path.as_ref().starts_with("data") {
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
}
let mut parts = relative_path.parts();
let data_dir = parts.next();
let data_file_key = parts.next();
let blob_file = parts.next();
if !matches!(data_dir, Some(dir) if dir.as_ref() == "data")
|| data_file_key.is_none()
|| blob_file.is_none()
{
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
}
if parts.next().is_some() {
debug!(
path = relative_path.as_ref(),
"Will not garbage collect blob file because it does not follow convention"
);
return Ok(None);
}
let data_file_key = data_file_key.expect("checked is_some");
let Ok(parent_data_path) =
Path::parse(format!("data/{}.lance", data_file_key.as_ref()))
else {
debug!(
path = relative_path.as_ref(),
derived_parent = format!("data/{}.lance", data_file_key.as_ref()),
"Will not garbage collect blob file because derived parent data file path is invalid"
);
return Ok(None);
};
if inspection
.referenced_files
.data_paths
.contains(&parent_data_path)
{
Ok(None)
} else if !maybe_in_progress {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else if inspection
.verified_files
.data_paths
.contains(&parent_data_path)
{
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DATA, path = path.to_string());
Ok(Some(path))
} else {
Ok(None)
}
}
Some("manifest") => {
Ok(None)
}
Some("arrow") | Some("bin") => {
if relative_path.as_ref().starts_with("_deletions") {
if inspection
.referenced_files
.delete_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE_UNVERIFIED, r#type=AUDIT_TYPE_DELETION, path = path.to_string());
Ok(Some(path))
} else if inspection
.verified_files
.delete_paths
.contains(&relative_path)
{
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_DELETE, r#type=AUDIT_TYPE_DELETION, path = path.to_string());
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
Some("txn") => {
if relative_path.as_ref().starts_with(TRANSACTIONS_DIR) {
if inspection
.referenced_files
.tx_paths
.contains(&relative_path)
{
Ok(None)
} else if !maybe_in_progress
|| inspection.verified_files.tx_paths.contains(&relative_path)
{
Ok(Some(path))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
async fn find_referenced_branches(&self) -> Result<Vec<(String, u64)>> {
let current_branch_id = self.dataset.branch_identifier().await?;
let all_branches = self.dataset.branches().list().await?;
let children = current_branch_id.collect_referenced_versions(&all_branches);
let referenced_branches: DashSet<String> = DashSet::new();
let tasks: Vec<_> = children
.iter()
.map(|(branch_name, referenced_version)| {
let dataset = &self.dataset;
let policy = &self.policy;
let referenced_branches = &referenced_branches;
async move {
let manifest_location = dataset
.commit_handler
.resolve_version_location(
&dataset.base,
*referenced_version,
&dataset.object_store.inner,
)
.await?;
let manifest = read_manifest(
&dataset.object_store,
&manifest_location.path,
manifest_location.size,
)
.await;
if let Ok(manifest) = manifest
&& policy.should_clean(&manifest)
{
referenced_branches.insert(branch_name.clone());
}
Ok::<(), Error>(())
}
})
.collect();
try_join_all(tasks).await?;
let referenced_branches = children
.iter()
.filter(|(branch_name, _)| referenced_branches.contains(branch_name))
.cloned()
.collect();
Ok(referenced_branches)
}
async fn clean_referenced_branches(
&self,
referenced_branches: &[(String, u64)],
) -> Result<RemovalStats> {
let final_stats = Mutex::new(RemovalStats::default());
let mut branches_chains = HashMap::new();
for (branch, id) in referenced_branches {
branches_chains
.entry(*id)
.or_insert_with(Vec::new)
.push(branch.clone());
}
let tasks: Vec<_> = branches_chains
.values()
.map(|branch_chain| {
let final_stats = &final_stats;
async move {
for branch in branch_chain {
let branch_dataset = self
.dataset
.checkout_version((branch.as_str(), None))
.await?;
if let Some(stats) = cleanup_cascade_branch(
&branch_dataset,
branch_dataset.manifest.as_ref(),
)
.await?
{
let mut stats_guard = final_stats.lock().unwrap();
stats_guard.bytes_removed += stats.bytes_removed;
stats_guard.old_versions += stats.old_versions;
stats_guard.data_files_removed += stats.data_files_removed;
stats_guard.transaction_files_removed +=
stats.transaction_files_removed;
stats_guard.index_files_removed += stats.index_files_removed;
stats_guard.deletion_files_removed += stats.deletion_files_removed;
}
}
Ok::<(), Error>(())
}
})
.collect();
try_join_all(tasks).await?;
Ok(final_stats.into_inner().unwrap())
}
async fn retain_branch_lineage_files(
&self,
inspection: CleanupInspection,
referenced_branches: &[(String, u64)],
) -> Result<CleanupInspection> {
let inspection = Mutex::new(inspection);
for (branch, root_version_number) in referenced_branches {
let branch_location = self.dataset.branch_location().find_branch(Some(branch))?;
self.dataset
.commit_handler
.list_manifest_locations(&branch_location.path, &self.dataset.object_store, false)
.try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |location| {
self.process_branch_referenced_manifests(
location,
*root_version_number,
&inspection,
)
})
.await?;
}
Ok(inspection.into_inner().unwrap())
}
async fn process_branch_referenced_manifests(
&self,
location: ManifestLocation,
referenced_version: u64,
inspection: &Mutex<CleanupInspection>,
) -> Result<()> {
let manifest =
read_manifest(&self.dataset.object_store, &location.path, location.size).await?;
let indexes =
read_manifest_indexes(&self.dataset.object_store, &location, &manifest).await?;
let mut inspection = inspection.lock().unwrap();
let mut is_referenced = false;
for fragment in manifest.fragments.iter() {
for file in fragment.files.iter() {
if let Some(base_id) = file.base_id {
let base_path = manifest.base_paths.get(&base_id);
if let Some(base_path) = base_path
&& base_path.path == self.dataset.uri
{
let full_data_path = self.dataset.data_dir().child(file.path.as_str());
let relative_data_path = remove_prefix(&full_data_path, &self.dataset.base);
inspection
.verified_files
.data_paths
.remove(&relative_data_path);
inspection
.referenced_files
.data_paths
.insert(relative_data_path);
is_referenced = true;
}
}
}
if let Some(del_file) = fragment.deletion_file.as_ref()
&& let Some(base_id) = del_file.base_id
{
let base_path = manifest.base_paths.get(&base_id);
if let Some(base_path) = base_path {
let deletion_path = fragment.deletion_file.as_ref().map(|deletion_file| {
deletion_file_path(&self.dataset.base, fragment.id, deletion_file)
});
if base_path.path == self.dataset.uri {
if let Some(deletion_path) = deletion_path {
let relative_del_path =
remove_prefix(&deletion_path, &self.dataset.base);
inspection
.verified_files
.delete_paths
.remove(&relative_del_path);
inspection
.referenced_files
.delete_paths
.insert(relative_del_path);
}
is_referenced = true;
}
}
}
}
for index in indexes {
if let Some(base_id) = index.base_id {
let base_path = manifest.base_paths.get(&base_id);
if let Some(base_path) = base_path
&& base_path.path == self.dataset.uri
{
let uuid_str = index.uuid.to_string();
inspection.verified_files.index_uuids.remove(&uuid_str);
inspection.referenced_files.index_uuids.insert(uuid_str);
is_referenced = true;
}
}
}
if is_referenced {
inspection
.old_manifests
.retain(|_path, version_number| *version_number != referenced_version);
}
Ok(())
}
}
fn calculate_duration(scheme: String, rate: u64) -> Duration {
let batch_size = if scheme.to_lowercase().contains("s3") {
S3_DELETE_STREAM_BATCH_SIZE
} else if scheme.to_lowercase().contains("az") {
AZURE_DELETE_STREAM_BATCH_SIZE
} else {
1
};
let effective_rate = rate.max(1);
let path_rate = effective_rate * batch_size;
info!(
"delete_rate_limit enabled: limit {} delete requests/sec",
effective_rate
);
let duration_ns = 1_000_000_000u64.div_ceil(path_rate).max(1);
Duration::from_nanos(duration_ns)
}
#[derive(Clone, Debug)]
pub struct CleanupPolicy {
pub before_timestamp: Option<DateTime<Utc>>,
pub before_version: Option<u64>,
pub delete_unverified: bool,
pub error_if_tagged_old_versions: bool,
pub clean_referenced_branches: bool,
pub delete_rate_limit: Option<u64>,
}
impl CleanupPolicy {
pub fn should_clean(&self, manifest: &Manifest) -> bool {
let mut should_clean = true;
if let Some(before_timestamp) = self.before_timestamp {
should_clean &= manifest.timestamp() < before_timestamp;
}
if let Some(before_version) = self.before_version {
should_clean &= manifest.version < before_version;
}
should_clean
}
}
impl Default for CleanupPolicy {
fn default() -> Self {
Self {
before_timestamp: None,
before_version: None,
delete_unverified: false,
error_if_tagged_old_versions: true,
clean_referenced_branches: false,
delete_rate_limit: None,
}
}
}
#[derive(Default)]
pub struct CleanupPolicyBuilder {
policy: CleanupPolicy,
}
impl CleanupPolicyBuilder {
pub fn clean_referenced_branches(mut self, clean_referenced_branches: bool) -> Self {
self.policy.clean_referenced_branches = clean_referenced_branches;
self
}
pub fn before_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.policy.before_timestamp = Some(timestamp);
self
}
pub async fn retain_n_versions(mut self, dataset: &Dataset, n: usize) -> Result<Self> {
let versions = dataset.versions().await?;
self.policy.before_version = if versions.len() <= n {
Some(versions[0].version)
} else {
Some(versions[versions.len() - n].version)
};
Ok(self)
}
pub fn delete_unverified(mut self, delete: bool) -> Self {
self.policy.delete_unverified = delete;
self
}
pub fn error_if_tagged_old_versions(mut self, error: bool) -> Self {
self.policy.error_if_tagged_old_versions = error;
self
}
pub fn delete_rate_limit(mut self, rate: u64) -> Result<Self> {
if rate == 0 {
return Err(Error::Cleanup {
message: format!("delete_rate_limit must be greater than 0, got {}", rate),
});
}
self.policy.delete_rate_limit = Some(rate);
Ok(self)
}
pub fn build(self) -> CleanupPolicy {
self.policy
}
}
pub async fn cleanup_old_versions(
dataset: &Dataset,
policy: CleanupPolicy,
) -> Result<RemovalStats> {
let cleanup = CleanupTask::new(dataset, policy);
cleanup.run().await
}
pub async fn auto_cleanup_hook(
dataset: &Dataset,
manifest: &Manifest,
) -> Result<Option<RemovalStats>> {
let policy = build_cleanup_policy(dataset, manifest).await?;
if let Some(policy) = policy {
Ok(Some(dataset.cleanup_with_policy(policy).await?))
} else {
Ok(None)
}
}
pub async fn cleanup_cascade_branch(
dataset: &Dataset,
manifest: &Manifest,
) -> Result<Option<RemovalStats>> {
let policy = build_cleanup_policy(dataset, manifest).await?;
if let Some(mut policy) = policy {
policy.clean_referenced_branches = false;
policy.error_if_tagged_old_versions = false;
Ok(Some(dataset.cleanup_with_policy(policy).await?))
} else {
Ok(None)
}
}
pub async fn build_cleanup_policy(
dataset: &Dataset,
manifest: &Manifest,
) -> Result<Option<CleanupPolicy>> {
if let Some(interval) = manifest.config.get("lance.auto_cleanup.interval") {
let interval: u64 = match interval.parse() {
Ok(i) => i,
Err(e) => {
return Err(Error::Cleanup {
message: format!(
"Error encountered while parsing lance.auto_cleanup.interval as u64: {}",
e
),
});
}
};
if interval != 0 && !manifest.version.is_multiple_of(interval) {
return Ok(None);
}
} else {
return Ok(None);
}
let mut builder = CleanupPolicyBuilder::default();
if let Some(older_than) = manifest.config.get("lance.auto_cleanup.older_than") {
let std_older_than = match parse_duration(older_than) {
Ok(t) => t,
Err(e) => {
return Err(Error::Cleanup {
message: format!(
"Error encountered while parsing lance.auto_cleanup.older_than as std::time::Duration: {}",
e
),
});
}
};
let timestamp = utc_now() - TimeDelta::from_std(std_older_than).unwrap_or(TimeDelta::MAX);
builder = builder.before_timestamp(timestamp);
}
if let Some(retain_versions) = manifest.config.get("lance.auto_cleanup.retain_versions") {
let retain_versions: usize = match retain_versions.parse() {
Ok(n) => n,
Err(e) => {
return Err(Error::Cleanup {
message: format!(
"Error encountered while parsing lance.auto_cleanup.retain_versions as u64: {}",
e
),
});
}
};
builder = builder.retain_n_versions(dataset, retain_versions).await?;
}
if let Some(referenced_branch) = manifest.config.get("lance.auto_cleanup.referenced_branch") {
let clean_referenced: bool = match referenced_branch.parse() {
Ok(b) => b,
Err(e) => {
return Err(Error::Cleanup {
message: format!(
"Error encountered while parsing lance.auto_cleanup.referenced_branch as bool: {}",
e
),
});
}
};
builder = builder.clean_referenced_branches(clean_referenced);
}
if let Some(delete_rate_limit) = manifest.config.get("lance.auto_cleanup.delete_rate_limit") {
let rate: u64 = match delete_rate_limit.parse() {
Ok(r) => r,
Err(e) => {
return Err(Error::Cleanup {
message: format!(
"Error encountered while parsing lance.auto_cleanup.delete_rate_limit as u64: {}",
e
),
});
}
};
builder = match builder.delete_rate_limit(rate) {
Ok(b) => b,
Err(e) => return Err(e),
};
}
Ok(Some(builder.build()))
}
fn tagged_old_versions_cleanup_error(
tags: &HashMap<String, TagContents>,
tagged_old_versions: &HashSet<u64>,
) -> Error {
let unreferenced_tags: HashMap<String, u64> = tags
.iter()
.filter_map(|(k, v)| {
if tagged_old_versions.contains(&v.version) {
Some((k.clone(), v.version))
} else {
None
}
})
.collect();
Error::Cleanup {
message: format!(
"{} tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup: {:?}",
unreferenced_tags.len(),
unreferenced_tags
),
}
}
#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::*;
use crate::blob::{BlobArrayBuilder, blob_field};
use crate::{
dataset::{ReadParams, WriteMode, WriteParams, builder::DatasetBuilder},
index::vector::VectorIndexParams,
};
use all_asserts::{assert_gt, assert_lt};
use arrow::compute;
use arrow_array::{
Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use datafusion::common::assert_contains;
use lance_core::utils::tempfile::TempStrDir;
use lance_core::utils::testing::{ProxyObjectStore, ProxyObjectStorePolicy};
use lance_index::{DatasetIndexExt, IndexType};
use lance_io::object_store::{
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore,
};
use lance_linalg::distance::MetricType;
use lance_table::io::commit::RenameCommitHandler;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector, some_batch};
use mock_instant::thread_local::MockClock;
#[derive(Debug)]
struct MockObjectStore {
policy: Arc<Mutex<ProxyObjectStorePolicy>>,
last_modified_times: Arc<Mutex<HashMap<Path, DateTime<Utc>>>>,
}
impl WrappingObjectStore for MockObjectStore {
fn wrap(
&self,
_storage_prefix: &str,
original: Arc<dyn object_store::ObjectStore>,
) -> Arc<dyn object_store::ObjectStore> {
Arc::new(ProxyObjectStore::new(original, self.policy.clone()))
}
}
impl MockObjectStore {
pub(crate) fn new() -> Self {
let instance = Self {
policy: Arc::new(Mutex::new(ProxyObjectStorePolicy::new())),
last_modified_times: Arc::new(Mutex::new(HashMap::new())),
};
instance.add_timestamp_policy();
instance
}
fn add_timestamp_policy(&self) {
let mut policy = self.policy.lock().unwrap();
let times_map = self.last_modified_times.clone();
policy.set_before_policy(
"record_file_time",
Arc::new(move |_, path| {
let mut times_map = times_map.lock().unwrap();
times_map.insert(path.clone(), utc_now());
Ok(())
}),
);
let times_map = self.last_modified_times.clone();
policy.set_obj_meta_policy(
"add_recorded_file_time",
Arc::new(move |_, meta| {
let mut meta = meta;
if let Some(recorded) = times_map.lock().unwrap().get(&meta.location) {
meta.last_modified = *recorded;
}
Ok(meta)
}),
);
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
struct FileCounts {
num_data_files: usize,
num_manifest_files: usize,
num_index_files: usize,
num_delete_files: usize,
num_tx_files: usize,
num_bytes: u64,
}
struct MockDatasetFixture {
_tmpdir: TempStrDir,
dataset_path: String,
mock_store: Arc<MockObjectStore>,
}
impl MockDatasetFixture {
fn try_new() -> Result<Self> {
let tmpdir = TempStrDir::default();
let tmpdir_path = tmpdir.as_str();
let path_prefix = if tmpdir_path.starts_with('/') {
""
} else {
"/"
};
let dataset_path = format!("file-object-store://{path_prefix}{tmpdir_path}/my_db");
Ok(Self {
_tmpdir: tmpdir,
dataset_path,
mock_store: Arc::new(MockObjectStore::new()),
})
}
fn os_params(&self) -> ObjectStoreParams {
ObjectStoreParams {
object_store_wrapper: Some(self.mock_store.clone()),
..Default::default()
}
}
async fn write_data_impl(
&self,
data: impl RecordBatchReader + Send + 'static,
mode: WriteMode,
) -> Result<()> {
Dataset::write(
data,
&self.dataset_path,
Some(WriteParams {
store_params: Some(self.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode,
..Default::default()
}),
)
.await?;
Ok(())
}
async fn write_some_data_impl(&self, mode: WriteMode) -> Result<()> {
self.write_data_impl(some_batch(), mode).await?;
Ok(())
}
async fn create_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Create).await
}
async fn overwrite_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Overwrite).await
}
async fn append_some_data(&self) -> Result<()> {
self.write_some_data_impl(WriteMode::Append).await
}
async fn create_with_data(
&self,
data: impl RecordBatchReader + Send + 'static,
) -> Result<()> {
self.write_data_impl(data, WriteMode::Create).await
}
async fn append_data(&self, data: impl RecordBatchReader + Send + 'static) -> Result<()> {
self.write_data_impl(data, WriteMode::Append).await
}
async fn overwrite_data(
&self,
data: impl RecordBatchReader + Send + 'static,
) -> Result<()> {
self.write_data_impl(data, WriteMode::Overwrite).await
}
async fn delete_data(&self, predicate: &str) -> Result<()> {
let mut db = self.open().await?;
db.delete(predicate).await?;
Ok(())
}
async fn create_some_index(&self) -> Result<()> {
let mut db = self.open().await?;
let index_params = Box::new(VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 5));
db.create_index(
&["indexable"],
IndexType::Vector,
Some("some_index".to_owned()),
&*index_params,
false,
)
.await?;
Ok(())
}
fn block_commits(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.set_before_policy(
"block_commit",
Arc::new(|op, _| -> Result<()> {
if op.contains("copy") {
return Err(Error::internal("Copy blocked".to_string()));
}
Ok(())
}),
);
}
fn block_delete_manifest(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.set_before_policy(
"block_delete_manifest",
Arc::new(|op, path| -> Result<()> {
if op.contains("delete") && path.extension() == Some("manifest") {
Err(Error::internal("Delete manifest blocked".to_string()))
} else {
Ok(())
}
}),
);
}
fn unblock_delete_manifest(&mut self) {
let mut policy = self.mock_store.policy.lock().unwrap();
policy.clear_before_policy("block_delete_manifest");
}
async fn run_cleanup(&self, before: DateTime<Utc>) -> Result<RemovalStats> {
let db = self.open().await?;
cleanup_old_versions(
&db,
CleanupPolicyBuilder::default()
.before_timestamp(before)
.build(),
)
.await
}
async fn run_cleanup_with_policy(&self, policy: CleanupPolicy) -> Result<RemovalStats> {
let db = self.open().await?;
cleanup_old_versions(&db, policy).await
}
async fn run_cleanup_with_override(
&self,
before: DateTime<Utc>,
delete_unverified: Option<bool>,
error_if_tagged_old_versions: Option<bool>,
) -> Result<RemovalStats> {
let db = self.open().await?;
cleanup_old_versions(
&db,
CleanupPolicyBuilder::default()
.before_timestamp(before)
.delete_unverified(delete_unverified.unwrap_or(false))
.error_if_tagged_old_versions(error_if_tagged_old_versions.unwrap_or(true))
.build(),
)
.await
}
async fn open(&self) -> Result<Box<Dataset>> {
let ds = DatasetBuilder::from_uri(&self.dataset_path)
.with_read_params(ReadParams {
store_options: Some(self.os_params()),
..Default::default()
})
.load()
.await?;
Ok(Box::new(ds))
}
async fn load(&self) -> Result<Dataset> {
self.load_dataset(&self.dataset_path).await
}
async fn load_dataset(&self, uri: &str) -> Result<Dataset> {
DatasetBuilder::from_uri(uri)
.with_read_params(ReadParams {
store_options: Some(self.os_params()),
..Default::default()
})
.load()
.await
}
async fn create_branch_and_load<V: Into<crate::dataset::refs::Ref>>(
&self,
from_dataset: &mut Dataset,
branch_name: &str,
source_ref: V,
) -> Result<Dataset> {
let branch_ds = from_dataset
.create_branch(branch_name, source_ref, Some(self.os_params()))
.await?;
self.load_dataset(&branch_ds.uri).await
}
async fn count_files(&self) -> Result<FileCounts> {
let registry = Arc::new(ObjectStoreRegistry::default());
let (os, path) =
ObjectStore::from_uri_and_params(registry, &self.dataset_path, &self.os_params())
.await?;
let mut file_stream = os.read_dir_all(&path, None);
let mut file_count = FileCounts {
num_data_files: 0,
num_delete_files: 0,
num_index_files: 0,
num_manifest_files: 0,
num_tx_files: 0,
num_bytes: 0,
};
while let Some(path) = file_stream.try_next().await? {
file_count.num_bytes += path.size;
match path.location.extension() {
Some("lance") => file_count.num_data_files += 1,
Some("manifest") => file_count.num_manifest_files += 1,
Some("arrow") | Some("bin") => file_count.num_delete_files += 1,
Some("idx") => file_count.num_index_files += 1,
Some("txn") => file_count.num_tx_files += 1,
_ => (),
}
}
Ok(file_count)
}
async fn count_blob_files(&self) -> Result<usize> {
let registry = Arc::new(ObjectStoreRegistry::default());
let (os, path) =
ObjectStore::from_uri_and_params(registry, &self.dataset_path, &self.os_params())
.await?;
let mut file_stream = os.read_dir_all(&path, None);
let mut blob_count = 0usize;
while let Some(path) = file_stream.try_next().await? {
if path.location.extension() == Some("blob") {
blob_count += 1;
}
}
Ok(blob_count)
}
async fn count_rows(&self) -> Result<usize> {
let db = self.open().await?;
let count = db.count_rows(None).await?;
Ok(count)
}
}
fn blob_v2_batch(blob_len: usize) -> Box<dyn RecordBatchReader + Send> {
let mut blobs = BlobArrayBuilder::new(1);
blobs.push_bytes(vec![0u8; blob_len]).unwrap();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
blob_field("blob", true),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1])), blobs.finish().unwrap()],
)
.unwrap();
Box::new(RecordBatchIterator::new(
vec![Ok(batch)].into_iter(),
schema,
))
}
#[tokio::test]
async fn cleanup_unreferenced_data_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
let before_count = fixture.count_files().await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(removed.data_files_removed, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_lt!(after_count.num_data_files, before_count.num_data_files);
assert_lt!(
after_count.num_manifest_files,
before_count.num_manifest_files
);
assert_lt!(after_count.num_tx_files, before_count.num_tx_files);
assert_gt!(after_count.num_manifest_files, 0);
assert_gt!(after_count.num_data_files, 0);
assert_gt!(after_count.num_tx_files, 0);
}
#[tokio::test]
async fn cleanup_blob_v2_sidecar_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
Dataset::write(
blob_v2_batch(100 * 1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Create,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();
assert_gt!(fixture.count_blob_files().await.unwrap(), 0);
Dataset::write(
blob_v2_batch(1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Overwrite,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
assert_eq!(fixture.count_blob_files().await.unwrap(), 0);
}
#[tokio::test]
async fn cleanup_recent_blob_v2_sidecar_files_when_verified() {
let fixture = MockDatasetFixture::try_new().unwrap();
Dataset::write(
blob_v2_batch(100 * 1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Create,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();
Dataset::write(
blob_v2_batch(1024),
&fixture.dataset_path,
Some(WriteParams {
store_params: Some(fixture.os_params()),
commit_handler: Some(Arc::new(RenameCommitHandler)),
mode: WriteMode::Overwrite,
data_storage_version: Some(lance_file::version::LanceFileVersion::V2_2),
..Default::default()
}),
)
.await
.unwrap();
fixture
.run_cleanup(utc_now() + TimeDelta::seconds(1))
.await
.unwrap();
assert_eq!(fixture.count_blob_files().await.unwrap(), 0);
}
#[tokio::test]
async fn do_not_cleanup_newer_data() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.append_some_data().await.unwrap();
fixture.append_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_manifest_files, 3);
let before = utc_now() - TimeDelta::try_days(7).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 3);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);
}
#[tokio::test]
async fn cleanup_error_when_tagged_old_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
let dataset = *(fixture.open().await.unwrap());
dataset.tags().create("old-tag", 1).await.unwrap();
dataset.tags().create("another-old-tag", 2).await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(20).unwrap())
.await
.unwrap();
assert_eq!(removed.old_versions, 0);
let mut cleanup_error = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.err()
.unwrap();
assert_contains!(
cleanup_error.to_string(),
"Cleanup error: 2 tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup:"
);
dataset.tags().delete("old-tag").await.unwrap();
cleanup_error = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.err()
.unwrap();
assert_contains!(
cleanup_error.to_string(),
"Cleanup error: 1 tagged version(s) have been marked for cleanup. Either set `error_if_tagged_old_versions=false` or delete the following tag(s) to enable cleanup:"
);
dataset.tags().delete("another-old-tag").await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
assert_eq!(removed.old_versions, 2);
}
#[tokio::test]
async fn cleanup_around_tagged_old_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
let dataset = *(fixture.open().await.unwrap());
dataset.tags().create("old-tag", 1).await.unwrap();
dataset.tags().create("another-old-tag", 2).await.unwrap();
dataset.tags().create("tag-latest", 3).await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
let mut removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 0);
dataset.tags().delete("old-tag").await.unwrap();
removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 1);
dataset.tags().delete("another-old-tag").await.unwrap();
removed = fixture
.run_cleanup_with_override(
utc_now() - TimeDelta::try_days(8).unwrap(),
None,
Some(false),
)
.await
.unwrap();
assert_eq!(removed.old_versions, 1);
}
async fn check_num_files(fixture: &MockDatasetFixture, num_expected_files: usize) {
let file_count = fixture.count_files().await.unwrap();
assert_eq!(file_count.num_data_files, num_expected_files);
assert_eq!(file_count.num_manifest_files, num_expected_files);
assert_eq!(file_count.num_tx_files, num_expected_files);
}
#[tokio::test]
async fn auto_cleanup_old_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
let dataset_config = &fixture.open().await.unwrap().manifest.config;
let cleanup_interval: usize = dataset_config
.get("lance.auto_cleanup.interval")
.unwrap()
.parse()
.unwrap();
let cleanup_older_than = TimeDelta::from_std(
parse_duration(dataset_config.get("lance.auto_cleanup.older_than").unwrap()).unwrap(),
)
.unwrap();
for num_expected_files in 2..2 * cleanup_interval {
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, num_expected_files).await;
}
MockClock::set_system_time(
(cleanup_older_than + TimeDelta::minutes(1))
.to_std()
.unwrap(),
);
for num_expected_files in 2..cleanup_interval {
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, num_expected_files).await;
}
let mut dataset = *(fixture.open().await.unwrap());
let mut new_autoclean_params = HashMap::new();
let new_cleanup_older_than_str = "1month 2days 2h 42min 6sec";
let new_cleanup_older_than =
TimeDelta::from_std(parse_duration(new_cleanup_older_than_str).unwrap()).unwrap();
new_autoclean_params.insert(
"lance.auto_cleanup.older_than".to_string(),
new_cleanup_older_than_str.to_string(),
);
let new_cleanup_interval = 5;
new_autoclean_params.insert(
"lance.auto_cleanup.interval".to_string(),
new_cleanup_interval.to_string(),
);
let config_updates = new_autoclean_params
.into_iter()
.map(|(k, v)| (k, Some(v)))
.collect::<HashMap<String, Option<String>>>();
dataset.update_config(config_updates).await.unwrap();
MockClock::set_system_time(
(cleanup_older_than + new_cleanup_older_than + TimeDelta::minutes(2))
.to_std()
.unwrap(),
);
fixture.overwrite_some_data().await.unwrap();
for num_expected_files in 2..new_cleanup_interval {
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, num_expected_files).await;
}
}
#[tokio::test]
async fn test_auto_cleanup_interval_zero() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, 3).await;
let mut dataset = fixture.open().await.unwrap();
let mut config_updates = HashMap::new();
config_updates.insert(
"lance.auto_cleanup.interval".to_string(),
Some("0".to_string()),
);
config_updates.insert(
"lance.auto_cleanup.retain_versions".to_string(),
Some("1".to_string()),
);
dataset
.update_config(config_updates)
.replace()
.await
.unwrap();
fixture.overwrite_some_data().await.unwrap();
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, 2).await;
fixture.overwrite_some_data().await.unwrap();
check_num_files(&fixture, 2).await;
}
#[tokio::test]
async fn cleanup_recent_verified_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap());
fixture.overwrite_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
let before = utc_now();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
}
#[tokio::test]
async fn dont_cleanup_recent_unverified_files() {
for (override_opt, old_files) in [
(Some(false), false), (Some(true), false), (None, true), (None, false), ] {
MockClock::set_system_time(std::time::Duration::from_secs(0));
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
let age = if old_files {
TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS + 1).unwrap()
} else {
TimeDelta::try_days(UNVERIFIED_THRESHOLD_DAYS - 1).unwrap()
};
MockClock::set_system_time(age.to_std().unwrap());
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 1);
let before = utc_now();
let removed = fixture
.run_cleanup_with_override(before, override_opt, None)
.await
.unwrap();
let should_delete = override_opt.unwrap_or(false) || old_files;
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
if should_delete {
assert_gt!(removed.bytes_removed, 0);
} else {
assert_eq!(removed.bytes_removed, 0);
}
}
}
#[tokio::test]
async fn cleanup_old_index() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.create_some_index().await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.overwrite_some_data().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_index_files, 2);
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 3);
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 2);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_index_files, 0);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}
#[tokio::test]
async fn clean_old_delete_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
let mut data_gen = BatchGenerator::new().col(Box::new(
IncrementingInt32::new().named("filter_me".to_owned()),
));
fixture.create_with_data(data_gen.batch(16)).await.unwrap();
fixture.append_data(data_gen.batch(16)).await.unwrap();
fixture.delete_data("filter_me < 20").await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.overwrite_data(data_gen.batch(16)).await.unwrap();
fixture.delete_data("filter_me >= 40").await.unwrap();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 3);
assert_eq!(before_count.num_delete_files, 2);
assert_eq!(before_count.num_manifest_files, 5);
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 3);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_delete_files, 1);
assert_eq!(after_count.num_manifest_files, 2);
assert_eq!(after_count.num_tx_files, 2);
let row_count_after = fixture.count_rows().await.unwrap();
assert_eq!(row_count_after, 8);
}
#[tokio::test]
async fn cleanup_collects_removed_file_metrics() {
let fixture = MockDatasetFixture::try_new().unwrap();
let row_count = 512;
let mut data_gen = BatchGenerator::new()
.col(Box::new(
IncrementingInt32::new().named("filter_me".to_owned()),
))
.col(Box::new(RandomVector::new().named("indexable".to_owned())));
fixture
.create_with_data(data_gen.batch(row_count))
.await
.unwrap();
fixture
.append_data(data_gen.batch(row_count))
.await
.unwrap();
fixture.create_some_index().await.unwrap();
fixture.delete_data("filter_me < 20").await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture
.overwrite_data(data_gen.batch(row_count))
.await
.unwrap();
fixture.delete_data("filter_me >= 40").await.unwrap();
let before_count = fixture.count_files().await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(8).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
let data_files_removed = (before_count.num_data_files - after_count.num_data_files) as u64;
let transaction_files_removed =
(before_count.num_tx_files - after_count.num_tx_files) as u64;
let index_files_removed =
(before_count.num_index_files - after_count.num_index_files) as u64;
let deletion_files_removed =
(before_count.num_delete_files - after_count.num_delete_files) as u64;
assert_eq!(removed.data_files_removed, data_files_removed);
assert_eq!(removed.transaction_files_removed, transaction_files_removed);
assert_eq!(removed.index_files_removed, index_files_removed);
assert_eq!(removed.deletion_files_removed, deletion_files_removed);
assert_gt!(removed.data_files_removed, 0);
assert_gt!(removed.transaction_files_removed, 0);
assert_gt!(removed.index_files_removed, 0);
assert_gt!(removed.deletion_files_removed, 0);
}
#[tokio::test]
async fn dont_clean_index_data_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.create_some_data().await.unwrap();
fixture.create_some_index().await.unwrap();
let before_count = fixture.count_files().await.unwrap();
let before = utc_now() - TimeDelta::try_days(8).unwrap();
let removed = fixture.run_cleanup(before).await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(removed.bytes_removed, 0);
let after_count = fixture.count_files().await.unwrap();
assert_eq!(before_count, after_count);
}
#[tokio::test]
async fn cleanup_failed_commit_data_file() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 1);
assert_eq!(before_count.num_tx_files, 2);
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(removed.data_files_removed, 1);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
assert_eq!(after_count.num_tx_files, 1);
}
#[tokio::test]
async fn dont_cleanup_in_progress_write() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.create_some_data().await.unwrap();
fixture.block_commits();
assert!(fixture.append_some_data().await.is_err());
let before_count = fixture.count_files().await.unwrap();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
assert_eq!(removed.old_versions, 0);
assert_eq!(removed.bytes_removed, 0);
assert_eq!(removed.data_files_removed, 0);
let after_count = fixture.count_files().await.unwrap();
assert_eq!(before_count, after_count);
}
#[tokio::test]
async fn can_recover_delete_failure() {
let mut fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
fixture.overwrite_some_data().await.unwrap();
fixture.block_delete_manifest();
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 2);
assert_eq!(before_count.num_manifest_files, 2);
assert!(
fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.is_err()
);
let mid_count = fixture.count_files().await.unwrap();
assert_eq!(mid_count.num_data_files, 1);
assert_eq!(mid_count.num_manifest_files, 2);
fixture.unblock_delete_manifest();
let removed = fixture
.run_cleanup(utc_now() - TimeDelta::try_days(7).unwrap())
.await
.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 1);
assert_eq!(
removed.bytes_removed,
mid_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 1);
assert_eq!(after_count.num_manifest_files, 1);
}
#[tokio::test]
async fn cleanup_and_retain_3_recent_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
let mut time = 10i64;
for _ in 0..4 {
MockClock::set_system_time(TimeDelta::try_seconds(time).unwrap().to_std().unwrap());
time += 10i64;
fixture.overwrite_some_data().await.unwrap();
}
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 5);
assert_eq!(before_count.num_manifest_files, 5);
let policy = CleanupPolicyBuilder::default()
.retain_n_versions(&fixture.open().await.unwrap(), 3)
.await
.unwrap()
.build();
let removed = fixture.run_cleanup_with_policy(policy).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 2);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 3);
assert_eq!(after_count.num_manifest_files, 3);
}
#[tokio::test]
async fn cleanup_before_ts_and_retain_n_recent_versions() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
let mut time = 1i64;
for _ in 0..4 {
MockClock::set_system_time(TimeDelta::try_days(time).unwrap().to_std().unwrap());
time += 1i64;
fixture.overwrite_some_data().await.unwrap();
}
let before_count = fixture.count_files().await.unwrap();
assert_eq!(before_count.num_data_files, 5);
assert_eq!(before_count.num_manifest_files, 5);
let policy = CleanupPolicyBuilder::default()
.before_timestamp(utc_now() - TimeDelta::try_days(6).unwrap())
.retain_n_versions(&fixture.open().await.unwrap(), 3)
.await
.unwrap()
.build();
let removed = fixture.run_cleanup_with_policy(policy).await.unwrap();
assert_eq!(removed.old_versions, 0);
let policy = CleanupPolicyBuilder::default()
.before_timestamp(utc_now())
.retain_n_versions(&fixture.open().await.unwrap(), 10)
.await
.unwrap()
.build();
let removed = fixture.run_cleanup_with_policy(policy).await.unwrap();
assert_eq!(removed.old_versions, 0);
let policy = CleanupPolicyBuilder::default()
.before_timestamp(utc_now() - TimeDelta::try_days(2).unwrap())
.retain_n_versions(&fixture.open().await.unwrap(), 3)
.await
.unwrap()
.build();
let removed = fixture.run_cleanup_with_policy(policy).await.unwrap();
let after_count = fixture.count_files().await.unwrap();
assert_eq!(removed.old_versions, 2);
assert_eq!(
removed.bytes_removed,
before_count.num_bytes - after_count.num_bytes
);
assert_eq!(after_count.num_data_files, 3);
assert_eq!(after_count.num_manifest_files, 3);
}
#[tokio::test]
async fn cleanup_preserves_unmanaged_dirs_and_files() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
let registry = Arc::new(ObjectStoreRegistry::default());
let (os, base) =
ObjectStore::from_uri_and_params(registry, &fixture.dataset_path, &fixture.os_params())
.await
.unwrap();
let img = base.child("images").child("clip.mp4");
let misc = base.child("misc").child("notes.txt");
let branch_file = base.child("tree").child("branchA").child("data.bin");
os.put(&img, b"video").await.unwrap();
os.put(&misc, b"notes").await.unwrap();
os.put(&branch_file, b"branch").await.unwrap();
let tmp_manifest = base.child("_versions").child(".tmp").child("orphan");
os.put(&tmp_manifest, b"tmp").await.unwrap();
os.remove_dir_all(base.child(TRANSACTIONS_DIR))
.await
.unwrap();
fixture
.run_cleanup_with_override(utc_now(), Some(true), Some(false))
.await
.unwrap();
assert!(!os.exists(&tmp_manifest).await.unwrap());
assert!(os.exists(&img).await.unwrap());
assert!(os.exists(&misc).await.unwrap());
assert!(os.exists(&branch_file).await.unwrap());
}
struct LineageSetup {
main: BranchDatasetFixture,
branch1: BranchDatasetFixture,
branch2: BranchDatasetFixture,
branch3: BranchDatasetFixture,
branch4: BranchDatasetFixture,
}
impl LineageSetup {
pub async fn assert_all_unchanged(&mut self) {
self.main.assert_not_changed().await.unwrap();
self.branch1.assert_not_changed().await.unwrap();
self.branch2.assert_not_changed().await.unwrap();
self.branch3.assert_not_changed().await.unwrap();
self.branch4.assert_not_changed().await.unwrap();
}
pub async fn assert_unchanged(&mut self, branches: &[&str]) {
for &b in branches {
match b {
"main" => self.main.assert_not_changed().await.unwrap(),
"branch1" => self.branch1.assert_not_changed().await.unwrap(),
"branch2" => self.branch2.assert_not_changed().await.unwrap(),
"branch3" => self.branch3.assert_not_changed().await.unwrap(),
"branch4" => self.branch4.assert_not_changed().await.unwrap(),
_ => panic!("unknown branch: {}", b),
}
}
}
pub async fn enable_auto_cleanup(&mut self) -> Result<()> {
let updates = [
("lance.auto_cleanup.interval", "1"),
("lance.auto_cleanup.retain_versions", "1"),
("lance.auto_cleanup.referenced_branch", "true"),
];
self.main.dataset.update_config(updates).await?;
self.branch1.dataset.update_config(updates).await?;
self.branch2.dataset.update_config(updates).await?;
self.branch3.dataset.update_config(updates).await?;
self.branch4.dataset.update_config(updates).await?;
self.main.refresh().await?;
self.branch1.refresh().await?;
self.branch2.refresh().await?;
self.branch3.refresh().await?;
self.branch4.refresh().await?;
Ok(())
}
pub async fn disable_auto_cleanup(&mut self) -> Result<()> {
let updates = [
("lance.auto_cleanup.interval", None),
("lance.auto_cleanup.retain_versions", None),
("lance.auto_cleanup.older_than", None),
];
self.main.dataset.update_config(updates).await?;
self.branch1.dataset.update_config(updates).await?;
self.branch2.dataset.update_config(updates).await?;
self.branch3.dataset.update_config(updates).await?;
self.branch4.dataset.update_config(updates).await?;
self.main.refresh().await?;
self.branch1.refresh().await?;
self.branch2.refresh().await?;
self.branch3.refresh().await?;
self.branch4.refresh().await?;
Ok(())
}
}
async fn build_lineage_datasets() -> Result<LineageSetup> {
let fixture = Arc::new(MockDatasetFixture::try_new()?);
MockClock::set_system_time(TimeDelta::try_seconds(1).unwrap().to_std().unwrap());
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field};
let ids = Int32Array::from_iter_values(0..50i32);
let texts = StringArray::from_iter_values((0..50i32).map(|i| format!("text_{}", i)));
let schema = Arc::new(arrow_schema::Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("text", DataType::Utf8, false),
]));
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(texts)]).unwrap();
let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
Dataset::write(
reader,
&fixture.dataset_path,
Some(WriteParams {
mode: WriteMode::Create,
store_params: Some(fixture.os_params()),
..Default::default()
}),
)
.await?;
let mut main = BranchDatasetFixture::new(fixture.clone(), fixture.load().await?);
main.create_text_index().await?;
main.write_data().await?;
let mut branch1 = BranchDatasetFixture::new(
fixture.clone(),
fixture
.create_branch_and_load(&mut main.dataset, "branch1", (None, None))
.await?,
);
branch1.write_data().await?;
let mut branch2 = BranchDatasetFixture::new(
fixture.clone(),
fixture
.create_branch_and_load(&mut branch1.dataset, "dev/branch2", ("branch1", None))
.await?,
);
branch2.write_data().await?;
let mut branch3 = BranchDatasetFixture::new(
fixture.clone(),
fixture
.create_branch_and_load(
&mut branch2.dataset,
"feature/nathan/branch3",
("dev/branch2", None),
)
.await?,
);
branch3.write_data().await?;
main.write_data().await?;
let mut branch4 = BranchDatasetFixture::new(
fixture.clone(),
fixture
.create_branch_and_load(&mut main.dataset, "branch4", (None, None))
.await?,
);
branch4.write_data().await?;
let mut lineage = LineageSetup {
main,
branch1,
branch2,
branch3,
branch4,
};
lineage.disable_auto_cleanup().await?;
Ok(lineage)
}
struct BranchDatasetFixture {
fixture: Arc<MockDatasetFixture>,
dataset: Dataset,
counts: FileCounts,
}
impl BranchDatasetFixture {
fn new(fixture: Arc<MockDatasetFixture>, dataset: Dataset) -> Self {
Self {
fixture,
dataset,
counts: FileCounts {
num_manifest_files: 0,
num_data_files: 0,
num_tx_files: 0,
num_delete_files: 0,
num_index_files: 0,
num_bytes: 0,
},
}
}
async fn create_text_index(&mut self) -> Result<()> {
use lance_index::scalar::InvertedIndexParams;
use lance_index::{DatasetIndexExt, IndexType};
let params = InvertedIndexParams::default();
self.dataset
.create_index(&["text"], IndexType::Inverted, None, ¶ms, true)
.await?;
Ok(())
}
async fn append_delete_and_optimize_index(&mut self) -> Result<()> {
self.write_batch(5).await?;
self.delete_last_row().await?;
use lance_index::optimize::OptimizeOptions;
self.dataset
.optimize_indices(&OptimizeOptions::append())
.await?;
Ok(())
}
async fn write_batch(&mut self, rows: i32) -> Result<()> {
use crate::dataset::WriteParams;
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field};
let ids = Int32Array::from_iter_values(0..rows);
let texts = StringArray::from_iter_values((0..rows).map(|i| format!("text_{}", i)));
let schema = Arc::new(arrow_schema::Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("text", DataType::Utf8, false),
]));
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(ids), Arc::new(texts)]).unwrap();
let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
self.dataset
.append(
reader,
Some(WriteParams {
mode: WriteMode::Append,
store_params: Some(self.fixture.os_params()),
..Default::default()
}),
)
.await?;
self.dataset.checkout_latest().await?;
Ok(())
}
async fn delete_last_row(&mut self) -> Result<()> {
let batch = self.dataset.scan().with_row_id().try_into_batch().await?;
if batch.num_rows() > 0 {
let row_id_col = batch.column_by_name(lance_core::ROW_ID).unwrap();
let uint64_array = row_id_col.as_any().downcast_ref::<UInt64Array>().unwrap();
let max_row_id = compute::max(uint64_array).unwrap_or(0);
self.dataset
.delete(&format!("_rowid = {}", max_row_id))
.await?;
}
Ok(())
}
async fn refresh(&mut self) -> Result<()> {
use futures::TryStreamExt;
let branch_path = self.dataset.base.clone();
async fn count_dir(
os: &ObjectStore,
dir: &Path,
exts: Option<&[&str]>,
) -> Result<usize> {
let mut count = 0usize;
let mut s = os.read_dir_all(dir, None);
while let Some(meta) = s.try_next().await? {
match exts {
Some(exts) => {
if let Some(e) = meta.location.extension()
&& exts.contains(&e)
{
count += 1;
}
}
None => count += 1,
}
}
Ok(count)
}
let manifest_dir = branch_path.child("_versions");
self.counts.num_manifest_files = count_dir(
&self.dataset.object_store,
&manifest_dir,
Some(&["manifest"]),
)
.await
.unwrap_or(0);
let txn_dir = branch_path.child("_transactions");
self.counts.num_tx_files =
count_dir(&self.dataset.object_store, &txn_dir, Some(&["txn"]))
.await
.unwrap_or(0);
let idx_dir = branch_path.child(crate::dataset::INDICES_DIR);
self.counts.num_index_files = count_dir(&self.dataset.object_store, &idx_dir, None)
.await
.unwrap_or(0);
let del_dir = branch_path.child("_deletions");
self.counts.num_delete_files = count_dir(
&self.dataset.object_store,
&del_dir,
Some(&["arrow", "bin"]),
)
.await
.unwrap_or(0);
let data_dir = branch_path.child(crate::dataset::DATA_DIR);
self.counts.num_data_files =
count_dir(&self.dataset.object_store, &data_dir, Some(&["lance"]))
.await
.unwrap_or(0);
Ok(())
}
async fn count_data(&self) -> Result<usize> {
use futures::TryStreamExt;
let mut count = 0usize;
let mut s = self.dataset.scan().try_into_stream().await?;
while let Some(_batch) = s.try_next().await? {
count += 1;
}
Ok(count)
}
async fn assert_not_changed(&mut self) -> Result<()> {
let pre_counts = self.counts;
let pre_data_count = self.count_data().await?;
self.refresh().await?;
assert_eq!(
self.counts.num_manifest_files,
pre_counts.num_manifest_files
);
assert_eq!(self.counts.num_data_files, pre_counts.num_data_files);
assert_eq!(self.counts.num_tx_files, pre_counts.num_tx_files);
assert_eq!(self.counts.num_delete_files, pre_counts.num_delete_files);
assert_eq!(self.counts.num_index_files, pre_counts.num_index_files);
assert_eq!(self.count_data().await?, pre_data_count);
Ok(())
}
async fn write_data(&mut self) -> Result<()> {
self.append_delete_and_optimize_index().await?;
self.refresh().await
}
async fn compact(&mut self) -> Result<()> {
use crate::dataset::optimize::{CompactionOptions, compact_files};
compact_files(&mut self.dataset, CompactionOptions::default(), None).await?;
self.refresh().await
}
async fn run_cleanup(&mut self) -> Result<RemovalStats> {
let policy = CleanupPolicyBuilder::default()
.error_if_tagged_old_versions(false)
.retain_n_versions(&self.dataset, 1)
.await?
.build();
self.run_cleanup_inner(policy).await
}
async fn run_cleanup_with_referenced_branches(&mut self) -> Result<RemovalStats> {
let policy = CleanupPolicyBuilder::default()
.error_if_tagged_old_versions(false)
.clean_referenced_branches(true)
.retain_n_versions(&self.dataset, 1)
.await?
.build();
self.run_cleanup_inner(policy).await
}
async fn run_cleanup_inner(&mut self, policy: CleanupPolicy) -> Result<RemovalStats> {
let pre_count = self.count_data().await?;
self.dataset.checkout_latest().await?;
let stats = cleanup_old_versions(&self.dataset, policy).await;
self.refresh().await?;
assert_eq!(self.count_data().await?, pre_count);
stats
}
}
#[tokio::test]
async fn cleanup_lineage_branch1() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.branch1.write_data().await.unwrap();
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 2);
assert_eq!(setup.branch1.counts.num_index_files, 23);
setup.assert_all_unchanged().await;
setup.branch1.compact().await.unwrap();
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 1);
assert_eq!(setup.branch1.counts.num_index_files, 23);
setup.assert_all_unchanged().await;
setup.branch2.compact().await.unwrap();
setup.branch3.compact().await.unwrap();
setup.branch3.run_cleanup().await.unwrap();
setup.branch2.run_cleanup().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 16);
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 1);
assert_eq!(setup.branch1.counts.num_data_files, 1);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 0);
assert_eq!(setup.branch1.counts.num_index_files, 13);
setup.assert_all_unchanged().await;
}
#[tokio::test]
async fn cleanup_lineage_branch3() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.branch3.write_data().await.unwrap();
setup.branch3.run_cleanup().await.unwrap();
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 2);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 2);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup
.assert_unchanged(&["branch1", "branch2", "branch4", "main"])
.await;
setup.branch2.compact().await.unwrap();
setup.branch2.run_cleanup().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 2);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 13);
setup.branch3.compact().await.unwrap();
setup.branch3.run_cleanup().await.unwrap();
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup
.assert_unchanged(&["branch1", "branch2", "branch4", "main"])
.await;
setup.branch2.compact().await.unwrap();
setup.branch2.run_cleanup().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 13);
}
#[tokio::test]
async fn cleanup_lineage_branch4() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.branch4.write_data().await.unwrap();
setup.branch4.run_cleanup().await.unwrap();
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 2);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 2);
assert_eq!(setup.branch4.counts.num_index_files, 16);
setup.assert_all_unchanged().await;
setup.main.compact().await.unwrap();
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 17);
setup.branch4.compact().await.unwrap();
setup.branch4.run_cleanup().await.unwrap();
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 16);
setup.assert_all_unchanged().await;
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 3);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 1);
assert_eq!(setup.main.counts.num_index_files, 17);
}
#[tokio::test]
async fn cleanup_lineage_main() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.main.write_data().await.unwrap();
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 3);
assert_eq!(setup.main.counts.num_index_files, 30);
setup.assert_all_unchanged().await;
setup.main.compact().await.unwrap();
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 30);
setup.assert_all_unchanged().await;
setup.branch1.write_data().await.unwrap();
setup.branch1.compact().await.unwrap();
setup.branch2.write_data().await.unwrap();
setup.branch2.compact().await.unwrap();
setup.branch2.run_cleanup().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 2);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 29);
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 1);
assert_eq!(setup.branch1.counts.num_index_files, 13);
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 23);
setup.branch3.write_data().await.unwrap();
setup.branch3.compact().await.unwrap();
setup.branch3.run_cleanup().await.unwrap();
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 23);
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 1);
assert_eq!(setup.branch1.counts.num_index_files, 13);
setup.branch2.run_cleanup().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 16);
setup.branch1.run_cleanup().await.unwrap();
assert_eq!(setup.branch1.counts.num_manifest_files, 1);
assert_eq!(setup.branch1.counts.num_data_files, 1);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 0);
assert_eq!(setup.branch1.counts.num_index_files, 13);
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 23);
setup.branch4.write_data().await.unwrap();
setup.branch4.compact().await.unwrap();
setup.branch4.run_cleanup().await.unwrap();
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 16);
setup.main.run_cleanup().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 1);
assert_eq!(setup.main.counts.num_data_files, 1);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 0);
assert_eq!(setup.main.counts.num_index_files, 13);
}
#[tokio::test]
async fn auto_clean_referenced_branches_from_branch2() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.branch3.write_data().await.unwrap();
setup.enable_auto_cleanup().await.unwrap();
setup
.branch2
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch3.refresh().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 2);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 2);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup
.assert_unchanged(&["branch1", "branch4", "main"])
.await;
setup.disable_auto_cleanup().await.unwrap();
setup.branch2.write_data().await.unwrap();
setup.branch2.compact().await.unwrap();
setup.branch3.compact().await.unwrap();
setup.enable_auto_cleanup().await.unwrap();
setup
.branch2
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch3.refresh().await.unwrap();
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 16);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup
.assert_unchanged(&["branch1", "branch4", "main"])
.await;
}
#[tokio::test]
async fn auto_clean_referenced_branches_from_main() {
let mut setup = build_lineage_datasets().await.unwrap();
setup.enable_auto_cleanup().await.unwrap();
setup.main.write_data().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 3);
assert_eq!(setup.main.counts.num_index_files, 13);
setup.main.compact().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 13);
setup.branch4.compact().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch4.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 3);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 1);
assert_eq!(setup.main.counts.num_index_files, 13);
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 13);
setup.branch1.write_data().await.unwrap();
setup.branch1.compact().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch1.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 3);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 1);
assert_eq!(setup.main.counts.num_index_files, 13);
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 1);
assert_eq!(setup.branch1.counts.num_index_files, 13);
setup.branch2.write_data().await.unwrap();
setup.branch2.compact().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch2.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 3);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 1);
assert_eq!(setup.main.counts.num_index_files, 13);
assert_eq!(setup.branch1.counts.num_manifest_files, 2);
assert_eq!(setup.branch1.counts.num_data_files, 2);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 1);
assert_eq!(setup.branch1.counts.num_index_files, 13);
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 2);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 16);
setup.branch3.write_data().await.unwrap();
setup.branch3.compact().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch1.refresh().await.unwrap();
setup.branch2.refresh().await.unwrap();
setup.branch3.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 1);
assert_eq!(setup.main.counts.num_data_files, 1);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 0);
assert_eq!(setup.main.counts.num_index_files, 13);
assert_eq!(setup.branch1.counts.num_manifest_files, 1);
assert_eq!(setup.branch1.counts.num_data_files, 1);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 0);
assert_eq!(setup.branch1.counts.num_index_files, 13);
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 16);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 19);
setup.assert_unchanged(&["branch4"]).await;
}
#[tokio::test]
async fn auto_clean_referenced_branches_with_tags() {
let mut setup = build_lineage_datasets().await.unwrap();
setup
.branch3
.dataset
.tags()
.create("branch3-tag", setup.branch3.dataset.version().version)
.await
.unwrap();
setup
.main
.dataset
.tags()
.create("main-tag", setup.main.dataset.version().version)
.await
.unwrap();
setup.branch1.compact().await.unwrap();
setup.branch2.compact().await.unwrap();
setup.branch3.compact().await.unwrap();
setup.branch4.compact().await.unwrap();
setup.main.compact().await.unwrap();
setup.enable_auto_cleanup().await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch1.refresh().await.unwrap();
setup.branch2.refresh().await.unwrap();
setup.branch3.refresh().await.unwrap();
setup.branch4.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 3);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 2);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 20);
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 2);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch2.counts.num_manifest_files, 2);
assert_eq!(setup.branch2.counts.num_data_files, 2);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 1);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 13);
setup
.branch3
.dataset
.tags()
.delete("branch3-tag")
.await
.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch1.refresh().await.unwrap();
setup.branch2.refresh().await.unwrap();
setup.branch3.refresh().await.unwrap();
setup.branch4.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 2);
assert_eq!(setup.main.counts.num_data_files, 4);
assert_eq!(setup.main.counts.num_tx_files, 2);
assert_eq!(setup.main.counts.num_delete_files, 2);
assert_eq!(setup.main.counts.num_index_files, 20);
assert_eq!(setup.branch1.counts.num_manifest_files, 1);
assert_eq!(setup.branch1.counts.num_data_files, 1);
assert_eq!(setup.branch1.counts.num_tx_files, 1);
assert_eq!(setup.branch1.counts.num_delete_files, 0);
assert_eq!(setup.branch1.counts.num_index_files, 10);
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 16);
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 13);
setup.main.dataset.tags().delete("main-tag").await.unwrap();
setup
.main
.run_cleanup_with_referenced_branches()
.await
.unwrap();
setup.branch2.refresh().await.unwrap();
setup.branch3.refresh().await.unwrap();
setup.branch4.refresh().await.unwrap();
assert_eq!(setup.main.counts.num_manifest_files, 1);
assert_eq!(setup.main.counts.num_data_files, 1);
assert_eq!(setup.main.counts.num_tx_files, 1);
assert_eq!(setup.main.counts.num_delete_files, 0);
assert_eq!(setup.main.counts.num_index_files, 10);
assert_eq!(setup.branch2.counts.num_manifest_files, 1);
assert_eq!(setup.branch2.counts.num_data_files, 1);
assert_eq!(setup.branch2.counts.num_tx_files, 1);
assert_eq!(setup.branch2.counts.num_delete_files, 0);
assert_eq!(setup.branch2.counts.num_index_files, 13);
assert_eq!(setup.branch3.counts.num_manifest_files, 1);
assert_eq!(setup.branch3.counts.num_data_files, 1);
assert_eq!(setup.branch3.counts.num_tx_files, 1);
assert_eq!(setup.branch3.counts.num_delete_files, 0);
assert_eq!(setup.branch3.counts.num_index_files, 16);
assert_eq!(setup.branch4.counts.num_manifest_files, 1);
assert_eq!(setup.branch4.counts.num_data_files, 1);
assert_eq!(setup.branch4.counts.num_tx_files, 1);
assert_eq!(setup.branch4.counts.num_delete_files, 0);
assert_eq!(setup.branch4.counts.num_index_files, 13);
}
#[test]
fn test_calculate_duration_s3() {
let normal_rate = 100;
let expected_duration_ns =
1_000_000_000u64.div_ceil(normal_rate * S3_DELETE_STREAM_BATCH_SIZE);
assert_eq!(
calculate_duration("s3".to_string(), normal_rate),
Duration::from_nanos(expected_duration_ns)
);
let min_rate_duration = calculate_duration("s3".to_string(), 1);
assert_eq!(calculate_duration("s3".to_string(), 0), min_rate_duration);
let very_large_rate = 2_000_000;
assert_eq!(
calculate_duration("s3".to_string(), very_large_rate),
Duration::from_nanos(1)
);
}
#[tokio::test]
async fn test_cleanup_with_rate_limit() {
let fixture = MockDatasetFixture::try_new().unwrap();
fixture.create_some_data().await.unwrap();
for _ in 0..4 {
fixture.overwrite_some_data().await.unwrap();
}
MockClock::set_system_time(TimeDelta::try_days(10).unwrap().to_std().unwrap());
let policy = CleanupPolicyBuilder::default()
.before_timestamp(utc_now() - TimeDelta::try_days(8).unwrap())
.delete_rate_limit(1)
.unwrap()
.build();
let start = std::time::Instant::now();
let db = fixture.open().await.unwrap();
let stats = cleanup_old_versions(&db, policy).await.unwrap();
let elapsed = start.elapsed();
assert!(
stats.old_versions > 0,
"expected some old versions to be removed"
);
assert!(
elapsed.as_millis() >= 2000,
"expected cleanup to be rate-limited (elapsed: {:?})",
elapsed
);
}
}