use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
pub const DEFAULT_RETENTION_FLOOR: Duration = Duration::from_secs(24 * 60 * 60);
#[derive(Clone, Copy, Debug)]
pub struct RefcountEntry {
pub refcount: u32,
pub first_seen_unix_ms: u64,
pub last_seen_unix_ms: u64,
pub pinned: bool,
pub size_bytes: Option<u64>,
}
#[derive(Clone, Debug, Default)]
pub struct BlobRefcountTable {
inner: Arc<DashMap<[u8; 32], RefcountEntry>>,
}
impl BlobRefcountTable {
pub fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn store_observed(&self, hash: [u8; 32], size_bytes: u64, now_unix_ms: u64) {
self.inner
.entry(hash)
.and_modify(|e| {
e.last_seen_unix_ms = now_unix_ms;
if e.size_bytes.is_none() {
e.size_bytes = Some(size_bytes);
}
})
.or_insert(RefcountEntry {
refcount: 0,
first_seen_unix_ms: now_unix_ms,
last_seen_unix_ms: now_unix_ms,
pinned: false,
size_bytes: Some(size_bytes),
});
}
pub fn incr(&self, hash: [u8; 32], now_unix_ms: u64) -> u32 {
let mut entry = self.inner.entry(hash).or_insert(RefcountEntry {
refcount: 0,
first_seen_unix_ms: now_unix_ms,
last_seen_unix_ms: now_unix_ms,
pinned: false,
size_bytes: None,
});
entry.refcount = entry.refcount.saturating_add(1);
entry.last_seen_unix_ms = now_unix_ms;
entry.refcount
}
pub fn decr(&self, hash: [u8; 32], now_unix_ms: u64) -> u32 {
match self.inner.get_mut(&hash) {
Some(mut entry) => {
entry.refcount = entry.refcount.saturating_sub(1);
entry.last_seen_unix_ms = now_unix_ms;
entry.refcount
}
None => 0,
}
}
pub fn pin(&self, hash: [u8; 32], now_unix_ms: u64) {
self.inner
.entry(hash)
.and_modify(|e| {
e.pinned = true;
e.last_seen_unix_ms = now_unix_ms;
})
.or_insert(RefcountEntry {
refcount: 0,
first_seen_unix_ms: now_unix_ms,
last_seen_unix_ms: now_unix_ms,
pinned: true,
size_bytes: None,
});
}
pub fn unpin(&self, hash: [u8; 32], now_unix_ms: u64) {
if let Some(mut entry) = self.inner.get_mut(&hash) {
entry.pinned = false;
entry.last_seen_unix_ms = now_unix_ms;
}
}
pub fn get(&self, hash: &[u8; 32]) -> Option<RefcountEntry> {
self.inner.get(hash).map(|r| *r)
}
pub fn pinned_count(&self) -> usize {
self.inner.iter().filter(|e| e.value().pinned).count()
}
pub fn snapshot(&self) -> Vec<([u8; 32], RefcountEntry)> {
self.inner.iter().map(|r| (*r.key(), *r.value())).collect()
}
pub fn snapshot_filter<F>(&self, mut accept: F) -> Vec<([u8; 32], RefcountEntry)>
where
F: FnMut(&[u8; 32]) -> bool,
{
self.inner
.iter()
.filter_map(|r| {
let key = *r.key();
if accept(&key) {
Some((key, *r.value()))
} else {
None
}
})
.collect()
}
pub fn zero_refcount_count(&self) -> usize {
self.inner
.iter()
.filter(|e| e.value().refcount == 0)
.count()
}
pub fn deletable_hashes(
&self,
now_unix_ms: u64,
retention_floor: Duration,
disk_pressure_critical: bool,
) -> Vec<[u8; 32]> {
self.inner
.iter()
.filter_map(|entry| {
if should_sweep(
entry.value(),
now_unix_ms,
retention_floor,
disk_pressure_critical,
) {
Some(*entry.key())
} else {
None
}
})
.collect()
}
pub fn remove(&self, hash: &[u8; 32]) {
self.inner.remove(hash);
}
pub fn remove_if_deletable(
&self,
hash: &[u8; 32],
now_unix_ms: u64,
retention_floor: Duration,
disk_pressure_critical: bool,
) -> bool {
self.take_if_deletable(hash, now_unix_ms, retention_floor, disk_pressure_critical)
.is_some()
}
pub fn take_if_deletable(
&self,
hash: &[u8; 32],
now_unix_ms: u64,
retention_floor: Duration,
disk_pressure_critical: bool,
) -> Option<RefcountEntry> {
self.inner
.remove_if(hash, |_, entry| {
should_sweep(entry, now_unix_ms, retention_floor, disk_pressure_critical)
})
.map(|(_, entry)| entry)
}
pub fn restore_if_absent(&self, hash: [u8; 32], entry: RefcountEntry) -> bool {
use dashmap::mapref::entry::Entry;
match self.inner.entry(hash) {
Entry::Occupied(_) => false,
Entry::Vacant(slot) => {
slot.insert(entry);
true
}
}
}
}
pub fn should_sweep(
entry: &RefcountEntry,
now_unix_ms: u64,
retention_floor: Duration,
disk_pressure_critical: bool,
) -> bool {
if entry.pinned {
return false;
}
if entry.refcount > 0 {
return false;
}
if disk_pressure_critical {
return false;
}
let age_ms = now_unix_ms.saturating_sub(entry.first_seen_unix_ms);
let floor_ms = retention_floor.as_millis() as u64;
age_ms >= floor_ms
}
#[cfg(test)]
mod tests {
use super::*;
fn h(byte: u8) -> [u8; 32] {
[byte; 32]
}
const ONE_HOUR_MS: u64 = 60 * 60 * 1000;
const ONE_DAY_MS: u64 = 24 * ONE_HOUR_MS;
#[test]
fn table_starts_empty() {
let t = BlobRefcountTable::new();
assert!(t.is_empty());
assert_eq!(t.len(), 0);
}
#[test]
fn incr_creates_entry_with_initial_refcount() {
let t = BlobRefcountTable::new();
assert_eq!(t.incr(h(1), 1_000), 1);
let e = t.get(&h(1)).unwrap();
assert_eq!(e.refcount, 1);
assert_eq!(e.first_seen_unix_ms, 1_000);
assert_eq!(e.last_seen_unix_ms, 1_000);
assert!(!e.pinned);
}
#[test]
fn incr_preserves_first_seen() {
let t = BlobRefcountTable::new();
t.incr(h(1), 1_000);
t.incr(h(1), 5_000);
let e = t.get(&h(1)).unwrap();
assert_eq!(e.refcount, 2);
assert_eq!(e.first_seen_unix_ms, 1_000);
assert_eq!(e.last_seen_unix_ms, 5_000);
}
#[test]
fn decr_clamps_at_zero() {
let t = BlobRefcountTable::new();
t.incr(h(1), 1_000);
assert_eq!(t.decr(h(1), 2_000), 0);
assert_eq!(t.decr(h(1), 3_000), 0);
}
#[test]
fn decr_on_unknown_hash_returns_zero() {
let t = BlobRefcountTable::new();
assert_eq!(t.decr(h(99), 0), 0);
}
#[test]
fn pin_unpin_round_trip() {
let t = BlobRefcountTable::new();
t.pin(h(1), 1_000);
assert!(t.get(&h(1)).unwrap().pinned);
t.unpin(h(1), 2_000);
assert!(!t.get(&h(1)).unwrap().pinned);
}
#[test]
fn pin_admits_hash_with_no_prior_observation() {
let t = BlobRefcountTable::new();
t.pin(h(1), 1_000);
assert_eq!(t.len(), 1);
let e = t.get(&h(1)).unwrap();
assert_eq!(e.refcount, 0);
assert!(e.pinned);
}
#[test]
fn store_observed_stamps_first_seen() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 1_000);
let e = t.get(&h(1)).unwrap();
assert_eq!(e.refcount, 0);
assert_eq!(e.first_seen_unix_ms, 1_000);
}
#[test]
fn store_observed_is_idempotent_on_first_seen() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 1_000);
t.store_observed(h(1), 0, 5_000);
let e = t.get(&h(1)).unwrap();
assert_eq!(e.first_seen_unix_ms, 1_000);
assert_eq!(e.last_seen_unix_ms, 5_000);
}
#[test]
fn pinned_count_and_zero_refcount_count() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
t.store_observed(h(2), 0, 0);
t.incr(h(3), 0);
t.pin(h(4), 0);
assert_eq!(t.pinned_count(), 1);
assert_eq!(t.zero_refcount_count(), 3);
}
#[test]
fn should_sweep_admits_when_all_rules_pass() {
let entry = RefcountEntry {
refcount: 0,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: false,
size_bytes: None,
};
let now = 25 * ONE_HOUR_MS;
assert!(should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, false));
}
#[test]
fn should_sweep_rejects_pinned() {
let entry = RefcountEntry {
refcount: 0,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: true,
size_bytes: None,
};
let now = 25 * ONE_HOUR_MS;
assert!(!should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, false));
}
#[test]
fn should_sweep_rejects_nonzero_refcount() {
let entry = RefcountEntry {
refcount: 1,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: false,
size_bytes: None,
};
let now = 25 * ONE_HOUR_MS;
assert!(!should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, false));
}
#[test]
fn should_sweep_rejects_under_retention_floor() {
let entry = RefcountEntry {
refcount: 0,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: false,
size_bytes: None,
};
let now = 12 * ONE_HOUR_MS;
assert!(!should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, false));
}
#[test]
fn should_sweep_at_exact_floor_boundary_is_inclusive() {
let entry = RefcountEntry {
refcount: 0,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: false,
size_bytes: None,
};
let now = ONE_DAY_MS;
assert!(should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, false));
}
#[test]
fn should_sweep_rejects_under_disk_pressure() {
let entry = RefcountEntry {
refcount: 0,
first_seen_unix_ms: 0,
last_seen_unix_ms: 0,
pinned: false,
size_bytes: None,
};
let now = 25 * ONE_HOUR_MS;
assert!(!should_sweep(&entry, now, DEFAULT_RETENTION_FLOOR, true));
}
#[test]
fn deletable_hashes_returns_only_sweep_eligible() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
t.pin(h(2), 0);
t.incr(h(3), 0);
t.store_observed(h(4), 0, 24 * ONE_HOUR_MS);
let now = 25 * ONE_HOUR_MS;
let mut deletable = t.deletable_hashes(now, DEFAULT_RETENTION_FLOOR, false);
deletable.sort();
assert_eq!(deletable, vec![h(1)]);
}
#[test]
fn deletable_hashes_returns_empty_under_pressure() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
let now = 25 * ONE_HOUR_MS;
let deletable = t.deletable_hashes(now, DEFAULT_RETENTION_FLOOR, true);
assert!(deletable.is_empty());
}
#[test]
fn remove_clears_entry() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
assert_eq!(t.len(), 1);
t.remove(&h(1));
assert_eq!(t.len(), 0);
assert!(t.get(&h(1)).is_none());
}
#[test]
fn remove_if_deletable_skips_when_incr_rescues_entry() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
let now = 25 * ONE_HOUR_MS;
t.incr(h(1), now);
let removed = t.remove_if_deletable(&h(1), now, DEFAULT_RETENTION_FLOOR, false);
assert!(!removed, "incr-rescued entry must survive the sweep");
assert!(t.get(&h(1)).is_some(), "refcount entry must persist");
}
#[test]
fn remove_if_deletable_removes_when_still_eligible() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
let now = 25 * ONE_HOUR_MS;
let removed = t.remove_if_deletable(&h(1), now, DEFAULT_RETENTION_FLOOR, false);
assert!(removed, "unmodified eligible entry must be removed");
assert!(t.get(&h(1)).is_none());
}
#[test]
fn remove_if_deletable_skips_under_disk_pressure() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
let now = 25 * ONE_HOUR_MS;
let removed = t.remove_if_deletable(&h(1), now, DEFAULT_RETENTION_FLOOR, true);
assert!(!removed, "critical disk pressure aborts the sweep delete");
assert!(t.get(&h(1)).is_some());
}
#[test]
fn remove_if_deletable_idempotent_when_absent() {
let t = BlobRefcountTable::new();
let removed =
t.remove_if_deletable(&h(1), 25 * ONE_HOUR_MS, DEFAULT_RETENTION_FLOOR, false);
assert!(!removed);
}
#[test]
fn take_if_deletable_returns_entry_for_retry_path() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 1234, 0);
let now = 25 * ONE_HOUR_MS;
let snapshot = t
.take_if_deletable(&h(1), now, DEFAULT_RETENTION_FLOOR, false)
.expect("eligible entry must be returned");
assert_eq!(snapshot.size_bytes, Some(1234));
assert_eq!(snapshot.refcount, 0);
assert!(t.get(&h(1)).is_none(), "entry has been removed");
let restored = t.restore_if_absent(h(1), snapshot);
assert!(restored);
assert_eq!(
t.get(&h(1)).map(|e| e.size_bytes),
Some(Some(1234)),
"restored entry preserves the original size_bytes",
);
}
#[test]
fn restore_if_absent_does_not_clobber_concurrent_incr() {
let t = BlobRefcountTable::new();
t.store_observed(h(1), 0, 0);
let now = 25 * ONE_HOUR_MS;
let snapshot = t
.take_if_deletable(&h(1), now, DEFAULT_RETENTION_FLOOR, false)
.expect("eligible entry must be returned");
assert_eq!(snapshot.refcount, 0);
t.incr(h(1), now);
assert_eq!(t.get(&h(1)).unwrap().refcount, 1);
let restored = t.restore_if_absent(h(1), snapshot);
assert!(!restored, "incr-occupied slot must not be overwritten");
assert_eq!(
t.get(&h(1)).unwrap().refcount,
1,
"concurrent incr's refcount=1 must survive the restore attempt",
);
}
#[test]
fn incr_saturates_at_u32_max() {
let t = BlobRefcountTable::new();
for _ in 0..3 {
t.incr(h(1), 0);
}
let _ = t;
}
#[test]
fn concurrent_incr_decr_balanced_lands_at_zero() {
use std::sync::{Arc, Barrier};
use std::thread;
let table = BlobRefcountTable::new();
let target = h(0x11);
table.incr(target, 0);
let threads = 8usize;
let ops_per_thread = 2_000u64;
let start = Arc::new(Barrier::new(threads));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for _ in 0..ops_per_thread {
table.incr(target, 0);
table.decr(target, 0);
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let entry = table.get(&target).expect("entry must still exist");
assert_eq!(
entry.refcount, 1,
"balanced incr/decr storm + seed must leave refcount at 1"
);
}
#[test]
fn concurrent_incr_accumulates_exactly_under_saturation_cap() {
use std::sync::{Arc, Barrier};
use std::thread;
let table = BlobRefcountTable::new();
let target = h(0x22);
let threads = 8usize;
let per_thread = 1_000u32;
let start = Arc::new(Barrier::new(threads));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for _ in 0..per_thread {
table.incr(target, 0);
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let entry = table.get(&target).expect("entry must exist");
assert_eq!(
entry.refcount as u64,
threads as u64 * per_thread as u64,
"incr from {} threads × {} should sum exactly",
threads,
per_thread
);
}
#[test]
fn concurrent_decr_saturates_at_zero_under_overdecrement() {
use std::sync::{Arc, Barrier};
use std::thread;
let table = BlobRefcountTable::new();
let target = h(0x33);
for _ in 0..10 {
table.incr(target, 0);
}
let threads = 8usize;
let per_thread = 100u32;
let start = Arc::new(Barrier::new(threads));
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for _ in 0..per_thread {
table.decr(target, 0);
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let entry = table.get(&target).expect("entry must exist");
assert_eq!(
entry.refcount, 0,
"decr must saturate at 0 even when threads race past the floor"
);
}
#[test]
fn concurrent_pin_unpin_incr_decr_is_panic_free() {
use std::sync::{Arc, Barrier};
use std::thread;
let table = BlobRefcountTable::new();
let target = h(0x44);
let threads = 4usize;
let per_thread = 1_000u32;
let start = Arc::new(Barrier::new(threads * 2));
let mut handles = Vec::with_capacity(threads * 2);
for _ in 0..threads {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for i in 0..per_thread {
if i % 2 == 0 {
table.pin(target, i as u64);
} else {
table.unpin(target, i as u64);
}
}
}));
}
for _ in 0..threads {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for i in 0..per_thread {
if i % 2 == 0 {
table.incr(target, i as u64);
} else {
table.decr(target, i as u64);
}
}
}));
}
for h in handles {
h.join().expect("worker panicked");
}
let entry = table
.get(&target)
.expect("entry must still exist after the race");
assert!(
entry.refcount <= (threads as u32) * per_thread,
"refcount must stay within the saturating envelope; got {}",
entry.refcount
);
}
#[test]
fn deletable_hashes_concurrent_with_incr_is_panic_free() {
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
let table = BlobRefcountTable::new();
for i in 0..32u8 {
table.store_observed(h(i), 0, 0);
}
let threads = 4usize;
let per_thread = 2_000u32;
let start = Arc::new(Barrier::new(threads + 1));
let mut handles = Vec::with_capacity(threads + 1);
for tid in 0..threads as u8 {
let table = table.clone();
let start = start.clone();
handles.push(thread::spawn(move || {
start.wait();
for _ in 0..per_thread {
table.incr(h(tid), 0);
table.decr(h(tid), 0);
}
}));
}
let table_snap = table.clone();
let start_snap = start.clone();
handles.push(thread::spawn(move || {
start_snap.wait();
for _ in 0..200 {
let _ = table_snap.deletable_hashes(1_000_000, Duration::from_secs(0), false);
}
}));
for h in handles {
h.join().expect("worker panicked");
}
assert!(table.len() >= 32);
}
}