Skip to main content

zenith_session/
global.rs

1//! Global cross-document LRU storage cap.
2//!
3//! Bounds the TOTAL bytes of all documents' object stores under `<data_dir>/docs`.
4//! When over the ceiling, evict the least-recently-used documents (by their
5//! `DocMeta.updated_ms`) one whole `docs/<id>/` subtree at a time until under the
6//! cap. The most-recently-used document is never evicted. A document's `.zen`
7//! source lives outside the store and is untouched — eviction only discards local
8//! history, which the next edit re-creates.
9
10use crate::adapter::Fs;
11use crate::error::SessionError;
12use crate::identity::read_meta;
13use crate::layout::StorePaths;
14
15// ── Public types ───────────────────────────────────────────────────────────────
16
17/// Report returned by [`enforce_global_cap`].
18#[derive(Debug, Clone, PartialEq)]
19pub struct GlobalCapReport {
20    /// Doc ids evicted, in eviction order (oldest first).
21    pub evicted: Vec<String>,
22    /// Total stored object bytes across all docs before eviction.
23    pub bytes_before: u64,
24    /// Total stored object bytes across all docs after eviction.
25    pub bytes_after: u64,
26}
27
28// ── Private helpers ────────────────────────────────────────────────────────────
29
30/// Sum the compressed sizes of all objects stored for `doc_id`.
31///
32/// Walks `<objects_dir>/<shard>/<file>` two levels deep, summing each file's
33/// byte length. Returns 0 if the objects directory does not exist.
34fn doc_object_bytes(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<u64, SessionError> {
35    let odir = paths.objects_dir(doc_id);
36    if !fs.exists(&odir) {
37        return Ok(0);
38    }
39    let mut total: u64 = 0;
40    for shard in fs.read_dir(&odir)? {
41        for obj in fs.read_dir(&shard)? {
42            let bytes = fs.read(&obj)?;
43            total = total.saturating_add(u64::try_from(bytes.len()).unwrap_or(u64::MAX));
44        }
45    }
46    Ok(total)
47}
48
49// ── Public API ─────────────────────────────────────────────────────────────────
50
51/// Evict least-recently-used documents until total stored bytes <= `max_total_bytes`.
52///
53/// Recency is `DocMeta.updated_ms` (missing meta → treated as oldest, ms 0). The
54/// single most-recently-used document is never evicted (so an active doc can never
55/// be wiped even if it alone exceeds the cap). Returns what was evicted.
56pub fn enforce_global_cap(
57    fs: &impl Fs,
58    paths: &StorePaths,
59    max_total_bytes: u64,
60) -> Result<GlobalCapReport, SessionError> {
61    let droot = paths.docs_root();
62    if !fs.exists(&droot) {
63        return Ok(GlobalCapReport {
64            evicted: Vec::new(),
65            bytes_before: 0,
66            bytes_after: 0,
67        });
68    }
69
70    // Gather (doc_id, bytes, updated_ms) for every doc dir.
71    struct DocEntry {
72        id: String,
73        bytes: u64,
74        updated_ms: u128,
75    }
76    let mut docs: Vec<DocEntry> = Vec::new();
77    for dir in fs.read_dir(&droot)? {
78        let id = match dir.file_name().and_then(|n| n.to_str()) {
79            Some(s) => s.to_owned(),
80            None => continue,
81        };
82        let bytes = doc_object_bytes(fs, paths, &id)?;
83        let updated_ms = read_meta(fs, paths, &id)?
84            .map(|m| m.updated_ms)
85            .unwrap_or(0);
86        docs.push(DocEntry {
87            id,
88            bytes,
89            updated_ms,
90        });
91    }
92
93    let bytes_before: u64 = docs.iter().fold(0u64, |a, d| a.saturating_add(d.bytes));
94    let mut total = bytes_before;
95    let mut evicted: Vec<String> = Vec::new();
96
97    if total <= max_total_bytes {
98        return Ok(GlobalCapReport {
99            evicted,
100            bytes_before,
101            bytes_after: total,
102        });
103    }
104
105    // Evict LRU first: sort ascending by updated_ms, tie-break by id for determinism.
106    docs.sort_by(|a, b| {
107        a.updated_ms
108            .cmp(&b.updated_ms)
109            .then_with(|| a.id.cmp(&b.id))
110    });
111    // The most-recently-used doc is the LAST after this sort; never evict it.
112    let protected_index = docs.len().saturating_sub(1);
113    for (i, d) in docs.iter().enumerate() {
114        if total <= max_total_bytes {
115            break;
116        }
117        if i == protected_index {
118            continue; // never evict the most-recent doc
119        }
120        // Evict the whole docs/<id> subtree.
121        let dir = paths.doc_dir(&d.id);
122        if fs.exists(&dir) {
123            fs.remove(&dir)?;
124        }
125        total = total.saturating_sub(d.bytes);
126        evicted.push(d.id.clone());
127    }
128
129    Ok(GlobalCapReport {
130        evicted,
131        bytes_before,
132        bytes_after: total,
133    })
134}
135
136// ── Tests ──────────────────────────────────────────────────────────────────────
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::adapter::{FakeClock, FakeRng, MemFs};
142    use crate::{identity, store};
143    use std::path::Path;
144    use std::time::{Duration, UNIX_EPOCH};
145
146    fn make_paths() -> StorePaths {
147        StorePaths::new("/data")
148    }
149
150    /// Seed a doc with object bytes (via `put_object`) and a meta timestamp (via
151    /// `reconcile` at `updated_ms` milliseconds since epoch). Returns the doc id.
152    fn seed_doc(
153        fs: &MemFs,
154        paths: &StorePaths,
155        doc_id: &str,
156        content: &[u8],
157        updated_ms: u64,
158    ) -> String {
159        let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(updated_ms));
160        let rng = FakeRng(0x01);
161        let doc_path = Path::new("/fake/doc.zen");
162        // Adopt the given doc_id so we can control it precisely.
163        identity::reconcile(fs, paths, &clock, &rng, Some(doc_id), doc_path).unwrap();
164        store::put_object(fs, paths, doc_id, content).unwrap();
165        doc_id.to_owned()
166    }
167
168    #[test]
169    fn under_cap_evicts_nothing() {
170        let fs = MemFs::new();
171        let paths = make_paths();
172
173        seed_doc(&fs, &paths, "doc-a", &[42u8; 200], 1000);
174        seed_doc(&fs, &paths, "doc-b", &[99u8; 200], 2000);
175
176        let report = enforce_global_cap(&fs, &paths, 1_000_000).unwrap();
177
178        assert!(report.evicted.is_empty(), "nothing should be evicted");
179        assert_eq!(
180            report.bytes_after, report.bytes_before,
181            "bytes unchanged when under cap"
182        );
183    }
184
185    #[test]
186    fn evicts_lru_first() {
187        let fs = MemFs::new();
188        let paths = make_paths();
189
190        // "old" doc: reconciled at ms 100, with a large object (~2000 bytes of content).
191        seed_doc(&fs, &paths, "old", &vec![0xAAu8; 2000], 100);
192        // "new" doc: reconciled at ms 5000, with a smaller object.
193        let new_hash = store::put_object(&fs, &paths, "new", &[0xBBu8; 50]).unwrap();
194        // Write meta for "new" manually via reconcile.
195        {
196            let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(5000));
197            let rng = FakeRng(0x02);
198            identity::reconcile(&fs, &paths, &clock, &rng, Some("new"), Path::new("/n.zen"))
199                .unwrap();
200        }
201
202        // Measure how many bytes old takes up so we can set cap below combined total
203        // but above the new doc alone.
204        let old_bytes = doc_object_bytes(&fs, &paths, "old").unwrap();
205        let new_bytes = doc_object_bytes(&fs, &paths, "new").unwrap();
206        let combined = old_bytes.saturating_add(new_bytes);
207        // Cap is below combined but above new_bytes alone.
208        let cap = new_bytes.saturating_add(1);
209        assert!(
210            combined > cap,
211            "test requires combined > cap (combined={combined}, cap={cap})"
212        );
213
214        let report = enforce_global_cap(&fs, &paths, cap).unwrap();
215
216        assert_eq!(report.evicted, vec!["old"], "older doc should be evicted");
217        assert!(
218            report.bytes_after <= cap,
219            "bytes_after ({}) should be <= cap ({})",
220            report.bytes_after,
221            cap
222        );
223
224        // Old doc's object is gone.
225        let old_hash = store::object_hash(&vec![0xAAu8; 2000]);
226        assert!(
227            store::get_object(&fs, &paths, "old", &old_hash).is_err(),
228            "old doc's object must be gone"
229        );
230        // New doc's object is still readable.
231        let got = store::get_object(&fs, &paths, "new", &new_hash).unwrap();
232        assert_eq!(got, vec![0xBBu8; 50], "new doc's object must survive");
233    }
234
235    #[test]
236    fn never_evicts_most_recent() {
237        let fs = MemFs::new();
238        let paths = make_paths();
239
240        // Single doc whose objects alone exceed the cap.
241        seed_doc(&fs, &paths, "solo", &vec![0xCCu8; 2000], 9999);
242        let solo_bytes = doc_object_bytes(&fs, &paths, "solo").unwrap();
243        assert!(solo_bytes > 0, "solo doc must have some bytes");
244
245        // Cap below the solo doc's size.
246        let cap = solo_bytes.saturating_sub(1);
247
248        let report = enforce_global_cap(&fs, &paths, cap).unwrap();
249
250        assert!(
251            report.evicted.is_empty(),
252            "most-recent (and only) doc must never be evicted"
253        );
254        assert_eq!(
255            report.bytes_after, report.bytes_before,
256            "bytes unchanged when protected"
257        );
258    }
259
260    #[test]
261    fn empty_store_noop() {
262        let fs = MemFs::new();
263        let paths = make_paths();
264
265        // No docs_root exists at all.
266        let report = enforce_global_cap(&fs, &paths, 0).unwrap();
267
268        assert!(report.evicted.is_empty());
269        assert_eq!(report.bytes_before, 0);
270        assert_eq!(report.bytes_after, 0);
271    }
272
273    #[test]
274    fn missing_meta_treated_as_oldest() {
275        let fs = MemFs::new();
276        let paths = make_paths();
277
278        // "no-meta" doc: objects only, no reconcile (no meta.json), updated_ms → 0.
279        store::put_object(&fs, &paths, "no-meta", &vec![0xDDu8; 2000]).unwrap();
280
281        // "recent" doc: reconciled at ms 9000 with a small object.
282        seed_doc(&fs, &paths, "recent", &[0xEEu8; 50], 9000);
283
284        let no_meta_bytes = doc_object_bytes(&fs, &paths, "no-meta").unwrap();
285        let recent_bytes = doc_object_bytes(&fs, &paths, "recent").unwrap();
286        let combined = no_meta_bytes.saturating_add(recent_bytes);
287        let cap = recent_bytes.saturating_add(1);
288        assert!(
289            combined > cap,
290            "test requires combined > cap (combined={combined}, cap={cap})"
291        );
292
293        let report = enforce_global_cap(&fs, &paths, cap).unwrap();
294
295        assert_eq!(
296            report.evicted,
297            vec!["no-meta"],
298            "meta-less doc (updated_ms=0) must be evicted first"
299        );
300        assert!(
301            report.bytes_after <= cap,
302            "bytes_after ({}) should be <= cap ({})",
303            report.bytes_after,
304            cap
305        );
306    }
307}