supertable_core/
action.rs1use crate::manifest::{ManifestEntryStatus, ManifestFile, ManifestList, Snapshot};
7use crate::metadata::TableMetadata;
8use crate::storage::Storage;
9use anyhow::Result;
10use chrono::{Duration, Utc};
11use std::collections::HashSet;
12
13pub struct ExpireSnapshots<'a> {
15 metadata: &'a mut TableMetadata,
16 min_snapshots_to_keep: usize,
17 max_snapshot_age_ms: Option<i64>,
18}
19
20impl<'a> ExpireSnapshots<'a> {
21 pub fn new(metadata: &'a mut TableMetadata, _storage: &'a Storage) -> Self {
22 Self {
23 metadata,
24 min_snapshots_to_keep: 1,
25 max_snapshot_age_ms: None,
26 }
27 }
28
29 pub fn with_min_snapshots_to_keep(mut self, min: usize) -> Self {
30 self.min_snapshots_to_keep = min;
31 self
32 }
33
34 pub fn with_max_snapshot_age(mut self, duration: Duration) -> Self {
35 self.max_snapshot_age_ms = Some(duration.num_milliseconds());
36 self
37 }
38
39 pub async fn execute(self) -> Result<Vec<i64>> {
43 if self.metadata.snapshots.is_empty() {
44 return Ok(Vec::new());
45 }
46
47 let now = Utc::now().timestamp_millis();
48 let mut expired_ids = Vec::new();
49
50 let mut snapshots = self.metadata.snapshots.clone();
52 snapshots.sort_by_key(|s| s.timestamp_ms);
53
54 let current_snapshot_id = self.metadata.current_snapshot_id;
55
56 let to_expire_candidates: Vec<Snapshot> = snapshots
58 .into_iter()
59 .filter(|s| Some(s.snapshot_id) != current_snapshot_id)
60 .collect();
61
62 let num_to_keep = self
63 .min_snapshots_to_keep
64 .saturating_sub(if current_snapshot_id.is_some() { 1 } else { 0 });
65 let mut kept_count = 0;
66
67 for snapshot in to_expire_candidates.into_iter().rev() {
68 let is_too_old = if let Some(max_age) = self.max_snapshot_age_ms {
69 (now - snapshot.timestamp_ms) > max_age
70 } else {
71 false
72 };
73
74 if kept_count < num_to_keep || !is_too_old {
75 kept_count += 1;
76 } else {
77 expired_ids.push(snapshot.snapshot_id);
78 }
79 }
80
81 if expired_ids.is_empty() {
82 return Ok(Vec::new());
83 }
84
85 self.metadata
87 .snapshots
88 .retain(|s| !expired_ids.contains(&s.snapshot_id));
89 self.metadata
90 .snapshot_log
91 .retain(|e| !expired_ids.contains(&e.snapshot_id));
92 self.metadata.increment_sequence();
93
94 Ok(expired_ids)
95 }
96}
97
98pub struct RemoveOrphanFiles<'a> {
100 metadata: &'a TableMetadata,
101 storage: &'a Storage,
102}
103
104impl<'a> RemoveOrphanFiles<'a> {
105 pub fn new(metadata: &'a TableMetadata, storage: &'a Storage) -> Self {
106 Self { metadata, storage }
107 }
108
109 pub async fn execute(self) -> Result<Vec<String>> {
113 let mut referenced_files = HashSet::new();
115
116 for snapshot in &self.metadata.snapshots {
117 referenced_files.insert(snapshot.manifest_list.clone());
119
120 let manifest_list = ManifestList::load(&snapshot.manifest_list, self.storage).await?;
122 for entry in &manifest_list.entries {
123 referenced_files.insert(entry.manifest_path.clone());
124
125 let manifest = ManifestFile::load(&entry.manifest_path, self.storage).await?;
127 for m_entry in &manifest.entries {
128 if m_entry.status != ManifestEntryStatus::Deleted {
129 referenced_files.insert(m_entry.data_file.file_path.clone());
130 }
131 }
132 }
133 }
134
135 let all_files = self.storage.list_files(&self.metadata.location).await?;
137
138 let mut orphans = Vec::new();
140 for file in all_files {
141 if !referenced_files.contains(&file) {
142 orphans.push(file.clone());
143 self.storage.delete(&file).await?;
145 }
146 }
147
148 Ok(orphans)
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::manifest::Operation;
156 use crate::schema::Schema;
157 use object_store::memory::InMemory;
158 use std::sync::Arc;
159
160 async fn setup() -> (TableMetadata, Storage) {
161 let schema = Schema::builder(1).build();
162 let metadata = TableMetadata::builder("test", schema).build();
163 let storage = Storage::new(Arc::new(InMemory::new()));
164 (metadata, storage)
165 }
166
167 #[tokio::test]
168 async fn test_expire_snapshots() -> Result<()> {
169 let (mut metadata, storage) = setup().await;
170
171 for i in 1..=5 {
173 let snapshot = Snapshot::builder(i as i64, format!("/m{}.json", i))
174 .with_operation(Operation::Append)
175 .build();
176 let mut s = snapshot;
178 s.timestamp_ms = Utc::now().timestamp_millis() - (i as i64 * 1000 * 3600 * 24); metadata.add_snapshot(s);
180 }
181
182 assert_eq!(metadata.snapshots.len(), 5);
183
184 let expired = ExpireSnapshots::new(&mut metadata, &storage)
186 .with_min_snapshots_to_keep(2)
187 .with_max_snapshot_age(Duration::days(1))
188 .execute()
189 .await?;
190
191 assert_eq!(expired.len(), 3);
198 assert_eq!(metadata.snapshots.len(), 2);
199
200 Ok(())
201 }
202}