use crate::catalog::Catalog;
use crate::error::{Error, Result};
use crate::spec::Snapshot;
use crate::table::Table;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Debug, Clone)]
pub struct CleanupOptions {
older_than_days: u32,
retain_last: usize,
}
impl Default for CleanupOptions {
fn default() -> Self {
Self {
older_than_days: 7,
retain_last: 10,
}
}
}
impl CleanupOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_older_than_days(mut self, days: u32) -> Self {
self.older_than_days = days;
self
}
pub fn with_retain_last(mut self, count: usize) -> Self {
self.retain_last = count;
self
}
pub fn older_than_days(&self) -> u32 {
self.older_than_days
}
pub fn retain_last(&self) -> usize {
self.retain_last
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotInfo {
pub snapshot_id: i64,
pub timestamp_ms: i64,
pub age_days: f64,
pub operation: String,
pub is_current: bool,
pub refs: Vec<String>,
}
impl SnapshotInfo {
fn from_snapshot(
snapshot: &Snapshot,
current_snapshot_id: Option<i64>,
refs: Vec<String>,
now_ms: i64,
) -> Self {
let age_ms = now_ms - snapshot.timestamp_ms();
let age_days = age_ms as f64 / (24.0 * 60.0 * 60.0 * 1000.0);
Self {
snapshot_id: snapshot.snapshot_id(),
timestamp_ms: snapshot.timestamp_ms(),
age_days,
operation: snapshot.summary().operation().to_string(),
is_current: current_snapshot_id == Some(snapshot.snapshot_id()),
refs,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum RetentionReason {
CurrentSnapshot,
WithinRetainCount,
NotOldEnough,
ReferencedByRef(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetainedSnapshot {
pub info: SnapshotInfo,
pub reason: RetentionReason,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupPlan {
pub snapshots_to_remove: Vec<SnapshotInfo>,
pub snapshots_to_retain: Vec<RetainedSnapshot>,
pub total_snapshots: usize,
pub older_than_days: u32,
pub retain_last: usize,
}
impl CleanupPlan {
pub fn is_empty(&self) -> bool {
self.snapshots_to_remove.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupResult {
pub snapshots_removed: usize,
pub snapshots_retained: usize,
pub removed_snapshot_ids: Vec<i64>,
pub orphaned_manifest_lists: Vec<String>,
}
pub fn plan_snapshot_cleanup(table: &Table, options: &CleanupOptions) -> Result<CleanupPlan> {
let metadata = table.metadata();
let snapshots = metadata.snapshots();
if snapshots.is_empty() {
return Ok(CleanupPlan {
snapshots_to_remove: vec![],
snapshots_to_retain: vec![],
total_snapshots: 0,
older_than_days: options.older_than_days,
retain_last: options.retain_last,
});
}
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| Error::unexpected(format!("Failed to get current time: {}", e)))?
.as_millis() as i64;
let current_snapshot_id = metadata.current_snapshot_id();
let age_threshold_ms = (options.older_than_days as i64) * 24 * 60 * 60 * 1000;
let mut snapshot_refs: std::collections::HashMap<i64, Vec<String>> =
std::collections::HashMap::new();
for (ref_name, snapshot_ref) in metadata.refs() {
snapshot_refs
.entry(snapshot_ref.snapshot_id())
.or_default()
.push(ref_name.clone());
}
let mut snapshot_infos: Vec<SnapshotInfo> = snapshots
.iter()
.map(|s| {
SnapshotInfo::from_snapshot(
s,
current_snapshot_id,
snapshot_refs
.get(&s.snapshot_id())
.cloned()
.unwrap_or_default(),
now_ms,
)
})
.collect();
snapshot_infos.sort_by(|a, b| b.timestamp_ms.cmp(&a.timestamp_ms));
let mut snapshots_to_remove = Vec::new();
let mut snapshots_to_retain = Vec::new();
let retain_last_ids: HashSet<i64> = snapshot_infos
.iter()
.take(options.retain_last)
.map(|s| s.snapshot_id)
.collect();
for info in snapshot_infos {
let retention_reason = if info.is_current {
Some(RetentionReason::CurrentSnapshot)
} else if !info.refs.is_empty() {
Some(RetentionReason::ReferencedByRef(info.refs.join(", ")))
} else if retain_last_ids.contains(&info.snapshot_id) {
Some(RetentionReason::WithinRetainCount)
} else if (now_ms - info.timestamp_ms) < age_threshold_ms {
Some(RetentionReason::NotOldEnough)
} else {
None
};
if let Some(reason) = retention_reason {
snapshots_to_retain.push(RetainedSnapshot { info, reason });
} else {
snapshots_to_remove.push(info);
}
}
Ok(CleanupPlan {
total_snapshots: snapshots.len(),
snapshots_to_remove,
snapshots_to_retain,
older_than_days: options.older_than_days,
retain_last: options.retain_last,
})
}
pub async fn execute_snapshot_cleanup<C: Catalog>(
table: &Table,
catalog: &C,
plan: CleanupPlan,
) -> Result<CleanupResult> {
if plan.is_empty() {
return Ok(CleanupResult {
snapshots_removed: 0,
snapshots_retained: plan.snapshots_to_retain.len(),
removed_snapshot_ids: vec![],
orphaned_manifest_lists: vec![],
});
}
let snapshot_ids_to_remove: HashSet<i64> = plan
.snapshots_to_remove
.iter()
.map(|s| s.snapshot_id)
.collect();
let orphaned_manifest_lists: Vec<String> = table
.metadata()
.snapshots()
.iter()
.filter(|s| snapshot_ids_to_remove.contains(&s.snapshot_id()))
.map(|s| s.manifest_list().to_string())
.collect();
let removed_ids: Vec<i64> = plan
.snapshots_to_remove
.iter()
.map(|s| s.snapshot_id)
.collect();
catalog
.expire_snapshots(table.identifier(), &removed_ids)
.await?;
Ok(CleanupResult {
snapshots_removed: plan.snapshots_to_remove.len(),
snapshots_retained: plan.snapshots_to_retain.len(),
removed_snapshot_ids: removed_ids,
orphaned_manifest_lists,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::{Snapshot, Summary};
fn create_test_snapshot(id: i64, timestamp_ms: i64, parent_id: Option<i64>) -> Snapshot {
let manifest_list = format!("s3://bucket/metadata/snap-{}.avro", id);
let mut builder = Snapshot::builder()
.with_snapshot_id(id)
.with_timestamp_ms(timestamp_ms)
.with_manifest_list(&manifest_list)
.with_summary(Summary::builder().set("operation", "append").build());
if let Some(parent) = parent_id {
builder = builder.with_parent_snapshot_id(parent);
}
builder.build().unwrap()
}
#[test]
fn test_cleanup_options_defaults() {
let options = CleanupOptions::new();
assert_eq!(options.older_than_days(), 7);
assert_eq!(options.retain_last(), 10);
}
#[test]
fn test_cleanup_options_builder() {
let options = CleanupOptions::new()
.with_older_than_days(14)
.with_retain_last(5);
assert_eq!(options.older_than_days(), 14);
assert_eq!(options.retain_last(), 5);
}
#[test]
fn test_snapshot_info_age_calculation() {
let now_ms = 1700000000000i64; let one_day_ago_ms = now_ms - (24 * 60 * 60 * 1000);
let snapshot = create_test_snapshot(1, one_day_ago_ms, None);
let info = SnapshotInfo::from_snapshot(&snapshot, None, vec![], now_ms);
assert!((info.age_days - 1.0).abs() < 0.01);
assert!(!info.is_current);
assert!(info.refs.is_empty());
}
#[test]
fn test_snapshot_info_current_flag() {
let now_ms = 1700000000000i64;
let snapshot = create_test_snapshot(42, now_ms - 1000, None);
let info = SnapshotInfo::from_snapshot(&snapshot, Some(42), vec![], now_ms);
assert!(info.is_current);
let info2 = SnapshotInfo::from_snapshot(&snapshot, Some(99), vec![], now_ms);
assert!(!info2.is_current);
}
}