1use std::collections::{BTreeMap, BTreeSet};
10use std::time::UNIX_EPOCH;
11
12use crate::adapter::{Clock, Fs};
13use crate::error::SessionError;
14use crate::gc::{GcReport, gc};
15use crate::layout::StorePaths;
16use crate::manifest::{HistoryRecord, read_records};
17use crate::store::object_size;
18
19const MS_PER_HOUR: u128 = 3_600_000;
22const MS_PER_DAY: u128 = 86_400_000;
23const MS_PER_WEEK: u128 = 604_800_000;
24
25#[derive(Debug, Clone, PartialEq)]
33pub struct RetentionPolicy {
34 pub keep_all_below: u128,
36 pub hourly_below: u128,
38 pub daily_below: u128,
40 pub max_versions: usize,
44 pub max_object_bytes: u64,
46}
47
48impl Default for RetentionPolicy {
49 fn default() -> Self {
50 Self {
51 keep_all_below: MS_PER_HOUR,
52 hourly_below: MS_PER_DAY,
53 daily_below: 30 * MS_PER_DAY,
54 max_versions: 500,
55 max_object_bytes: 10 * 1024 * 1024,
56 }
57 }
58}
59
60pub fn thin_versions(
68 versions: &[HistoryRecord],
69 now_ms: u128,
70 policy: &RetentionPolicy,
71) -> BTreeSet<String> {
72 let mut keep: BTreeSet<String> = BTreeSet::new();
73 if versions.is_empty() {
74 return keep;
75 }
76
77 if let Some(last) = versions.last() {
79 keep.insert(last.id.clone());
80 }
81
82 let mut buckets: BTreeMap<(u8, u128), (u64, String)> = BTreeMap::new();
85
86 for v in versions {
87 if v.label.is_some() {
89 keep.insert(v.id.clone());
90 continue;
91 }
92
93 let ts = match v.timestamp_ms {
94 Some(t) => t,
95 None => {
97 keep.insert(v.id.clone());
98 continue;
99 }
100 };
101
102 let age = now_ms.saturating_sub(ts);
103 let key = if age < policy.keep_all_below {
104 (0u8, u128::from(v.seq))
106 } else if age < policy.hourly_below {
107 (1u8, age / MS_PER_HOUR)
108 } else if age < policy.daily_below {
109 (2u8, age / MS_PER_DAY)
110 } else {
111 (3u8, age / MS_PER_WEEK)
112 };
113
114 buckets
116 .entry(key)
117 .and_modify(|e| {
118 if v.seq > e.0 {
119 *e = (v.seq, v.id.clone());
120 }
121 })
122 .or_insert((v.seq, v.id.clone()));
123 }
124
125 for (_, (_, id)) in buckets {
126 keep.insert(id);
127 }
128 keep
129}
130
131#[derive(Debug, Clone, PartialEq)]
135pub struct ThinReport {
136 pub kept: usize,
137 pub dropped: usize,
138}
139
140pub(crate) fn rewrite_versions_keeping(
145 fs: &impl Fs,
146 paths: &StorePaths,
147 doc_id: &str,
148 versions: &[HistoryRecord],
149 keep: &BTreeSet<String>,
150) -> Result<(usize, usize), SessionError> {
151 let mut kept_records: Vec<HistoryRecord> = Vec::new();
152 let mut prev_kept_id: Option<String> = None;
153 let mut dropped = 0usize;
154 for v in versions {
155 if keep.contains(&v.id) {
156 let mut r = v.clone();
157 r.parent = prev_kept_id.clone();
158 prev_kept_id = Some(v.id.clone());
159 kept_records.push(r);
160 } else {
161 dropped += 1;
162 }
163 }
164 let mut bytes: Vec<u8> = Vec::new();
165 for r in &kept_records {
166 let mut line = serde_json::to_vec(r)
167 .map_err(|e| SessionError::new(format!("serialize version: {e}")))?;
168 line.push(b'\n');
169 bytes.extend_from_slice(&line);
170 }
171 let vpath = paths.versions_file(doc_id);
172 if let Some(parent) = vpath.parent() {
173 fs.create_dir_all(parent)?;
174 }
175 fs.write(&vpath, &bytes)?;
176 Ok((kept_records.len(), dropped))
177}
178
179pub fn apply_thinning(
186 fs: &impl Fs,
187 paths: &StorePaths,
188 clock: &impl Clock,
189 doc_id: &str,
190 policy: &RetentionPolicy,
191) -> Result<ThinReport, SessionError> {
192 let versions = read_records(fs, &paths.versions_file(doc_id))?;
193 if versions.is_empty() {
194 return Ok(ThinReport {
195 kept: 0,
196 dropped: 0,
197 });
198 }
199
200 let now_ms = clock
201 .now()
202 .duration_since(UNIX_EPOCH)
203 .ok()
204 .map(|d| d.as_millis())
205 .unwrap_or(0);
206
207 let keep = thin_versions(&versions, now_ms, policy);
208 let (kept, dropped) = rewrite_versions_keeping(fs, paths, doc_id, &versions, &keep)?;
209 Ok(ThinReport { kept, dropped })
210}
211
212#[derive(Debug, Clone, PartialEq)]
216pub struct CapReport {
217 pub kept: usize,
218 pub dropped: usize,
219}
220
221pub fn apply_caps(
227 fs: &impl Fs,
228 paths: &StorePaths,
229 doc_id: &str,
230 policy: &RetentionPolicy,
231) -> Result<CapReport, SessionError> {
232 let versions = read_records(fs, &paths.versions_file(doc_id))?;
233 if versions.is_empty() {
234 return Ok(CapReport {
235 kept: 0,
236 dropped: 0,
237 });
238 }
239
240 let mut sizes: BTreeMap<String, u64> = BTreeMap::new();
242 for v in &versions {
243 if !sizes.contains_key(&v.snapshot) {
244 let sz = object_size(fs, paths, doc_id, &v.snapshot).unwrap_or(0);
245 sizes.insert(v.snapshot.clone(), sz);
246 }
247 }
248
249 let latest_id = versions.last().map(|v| v.id.clone());
250 let mut keep: BTreeSet<String> = versions.iter().map(|v| v.id.clone()).collect();
251
252 let referenced_bytes = |keep: &BTreeSet<String>| -> u64 {
254 let mut seen: BTreeSet<&str> = BTreeSet::new();
255 let mut total: u64 = 0;
256 for v in &versions {
257 if keep.contains(&v.id) && seen.insert(v.snapshot.as_str()) {
258 total = total.saturating_add(*sizes.get(&v.snapshot).unwrap_or(&0));
259 }
260 }
261 total
262 };
263
264 for v in &versions {
266 let over =
267 keep.len() > policy.max_versions || referenced_bytes(&keep) > policy.max_object_bytes;
268 if !over {
269 break;
270 }
271 let is_latest = latest_id.as_deref() == Some(v.id.as_str());
272 if v.label.is_none() && !is_latest && keep.contains(&v.id) {
273 keep.remove(&v.id);
274 }
275 }
276
277 let (kept, dropped) = rewrite_versions_keeping(fs, paths, doc_id, &versions, &keep)?;
278 Ok(CapReport { kept, dropped })
279}
280
281#[derive(Debug, Clone, PartialEq)]
285pub struct MaintainReport {
286 pub thinned: ThinReport,
287 pub capped: CapReport,
288 pub collected: GcReport,
289}
290
291pub fn maintain(
295 fs: &impl Fs,
296 paths: &StorePaths,
297 clock: &impl Clock,
298 doc_id: &str,
299 policy: &RetentionPolicy,
300) -> Result<MaintainReport, SessionError> {
301 let thinned = apply_thinning(fs, paths, clock, doc_id, policy)?;
302 let capped = apply_caps(fs, paths, doc_id, policy)?;
303 let collected = gc(fs, paths, doc_id)?;
304 Ok(MaintainReport {
305 thinned,
306 capped,
307 collected,
308 })
309}
310
311#[cfg(test)]
314mod tests {
315 use std::time::{Duration, UNIX_EPOCH};
316
317 use super::*;
318 use crate::adapter::{FakeClock, MemFs};
319 use crate::layout::StorePaths;
320 use crate::manifest::append_record;
321 use crate::tier2::VersionMeta;
322
323 fn rec(seq: u64, parent: Option<&str>, ts_ms: u128, label: Option<&str>) -> HistoryRecord {
326 let mut r = HistoryRecord::new(
327 format!("v{seq}"),
328 seq,
329 parent.map(str::to_owned),
330 format!("hash{seq}"),
331 );
332 r.timestamp_ms = Some(ts_ms);
333 r.label = label.map(str::to_owned);
334 r
335 }
336
337 fn policy() -> RetentionPolicy {
338 RetentionPolicy::default()
339 }
340
341 fn seed_via_tier2(
343 fs: &MemFs,
344 paths: &StorePaths,
345 doc_id: &str,
346 contents: &[&[u8]],
347 labels: &[Option<&str>],
348 base_ms: u64,
349 ) {
350 for (i, content) in contents.iter().enumerate() {
351 let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(base_ms + i as u64 * 100));
352 let label = labels.get(i).copied().flatten();
353 crate::tier2::record_version(
354 fs,
355 paths,
356 &clock,
357 doc_id,
358 content,
359 VersionMeta {
360 label,
361 ..Default::default()
362 },
363 )
364 .unwrap();
365 }
366 }
367
368 #[test]
371 fn empty_keeps_nothing() {
372 let kept = thin_versions(&[], 1_000_000, &policy());
373 assert!(kept.is_empty());
374 }
375
376 #[test]
377 fn all_recent_kept() {
378 let now_ms: u128 = 10 * MS_PER_HOUR;
380 let versions = vec![
381 rec(0, None, now_ms - 10_000, None), rec(1, Some("v0"), now_ms - 30_000, None), rec(2, Some("v1"), now_ms - 59 * 60_000, None), ];
385 let kept = thin_versions(&versions, now_ms, &policy());
386 assert!(kept.contains("v0"), "v0 should be kept");
387 assert!(kept.contains("v1"), "v1 should be kept");
388 assert!(kept.contains("v2"), "v2 should be kept (latest + recent)");
389 }
390
391 #[test]
392 fn latest_always_kept() {
393 let now_ms: u128 = 100 * MS_PER_DAY;
395 let versions = vec![rec(0, None, 0, None)];
396 let kept = thin_versions(&versions, now_ms, &policy());
397 assert!(kept.contains("v0"), "latest must always be kept");
398 }
399
400 #[test]
401 fn named_always_kept() {
402 let now_ms: u128 = 100 * MS_PER_DAY;
405 let base_ts = now_ms - 60 * MS_PER_DAY; let versions = vec![
408 rec(0, None, base_ts, Some("release-1.0")), rec(1, Some("v0"), base_ts + 1_000, None), rec(2, Some("v1"), now_ms - 1_000, None), ];
412 let kept = thin_versions(&versions, now_ms, &policy());
413 assert!(kept.contains("v0"), "named version must survive thinning");
414 assert!(kept.contains("v2"), "latest must be kept");
416 }
421
422 #[test]
423 fn hourly_bucket_keeps_newest() {
424 let now_ms: u128 = 10 * MS_PER_HOUR;
427 let v0_ts = now_ms - 2 * MS_PER_HOUR - 5 * 60_000; let v1_ts = now_ms - 2 * MS_PER_HOUR; let v2_ts = now_ms - 60_000; let versions = vec![
432 rec(0, None, v0_ts, None),
433 rec(1, Some("v0"), v1_ts, None),
434 rec(2, Some("v1"), v2_ts, None),
435 ];
436 let kept = thin_versions(&versions, now_ms, &policy());
437 assert!(
439 !kept.contains("v0"),
440 "lower-seq in same hourly bucket should be dropped"
441 );
442 assert!(
443 kept.contains("v1"),
444 "higher-seq in hourly bucket must be kept"
445 );
446 assert!(kept.contains("v2"), "latest must be kept");
447 }
448
449 #[test]
450 fn daily_and_weekly_buckets() {
451 let now_ms: u128 = 60 * MS_PER_DAY; let day1_early = now_ms - MS_PER_DAY - 3 * MS_PER_HOUR;
456 let day1_late = now_ms - MS_PER_DAY - MS_PER_HOUR;
457
458 let day2 = now_ms - 2 * MS_PER_DAY - MS_PER_HOUR;
460
461 let week5_early = now_ms - 35 * MS_PER_DAY - 2 * MS_PER_HOUR;
463 let week5_late = now_ms - 35 * MS_PER_DAY - MS_PER_HOUR;
464
465 let versions = vec![
466 rec(0, None, week5_early, None),
467 rec(1, Some("v0"), week5_late, None),
468 rec(2, Some("v1"), day2, None),
469 rec(3, Some("v2"), day1_early, None),
470 rec(4, Some("v3"), day1_late, None),
471 rec(5, Some("v4"), now_ms - 30_000, None), ];
473 let kept = thin_versions(&versions, now_ms, &policy());
474
475 assert!(kept.contains("v5"));
477
478 assert!(kept.contains("v4"), "v4 is higher-seq in day-1 bucket");
480 assert!(!kept.contains("v3"), "v3 is lower-seq in day-1 bucket");
481
482 assert!(kept.contains("v2"), "sole version in day-2 bucket");
484
485 assert!(kept.contains("v1"), "v1 is higher-seq in weekly bucket");
487 assert!(!kept.contains("v0"), "v0 is lower-seq in weekly bucket");
488 }
489
490 #[test]
491 fn missing_timestamp_kept() {
492 let now_ms: u128 = 10 * MS_PER_HOUR;
493 let mut no_ts = HistoryRecord::new("no_ts", 0, None, "hashX");
494 no_ts.timestamp_ms = None; let versions = vec![no_ts];
496 let kept = thin_versions(&versions, now_ms, &policy());
497 assert!(
498 kept.contains("no_ts"),
499 "version with no timestamp must always be kept"
500 );
501 }
502
503 fn seed_versions(fs: &MemFs, paths: &StorePaths, doc_id: &str, records: &[HistoryRecord]) {
506 for r in records {
507 append_record(fs, &paths.versions_file(doc_id), r).unwrap();
508 }
509 }
510
511 #[test]
512 fn apply_drops_and_relinks() {
513 let fs = MemFs::new();
514 let paths = StorePaths::new("/store");
515 let doc_id = "doc1";
516
517 let now_ms: u128 = 5 * MS_PER_HOUR;
519 let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(now_ms as u64));
520
521 let v0_ts = now_ms - 3 * MS_PER_HOUR;
526 let v1_ts = now_ms - 2 * MS_PER_HOUR - 30 * 60_000;
527 let v2_ts = now_ms - 2 * MS_PER_HOUR;
528 let v3_ts = now_ms - 30_000;
529
530 let records = vec![
531 rec(0, None, v0_ts, None),
532 rec(1, Some("v0"), v1_ts, None),
533 rec(2, Some("v1"), v2_ts, None),
534 rec(3, Some("v2"), v3_ts, None),
535 ];
536 seed_versions(&fs, &paths, doc_id, &records);
537
538 let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
539
540 assert_eq!(report.dropped, 1, "v1 should be dropped");
542 assert_eq!(report.kept, 3, "v0, v2, v3 should be kept");
543
544 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
546 assert_eq!(kept_back.len(), 3);
547
548 assert_eq!(kept_back[0].id, "v0");
550 assert_eq!(kept_back[1].id, "v2");
551 assert_eq!(kept_back[2].id, "v3");
552
553 assert_eq!(
555 kept_back[0].parent, None,
556 "first kept record must have parent None"
557 );
558 assert_eq!(
559 kept_back[1].parent,
560 Some("v0".to_string()),
561 "v2 must re-link to v0"
562 );
563 assert_eq!(
564 kept_back[2].parent,
565 Some("v2".to_string()),
566 "v3 must re-link to v2"
567 );
568
569 assert!(
571 kept_back.iter().all(|r| r.id != "v1"),
572 "v1 must not appear in rewritten manifest"
573 );
574 }
575
576 #[test]
577 fn apply_empty_is_noop() {
578 let fs = MemFs::new();
579 let paths = StorePaths::new("/store");
580 let doc_id = "empty_doc";
581 let clock = FakeClock(UNIX_EPOCH + Duration::from_secs(1_000_000));
582
583 let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
584 assert_eq!(
585 report,
586 ThinReport {
587 kept: 0,
588 dropped: 0
589 }
590 );
591 }
592
593 #[test]
594 fn apply_preserves_named() {
595 let fs = MemFs::new();
596 let paths = StorePaths::new("/store");
597 let doc_id = "doc2";
598
599 let now_ms: u128 = 90 * MS_PER_DAY;
601 let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(now_ms as u64));
602
603 let named_ts = now_ms - 60 * MS_PER_DAY;
607 let unnamed_ts = named_ts + MS_PER_HOUR; let latest_ts = now_ms - 5_000;
609
610 let records = vec![
611 rec(0, None, named_ts, Some("v1.0")), rec(1, Some("v0"), unnamed_ts, None), rec(2, Some("v1"), latest_ts, None), ];
615 seed_versions(&fs, &paths, doc_id, &records);
616
617 let report = apply_thinning(&fs, &paths, &clock, doc_id, &policy()).unwrap();
618
619 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
620 let ids: Vec<&str> = kept_back.iter().map(|r| r.id.as_str()).collect();
621
622 assert!(ids.contains(&"v0"), "named version v0 must be preserved");
623 assert!(ids.contains(&"v2"), "latest v2 must be preserved");
624 assert_eq!(report.kept + report.dropped, 3);
625 }
626
627 #[test]
630 fn apply_caps_count_drops_oldest_unnamed() {
631 let fs = MemFs::new();
632 let paths = StorePaths::new("/store");
633 let doc_id = "caps_count";
634
635 seed_via_tier2(
637 &fs,
638 &paths,
639 doc_id,
640 &[b"c0", b"c1", b"c2", b"c3", b"c4"],
641 &[None, None, None, None, None],
642 1_000,
643 );
644
645 let policy = RetentionPolicy {
646 max_versions: 3,
647 max_object_bytes: u64::MAX,
648 ..Default::default()
649 };
650 let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
651 assert_eq!(report.kept, 3, "should keep exactly 3");
652 assert_eq!(report.dropped, 2, "should drop the 2 oldest");
653
654 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
655 assert_eq!(kept_back.len(), 3);
656
657 assert_eq!(kept_back.last().map(|r| r.id.as_str()), Some("v4"));
659 assert!(kept_back.iter().all(|r| r.id != "v0"), "v0 must be dropped");
661 assert!(kept_back.iter().all(|r| r.id != "v1"), "v1 must be dropped");
662 assert_eq!(kept_back[0].id, "v2");
664 assert_eq!(
665 kept_back[0].parent, None,
666 "first kept must have parent None"
667 );
668 }
669
670 #[test]
671 fn apply_caps_keeps_named() {
672 let fs = MemFs::new();
673 let paths = StorePaths::new("/store");
674 let doc_id = "caps_named";
675
676 seed_via_tier2(
678 &fs,
679 &paths,
680 doc_id,
681 &[b"n0", b"u1", b"u2", b"u3"],
682 &[Some("keep-me"), None, None, None],
683 1_000,
684 );
685
686 let policy = RetentionPolicy {
687 max_versions: 2,
688 max_object_bytes: u64::MAX,
689 ..Default::default()
690 };
691 let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
692
693 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
694 let ids: Vec<&str> = kept_back.iter().map(|r| r.id.as_str()).collect();
695
696 assert!(ids.contains(&"v0"), "named version must survive caps");
698 assert!(ids.contains(&"v3"), "latest must survive caps");
700 assert_eq!(report.kept, kept_back.len());
701 }
702
703 #[test]
704 fn apply_caps_size_cap() {
705 let fs = MemFs::new();
706 let paths = StorePaths::new("/store");
707 let doc_id = "caps_size";
708
709 let big0 = vec![b'A'; 2_000];
711 let big1 = vec![b'B'; 2_000];
712 let big2 = vec![b'C'; 2_000];
713 seed_via_tier2(
714 &fs,
715 &paths,
716 doc_id,
717 &[&big0, &big1, &big2],
718 &[None, None, None],
719 1_000,
720 );
721
722 let versions = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
724 let one_obj_size =
725 crate::store::object_size(&fs, &paths, doc_id, &versions[0].snapshot).unwrap_or(1);
726 let policy = RetentionPolicy {
728 max_versions: usize::MAX,
729 max_object_bytes: one_obj_size + 1,
730 ..Default::default()
731 };
732
733 let report = apply_caps(&fs, &paths, doc_id, &policy).unwrap();
734 assert!(
735 report.dropped >= 1,
736 "at least one old unnamed must be dropped by size cap"
737 );
738
739 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
740 assert!(
741 kept_back.iter().any(|r| r.id == "v2"),
742 "latest (v2) must always survive"
743 );
744 }
745
746 #[test]
747 fn apply_caps_empty_noop() {
748 let fs = MemFs::new();
749 let paths = StorePaths::new("/store");
750 let report = apply_caps(&fs, &paths, "empty_doc", &RetentionPolicy::default()).unwrap();
751 assert_eq!(
752 report,
753 CapReport {
754 kept: 0,
755 dropped: 0
756 }
757 );
758 }
759
760 #[test]
763 fn maintain_runs_all_passes() {
764 let fs = MemFs::new();
765 let paths = StorePaths::new("/store");
766 let doc_id = "maintain_doc";
767
768 let base_ms: u64 = 0;
770 seed_via_tier2(
771 &fs,
772 &paths,
773 doc_id,
774 &[b"m0", b"m1", b"m2", b"m3", b"m4"],
775 &[None, None, Some("tagged"), None, None],
776 base_ms,
777 );
778
779 let future_ms = 90 * 24 * 3_600_000_u64; let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(future_ms));
782
783 let report = maintain(&fs, &paths, &clock, doc_id, &RetentionPolicy::default()).unwrap();
784
785 let kept_back = read_records(&fs, &paths.versions_file(doc_id)).unwrap();
787 assert!(
788 !kept_back.is_empty(),
789 "at least the latest must survive maintain"
790 );
791
792 let last_id = kept_back.last().map(|r| r.id.clone()).unwrap();
794 let content = crate::tier2::version_content(&fs, &paths, doc_id, &last_id).unwrap();
795 assert_eq!(
796 content, b"m4",
797 "latest version content must be intact after maintain"
798 );
799
800 let distinct_snapshots: std::collections::BTreeSet<&str> =
802 kept_back.iter().map(|r| r.snapshot.as_str()).collect();
803 assert!(
804 report.collected.kept >= distinct_snapshots.len(),
805 "gc must have kept at least the surviving snapshots' objects"
806 );
807 }
808}