Skip to main content

zenith_session/
gc.rs

1//! Object garbage collection.
2//!
3//! Deletes content-addressed objects that are no longer referenced by the
4//! Tier-1 session journal or the Tier-2 version history. An object is reachable
5//! iff its hash appears as the `snapshot` of some live record in either manifest.
6
7use std::collections::BTreeSet;
8
9use crate::adapter::Fs;
10use crate::error::SessionError;
11use crate::layout::StorePaths;
12use crate::manifest::read_records;
13use crate::session::journal_path;
14
15// ── Report ─────────────────────────────────────────────────────────────────────
16
17/// Summary of a [`gc`] run.
18#[derive(Debug, Clone, PartialEq)]
19pub struct GcReport {
20    /// Object files deleted (no longer referenced).
21    pub deleted: usize,
22    /// Object files kept (still referenced).
23    pub kept: usize,
24}
25
26// ── Public API ─────────────────────────────────────────────────────────────────
27
28/// Delete objects for `doc_id` that are not referenced by the Tier-1 journal or
29/// the Tier-2 versions. Safe to call any time; a no-op if there is no object dir.
30pub fn gc(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<GcReport, SessionError> {
31    // Collect the reachable object hashes from BOTH manifests.
32    let mut referenced: BTreeSet<String> = BTreeSet::new();
33    for r in read_records(fs, &journal_path(paths, doc_id))? {
34        referenced.insert(r.snapshot);
35    }
36    for r in read_records(fs, &paths.versions_file(doc_id))? {
37        referenced.insert(r.snapshot);
38    }
39
40    let odir = paths.objects_dir(doc_id);
41    if !fs.exists(&odir) {
42        return Ok(GcReport {
43            deleted: 0,
44            kept: 0,
45        });
46    }
47
48    let mut deleted = 0usize;
49    let mut kept = 0usize;
50    // objects_dir/<shard>/<rest>
51    for shard in fs.read_dir(&odir)? {
52        // The shard's directory-name is the first 2 hex chars of the hash.
53        let shard_name = match shard.file_name().and_then(|n| n.to_str()) {
54            Some(s) => s.to_owned(),
55            None => continue, // non-utf8 dir name: not one of ours, skip
56        };
57        for obj in fs.read_dir(&shard)? {
58            let file_name = match obj.file_name().and_then(|n| n.to_str()) {
59                Some(s) => s.to_owned(),
60                None => continue,
61            };
62            let hash = format!("{shard_name}{file_name}");
63            if referenced.contains(&hash) {
64                kept += 1;
65            } else {
66                fs.remove(&obj)?;
67                deleted += 1;
68            }
69        }
70    }
71    Ok(GcReport { deleted, kept })
72}
73
74// ── Tests ──────────────────────────────────────────────────────────────────────
75
76#[cfg(test)]
77mod tests {
78    use std::time::{Duration, UNIX_EPOCH};
79
80    use super::*;
81    use crate::adapter::{FakeClock, FakeRng, MemFs};
82    use crate::tier2::VersionMeta;
83    use crate::{session, store, tier2};
84
85    fn setup() -> (MemFs, StorePaths, FakeClock, FakeRng) {
86        let fs = MemFs::new();
87        let paths = StorePaths::new("/data");
88        let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(1000));
89        let rng = FakeRng(0);
90        (fs, paths, clock, rng)
91    }
92
93    #[test]
94    fn gc_empty_is_noop() {
95        let (fs, paths, _clock, _rng) = setup();
96        let report = gc(&fs, &paths, "doc1").unwrap();
97        assert_eq!(
98            report,
99            GcReport {
100                deleted: 0,
101                kept: 0
102            }
103        );
104    }
105
106    #[test]
107    fn gc_keeps_version_referenced() {
108        let (fs, paths, clock, _rng) = setup();
109        tier2::record_version(&fs, &paths, &clock, "doc1", b"V1", VersionMeta::default()).unwrap();
110        let report = gc(&fs, &paths, "doc1").unwrap();
111        assert_eq!(report.deleted, 0);
112        assert!(report.kept >= 1);
113        // Object must survive — version_content still works.
114        let content = tier2::version_content(&fs, &paths, "doc1", "v0").unwrap();
115        assert_eq!(content, b"V1");
116    }
117
118    #[test]
119    fn gc_keeps_session_referenced() {
120        let (fs, paths, clock, rng) = setup();
121        session::record_state(&fs, &paths, &clock, &rng, "doc1", b"S1", None).unwrap();
122        let report = gc(&fs, &paths, "doc1").unwrap();
123        assert_eq!(report.deleted, 0);
124        assert!(report.kept >= 1);
125        // Object must survive — current_content still returns b"S1".
126        let content = session::current_content(&fs, &paths, "doc1").unwrap();
127        assert_eq!(content, Some(b"S1".to_vec()));
128    }
129
130    #[test]
131    fn gc_removes_unreferenced() {
132        let (fs, paths, _clock, _rng) = setup();
133        let hash = store::put_object(&fs, &paths, "doc1", b"orphan").unwrap();
134        let report = gc(&fs, &paths, "doc1").unwrap();
135        assert_eq!(
136            report,
137            GcReport {
138                deleted: 1,
139                kept: 0
140            }
141        );
142        // Object must be gone.
143        let result = store::get_object(&fs, &paths, "doc1", &hash);
144        assert!(result.is_err());
145    }
146
147    #[test]
148    fn gc_mixed() {
149        let (fs, paths, clock, _rng) = setup();
150        // Record a version (referenced).
151        tier2::record_version(&fs, &paths, &clock, "doc1", b"kept", VersionMeta::default())
152            .unwrap();
153        // Store an orphan object directly (unreferenced).
154        store::put_object(&fs, &paths, "doc1", b"orphan").unwrap();
155        let report = gc(&fs, &paths, "doc1").unwrap();
156        assert_eq!(report.deleted, 1);
157        assert_eq!(report.kept, 1);
158        // Version content must still be readable.
159        let content = tier2::version_content(&fs, &paths, "doc1", "v0").unwrap();
160        assert_eq!(content, b"kept");
161        // Orphan hash must be gone.
162        let orphan_hash = store::object_hash(b"orphan");
163        let result = store::get_object(&fs, &paths, "doc1", &orphan_hash);
164        assert!(result.is_err());
165    }
166
167    #[test]
168    fn gc_keeps_object_shared_by_both_tiers() {
169        let (fs, paths, clock, rng) = setup();
170        // Same content → same hash → one object file shared by both tiers.
171        session::record_state(&fs, &paths, &clock, &rng, "doc1", b"shared", None).unwrap();
172        tier2::record_version(
173            &fs,
174            &paths,
175            &clock,
176            "doc1",
177            b"shared",
178            VersionMeta::default(),
179        )
180        .unwrap();
181        let report = gc(&fs, &paths, "doc1").unwrap();
182        assert_eq!(report.deleted, 0);
183        assert!(report.kept >= 1);
184    }
185}