use crate::manifest::{ManifestEntryStatus, ManifestFile, ManifestList, Snapshot};
use crate::metadata::TableMetadata;
use crate::storage::Storage;
use anyhow::Result;
use chrono::{Duration, Utc};
use std::collections::HashSet;
pub struct ExpireSnapshots<'a> {
metadata: &'a mut TableMetadata,
min_snapshots_to_keep: usize,
max_snapshot_age_ms: Option<i64>,
}
impl<'a> ExpireSnapshots<'a> {
pub fn new(metadata: &'a mut TableMetadata, _storage: &'a Storage) -> Self {
Self {
metadata,
min_snapshots_to_keep: 1,
max_snapshot_age_ms: None,
}
}
pub fn with_min_snapshots_to_keep(mut self, min: usize) -> Self {
self.min_snapshots_to_keep = min;
self
}
pub fn with_max_snapshot_age(mut self, duration: Duration) -> Self {
self.max_snapshot_age_ms = Some(duration.num_milliseconds());
self
}
pub async fn execute(self) -> Result<Vec<i64>> {
if self.metadata.snapshots.is_empty() {
return Ok(Vec::new());
}
let now = Utc::now().timestamp_millis();
let mut expired_ids = Vec::new();
let mut snapshots = self.metadata.snapshots.clone();
snapshots.sort_by_key(|s| s.timestamp_ms);
let current_snapshot_id = self.metadata.current_snapshot_id;
let to_expire_candidates: Vec<Snapshot> = snapshots
.into_iter()
.filter(|s| Some(s.snapshot_id) != current_snapshot_id)
.collect();
let num_to_keep = self
.min_snapshots_to_keep
.saturating_sub(if current_snapshot_id.is_some() { 1 } else { 0 });
let mut kept_count = 0;
for snapshot in to_expire_candidates.into_iter().rev() {
let is_too_old = if let Some(max_age) = self.max_snapshot_age_ms {
(now - snapshot.timestamp_ms) > max_age
} else {
false
};
if kept_count < num_to_keep || !is_too_old {
kept_count += 1;
} else {
expired_ids.push(snapshot.snapshot_id);
}
}
if expired_ids.is_empty() {
return Ok(Vec::new());
}
self.metadata
.snapshots
.retain(|s| !expired_ids.contains(&s.snapshot_id));
self.metadata
.snapshot_log
.retain(|e| !expired_ids.contains(&e.snapshot_id));
self.metadata.increment_sequence();
Ok(expired_ids)
}
}
pub struct RemoveOrphanFiles<'a> {
metadata: &'a TableMetadata,
storage: &'a Storage,
}
impl<'a> RemoveOrphanFiles<'a> {
pub fn new(metadata: &'a TableMetadata, storage: &'a Storage) -> Self {
Self { metadata, storage }
}
pub async fn execute(self) -> Result<Vec<String>> {
let mut referenced_files = HashSet::new();
for snapshot in &self.metadata.snapshots {
referenced_files.insert(snapshot.manifest_list.clone());
let manifest_list = ManifestList::load(&snapshot.manifest_list, self.storage).await?;
for entry in &manifest_list.entries {
referenced_files.insert(entry.manifest_path.clone());
let manifest = ManifestFile::load(&entry.manifest_path, self.storage).await?;
for m_entry in &manifest.entries {
if m_entry.status != ManifestEntryStatus::Deleted {
referenced_files.insert(m_entry.data_file.file_path.clone());
}
}
}
}
let all_files = self.storage.list_files(&self.metadata.location).await?;
let mut orphans = Vec::new();
for file in all_files {
if !referenced_files.contains(&file) {
orphans.push(file.clone());
self.storage.delete(&file).await?;
}
}
Ok(orphans)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::Operation;
use crate::schema::Schema;
use object_store::memory::InMemory;
use std::sync::Arc;
async fn setup() -> (TableMetadata, Storage) {
let schema = Schema::builder(1).build();
let metadata = TableMetadata::builder("test", schema).build();
let storage = Storage::new(Arc::new(InMemory::new()));
(metadata, storage)
}
#[tokio::test]
async fn test_expire_snapshots() -> Result<()> {
let (mut metadata, storage) = setup().await;
for i in 1..=5 {
let snapshot = Snapshot::builder(i as i64, format!("/m{}.json", i))
.with_operation(Operation::Append)
.build();
let mut s = snapshot;
s.timestamp_ms = Utc::now().timestamp_millis() - (i as i64 * 1000 * 3600 * 24); metadata.add_snapshot(s);
}
assert_eq!(metadata.snapshots.len(), 5);
let expired = ExpireSnapshots::new(&mut metadata, &storage)
.with_min_snapshots_to_keep(2)
.with_max_snapshot_age(Duration::days(1))
.execute()
.await?;
assert_eq!(expired.len(), 3);
assert_eq!(metadata.snapshots.len(), 2);
Ok(())
}
}