Skip to main content

supertable_core/
action.rs

1//! # SuperTable Actions
2//!
3//! This module provides high-level table maintenance actions such as
4//! snapshot expiration and orphan file removal.
5
6use crate::manifest::{ManifestEntryStatus, ManifestFile, ManifestList, Snapshot};
7use crate::metadata::TableMetadata;
8use crate::storage::Storage;
9use anyhow::Result;
10use chrono::{Duration, Utc};
11use std::collections::HashSet;
12
13/// Action to expire old snapshots based on retention policies.
14pub struct ExpireSnapshots<'a> {
15    metadata: &'a mut TableMetadata,
16    min_snapshots_to_keep: usize,
17    max_snapshot_age_ms: Option<i64>,
18}
19
20impl<'a> ExpireSnapshots<'a> {
21    pub fn new(metadata: &'a mut TableMetadata, _storage: &'a Storage) -> Self {
22        Self {
23            metadata,
24            min_snapshots_to_keep: 1,
25            max_snapshot_age_ms: None,
26        }
27    }
28
29    pub fn with_min_snapshots_to_keep(mut self, min: usize) -> Self {
30        self.min_snapshots_to_keep = min;
31        self
32    }
33
34    pub fn with_max_snapshot_age(mut self, duration: Duration) -> Self {
35        self.max_snapshot_age_ms = Some(duration.num_milliseconds());
36        self
37    }
38
39    /// Executes the expiration action.
40    ///
41    /// Returns the IDs of the expired snapshots.
42    pub async fn execute(self) -> Result<Vec<i64>> {
43        if self.metadata.snapshots.is_empty() {
44            return Ok(Vec::new());
45        }
46
47        let now = Utc::now().timestamp_millis();
48        let mut expired_ids = Vec::new();
49
50        // Snapshots are usually sorted by ID/Timestamp, but let's be sure
51        let mut snapshots = self.metadata.snapshots.clone();
52        snapshots.sort_by_key(|s| s.timestamp_ms);
53
54        let current_snapshot_id = self.metadata.current_snapshot_id;
55
56        // We never expire the current snapshot
57        let to_expire_candidates: Vec<Snapshot> = snapshots
58            .into_iter()
59            .filter(|s| Some(s.snapshot_id) != current_snapshot_id)
60            .collect();
61
62        let num_to_keep = self
63            .min_snapshots_to_keep
64            .saturating_sub(if current_snapshot_id.is_some() { 1 } else { 0 });
65        let mut kept_count = 0;
66
67        for snapshot in to_expire_candidates.into_iter().rev() {
68            let is_too_old = if let Some(max_age) = self.max_snapshot_age_ms {
69                (now - snapshot.timestamp_ms) > max_age
70            } else {
71                false
72            };
73
74            if kept_count < num_to_keep || !is_too_old {
75                kept_count += 1;
76            } else {
77                expired_ids.push(snapshot.snapshot_id);
78            }
79        }
80
81        if expired_ids.is_empty() {
82            return Ok(Vec::new());
83        }
84
85        // Update metadata
86        self.metadata
87            .snapshots
88            .retain(|s| !expired_ids.contains(&s.snapshot_id));
89        self.metadata
90            .snapshot_log
91            .retain(|e| !expired_ids.contains(&e.snapshot_id));
92        self.metadata.increment_sequence();
93
94        Ok(expired_ids)
95    }
96}
97
98/// Action to delete files that are no longer referenced by any snapshot.
99pub struct RemoveOrphanFiles<'a> {
100    metadata: &'a TableMetadata,
101    storage: &'a Storage,
102}
103
104impl<'a> RemoveOrphanFiles<'a> {
105    pub fn new(metadata: &'a TableMetadata, storage: &'a Storage) -> Self {
106        Self { metadata, storage }
107    }
108
109    /// Executes the removal action.
110    ///
111    /// WARNING: This scans all files in the table location.
112    pub async fn execute(self) -> Result<Vec<String>> {
113        // 1. Collect all referenced files
114        let mut referenced_files = HashSet::new();
115
116        for snapshot in &self.metadata.snapshots {
117            // Add manifest list
118            referenced_files.insert(snapshot.manifest_list.clone());
119
120            // Load manifest list
121            let manifest_list = ManifestList::load(&snapshot.manifest_list, self.storage).await?;
122            for entry in &manifest_list.entries {
123                referenced_files.insert(entry.manifest_path.clone());
124
125                // Load manifest
126                let manifest = ManifestFile::load(&entry.manifest_path, self.storage).await?;
127                for m_entry in &manifest.entries {
128                    if m_entry.status != ManifestEntryStatus::Deleted {
129                        referenced_files.insert(m_entry.data_file.file_path.clone());
130                    }
131                }
132            }
133        }
134
135        // 2. List all files in storage
136        let all_files = self.storage.list_files(&self.metadata.location).await?;
137
138        // 3. Find orphans
139        let mut orphans = Vec::new();
140        for file in all_files {
141            if !referenced_files.contains(&file) {
142                orphans.push(file.clone());
143                // 4. Delete orphan (active)
144                self.storage.delete(&file).await?;
145            }
146        }
147
148        Ok(orphans)
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::manifest::Operation;
156    use crate::schema::Schema;
157    use object_store::memory::InMemory;
158    use std::sync::Arc;
159
160    async fn setup() -> (TableMetadata, Storage) {
161        let schema = Schema::builder(1).build();
162        let metadata = TableMetadata::builder("test", schema).build();
163        let storage = Storage::new(Arc::new(InMemory::new()));
164        (metadata, storage)
165    }
166
167    #[tokio::test]
168    async fn test_expire_snapshots() -> Result<()> {
169        let (mut metadata, storage) = setup().await;
170
171        // Add 5 snapshots
172        for i in 1..=5 {
173            let snapshot = Snapshot::builder(i as i64, format!("/m{}.json", i))
174                .with_operation(Operation::Append)
175                .build();
176            // Manually set timestamp to simulate age
177            let mut s = snapshot;
178            s.timestamp_ms = Utc::now().timestamp_millis() - (i as i64 * 1000 * 3600 * 24); // days ago
179            metadata.add_snapshot(s);
180        }
181
182        assert_eq!(metadata.snapshots.len(), 5);
183
184        // Expire all but 2
185        let expired = ExpireSnapshots::new(&mut metadata, &storage)
186            .with_min_snapshots_to_keep(2)
187            .with_max_snapshot_age(Duration::days(1))
188            .execute()
189            .await?;
190
191        // Current snapshot is #5.
192        // Snapshots #1, #2, #3, #4 are candidates.
193        // #1, #2, #3, #4 are older than 1 day.
194        // We keep min 2. Current is 1. So we keep #4.
195        // Total kept: #5 (current), #4 (min).
196        // Expired: #1, #2, #3.
197        assert_eq!(expired.len(), 3);
198        assert_eq!(metadata.snapshots.len(), 2);
199
200        Ok(())
201    }
202}