Skip to main content

zenith_session/
retention.rs

1//! Time-Machine-style retention thinning for Tier-2 version history.
2//!
3//! Named versions (those with a label) and the latest version are kept forever;
4//! all other versions are thinned by age into coarser buckets the older they get
5//! (all recent, then hourly, daily, weekly). Thinning rewrites `versions.jsonl`
6//! and re-links each kept version's parent to the previous kept version so that
7//! `@head~N` walks remain valid. Object GC and storage caps are separate passes.
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::time::UNIX_EPOCH;
11
12use crate::adapter::{Clock, Fs};
13use crate::error::SessionError;
14use crate::gc::{GcReport, gc};
15use crate::layout::StorePaths;
16use crate::manifest::{HistoryRecord, read_records};
17use crate::store::object_size;
18
19// ── Time constants (ms) ───────────────────────────────────────────────────────
20
21const MS_PER_HOUR: u128 = 3_600_000;
22const MS_PER_DAY: u128 = 86_400_000;
23const MS_PER_WEEK: u128 = 604_800_000;
24
25// ── RetentionPolicy ───────────────────────────────────────────────────────────
26
27/// Controls the age windows used by [`thin_versions`] to decide which versions
28/// survive thinning.
29///
30/// All values are in milliseconds. The default matches the Time-Machine windows
31/// documented in this module's top-level description.
32#[derive(Debug, Clone, PartialEq)]
33pub struct RetentionPolicy {
34    /// Below this age, every version is kept (ms).
35    pub keep_all_below: u128,
36    /// Below this age, keep one per hour (ms).
37    pub hourly_below: u128,
38    /// Below this age, keep one per day (ms).
39    pub daily_below: u128,
40    // at/above daily_below → weekly buckets
41    /// Maximum number of versions to retain (named versions and the latest are
42    /// exempt from being dropped, but DO count toward the total).
43    pub max_versions: usize,
44    /// Maximum total compressed bytes of objects referenced by retained versions.
45    pub max_object_bytes: u64,
46}
47
48impl Default for RetentionPolicy {
49    fn default() -> Self {
50        Self {
51            keep_all_below: MS_PER_HOUR,
52            hourly_below: MS_PER_DAY,
53            daily_below: 30 * MS_PER_DAY,
54            max_versions: 500,
55            max_object_bytes: 10 * 1024 * 1024,
56        }
57    }
58}
59
60// ── Pure selector ─────────────────────────────────────────────────────────────
61
62/// Return the set of version ids to KEEP under `policy` given the current time.
63///
64/// Named versions and the latest version are always kept; everything else is
65/// thinned to the newest version per age bucket. Versions are assumed to be in
66/// append order (oldest first); "newest in bucket" = highest seq.
67pub fn thin_versions(
68    versions: &[HistoryRecord],
69    now_ms: u128,
70    policy: &RetentionPolicy,
71) -> BTreeSet<String> {
72    let mut keep: BTreeSet<String> = BTreeSet::new();
73    if versions.is_empty() {
74        return keep;
75    }
76
77    // Always keep the latest (last in append order).
78    if let Some(last) = versions.last() {
79        keep.insert(last.id.clone());
80    }
81
82    // bucket key -> (highest seq seen, id of that version)
83    // Tier tag in the key keeps hour/day/week buckets from ever colliding.
84    let mut buckets: BTreeMap<(u8, u128), (u64, String)> = BTreeMap::new();
85
86    for v in versions {
87        // Named versions are always kept.
88        if v.label.is_some() {
89            keep.insert(v.id.clone());
90            continue;
91        }
92
93        let ts = match v.timestamp_ms {
94            Some(t) => t,
95            // No timestamp → cannot bucket; always keep.
96            None => {
97                keep.insert(v.id.clone());
98                continue;
99            }
100        };
101
102        let age = now_ms.saturating_sub(ts);
103        let key = if age < policy.keep_all_below {
104            // keep-all tier: unique bucket per version (seq used as discriminant)
105            (0u8, u128::from(v.seq))
106        } else if age < policy.hourly_below {
107            (1u8, age / MS_PER_HOUR)
108        } else if age < policy.daily_below {
109            (2u8, age / MS_PER_DAY)
110        } else {
111            (3u8, age / MS_PER_WEEK)
112        };
113
114        // Keep the highest-seq version in each bucket.
115        buckets
116            .entry(key)
117            .and_modify(|e| {
118                if v.seq > e.0 {
119                    *e = (v.seq, v.id.clone());
120                }
121            })
122            .or_insert((v.seq, v.id.clone()));
123    }
124
125    for (_, (_, id)) in buckets {
126        keep.insert(id);
127    }
128    keep
129}
130
131// ── Apply (rewrite manifest, re-link parents) ─────────────────────────────────
132
133/// Outcome of an [`apply_thinning`] pass.
134#[derive(Debug, Clone, PartialEq)]
135pub struct ThinReport {
136    pub kept: usize,
137    pub dropped: usize,
138}
139
140/// Rewrite `versions.jsonl` for `doc_id` keeping only the versions whose id is
141/// in `keep` (preserving original order, id and seq), re-linking each kept
142/// record's `parent` to the previous kept record (first kept → None).
143/// Returns `(kept, dropped)`.
144pub(crate) fn rewrite_versions_keeping(
145    fs: &impl Fs,
146    paths: &StorePaths,
147    doc_id: &str,
148    versions: &[HistoryRecord],
149    keep: &BTreeSet<String>,
150) -> Result<(usize, usize), SessionError> {
151    let mut kept_records: Vec<HistoryRecord> = Vec::new();
152    let mut prev_kept_id: Option<String> = None;
153    let mut dropped = 0usize;
154    for v in versions {
155        if keep.contains(&v.id) {
156            let mut r = v.clone();
157            r.parent = prev_kept_id.clone();
158            prev_kept_id = Some(v.id.clone());
159            kept_records.push(r);
160        } else {
161            dropped += 1;
162        }
163    }
164    let mut bytes: Vec<u8> = Vec::new();
165    for r in &kept_records {
166        let mut line = serde_json::to_vec(r)
167            .map_err(|e| SessionError::new(format!("serialize version: {e}")))?;
168        line.push(b'\n');
169        bytes.extend_from_slice(&line);
170    }
171    let vpath = paths.versions_file(doc_id);
172    if let Some(parent) = vpath.parent() {
173        fs.create_dir_all(parent)?;
174    }
175    fs.write(&vpath, &bytes)?;
176    Ok((kept_records.len(), dropped))
177}
178
179/// Apply `policy` to `doc_id`'s versions: drop thinned-out versions and rewrite
180/// `versions.jsonl`, re-linking each kept version's parent to the previous kept
181/// version.
182///
183/// Returns how many were kept/dropped. Orphaned objects are reclaimed by a later
184/// GC pass — not here.
185pub fn apply_thinning(
186    fs: &impl Fs,
187    paths: &StorePaths,
188    clock: &impl Clock,
189    doc_id: &str,
190    policy: &RetentionPolicy,
191) -> Result<ThinReport, SessionError> {
192    let versions = read_records(fs, &paths.versions_file(doc_id))?;
193    if versions.is_empty() {
194        return Ok(ThinReport {
195            kept: 0,
196            dropped: 0,
197        });
198    }
199
200    let now_ms = clock
201        .now()
202        .duration_since(UNIX_EPOCH)
203        .ok()
204        .map(|d| d.as_millis())
205        .unwrap_or(0);
206
207    let keep = thin_versions(&versions, now_ms, policy);
208    let (kept, dropped) = rewrite_versions_keeping(fs, paths, doc_id, &versions, &keep)?;
209    Ok(ThinReport { kept, dropped })
210}
211
212// ── Caps ──────────────────────────────────────────────────────────────────────
213
214/// Outcome of an [`apply_caps`] pass.
215#[derive(Debug, Clone, PartialEq)]
216pub struct CapReport {
217    pub kept: usize,
218    pub dropped: usize,
219}
220
221/// Enforce the per-doc count and size caps: drop the OLDEST unnamed, non-latest
222/// versions until at most `max_versions` remain AND the total compressed size of
223/// objects referenced by retained versions is at most `max_object_bytes`. Named
224/// versions and the latest version are never dropped. Rewrites `versions.jsonl`
225/// (re-linking parents). Orphaned objects are reclaimed by a later gc pass.
226pub fn apply_caps(
227    fs: &impl Fs,
228    paths: &StorePaths,
229    doc_id: &str,
230    policy: &RetentionPolicy,
231) -> Result<CapReport, SessionError> {
232    let versions = read_records(fs, &paths.versions_file(doc_id))?;
233    if versions.is_empty() {
234        return Ok(CapReport {
235            kept: 0,
236            dropped: 0,
237        });
238    }
239
240    // Precompute each distinct snapshot hash's stored size ONCE (avoid O(n²) IO).
241    let mut sizes: BTreeMap<String, u64> = BTreeMap::new();
242    for v in &versions {
243        if !sizes.contains_key(&v.snapshot) {
244            let sz = object_size(fs, paths, doc_id, &v.snapshot).unwrap_or(0);
245            sizes.insert(v.snapshot.clone(), sz);
246        }
247    }
248
249    let latest_id = versions.last().map(|v| v.id.clone());
250    let mut keep: BTreeSet<String> = versions.iter().map(|v| v.id.clone()).collect();
251
252    // Helper: total bytes of DISTINCT object hashes referenced by the kept set.
253    let referenced_bytes = |keep: &BTreeSet<String>| -> u64 {
254        let mut seen: BTreeSet<&str> = BTreeSet::new();
255        let mut total: u64 = 0;
256        for v in &versions {
257            if keep.contains(&v.id) && seen.insert(v.snapshot.as_str()) {
258                total = total.saturating_add(*sizes.get(&v.snapshot).unwrap_or(&0));
259            }
260        }
261        total
262    };
263
264    // Drop oldest unnamed, non-latest versions until under both caps.
265    for v in &versions {
266        let over =
267            keep.len() > policy.max_versions || referenced_bytes(&keep) > policy.max_object_bytes;
268        if !over {
269            break;
270        }
271        let is_latest = latest_id.as_deref() == Some(v.id.as_str());
272        if v.label.is_none() && !is_latest && keep.contains(&v.id) {
273            keep.remove(&v.id);
274        }
275    }
276
277    let (kept, dropped) = rewrite_versions_keeping(fs, paths, doc_id, &versions, &keep)?;
278    Ok(CapReport { kept, dropped })
279}
280
281// ── Maintain ──────────────────────────────────────────────────────────────────
282
283/// Combined outcome of a [`maintain`] pass.
284#[derive(Debug, Clone, PartialEq)]
285pub struct MaintainReport {
286    pub thinned: ThinReport,
287    pub capped: CapReport,
288    pub collected: GcReport,
289}
290
291/// Full durable-retention pass for `doc_id`: thin by age, enforce caps, then
292/// garbage-collect now-unreferenced objects. The single entry point a host calls
293/// periodically (e.g. on close or on a timer).
294pub fn maintain(
295    fs: &impl Fs,
296    paths: &StorePaths,
297    clock: &impl Clock,
298    doc_id: &str,
299    policy: &RetentionPolicy,
300) -> Result<MaintainReport, SessionError> {
301    let thinned = apply_thinning(fs, paths, clock, doc_id, policy)?;
302    let capped = apply_caps(fs, paths, doc_id, policy)?;
303    let collected = gc(fs, paths, doc_id)?;
304    Ok(MaintainReport {
305        thinned,
306        capped,
307        collected,
308    })
309}
310
311// ── Tests ─────────────────────────────────────────────────────────────────────
312
313#[cfg(test)]
314mod tests {
315    use std::time::{Duration, UNIX_EPOCH};
316
317    use super::*;
318    use crate::adapter::{FakeClock, MemFs};
319    use crate::layout::StorePaths;
320    use crate::manifest::append_record;
321    use crate::tier2::VersionMeta;
322
323    // ── Helper ────────────────────────────────────────────────────────────────
324
325    fn rec(seq: u64, parent: Option<&str>, ts_ms: u128, label: Option<&str>) -> HistoryRecord {
326        let mut r = HistoryRecord::new(
327            format!("v{seq}"),
328            seq,
329            parent.map(str::to_owned),
330            format!("hash{seq}"),
331        );
332        r.timestamp_ms = Some(ts_ms);
333        r.label = label.map(str::to_owned);
334        r
335    }
336
337    fn policy() -> RetentionPolicy {
338        RetentionPolicy::default()
339    }
340
341    /// Seed via tier2::record_version with a fixed-increment FakeClock.
342    fn seed_via_tier2(
343        fs: &MemFs,
344        paths: &StorePaths,
345        doc_id: &str,
346        contents: &[&[u8]],
347        labels: &[Option<&str>],
348        base_ms: u64,
349    ) {
350        for (i, content) in contents.iter().enumerate() {
351            let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(base_ms + i as u64 * 100));
352            let label = labels.get(i).copied().flatten();
353            crate::tier2::record_version(
354                fs,
355                paths,
356                &clock,
357                doc_id,
358                content,
359                VersionMeta {
360                    label,
361                    ..Default::default()
362                },
363            )
364            .unwrap();
365        }
366    }
367
368    // ── Pure thin_versions tests ──────────────────────────────────────────────
369
370    #[test]
371    fn empty_keeps_nothing() {
372        let kept = thin_versions(&[], 1_000_000, &policy());
373        assert!(kept.is_empty());
374    }
375
376    #[test]
377    fn all_recent_kept() {
378        // now = 10 hours in ms; all three versions within 1h of now → all kept
379        let now_ms: u128 = 10 * MS_PER_HOUR;
380        let versions = vec![
381            rec(0, None, now_ms - 10_000, None),            // 10s ago
382            rec(1, Some("v0"), now_ms - 30_000, None),      // 30s ago
383            rec(2, Some("v1"), now_ms - 59 * 60_000, None), // 59min ago
384        ];
385        let kept = thin_versions(&versions, now_ms, &policy());
386        assert!(kept.contains("v0"), "v0 should be kept");
387        assert!(kept.contains("v1"), "v1 should be kept");
388        assert!(kept.contains("v2"), "v2 should be kept (latest + recent)");
389    }
390
391    #[test]
392    fn latest_always_kept() {
393        // A single very old version must be kept because it is the latest.
394        let now_ms: u128 = 100 * MS_PER_DAY;
395        let versions = vec![rec(0, None, 0, None)];
396        let kept = thin_versions(&versions, now_ms, &policy());
397        assert!(kept.contains("v0"), "latest must always be kept");
398    }
399
400    #[test]
401    fn named_always_kept() {
402        // Two versions in the same weekly bucket: one named, one not.
403        // The named one must be kept; the unnamed one may be thinned.
404        let now_ms: u128 = 100 * MS_PER_DAY;
405        // Both in week 0 relative to our weekly bucket computation.
406        let base_ts = now_ms - 60 * MS_PER_DAY; // 60 days ago → weekly bucket
407        let versions = vec![
408            rec(0, None, base_ts, Some("release-1.0")), // named
409            rec(1, Some("v0"), base_ts + 1_000, None),  // unnamed, same bucket
410            rec(2, Some("v1"), now_ms - 1_000, None),   // latest
411        ];
412        let kept = thin_versions(&versions, now_ms, &policy());
413        assert!(kept.contains("v0"), "named version must survive thinning");
414        // v2 is the latest and must be kept
415        assert!(kept.contains("v2"), "latest must be kept");
416        // v1 is an old unnamed in the same weekly bucket as v0; it may be dropped
417        // (bucket keeps highest seq, which could be v1, but v0 is named so it's
418        // kept independently — v1 may or may not survive depending on buckets)
419        // The important invariant is: named v0 is kept regardless.
420    }
421
422    #[test]
423    fn hourly_bucket_keeps_newest() {
424        // Two versions in the same hourly bucket; only the higher-seq one is kept
425        // (plus the latest).
426        let now_ms: u128 = 10 * MS_PER_HOUR;
427        // Age ~2h and ~2h+5min → both fall in hour-bucket 2 (age/MS_PER_HOUR == 2)
428        let v0_ts = now_ms - 2 * MS_PER_HOUR - 5 * 60_000; // older (lower seq)
429        let v1_ts = now_ms - 2 * MS_PER_HOUR; // newer (higher seq)
430        let v2_ts = now_ms - 60_000; // latest (recent, kept)
431        let versions = vec![
432            rec(0, None, v0_ts, None),
433            rec(1, Some("v0"), v1_ts, None),
434            rec(2, Some("v1"), v2_ts, None),
435        ];
436        let kept = thin_versions(&versions, now_ms, &policy());
437        // v1 has higher seq in bucket hour-2, so it wins; v0 should be dropped
438        assert!(
439            !kept.contains("v0"),
440            "lower-seq in same hourly bucket should be dropped"
441        );
442        assert!(
443            kept.contains("v1"),
444            "higher-seq in hourly bucket must be kept"
445        );
446        assert!(kept.contains("v2"), "latest must be kept");
447    }
448
449    #[test]
450    fn daily_and_weekly_buckets() {
451        // Construct versions spanning multiple days and weeks; assert one per bucket.
452        let now_ms: u128 = 60 * MS_PER_DAY; // "now" = 60 days since epoch
453
454        // Day-1 ago: two versions in the same daily bucket → only higher seq kept.
455        let day1_early = now_ms - MS_PER_DAY - 3 * MS_PER_HOUR;
456        let day1_late = now_ms - MS_PER_DAY - MS_PER_HOUR;
457
458        // Day-2 ago: one version.
459        let day2 = now_ms - 2 * MS_PER_DAY - MS_PER_HOUR;
460
461        // 35 days ago: two versions in the same weekly bucket → only higher seq kept.
462        let week5_early = now_ms - 35 * MS_PER_DAY - 2 * MS_PER_HOUR;
463        let week5_late = now_ms - 35 * MS_PER_DAY - MS_PER_HOUR;
464
465        let versions = vec![
466            rec(0, None, week5_early, None),
467            rec(1, Some("v0"), week5_late, None),
468            rec(2, Some("v1"), day2, None),
469            rec(3, Some("v2"), day1_early, None),
470            rec(4, Some("v3"), day1_late, None),
471            rec(5, Some("v4"), now_ms - 30_000, None), // latest (recent)
472        ];
473        let kept = thin_versions(&versions, now_ms, &policy());
474
475        // Latest always kept.
476        assert!(kept.contains("v5"));
477
478        // Daily bucket for day-1: v4 (higher seq) kept, v3 dropped.
479        assert!(kept.contains("v4"), "v4 is higher-seq in day-1 bucket");
480        assert!(!kept.contains("v3"), "v3 is lower-seq in day-1 bucket");
481
482        // Daily bucket for day-2: only v2.
483        assert!(kept.contains("v2"), "sole version in day-2 bucket");
484
485        // Weekly bucket for week containing 35d ago: v1 (higher seq) kept, v0 dropped.
486        assert!(kept.contains("v1"), "v1 is higher-seq in weekly bucket");
487        assert!(!kept.contains("v0"), "v0 is lower-seq in weekly bucket");
488    }
489
490    #[test]
491    fn missing_timestamp_kept() {
492        let now_ms: u128 = 10 * MS_PER_HOUR;
493        let mut no_ts = HistoryRecord::new("no_ts", 0, None, "hashX");
494        no_ts.timestamp_ms = None; // explicitly absent
495        let versions = vec![no_ts];
496        let kept = thin_versions(&versions, now_ms, &policy());
497        assert!(
498            kept.contains("no_ts"),
499            "version with no timestamp must always be kept"
500        );
501    }
502
503    // ── apply_thinning tests ──────────────────────────────────────────────────
504
505    fn seed_versions(fs: &MemFs, paths: &StorePaths, doc_id: &str, records: &[HistoryRecord]) {
506        for r in records {
507            append_record(fs, &paths.versions_file(doc_id), r).unwrap();
508        }
509    }
510
511    #[test]
512    fn apply_drops_and_relinks() {
513        let fs = MemFs::new();
514        let paths = StorePaths::new("/store");
515        let doc_id = "doc1";
516
517        // now = 5 hours in ms
518        let now_ms: u128 = 5 * MS_PER_HOUR;
519        let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(now_ms as u64));
520
521        // v0: 3h ago (hourly bucket 3) — will be the only one in that bucket
522        // v1: 2h 30min ago (hourly bucket 2) — lower seq in bucket 2
523        // v2: 2h ago (hourly bucket 2) — higher seq in bucket 2; v1 should be dropped
524        // v3: 30s ago (< 1h — recent, always kept); also the latest
525        let v0_ts = now_ms - 3 * MS_PER_HOUR;
526        let v1_ts = now_ms - 2 * MS_PER_HOUR - 30 * 60_000;
527        let v2_ts = now_ms - 2 * MS_PER_HOUR;
528        let v3_ts = now_ms - 30_000;
529
530        let records = vec![
531            rec(0, None, v0_ts, None),
532            rec(1, Some("v0"), v1_ts, None),
533            rec(2, Some("v1"), v2_ts, None),
534            rec(3, Some("v2"), v3_ts, None),
535        ];
536        seed_versions(&fs, &paths, doc_id, &records);
537
538        let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
539
540        // v1 (lower seq in bucket 2) is dropped; v0, v2, v3 are kept.
541        assert_eq!(report.dropped, 1, "v1 should be dropped");
542        assert_eq!(report.kept, 3, "v0, v2, v3 should be kept");
543
544        // Read back and verify parent chain.
545        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
546        assert_eq!(kept_back.len(), 3);
547
548        // Order must be preserved: v0, v2, v3
549        assert_eq!(kept_back[0].id, "v0");
550        assert_eq!(kept_back[1].id, "v2");
551        assert_eq!(kept_back[2].id, "v3");
552
553        // Parent chain: first kept has None, then each points to previous kept id.
554        assert_eq!(
555            kept_back[0].parent, None,
556            "first kept record must have parent None"
557        );
558        assert_eq!(
559            kept_back[1].parent,
560            Some("v0".to_string()),
561            "v2 must re-link to v0"
562        );
563        assert_eq!(
564            kept_back[2].parent,
565            Some("v2".to_string()),
566            "v3 must re-link to v2"
567        );
568
569        // Dropped id is absent.
570        assert!(
571            kept_back.iter().all(|r| r.id != "v1"),
572            "v1 must not appear in rewritten manifest"
573        );
574    }
575
576    #[test]
577    fn apply_empty_is_noop() {
578        let fs = MemFs::new();
579        let paths = StorePaths::new("/store");
580        let doc_id = "empty_doc";
581        let clock = FakeClock(UNIX_EPOCH + Duration::from_secs(1_000_000));
582
583        let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
584        assert_eq!(
585            report,
586            ThinReport {
587                kept: 0,
588                dropped: 0
589            }
590        );
591    }
592
593    #[test]
594    fn apply_preserves_named() {
595        let fs = MemFs::new();
596        let paths = StorePaths::new("/store");
597        let doc_id = "doc2";
598
599        // now = 90 days in ms
600        let now_ms: u128 = 90 * MS_PER_DAY;
601        let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(now_ms as u64));
602
603        // Named version 60 days ago (weekly bucket).
604        // Unnamed neighbour in the same weekly bucket with higher seq.
605        // Latest version (recent).
606        let named_ts = now_ms - 60 * MS_PER_DAY;
607        let unnamed_ts = named_ts + MS_PER_HOUR; // slightly newer, same weekly bucket
608        let latest_ts = now_ms - 5_000;
609
610        let records = vec![
611            rec(0, None, named_ts, Some("v1.0")), // named — must survive
612            rec(1, Some("v0"), unnamed_ts, None), // unnamed, same bucket; may be dropped
613            rec(2, Some("v1"), latest_ts, None),  // latest
614        ];
615        seed_versions(&fs, &paths, doc_id, &records);
616
617        let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
618
619        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
620        let ids: Vec<&str> = kept_back.iter().map(|r| r.id.as_str()).collect();
621
622        assert!(ids.contains(&"v0"), "named version v0 must be preserved");
623        assert!(ids.contains(&"v2"), "latest v2 must be preserved");
624        assert_eq!(report.kept + report.dropped, 3);
625    }
626
627    // ── apply_caps tests ──────────────────────────────────────────────────────
628
629    #[test]
630    fn apply_caps_count_drops_oldest_unnamed() {
631        let fs = MemFs::new();
632        let paths = StorePaths::new("/store");
633        let doc_id = "caps_count";
634
635        // Seed 5 unnamed versions with distinct content so objects differ.
636        seed_via_tier2(
637            &fs,
638            &paths,
639            doc_id,
640            &[b"c0", b"c1", b"c2", b"c3", b"c4"],
641            &[None, None, None, None, None],
642            1_000,
643        );
644
645        let policy = RetentionPolicy {
646            max_versions: 3,
647            max_object_bytes: u64::MAX,
648            ..Default::default()
649        };
650        let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
651        assert_eq!(report.kept, 3, "should keep exactly 3");
652        assert_eq!(report.dropped, 2, "should drop the 2 oldest");
653
654        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
655        assert_eq!(kept_back.len(), 3);
656
657        // Latest (v4) must survive.
658        assert_eq!(kept_back.last().map(|r| r.id.as_str()), Some("v4"));
659        // v0 and v1 (oldest unnamed) must be gone.
660        assert!(kept_back.iter().all(|r| r.id != "v0"), "v0 must be dropped");
661        assert!(kept_back.iter().all(|r| r.id != "v1"), "v1 must be dropped");
662        // v2 is the new first kept; its parent must be None.
663        assert_eq!(kept_back[0].id, "v2");
664        assert_eq!(
665            kept_back[0].parent, None,
666            "first kept must have parent None"
667        );
668    }
669
670    #[test]
671    fn apply_caps_keeps_named() {
672        let fs = MemFs::new();
673        let paths = StorePaths::new("/store");
674        let doc_id = "caps_named";
675
676        // v0 named, v1–v3 unnamed.
677        seed_via_tier2(
678            &fs,
679            &paths,
680            doc_id,
681            &[b"n0", b"u1", b"u2", b"u3"],
682            &[Some("keep-me"), None, None, None],
683            1_000,
684        );
685
686        let policy = RetentionPolicy {
687            max_versions: 2,
688            max_object_bytes: u64::MAX,
689            ..Default::default()
690        };
691        let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
692
693        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
694        let ids: Vec<&str> = kept_back.iter().map(|r| r.id.as_str()).collect();
695
696        // Named v0 must survive even though it is oldest.
697        assert!(ids.contains(&"v0"), "named version must survive caps");
698        // Latest (v3) must survive.
699        assert!(ids.contains(&"v3"), "latest must survive caps");
700        assert_eq!(report.kept, kept_back.len());
701    }
702
703    #[test]
704    fn apply_caps_size_cap() {
705        let fs = MemFs::new();
706        let paths = StorePaths::new("/store");
707        let doc_id = "caps_size";
708
709        // Three versions with sizeable, distinct content.
710        let big0 = vec![b'A'; 2_000];
711        let big1 = vec![b'B'; 2_000];
712        let big2 = vec![b'C'; 2_000];
713        seed_via_tier2(
714            &fs,
715            &paths,
716            doc_id,
717            &[&big0, &big1, &big2],
718            &[None, None, None],
719            1_000,
720        );
721
722        // Get the compressed size of one object so we can set the cap tight.
723        let versions = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
724        let one_obj_size =
725            crate::store::object_size(&fs, &paths, doc_id, &versions[0].snapshot).unwrap_or(1);
726        // Cap to just above one object's size — forces dropping at least one old unnamed.
727        let policy = RetentionPolicy {
728            max_versions: usize::MAX,
729            max_object_bytes: one_obj_size + 1,
730            ..Default::default()
731        };
732
733        let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
734        assert!(
735            report.dropped >= 1,
736            "at least one old unnamed must be dropped by size cap"
737        );
738
739        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
740        assert!(
741            kept_back.iter().any(|r| r.id == "v2"),
742            "latest (v2) must always survive"
743        );
744    }
745
746    #[test]
747    fn apply_caps_empty_noop() {
748        let fs = MemFs::new();
749        let paths = StorePaths::new("/store");
750        let report = apply_caps(&fs, &paths, "empty_doc", &RetentionPolicy::default()).unwrap();
751        assert_eq!(
752            report,
753            CapReport {
754                kept: 0,
755                dropped: 0
756            }
757        );
758    }
759
760    // ── maintain test ─────────────────────────────────────────────────────────
761
762    #[test]
763    fn maintain_runs_all_passes() {
764        let fs = MemFs::new();
765        let paths = StorePaths::new("/store");
766        let doc_id = "maintain_doc";
767
768        // Seed several versions via record_version so objects are real.
769        let base_ms: u64 = 0;
770        seed_via_tier2(
771            &fs,
772            &paths,
773            doc_id,
774            &[b"m0", b"m1", b"m2", b"m3", b"m4"],
775            &[None, None, Some("tagged"), None, None],
776            base_ms,
777        );
778
779        // Use a clock well into the future so age-based thinning can act.
780        let future_ms = 90 * 24 * 3_600_000_u64; // 90 days
781        let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(future_ms));
782
783        let report = maintain(&fs, &paths, &clock, doc_id, &RetentionPolicy::default()).unwrap();
784
785        // All three sub-reports must exist without error; the file must be readable.
786        let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
787        assert!(
788            !kept_back.is_empty(),
789            "at least the latest must survive maintain"
790        );
791
792        // Latest content must be intact.
793        let last_id = kept_back.last().map(|r| r.id.clone()).unwrap();
794        let content = crate::tier2::version_content(&fs, &paths, doc_id, &last_id).unwrap();
795        assert_eq!(
796            content, b"m4",
797            "latest version content must be intact after maintain"
798        );
799
800        // GC kept at least as many objects as surviving distinct snapshots.
801        let distinct_snapshots: std::collections::BTreeSet<&str> =
802            kept_back.iter().map(|r| r.snapshot.as_str()).collect();
803        assert!(
804            report.collected.kept >= distinct_snapshots.len(),
805            "gc must have kept at least the surviving snapshots' objects"
806        );
807    }
808}