use crate::adapter::Fs;
use crate::error::SessionError;
use crate::identity::read_meta;
use crate::layout::StorePaths;
#[derive(Debug, Clone, PartialEq)]
pub struct GlobalCapReport {
pub evicted: Vec<String>,
pub bytes_before: u64,
pub bytes_after: u64,
}
fn doc_object_bytes(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<u64, SessionError> {
let odir = paths.objects_dir(doc_id);
if !fs.exists(&odir) {
return Ok(0);
}
let mut total: u64 = 0;
for shard in fs.read_dir(&odir)? {
for obj in fs.read_dir(&shard)? {
let bytes = fs.read(&obj)?;
total = total.saturating_add(u64::try_from(bytes.len()).unwrap_or(u64::MAX));
}
}
Ok(total)
}
pub fn enforce_global_cap(
fs: &impl Fs,
paths: &StorePaths,
max_total_bytes: u64,
) -> Result<GlobalCapReport, SessionError> {
let droot = paths.docs_root();
if !fs.exists(&droot) {
return Ok(GlobalCapReport {
evicted: Vec::new(),
bytes_before: 0,
bytes_after: 0,
});
}
struct DocEntry {
id: String,
bytes: u64,
updated_ms: u128,
}
let mut docs: Vec<DocEntry> = Vec::new();
for dir in fs.read_dir(&droot)? {
let id = match dir.file_name().and_then(|n| n.to_str()) {
Some(s) => s.to_owned(),
None => continue,
};
let bytes = doc_object_bytes(fs, paths, &id)?;
let updated_ms = read_meta(fs, paths, &id)?
.map(|m| m.updated_ms)
.unwrap_or(0);
docs.push(DocEntry {
id,
bytes,
updated_ms,
});
}
let bytes_before: u64 = docs.iter().fold(0u64, |a, d| a.saturating_add(d.bytes));
let mut total = bytes_before;
let mut evicted: Vec<String> = Vec::new();
if total <= max_total_bytes {
return Ok(GlobalCapReport {
evicted,
bytes_before,
bytes_after: total,
});
}
docs.sort_by(|a, b| {
a.updated_ms
.cmp(&b.updated_ms)
.then_with(|| a.id.cmp(&b.id))
});
let protected_index = docs.len().saturating_sub(1);
for (i, d) in docs.iter().enumerate() {
if total <= max_total_bytes {
break;
}
if i == protected_index {
continue; }
let dir = paths.doc_dir(&d.id);
if fs.exists(&dir) {
fs.remove(&dir)?;
}
total = total.saturating_sub(d.bytes);
evicted.push(d.id.clone());
}
Ok(GlobalCapReport {
evicted,
bytes_before,
bytes_after: total,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::{FakeClock, FakeRng, MemFs};
use crate::{identity, store};
use std::path::Path;
use std::time::{Duration, UNIX_EPOCH};
fn make_paths() -> StorePaths {
StorePaths::new("/data")
}
fn seed_doc(
fs: &MemFs,
paths: &StorePaths,
doc_id: &str,
content: &[u8],
updated_ms: u64,
) -> String {
let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(updated_ms));
let rng = FakeRng(0x01);
let doc_path = Path::new("/fake/doc.zen");
identity::reconcile(fs, paths, &clock, &rng, Some(doc_id), doc_path).unwrap();
store::put_object(fs, paths, doc_id, content).unwrap();
doc_id.to_owned()
}
#[test]
fn under_cap_evicts_nothing() {
let fs = MemFs::new();
let paths = make_paths();
seed_doc(&fs, &paths, "doc-a", &[42u8; 200], 1000);
seed_doc(&fs, &paths, "doc-b", &[99u8; 200], 2000);
let report = enforce_global_cap(&fs, &paths, 1_000_000).unwrap();
assert!(report.evicted.is_empty(), "nothing should be evicted");
assert_eq!(
report.bytes_after, report.bytes_before,
"bytes unchanged when under cap"
);
}
#[test]
fn evicts_lru_first() {
let fs = MemFs::new();
let paths = make_paths();
seed_doc(&fs, &paths, "old", &vec![0xAAu8; 2000], 100);
let new_hash = store::put_object(&fs, &paths, "new", &[0xBBu8; 50]).unwrap();
{
let clock = FakeClock(UNIX_EPOCH + Duration::from_millis(5000));
let rng = FakeRng(0x02);
identity::reconcile(&fs, &paths, &clock, &rng, Some("new"), Path::new("/n.zen"))
.unwrap();
}
let old_bytes = doc_object_bytes(&fs, &paths, "old").unwrap();
let new_bytes = doc_object_bytes(&fs, &paths, "new").unwrap();
let combined = old_bytes.saturating_add(new_bytes);
let cap = new_bytes.saturating_add(1);
assert!(
combined > cap,
"test requires combined > cap (combined={combined}, cap={cap})"
);
let report = enforce_global_cap(&fs, &paths, cap).unwrap();
assert_eq!(report.evicted, vec!["old"], "older doc should be evicted");
assert!(
report.bytes_after <= cap,
"bytes_after ({}) should be <= cap ({})",
report.bytes_after,
cap
);
let old_hash = store::object_hash(&vec![0xAAu8; 2000]);
assert!(
store::get_object(&fs, &paths, "old", &old_hash).is_err(),
"old doc's object must be gone"
);
let got = store::get_object(&fs, &paths, "new", &new_hash).unwrap();
assert_eq!(got, vec![0xBBu8; 50], "new doc's object must survive");
}
#[test]
fn never_evicts_most_recent() {
let fs = MemFs::new();
let paths = make_paths();
seed_doc(&fs, &paths, "solo", &vec![0xCCu8; 2000], 9999);
let solo_bytes = doc_object_bytes(&fs, &paths, "solo").unwrap();
assert!(solo_bytes > 0, "solo doc must have some bytes");
let cap = solo_bytes.saturating_sub(1);
let report = enforce_global_cap(&fs, &paths, cap).unwrap();
assert!(
report.evicted.is_empty(),
"most-recent (and only) doc must never be evicted"
);
assert_eq!(
report.bytes_after, report.bytes_before,
"bytes unchanged when protected"
);
}
#[test]
fn empty_store_noop() {
let fs = MemFs::new();
let paths = make_paths();
let report = enforce_global_cap(&fs, &paths, 0).unwrap();
assert!(report.evicted.is_empty());
assert_eq!(report.bytes_before, 0);
assert_eq!(report.bytes_after, 0);
}
#[test]
fn missing_meta_treated_as_oldest() {
let fs = MemFs::new();
let paths = make_paths();
store::put_object(&fs, &paths, "no-meta", &vec![0xDDu8; 2000]).unwrap();
seed_doc(&fs, &paths, "recent", &[0xEEu8; 50], 9000);
let no_meta_bytes = doc_object_bytes(&fs, &paths, "no-meta").unwrap();
let recent_bytes = doc_object_bytes(&fs, &paths, "recent").unwrap();
let combined = no_meta_bytes.saturating_add(recent_bytes);
let cap = recent_bytes.saturating_add(1);
assert!(
combined > cap,
"test requires combined > cap (combined={combined}, cap={cap})"
);
let report = enforce_global_cap(&fs, &paths, cap).unwrap();
assert_eq!(
report.evicted,
vec!["no-meta"],
"meta-less doc (updated_ms=0) must be evicted first"
);
assert!(
report.bytes_after <= cap,
"bytes_after ({}) should be <= cap ({})",
report.bytes_after,
cap
);
}
}