mod admission;
mod cached_blob;
mod guid_hash;
mod mutation;
mod residency;
mod telemetry;
use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use dashmap::DashMap;
use guid_hash::GuidBuildHasher;
use crate::api::errors::{Error, Result};
use crate::layout::{BlobGuid, PAGE_SIZE};
use super::blob_store::{AlignedBlobBuf, BlobStore};
use super::routing_cache::RoutingCache;
use admission::TinyLFU;
pub use cached_blob::{BlobWriteGuard, CachedBlob};
use mutation::{
bookkeeping_shard_idx, pop_candidate_batch, CandidateKind, MutationState, BOOKKEEPING_SHARDS,
};
use residency::RouteResidency;
use telemetry::Telemetry;
pub const STRUCTURAL_SEQ: u64 = u64::MAX;
#[derive(Default)]
struct SnapshotState {
live: BTreeMap<u64, BlobGuid>,
orphans: Vec<(BlobGuid, u64)>,
}
pub(crate) struct WriteThroughEntry {
pub(crate) guid: BlobGuid,
pub(crate) bytes: AlignedBlobBuf,
pub(crate) expected_seq: u64,
pub(crate) content_version: Option<u64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum WriteThroughStatus {
Written,
Stale,
}
pub(crate) struct WriteThroughBatchReport {
pub(crate) statuses: Vec<WriteThroughStatus>,
}
#[derive(Clone, Copy)]
pub(crate) struct DirtySnapshotEntry {
pub(crate) guid: BlobGuid,
pub(crate) expected_seq: u64,
pub(crate) content_version: u64,
}
#[derive(Clone, Copy)]
enum PinAccess {
Point,
Scan,
Silent,
}
const ROUTING_CACHE_BUDGET_BYTES: usize = 32 * 1024 * 1024;
pub struct BufferManager {
store: Arc<dyn BlobStore>,
alloc_uninit: Arc<dyn Fn() -> AlignedBlobBuf + Send + Sync>,
capacity: usize,
routing_cache: RoutingCache,
cache: DashMap<BlobGuid, Arc<CachedBlob>, GuidBuildHasher>,
admission: TinyLFU,
route_resident: RouteResidency,
mutation: [Mutex<MutationState>; BOOKKEEPING_SHARDS],
delete_fence_total: AtomicUsize,
compact_candidate_cursor: AtomicUsize,
merge_candidate_cursor: AtomicUsize,
compact_candidate_total: AtomicUsize,
merge_candidate_total: AtomicUsize,
clock: AtomicU64,
telemetry: Telemetry,
current_epoch: AtomicU64,
fork_barrier: AtomicU64,
snapshots: Mutex<SnapshotState>,
}
impl BufferManager {
#[must_use]
pub(crate) fn current_epoch(&self) -> u64 {
self.current_epoch.load(Ordering::Acquire)
}
pub(crate) fn set_current_epoch(&self, epoch: u64) {
self.current_epoch.store(epoch.max(1), Ordering::Release);
}
#[must_use]
pub(crate) fn fork_barrier(&self) -> u64 {
self.fork_barrier.load(Ordering::Acquire)
}
pub(crate) fn fork_frame(
&self,
src_bytes: &[u8],
new_guid: BlobGuid,
seq: u64,
) -> Result<Arc<CachedBlob>> {
let mut buf = self.alloc_blob_buf_zeroed();
buf.as_mut_slice().copy_from_slice(src_bytes);
crate::layout::set_frame_blob_guid(buf.as_mut_slice(), new_guid);
self.install_new_blob(new_guid, buf, seq);
self.pin(new_guid)
}
pub(crate) fn install_snapshot_root(
&self,
new_guid: BlobGuid,
src: &CachedBlob,
seq: u64,
) -> Result<Arc<CachedBlob>> {
let guard = src.read();
self.fork_frame(guard.as_slice(), new_guid, seq)
}
pub(crate) fn register_snapshot(&self, root_guid: BlobGuid) -> u64 {
let mut snaps = self.snapshots.lock().expect("snapshot registry poisoned");
let epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel);
snaps.live.insert(epoch, root_guid);
self.fork_barrier.store(epoch, Ordering::Release);
epoch
}
pub(crate) fn record_orphan(&self, guid: BlobGuid, created_epoch: u64) {
self.snapshots
.lock()
.expect("snapshot registry poisoned")
.orphans
.push((guid, created_epoch));
}
pub(crate) fn retire_snapshot(&self, epoch: u64) {
let (root, to_free) = {
let mut snaps = self.snapshots.lock().expect("snapshot registry poisoned");
let root = snaps.live.remove(&epoch);
let barrier = snaps.live.keys().next_back().copied().unwrap_or(0);
self.fork_barrier.store(barrier, Ordering::Release);
let mut to_free = Vec::new();
snaps.orphans.retain(|&(guid, created_epoch)| {
let still_referenced = created_epoch <= barrier;
if !still_referenced {
to_free.push(guid);
}
still_referenced
});
(root, to_free)
};
if let Some(root) = root {
self.reclaim_blob(root);
}
for guid in to_free {
self.reclaim_blob(guid);
}
}
fn reclaim_blob(&self, guid: BlobGuid) {
{
let state = self.mutation_shard(guid).lock().unwrap();
if state.is_protected_or_pending(&guid) {
return;
}
}
if let Some((_, entry)) = self.cache.remove_if(&guid, |_, entry| {
if Arc::strong_count(entry) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.is_protected_or_pending(&guid)
}) {
entry.clear_dirty_hint();
} else if self.cache.contains_key(&guid) {
return;
}
self.route_resident.remove(guid);
let mut state = self.mutation_shard(guid).lock().unwrap();
state.remove_unclaimed_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
let _ = self.store.delete_blob(guid);
}
pub(crate) fn snapshot_roots(&self) -> Vec<BlobGuid> {
self.snapshots
.lock()
.expect("snapshot registry poisoned")
.live
.values()
.copied()
.collect()
}
pub(crate) fn gc_sweep_unreachable(&self, reachable: &HashSet<BlobGuid>) -> Result<usize> {
let mut freed = 0;
for guid in self.list_blobs()? {
if !reachable.contains(&guid) {
self.reclaim_blob(guid);
freed += 1;
}
}
Ok(freed)
}
}
impl BufferManager {
#[must_use]
pub fn new(store: Arc<dyn BlobStore>, capacity: usize) -> Self {
Self::new_with_uninit_allocator(store, capacity, || {
unsafe { AlignedBlobBuf::uninit() }
})
}
#[must_use]
pub(crate) fn new_with_uninit_allocator<F>(
store: Arc<dyn BlobStore>,
capacity: usize,
alloc_uninit: F,
) -> Self
where
F: Fn() -> AlignedBlobBuf + Send + Sync + 'static,
{
let capacity = capacity.max(1);
Self {
store,
alloc_uninit: Arc::new(alloc_uninit),
capacity,
routing_cache: RoutingCache::new(ROUTING_CACHE_BUDGET_BYTES),
cache: DashMap::with_hasher(GuidBuildHasher),
admission: TinyLFU::new(),
route_resident: RouteResidency::new(capacity),
mutation: std::array::from_fn(|_| Mutex::new(MutationState::default())),
delete_fence_total: AtomicUsize::new(0),
compact_candidate_cursor: AtomicUsize::new(0),
merge_candidate_cursor: AtomicUsize::new(0),
compact_candidate_total: AtomicUsize::new(0),
merge_candidate_total: AtomicUsize::new(0),
clock: AtomicU64::new(1),
telemetry: Telemetry::default(),
current_epoch: AtomicU64::new(1),
fork_barrier: AtomicU64::new(0),
snapshots: Mutex::new(SnapshotState::default()),
}
}
fn alloc_blob_buf_uninit(&self) -> AlignedBlobBuf {
(self.alloc_uninit)()
}
pub(crate) fn clock_tick(&self) -> u64 {
self.clock.load(Ordering::Relaxed)
}
pub(crate) fn cache_excess(&self) -> usize {
self.cache.len().saturating_sub(self.capacity)
}
pub(crate) fn route_resident_count(&self) -> usize {
self.route_resident.len()
}
pub(crate) fn route_resident_demotions(&self) -> u64 {
self.telemetry.route_resident_demotions()
}
pub(crate) fn cache_evictions(&self) -> u64 {
self.telemetry.cache_evictions()
}
pub(crate) fn eviction_skips_protected(&self) -> u64 {
self.telemetry.eviction_skips_protected()
}
pub(crate) fn eviction_skips_route_resident(&self) -> u64 {
self.telemetry.eviction_skips_route_resident()
}
pub(crate) fn admission_protects(&self) -> u64 {
self.telemetry.admission_protects()
}
pub(crate) fn mark_route_resident(&self, guid: BlobGuid) {
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
for _ in 0..self.route_resident.mark(guid, tick) {
self.telemetry.note_route_resident_demotion();
}
}
fn is_route_resident(&self, guid: BlobGuid) -> bool {
self.route_resident.contains(guid)
}
pub(crate) fn snapshot_entries(&self) -> Vec<(BlobGuid, Arc<CachedBlob>)> {
self.cache
.iter()
.map(|kv| (*kv.key(), Arc::clone(kv.value())))
.collect()
}
fn decrement_candidate_totals(&self, removed: (bool, bool)) {
if removed.0 {
self.compact_candidate_total.fetch_sub(1, Ordering::Relaxed);
}
if removed.1 {
self.merge_candidate_total.fetch_sub(1, Ordering::Relaxed);
}
}
pub(crate) fn try_evict_cold(&self, guid: BlobGuid) -> bool {
if self.is_route_resident(guid) {
self.telemetry.note_eviction_skip_route_resident();
return false;
}
{
let state = self.mutation_shard(guid).lock().unwrap();
if state.is_protected_or_pending(&guid) {
self.telemetry.note_eviction_skip_protected();
return false;
}
}
let removed = self
.cache
.remove_if(&guid, |_, entry| {
if self.is_route_resident(guid) {
self.telemetry.note_eviction_skip_route_resident();
return false;
}
if Arc::strong_count(entry) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
let removable = !state.is_protected_or_pending(&guid);
if !removable {
self.telemetry.note_eviction_skip_protected();
}
removable
})
.is_some();
if removed {
self.telemetry.note_cache_eviction();
}
removed
}
#[cfg(test)]
#[must_use]
pub fn cached_count(&self) -> usize {
self.cache.len()
}
#[must_use]
pub fn cache_hits(&self) -> u64 {
self.telemetry.cache_hits()
}
#[must_use]
pub fn cache_misses(&self) -> u64 {
self.telemetry.cache_misses()
}
#[must_use]
pub fn full_blob_reads(&self) -> u64 {
self.telemetry.full_blob_reads()
}
#[must_use]
pub fn full_blob_read_bytes(&self) -> u64 {
self.full_blob_reads() * PAGE_SIZE as u64
}
#[must_use]
pub fn point_full_blob_reads(&self) -> u64 {
self.telemetry.point_full_blob_reads()
}
#[must_use]
pub fn scan_full_blob_reads(&self) -> u64 {
self.telemetry.scan_full_blob_reads()
}
#[must_use]
pub fn silent_full_blob_reads(&self) -> u64 {
self.telemetry.silent_full_blob_reads()
}
#[must_use]
pub fn optimistic_restarts(&self) -> u64 {
self.telemetry.optimistic_restarts()
}
pub(crate) fn note_optimistic_restart(&self) {
self.telemetry.note_optimistic_restart();
}
#[must_use]
pub fn range_restarts(&self) -> u64 {
self.telemetry.range_restarts()
}
pub(crate) fn note_range_restart(&self) {
self.telemetry.note_range_restart();
}
#[must_use]
pub fn walker_ops(&self) -> u64 {
self.telemetry.walker_ops()
}
#[must_use]
pub fn walker_blob_hops(&self) -> u64 {
self.telemetry.walker_blob_hops()
}
#[must_use]
pub fn max_blob_hops(&self) -> u64 {
self.telemetry.max_blob_hops()
}
#[must_use]
pub fn max_cross_blob_depth(&self) -> u64 {
self.telemetry.max_cross_blob_depth()
}
#[must_use]
pub fn spillover_count(&self) -> u64 {
self.telemetry.spillover_count()
}
#[must_use]
pub fn merge_count(&self) -> u64 {
self.telemetry.merge_count()
}
pub(crate) fn note_walker_blob_hops(&self, hops: u64, max_cross_blob_depth: usize) {
self.telemetry
.note_walker_blob_hops(hops, max_cross_blob_depth);
}
pub(crate) fn note_spillover(&self) {
self.telemetry.note_spillover();
}
pub(crate) fn note_merges(&self, merged: u64) {
self.telemetry.note_merges(merged);
}
fn get_cached_with_access(&self, guid: BlobGuid, access: PinAccess) -> Option<Arc<CachedBlob>> {
let Some(entry) = self.cache.get(&guid) else {
if !matches!(access, PinAccess::Silent) {
self.telemetry.note_cache_miss();
}
if matches!(access, PinAccess::Point) {
self.admission.record(guid);
}
return None;
};
let arc = Arc::clone(entry.value());
drop(entry);
match access {
PinAccess::Point => {
self.admission.record(guid);
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
arc.last_touched.store(tick, Ordering::Relaxed);
self.telemetry.note_cache_hit();
}
PinAccess::Scan => {
self.telemetry.note_cache_hit();
}
PinAccess::Silent => {}
}
Some(arc)
}
fn mutation_shard(&self, guid: BlobGuid) -> &Mutex<MutationState> {
&self.mutation[bookkeeping_shard_idx(&guid)]
}
fn is_pending_delete(&self, guid: BlobGuid) -> bool {
if self.delete_fence_total.load(Ordering::Acquire) == 0 {
return false;
}
self.mutation_shard(guid)
.lock()
.unwrap()
.has_delete_fence(&guid)
}
fn pending_delete_not_found(guid: BlobGuid) -> Error {
Error::BlobStoreIo(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("blob {:02x?} is pending delete", &guid[..4]),
))
}
fn insert_into_cache(&self, guid: BlobGuid, contents: &AlignedBlobBuf) {
self.insert_owned_into_cache(guid, contents.clone(), PinAccess::Point);
}
fn insert_owned_into_cache(
&self,
guid: BlobGuid,
contents: AlignedBlobBuf,
access: PinAccess,
) -> Arc<CachedBlob> {
let hot_tick =
matches!(access, PinAccess::Point).then(|| self.clock.fetch_add(1, Ordering::Relaxed));
let inserted = self.cache.entry(guid).or_insert_with(|| {
let entry = Arc::new(CachedBlob::new(contents));
entry
.last_touched
.store(hot_tick.unwrap_or(0), Ordering::Relaxed);
entry
});
let entry = Arc::clone(inserted.value());
if let Some(tick) = hot_tick {
entry.last_touched.store(tick, Ordering::Relaxed);
}
drop(inserted);
const RETRY_BUDGET: u32 = 8;
let mut retries_left = RETRY_BUDGET;
let mut entry_spins = self.cache.len();
while self.cache.len() > self.capacity {
let evicted = match access {
PinAccess::Point => self.try_evict_for_point_insert(guid),
PinAccess::Scan | PinAccess::Silent => self.try_evict_scan_cold(),
};
if evicted {
entry_spins = self.cache.len();
continue;
}
if retries_left == 0 || entry_spins == 0 {
break;
}
std::thread::yield_now();
retries_left -= 1;
entry_spins = entry_spins.saturating_sub(1);
}
entry
}
fn try_evict_for_point_insert(&self, candidate: BlobGuid) -> bool {
self.try_evict_until(
u64::MAX,
Some((candidate, self.admission.estimate(candidate))),
)
}
fn try_evict_scan_cold(&self) -> bool {
self.try_evict_until(0, None)
}
fn try_evict_until(&self, max_last_touched: u64, candidate: Option<(BlobGuid, u8)>) -> bool {
let protected_snap: std::collections::HashSet<BlobGuid> = {
let mut out = std::collections::HashSet::new();
for shard in &self.mutation {
let state = shard.lock().unwrap();
out.extend(state.dirty.keys().copied());
out.extend(state.flushing.keys().copied());
out.extend(state.pending_deletes.keys().copied());
}
out
};
let mut victim: Option<(BlobGuid, u8, u64)> = None;
for kv in &self.cache {
if Arc::strong_count(kv.value()) > 1 {
continue;
}
let guid = *kv.key();
if protected_snap.contains(&guid) {
self.telemetry.note_eviction_skip_protected();
continue;
}
if self.is_route_resident(guid) {
self.telemetry.note_eviction_skip_route_resident();
continue;
}
let tick = kv.value().last_touched.load(Ordering::Relaxed);
if tick > max_last_touched {
continue;
}
let freq = if candidate.is_some() {
self.admission.estimate(guid)
} else {
0
};
match victim {
None => victim = Some((guid, freq, tick)),
Some((_, vfreq, vtick)) if (freq, tick) < (vfreq, vtick) => {
victim = Some((guid, freq, tick));
}
_ => {}
}
}
if let (Some((candidate_guid, candidate_freq)), Some((victim_guid, victim_freq, _))) =
(candidate, victim)
{
if victim_guid != candidate_guid && victim_freq > candidate_freq {
self.telemetry.note_admission_protect();
return false;
}
}
if let Some((guid, _, _)) = victim {
let removed = self
.cache
.remove_if(&guid, |_, e| {
if self.is_route_resident(guid) {
self.telemetry.note_eviction_skip_route_resident();
return false;
}
if Arc::strong_count(e) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
let removable = !state.is_protected_or_pending(&guid);
if !removable {
self.telemetry.note_eviction_skip_protected();
}
removable
})
.is_some();
if removed {
self.telemetry.note_cache_eviction();
}
return removed;
}
false
}
fn evict_from_cache(&self, guid: BlobGuid) -> bool {
{
let state = self.mutation_shard(guid).lock().unwrap();
if state.checkpoint_owned_or_pending(&guid) {
return false;
}
}
if let Some((_, entry)) = self.cache.remove_if(&guid, |_, entry| {
if Arc::strong_count(entry) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.checkpoint_owned_or_pending(&guid)
}) {
entry.clear_dirty_hint();
} else if self.cache.contains_key(&guid) {
return false;
}
self.route_resident.remove(guid);
let mut state = self.mutation_shard(guid).lock().unwrap();
state.remove_unclaimed_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
true
}
pub fn pin(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
self.pin_with_access(guid, PinAccess::Point)
}
pub(crate) fn pin_scan(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
self.pin_with_access(guid, PinAccess::Scan)
}
pub(crate) fn pin_scan_many(&self, guids: &[BlobGuid]) -> Vec<Option<Arc<CachedBlob>>> {
let mut out: Vec<Option<Arc<CachedBlob>>> = Vec::with_capacity(guids.len());
let mut miss_guids: Vec<BlobGuid> = Vec::new();
let mut miss_slots: Vec<usize> = Vec::new();
for &guid in guids {
if self.is_pending_delete(guid) {
out.push(None);
continue;
}
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Scan) {
out.push(Some(entry));
continue;
}
out.push(None);
miss_slots.push(out.len() - 1);
miss_guids.push(guid);
}
if miss_guids.is_empty() {
return out;
}
let mut bufs: Vec<AlignedBlobBuf> = (0..miss_guids.len())
.map(|_| self.alloc_blob_buf_uninit())
.collect();
let results = self.store.read_blobs(&miss_guids, &mut bufs);
for (i, (buf, res)) in bufs.into_iter().zip(results).enumerate() {
if res.is_err() {
continue;
}
self.note_full_blob_read(PinAccess::Scan);
let guid = miss_guids[i];
if self.is_pending_delete(guid) {
continue;
}
out[miss_slots[i]] = Some(self.insert_owned_into_cache(guid, buf, PinAccess::Scan));
}
out
}
pub fn pin_silent(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
self.pin_with_access(guid, PinAccess::Silent)
}
fn pin_with_access(&self, guid: BlobGuid, access: PinAccess) -> Result<Arc<CachedBlob>> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if let Some(entry) = self.get_cached_with_access(guid, access) {
return Ok(entry);
}
let mut scratch = self.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut scratch)?;
self.note_full_blob_read(access);
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
Ok(self.insert_owned_into_cache(guid, scratch, access))
}
pub(crate) fn cold_read_eligible(&self, guid: BlobGuid) -> bool {
if self.is_pending_delete(guid) || self.cache.contains_key(&guid) {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.is_protected_or_pending(&guid)
}
pub(crate) fn read_blob_range(
&self,
guid: BlobGuid,
byte_offset: u64,
dst: &mut [u8],
) -> Result<()> {
self.store.read_blob_range(guid, byte_offset, dst)
}
pub(crate) fn routing_region_cached(
&self,
guid: BlobGuid,
compact_times: u32,
dst: &mut [u8],
) -> bool {
self.routing_cache.fill(guid, compact_times, dst)
}
pub(crate) fn routing_region_store(&self, guid: BlobGuid, compact_times: u32, region: &[u8]) {
self.routing_cache.put(guid, compact_times, region);
}
fn note_full_blob_read(&self, access: PinAccess) {
match access {
PinAccess::Point => self.telemetry.note_point_full_blob_read(),
PinAccess::Scan => self.telemetry.note_scan_full_blob_read(),
PinAccess::Silent => self.telemetry.note_silent_full_blob_read(),
}
}
pub fn mark_dirty(&self, guid: BlobGuid, seq: u64) {
let cached = self.get_cached_with_access(guid, PinAccess::Silent);
self.mark_dirty_with_hint(guid, seq, cached.as_deref());
}
pub(crate) fn mark_dirty_cached(&self, guid: BlobGuid, seq: u64, entry: &CachedBlob) {
self.mark_dirty_with_hint(guid, seq, Some(entry));
}
fn mark_dirty_with_hint(&self, guid: BlobGuid, seq: u64, cached: Option<&CachedBlob>) {
let Some(cached) = cached else {
return;
};
let hint_covers_seq = !cached.dirty_hint_needs_map_publish(seq);
let mut state = self.mutation_shard(guid).lock().unwrap();
if state.has_delete_fence(&guid) {
cached.clear_dirty_hint();
return;
}
if hint_covers_seq && matches!(state.dirty.get(&guid), Some(cur) if *cur <= seq) {
return;
}
if hint_covers_seq {
cached.clear_dirty_hint();
let _ = cached.dirty_hint_needs_map_publish(seq);
}
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
}
#[must_use]
pub fn snapshot_dirty(&self) -> HashMap<BlobGuid, u64> {
let mut out = HashMap::new();
for shard in &self.mutation {
let mut state = shard.lock().unwrap();
for (guid, seq) in &mut state.dirty {
if let Some(hinted_seq) = self
.get_cached_with_access(*guid, PinAccess::Silent)
.and_then(|entry| entry.take_dirty_hint())
{
*seq = (*seq).min(hinted_seq);
}
}
let snap = std::mem::take(&mut state.dirty);
for (guid, seq) in snap {
if state.has_delete_fence(&guid) {
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) {
entry.clear_dirty_hint();
}
continue;
}
state.add_flushing(guid);
out.insert(guid, seq);
}
}
out
}
pub(crate) fn snapshot_dirty_versions(
&self,
snap: &HashMap<BlobGuid, u64>,
) -> Result<Vec<DirtySnapshotEntry>> {
let mut out = Vec::with_capacity(snap.len());
for (&guid, &seq) in snap {
let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) else {
return Err(Error::Internal(
"snapshot_dirty_versions: dirty entry lost cache image",
));
};
out.push(DirtySnapshotEntry {
guid,
expected_seq: seq,
content_version: entry.content_version(),
});
}
Ok(out)
}
pub fn restore_dirty(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
for (guid, t) in entries {
let cached = self.get_cached_with_access(guid, PinAccess::Silent);
if let Some(entry) = &cached {
let _ = entry.dirty_hint_needs_map_publish(t);
}
let mut state = self.mutation_shard(guid).lock().unwrap();
if state.has_delete_fence(&guid) {
if let Some(entry) = cached {
entry.clear_dirty_hint();
}
state.remove_one_flushing(&guid);
continue;
}
state.remove_one_flushing(&guid);
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(t))
.or_insert(t);
}
}
#[must_use]
pub fn dirty_count(&self) -> usize {
self.mutation
.iter()
.map(|shard| shard.lock().unwrap().dirty.len())
.sum()
}
#[must_use]
pub(crate) fn flushing_count(&self) -> usize {
self.mutation
.iter()
.map(|shard| shard.lock().unwrap().flushing.values().sum::<usize>())
.sum()
}
pub fn mark_for_delete(&self, guid: BlobGuid, seq: u64) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if let Some(seq_ref) = state.deleting.get_mut(&guid) {
*seq_ref = (*seq_ref).min(seq);
state.remove_unclaimed_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.route_resident.remove(guid);
self.decrement_candidate_totals(removed);
return;
}
match state.pending_deletes.entry(guid) {
Entry::Occupied(mut entry) => {
let cur = entry.get_mut();
*cur = (*cur).min(seq);
}
Entry::Vacant(entry) => {
entry.insert(seq);
self.delete_fence_total.fetch_add(1, Ordering::AcqRel);
}
}
let keep_cached_for_flushing = state.flushing.contains_key(&guid);
state.remove_unclaimed_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.route_resident.remove(guid);
self.decrement_candidate_totals(removed);
if keep_cached_for_flushing {
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) {
entry.clear_dirty_hint();
}
} else if let Some((_, entry)) = self
.cache
.remove_if(&guid, |_, entry| Arc::strong_count(entry) == 1)
{
entry.clear_dirty_hint();
} else if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) {
entry.clear_dirty_hint();
}
}
#[must_use]
pub fn snapshot_pending_deletes(&self) -> HashMap<BlobGuid, u64> {
let mut out = HashMap::new();
for shard in &self.mutation {
let mut state = shard.lock().unwrap();
let pending = std::mem::take(&mut state.pending_deletes);
for (guid, seq) in &pending {
state
.deleting
.entry(*guid)
.and_modify(|cur| *cur = (*cur).min(*seq))
.or_insert(*seq);
}
out.extend(pending);
}
out
}
pub fn restore_pending_deletes(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
for (g, t) in entries {
let mut state = self.mutation_shard(g).lock().unwrap();
let mut seq = t;
let had_fence = state.has_delete_fence(&g);
if let Some(claimed) = state.deleting.remove(&g) {
seq = seq.min(claimed);
}
match state.pending_deletes.entry(g) {
Entry::Occupied(mut entry) => {
let cur = entry.get_mut();
*cur = (*cur).min(seq);
}
Entry::Vacant(entry) => {
entry.insert(seq);
if !had_fence {
self.delete_fence_total.fetch_add(1, Ordering::AcqRel);
}
}
}
}
}
#[must_use]
pub fn pending_delete_count(&self) -> usize {
self.delete_fence_total.load(Ordering::Acquire)
}
pub(crate) fn note_compaction_candidate(&self, guid: BlobGuid) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if !state.has_delete_fence(&guid) && state.compact_candidates.insert(guid) {
self.compact_candidate_total.fetch_add(1, Ordering::Relaxed);
}
}
pub(crate) fn note_merge_candidate(&self, guid: BlobGuid) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if !state.has_delete_fence(&guid) && state.merge_candidates.insert(guid) {
self.merge_candidate_total.fetch_add(1, Ordering::Relaxed);
}
}
#[must_use]
pub(crate) fn pop_compaction_candidates(&self, limit: usize) -> Vec<BlobGuid> {
pop_candidate_batch(
&self.mutation,
&self.compact_candidate_cursor,
&self.compact_candidate_total,
CandidateKind::Compact,
limit,
)
}
#[must_use]
pub(crate) fn pop_merge_candidates(&self, limit: usize) -> Vec<BlobGuid> {
pop_candidate_batch(
&self.mutation,
&self.merge_candidate_cursor,
&self.merge_candidate_total,
CandidateKind::Merge,
limit,
)
}
#[must_use]
pub(crate) fn compaction_candidate_count(&self) -> usize {
self.compact_candidate_total.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn merge_candidate_count(&self) -> usize {
self.merge_candidate_total.load(Ordering::Relaxed)
}
pub(crate) fn execute_pending_delete(&self, guid: BlobGuid) -> Result<bool> {
{
let state = self.mutation_shard(guid).lock().unwrap();
if state.is_protected(&guid) {
return Ok(false);
}
}
if let Some((_, entry)) = self.cache.remove_if(&guid, |_, entry| {
if Arc::strong_count(entry) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.is_protected(&guid)
}) {
entry.clear_dirty_hint();
} else if self.cache.contains_key(&guid) {
return Ok(false);
}
self.store.delete_blob(guid)?;
self.route_resident.remove(guid);
self.finish_pending_delete(guid);
Ok(true)
}
fn finish_pending_delete(&self, guid: BlobGuid) {
let mut state = self.mutation_shard(guid).lock().unwrap();
let had_claim = state.deleting.remove(&guid).is_some();
if had_claim && !state.pending_deletes.contains_key(&guid) {
self.delete_fence_total.fetch_sub(1, Ordering::AcqRel);
}
}
pub(crate) fn store_has_blob(&self, guid: BlobGuid) -> Result<bool> {
self.store.has_blob(guid)
}
pub(crate) fn has_unflushed_blob(&self, guid: BlobGuid) -> bool {
let state = self.mutation_shard(guid).lock().unwrap();
state.dirty.contains_key(&guid) || state.flushing.contains_key(&guid)
}
pub(crate) fn snapshot_bytes(&self, guid: BlobGuid) -> Option<AlignedBlobBuf> {
let entry = self.get_cached_with_access(guid, PinAccess::Silent)?;
let buf = entry.read();
let mut out = self.alloc_blob_buf_uninit();
out.as_mut_slice().copy_from_slice(buf.as_slice());
Some(out)
}
pub(crate) fn snapshot_bytes_if_version(
&self,
guid: BlobGuid,
content_version: u64,
) -> Result<Option<AlignedBlobBuf>> {
let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) else {
return Err(Error::Internal(
"snapshot_bytes_if_version: dirty entry lost cache image",
));
};
let buf = entry.read();
if entry.content_version() != content_version {
return Ok(None);
}
let mut out = self.alloc_blob_buf_uninit();
out.as_mut_slice().copy_from_slice(buf.as_slice());
Ok(Some(out))
}
#[must_use]
pub(crate) fn alloc_blob_buf_zeroed(&self) -> AlignedBlobBuf {
self.store.alloc_blob_buf_zeroed()
}
pub(crate) fn write_through_batch(
&self,
entries: &[WriteThroughEntry],
) -> Result<WriteThroughBatchReport> {
if entries.is_empty() {
return Ok(WriteThroughBatchReport {
statuses: Vec::new(),
});
}
let mut statuses = vec![WriteThroughStatus::Stale; entries.len()];
let write_indices: Vec<_> = entries
.iter()
.enumerate()
.filter_map(|(idx, entry)| match self.write_snapshot_is_current(entry) {
Ok(true) => Some(Ok(idx)),
Ok(false) => None,
Err(e) => Some(Err(e)),
})
.collect::<Result<Vec<_>>>()?;
let writes: Vec<_> = write_indices
.iter()
.map(|idx| (entries[*idx].guid, &entries[*idx].bytes))
.collect();
if !writes.is_empty() {
self.store.write_blobs_with_data_sync(&writes)?;
}
for idx in write_indices {
let entry = &entries[idx];
self.retire_write_through(entry.guid, entry.expected_seq);
statuses[idx] = WriteThroughStatus::Written;
}
Ok(WriteThroughBatchReport { statuses })
}
fn write_snapshot_is_current(&self, entry: &WriteThroughEntry) -> Result<bool> {
let Some(version) = entry.content_version else {
return Ok(true);
};
let Some(cached) = self.get_cached_with_access(entry.guid, PinAccess::Silent) else {
return Err(Error::Internal(
"write_through_batch: flushing entry lost cache image",
));
};
Ok(cached.validate_content_version(version))
}
fn retire_write_through(&self, guid: BlobGuid, expected_seq: u64) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if expected_seq != STRUCTURAL_SEQ {
if let std::collections::hash_map::Entry::Occupied(e) = state.dirty.entry(guid) {
if *e.get() <= expected_seq {
e.remove();
}
}
}
state.remove_one_flushing(&guid);
let still_dirty = state.dirty.contains_key(&guid) || state.flushing.contains_key(&guid);
drop(state);
if !still_dirty {
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) {
entry.clear_dirty_hint();
}
}
}
pub(crate) fn flush_inner(&self) -> Result<()> {
self.store.flush()
}
pub(crate) fn install_new_blob(&self, guid: BlobGuid, mut bytes: AlignedBlobBuf, seq: u64) {
crate::layout::set_frame_created_epoch(
bytes.as_mut_slice(),
self.current_epoch.load(Ordering::Acquire),
);
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
let entry = Arc::new(CachedBlob::new(bytes));
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
let _ = entry.dirty_hint_needs_map_publish(seq);
let mut state = self.mutation_shard(guid).lock().unwrap();
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
drop(entry);
}
}
impl BlobStore for BufferManager {
fn read_blob(&self, guid: BlobGuid, dst: &mut AlignedBlobBuf) -> Result<()> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Point) {
let buf = entry.read();
dst.as_mut_slice().copy_from_slice(buf.as_slice());
return Ok(());
}
self.store.read_blob(guid, dst)?;
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
self.insert_into_cache(guid, dst);
Ok(())
}
fn write_blob(&self, guid: BlobGuid, src: &AlignedBlobBuf) -> Result<()> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if let Some(entry) = self.get_cached_with_access(guid, PinAccess::Silent) {
let mut buf = entry.write();
buf.as_mut_slice().copy_from_slice(src.as_slice());
entry.clear_dirty_hint();
}
self.store.write_blob(guid, src)?;
let mut state = self.mutation_shard(guid).lock().unwrap();
state.remove_unclaimed_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
Ok(())
}
fn write_blobs(&self, writes: &[(BlobGuid, &AlignedBlobBuf)]) -> Result<()> {
for (guid, _) in writes {
if self.is_pending_delete(*guid) {
return Err(Self::pending_delete_not_found(*guid));
}
}
for (guid, src) in writes {
if let Some(entry) = self.get_cached_with_access(*guid, PinAccess::Silent) {
let mut buf = entry.write();
buf.as_mut_slice().copy_from_slice(src.as_slice());
entry.clear_dirty_hint();
}
}
self.store.write_blobs(writes)?;
for (guid, _) in writes {
let mut state = self.mutation_shard(*guid).lock().unwrap();
state.remove_unclaimed_dirty(guid);
let removed = state.remove_maintenance_candidates(guid);
drop(state);
self.decrement_candidate_totals(removed);
}
Ok(())
}
fn delete_blob(&self, guid: BlobGuid) -> Result<()> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if !self.evict_from_cache(guid) {
return Err(Error::Internal(
"delete_blob: protected cache image cannot be evicted",
));
}
self.store.delete_blob(guid)
}
fn list_blobs(&self) -> Result<Vec<BlobGuid>> {
self.store.list_blobs()
}
fn flush(&self) -> Result<()> {
self.store.flush()
}
fn needs_flush(&self) -> bool {
self.store.needs_flush()
}
fn has_blob(&self, guid: BlobGuid) -> Result<bool> {
if self.is_pending_delete(guid) {
return Ok(false);
}
if self.cache.contains_key(&guid) {
return Ok(true);
}
self.store.has_blob(guid)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::blob_store::MemoryBlobStore;
fn make_buf(byte_at_100: u8) -> AlignedBlobBuf {
let mut b = AlignedBlobBuf::zeroed();
b.as_mut_slice()[100] = byte_at_100;
b
}
#[test]
fn read_caches_after_first_load() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xAB; 16], &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
assert_eq!(bm.cached_count(), 0);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 7);
assert_eq!(bm.cached_count(), 1);
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
}
#[test]
fn pin_scan_many_returns_each_blob_in_order_and_none_for_missing() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..10u8 {
inner.write_blob([i; 16], &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 64);
let mut guids: Vec<BlobGuid> = (0..10u8).map(|i| [i; 16]).collect();
guids.insert(5, [0xFF; 16]);
let pins = bm.pin_scan_many(&guids);
assert_eq!(pins.len(), guids.len());
for (g, pin) in guids.iter().zip(&pins) {
if *g == [0xFF; 16] {
assert!(pin.is_none(), "missing guid must map to None");
} else {
let pin = pin.as_ref().expect("present guid must be pinned");
assert_eq!(pin.read().as_slice()[100], g[0]);
}
}
}
#[test]
fn pin_miss_is_not_counted_as_a_hit() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xCD; 16];
inner.write_blob(guid, &make_buf(9)).unwrap();
let bm = BufferManager::new(inner, 4);
let first = bm.pin(guid).unwrap();
assert_eq!(first.read().as_slice()[100], 9);
drop(first);
assert_eq!(bm.cache_misses(), 1);
assert_eq!(bm.cache_hits(), 0);
let second = bm.pin(guid).unwrap();
assert_eq!(second.read().as_slice()[100], 9);
assert_eq!(bm.cache_misses(), 1);
assert_eq!(bm.cache_hits(), 1);
assert_eq!(bm.full_blob_reads(), 1);
assert_eq!(bm.full_blob_read_bytes(), PAGE_SIZE as u64);
}
#[test]
fn full_blob_reads_are_classified_by_access_path() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..3u8 {
let mut guid = [0u8; 16];
guid[0] = i;
inner.write_blob(guid, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 4);
drop(bm.pin([0; 16]).unwrap());
let mut scan = [0u8; 16];
scan[0] = 1;
drop(bm.pin_scan(scan).unwrap());
let mut silent = [0u8; 16];
silent[0] = 2;
drop(bm.pin_silent(silent).unwrap());
assert_eq!(bm.full_blob_reads(), 3);
assert_eq!(bm.full_blob_read_bytes(), 3 * PAGE_SIZE as u64);
assert_eq!(bm.point_full_blob_reads(), 1);
assert_eq!(bm.scan_full_blob_reads(), 1);
assert_eq!(bm.silent_full_blob_reads(), 1);
assert_eq!(
bm.cache_misses(),
2,
"silent miss does not count as a public cache miss"
);
assert_eq!(bm.cache_hits(), 0);
drop(bm.pin([0; 16]).unwrap());
assert_eq!(
bm.full_blob_reads(),
3,
"cache hits must not count as store reads"
);
assert_eq!(bm.cache_hits(), 1);
}
#[test]
fn scan_misses_do_not_evict_hot_point_blob() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..5u8 {
let mut guid = [0u8; 16];
guid[0] = i;
inner.write_blob(guid, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 2);
let hot = [0u8; 16];
drop(bm.pin(hot).unwrap());
for i in 1..5u8 {
let mut guid = [0u8; 16];
guid[0] = i;
drop(bm.pin_scan(guid).unwrap());
}
assert_eq!(bm.cached_count(), 2);
assert!(
bm.cache.contains_key(&hot),
"scan-loaded blobs must stay colder than point-read blobs",
);
}
#[test]
fn scan_miss_may_overshoot_instead_of_evicting_only_hot_blob() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..2u8 {
let mut guid = [0u8; 16];
guid[0] = i;
inner.write_blob(guid, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 1);
let hot = [0u8; 16];
let mut scan = [0u8; 16];
scan[0] = 1;
drop(bm.pin(hot).unwrap());
drop(bm.pin_scan(scan).unwrap());
assert!(
bm.cache.contains_key(&hot),
"scan miss must not evict the only point-hot blob",
);
assert_eq!(
bm.cached_count(),
2,
"scan access may briefly exceed capacity to avoid hot-cache pollution",
);
}
#[test]
fn scan_hits_do_not_refresh_recency() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..3u8 {
let mut guid = [0u8; 16];
guid[0] = i;
inner.write_blob(guid, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 2);
let first = [0u8; 16];
let mut second = [0u8; 16];
second[0] = 1;
let mut third = [0u8; 16];
third[0] = 2;
drop(bm.pin(first).unwrap());
drop(bm.pin(second).unwrap());
drop(bm.pin_scan(first).unwrap());
drop(bm.pin(third).unwrap());
assert!(
!bm.cache.contains_key(&first),
"a scan hit must not make the oldest point blob look hot",
);
assert!(bm.cache.contains_key(&second));
assert!(bm.cache.contains_key(&third));
}
#[test]
fn frequency_aware_eviction_stays_at_capacity() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 4);
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
}
assert_eq!(
bm.cached_count(),
4,
"cache must shrink to capacity after over-fill",
);
let mut g_last = [0u8; 16];
g_last[0] = 9;
let mut g_first = [0u8; 16];
g_first[0] = 0;
assert!(bm.cache.contains_key(&g_last));
assert!(!bm.cache.contains_key(&g_first));
}
#[test]
fn tinylfu_keeps_frequent_point_blob_against_one_hit_stream() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..12u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 2);
let hot = [0u8; 16];
for _ in 0..8 {
drop(bm.pin(hot).unwrap());
}
for i in 1..12u8 {
let mut cold = [0u8; 16];
cold[0] = i;
drop(bm.pin(cold).unwrap());
assert!(
bm.cache.contains_key(&hot),
"frequent point blob should survive one-hit stream pressure",
);
assert!(
bm.cached_count() <= 2,
"unprotected one-hit blobs should be reclaimed immediately",
);
}
}
#[test]
fn route_resident_anchor_survives_inline_eviction() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..9u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 8);
let anchor = [0u8; 16];
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(anchor, &mut dst).unwrap();
bm.mark_route_resident(anchor);
for i in 1..9u8 {
let mut g = [0u8; 16];
g[0] = i;
bm.read_blob(g, &mut dst).unwrap();
}
assert_eq!(bm.cached_count(), 8);
assert!(bm.cache.contains_key(&anchor));
assert!(bm.is_route_resident(anchor));
let mut first_non_route = [0u8; 16];
first_non_route[0] = 1;
assert!(
!bm.cache.contains_key(&first_non_route),
"oldest non-route blob should be evicted first",
);
}
#[test]
fn route_resident_tier_demotes_old_anchors_at_budget() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
bm.mark_route_resident([1; 16]);
bm.mark_route_resident([2; 16]);
assert_eq!(bm.route_resident_count(), 1);
assert_eq!(bm.route_resident_demotions(), 1);
assert!(!bm.is_route_resident([1; 16]));
assert!(bm.is_route_resident([2; 16]));
}
#[test]
fn inline_eviction_skips_dirty_entries() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..3u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 2);
let g_a = {
let mut g = [0u8; 16];
g[0] = 0;
g
};
let g_b = {
let mut g = [0u8; 16];
g[0] = 1;
g
};
let g_c = {
let mut g = [0u8; 16];
g[0] = 2;
g
};
{
let _pin = bm.pin(g_a).unwrap();
}
bm.mark_dirty(g_a, 10);
assert_eq!(bm.dirty_count(), 1);
assert!(bm.cache.contains_key(&g_a));
{
let _pin = bm.pin(g_b).unwrap();
}
assert!(bm.cache.contains_key(&g_a));
assert!(bm.cache.contains_key(&g_b));
{
let _pin = bm.pin(g_c).unwrap();
}
assert!(
bm.cache.contains_key(&g_a),
"dirty entry A's cache image must survive inline eviction",
);
assert!(
bm.cache.contains_key(&g_c),
"newly-pinned C must be in cache",
);
assert!(
!bm.cache.contains_key(&g_b),
"B (clean, no pin) should have been evicted in A's stead",
);
assert_eq!(
bm.dirty_count(),
1,
"dirty bookkeeping must not be touched by eviction",
);
assert!(
bm.snapshot_bytes(g_a).is_some(),
"dirty entry's cache image must be snapshottable",
);
}
#[test]
fn maintenance_candidates_are_unique_and_fifo_budgeted() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let mut buckets = vec![Vec::<BlobGuid>::new(); BOOKKEEPING_SHARDS];
for i in 0..=u8::MAX {
let mut g = [0u8; 16];
g[0] = i;
buckets[bookkeeping_shard_idx(&g)].push(g);
}
let same_shard = buckets.into_iter().find(|b| b.len() >= 3).unwrap();
let a = same_shard[0];
let b = same_shard[1];
let c = same_shard[2];
bm.note_compaction_candidate(a);
bm.note_compaction_candidate(b);
bm.note_compaction_candidate(a);
bm.note_compaction_candidate(c);
assert_eq!(bm.compaction_candidate_count(), 3);
assert_eq!(bm.pop_compaction_candidates(2), vec![a, b]);
assert_eq!(bm.compaction_candidate_count(), 1);
bm.note_compaction_candidate(a);
assert_eq!(bm.pop_compaction_candidates(8), vec![c, a]);
assert_eq!(bm.compaction_candidate_count(), 0);
}
#[test]
fn maintenance_candidate_drain_rotates_across_shards() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let mut by_shard = [None::<BlobGuid>; BOOKKEEPING_SHARDS];
let mut counter = 0u32;
while by_shard.iter().any(Option::is_none) {
assert!(
counter < 100_000,
"test helper could not cover every bookkeeping shard"
);
let mut guid = [0u8; 16];
guid[0..4].copy_from_slice(&counter.to_le_bytes());
let shard = bookkeeping_shard_idx(&guid);
by_shard[shard].get_or_insert(guid);
counter += 1;
}
for guid in by_shard.iter().flatten() {
bm.note_compaction_candidate(*guid);
}
for expected_shard in 0..4 {
let batch = bm.pop_compaction_candidates(1);
assert_eq!(batch.len(), 1);
assert_eq!(bookkeeping_shard_idx(&batch[0]), expected_shard);
}
}
#[test]
fn write_through_propagates_to_inner_store() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner.clone(), 4);
bm.write_blob([0xCD; 16], &make_buf(0x42)).unwrap();
assert!(inner.has_blob([0xCD; 16]).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob([0xCD; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 0x42);
}
#[test]
fn write_through_updates_cache_if_present() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xEF; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 1);
bm.write_blob([0xEF; 16], &make_buf(99)).unwrap();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 99);
}
#[test]
fn delete_evicts_from_cache_and_inner() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x33; 16], &make_buf(5)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x33; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
bm.delete_blob([0x33; 16]).unwrap();
assert_eq!(bm.cached_count(), 0);
assert!(!inner.has_blob([0x33; 16]).unwrap());
assert!(!bm.has_blob([0x33; 16]).unwrap());
}
#[test]
fn pending_delete_hides_blob_until_checkpoint_delete_applies() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x44; 16], &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let _pin = bm.pin([0x44; 16]).unwrap();
assert!(bm.has_blob([0x44; 16]).unwrap());
bm.mark_dirty([0x44; 16], 10);
bm.mark_for_delete([0x44; 16], 11);
assert!(inner.has_blob([0x44; 16]).unwrap());
assert!(!bm.has_blob([0x44; 16]).unwrap());
assert!(
bm.pin([0x44; 16]).is_err(),
"pending-delete child must not be reloaded from store"
);
bm.mark_dirty([0x44; 16], 12);
let mut restore = HashMap::new();
restore.insert([0x44; 16], 13);
bm.restore_dirty(restore);
assert_eq!(bm.dirty_count(), 0);
assert_eq!(bm.pending_delete_count(), 1);
}
#[test]
fn pending_delete_count_tracks_snapshot_and_restore() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let guid = [0x55; 16];
bm.mark_for_delete(guid, 20);
bm.mark_for_delete(guid, 10);
assert_eq!(bm.pending_delete_count(), 1);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert_eq!(
bm.pending_delete_count(),
1,
"claimed deletes remain fenced while the I/O worker owns them",
);
bm.restore_pending_deletes(pending);
assert_eq!(bm.pending_delete_count(), 1);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert_eq!(bm.pending_delete_count(), 1);
}
#[test]
fn claimed_pending_delete_still_hides_blob_from_stale_pins() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x5A; 16];
inner.write_blob(guid, &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
bm.mark_for_delete(guid, 10);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert!(inner.has_blob(guid).unwrap());
assert!(!bm.has_blob(guid).unwrap());
assert!(
bm.pin(guid).is_err(),
"a claimed delete must keep stale walkers from reloading the blob",
);
let mut dst = AlignedBlobBuf::zeroed();
assert!(
bm.read_blob(guid, &mut dst).is_err(),
"BlobStore reads must obey the same delete fence as pin()",
);
bm.mark_dirty(guid, 11);
assert_eq!(bm.dirty_count(), 0);
assert!(bm.write_blob(guid, &make_buf(9)).is_err());
assert!(bm.delete_blob(guid).is_err());
assert!(bm.execute_pending_delete(guid).unwrap());
assert_eq!(bm.pending_delete_count(), 0);
assert!(!inner.has_blob(guid).unwrap());
}
#[test]
fn pending_delete_defers_until_existing_pin_drops() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x5C; 16];
inner.write_blob(guid, &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let pin = bm.pin(guid).unwrap();
bm.mark_for_delete(guid, 10);
let pending = bm.snapshot_pending_deletes();
assert!(
!bm.execute_pending_delete(guid).unwrap(),
"delete must wait while an old walker still holds a cached blob pin",
);
bm.restore_pending_deletes(pending);
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x66;
}
bm.mark_dirty_cached(guid, 11, pin.as_ref());
assert_eq!(
bm.dirty_count(),
0,
"existing pins must not publish orphan dirty state while delete-fenced",
);
drop(pin);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert!(bm.execute_pending_delete(guid).unwrap());
assert_eq!(bm.pending_delete_count(), 0);
assert!(!inner.has_blob(guid).unwrap());
}
#[test]
fn has_blob_fast_path_avoids_inner_when_cached() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x77; 16], &make_buf(11)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x77; 16], &mut dst).unwrap();
assert!(bm.has_blob([0x77; 16]).unwrap());
assert!(!bm.has_blob([0x88; 16]).unwrap());
}
#[test]
fn mark_dirty_keeps_lowest_seq() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x01; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, 50);
bm.mark_dirty(guid, 30);
bm.mark_dirty(guid, 99);
assert_eq!(bm.dirty_count(), 1);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 30);
}
#[test]
fn mark_dirty_without_cache_image_does_not_publish_orphan() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xAB; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
bm.mark_dirty(guid, 10);
assert!(
bm.snapshot_dirty().is_empty(),
"dirty map must not contain an entry without a cache image",
);
}
#[test]
fn cached_dirty_hint_resets_after_snapshot() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD1; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, 10);
bm.mark_dirty(guid, 20);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 10);
assert_eq!(bm.dirty_count(), 0);
bm.mark_dirty(guid, 30);
let next = bm.snapshot_dirty();
assert_eq!(
next[&guid], 30,
"mark_dirty after snapshot must publish a fresh dirty entry",
);
}
#[test]
fn stale_dirty_hint_cannot_skip_dirty_map_publish() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD3; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let pin = bm.pin(guid).unwrap();
assert!(pin.dirty_hint_needs_map_publish(10));
bm.mark_dirty(guid, 20);
let snap = bm.snapshot_dirty();
assert_eq!(
snap[&guid], 20,
"a stale hint without a dirty-map entry must not hide a fresh write",
);
}
#[test]
fn cached_dirty_hint_preserves_lower_restored_seq() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD2; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
let mut restored = HashMap::new();
restored.insert(guid, 40);
bm.restore_dirty(restored);
bm.mark_dirty(guid, 90);
let snap = bm.snapshot_dirty();
assert_eq!(
snap[&guid], 40,
"duplicate higher seq must be covered by restored low-watermark",
);
bm.restore_dirty(snap);
bm.mark_dirty(guid, 20);
let lowered = bm.snapshot_dirty();
assert_eq!(
lowered[&guid], 20,
"lower seq must still update the dirty low-watermark",
);
}
#[test]
fn snapshot_dirty_drains_atomically() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for guid in [[0x01; 16], [0x02; 16], [0x03; 16]] {
inner.write_blob(guid, &make_buf(1)).unwrap();
}
let bm = BufferManager::new(inner, 4);
let _p1 = bm.pin([0x01; 16]).unwrap();
let _p2 = bm.pin([0x02; 16]).unwrap();
bm.mark_dirty([0x01; 16], 10);
bm.mark_dirty([0x02; 16], 20);
let snap = bm.snapshot_dirty();
assert_eq!(snap.len(), 2);
assert_eq!(snap[&[0x01; 16]], 10);
assert_eq!(snap[&[0x02; 16]], 20);
assert_eq!(bm.dirty_count(), 0);
let _p3 = bm.pin([0x03; 16]).unwrap();
bm.mark_dirty([0x03; 16], 99);
assert_eq!(bm.dirty_count(), 1);
let next = bm.snapshot_dirty();
assert_eq!(next[&[0x03; 16]], 99);
}
#[test]
fn snapshot_dirty_drains_every_bookkeeping_shard() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(Arc::clone(&inner), BOOKKEEPING_SHARDS);
let mut guids: [Option<BlobGuid>; BOOKKEEPING_SHARDS] = [None; BOOKKEEPING_SHARDS];
for i in 0..20_000u64 {
let mut guid = [0u8; 16];
guid[0..8].copy_from_slice(&i.to_le_bytes());
guid[8..16].copy_from_slice(&i.wrapping_mul(0x9E37_79B9_7F4A_7C15).to_le_bytes());
let shard = bookkeeping_shard_idx(&guid);
guids[shard].get_or_insert(guid);
if guids.iter().all(Option::is_some) {
break;
}
}
assert!(
guids.iter().all(Option::is_some),
"test generator should hit every bookkeeping shard"
);
for (shard, guid) in guids.iter().enumerate() {
let guid = guid.expect("filled");
inner.write_blob(guid, &make_buf(1)).unwrap();
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, shard as u64 + 1);
}
let snap = bm.snapshot_dirty();
assert_eq!(snap.len(), BOOKKEEPING_SHARDS);
assert_eq!(bm.dirty_count(), 0);
for (shard, guid) in guids.iter().enumerate() {
assert_eq!(snap[&guid.expect("filled")], shard as u64 + 1);
}
}
#[test]
fn snapshot_dirty_protects_flushing_entry_from_eviction() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x55; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 1);
{
let pin = bm.pin(guid).unwrap();
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xAB;
}
bm.mark_dirty(guid, 42);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 42);
assert_eq!(
bm.dirty_count(),
0,
"snapshot drains the live dirty map for racing writers",
);
assert!(
!bm.try_evict_cold(guid),
"checkpoint-owned flushing entries must stay cached until write-through",
);
let bytes = bm
.snapshot_bytes(guid)
.expect("flushing protection must preserve cached bytes");
assert_eq!(bytes.as_slice()[123], 0xAB);
bm.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 42,
content_version: None,
}])
.unwrap();
assert!(
bm.try_evict_cold(guid),
"successful write-through releases flushing protection",
);
}
#[test]
fn cow_reclaim_does_not_drop_flushing_cache_image() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x56; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xAA;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 10);
drop(pin);
bm.reclaim_blob(guid);
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.expect("COW reclaim must not drop checkpoint-owned bytes");
assert_eq!(bytes.as_slice()[123], 0xAA);
assert!(inner.has_blob(guid).unwrap());
}
#[test]
fn cow_reclaim_does_not_drop_pinned_cache_image() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x56; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
bm.reclaim_blob(guid);
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xBB;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.expect("pinned dirty image must stay reachable through cache");
assert_eq!(bytes.as_slice()[123], 0xBB);
}
#[test]
fn snapshot_bytes_if_version_rejects_stale_blob_image() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x56; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 1);
let pin = bm.pin(guid).unwrap();
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let versioned = bm.snapshot_dirty_versions(&snap).unwrap();
assert_eq!(versioned.len(), 1);
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xEE;
}
assert!(
bm.snapshot_bytes_if_version(guid, versioned[0].content_version)
.unwrap()
.is_none(),
"checkpoint clone must reject bytes after a newer blob mutation"
);
let bytes = bm
.snapshot_bytes_if_version(guid, pin.content_version())
.unwrap()
.expect("current version should clone");
assert_eq!(bytes.as_slice()[123], 0xEE);
}
#[test]
fn write_through_rejects_stale_snapshot_bytes() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x57; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x11;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let versioned = bm.snapshot_dirty_versions(&snap).unwrap();
let stale_version = versioned[0].content_version;
let stale_bytes = bm
.snapshot_bytes_if_version(guid, stale_version)
.unwrap()
.expect("snapshot bytes should clone while version still matches");
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xEE;
}
bm.mark_dirty_cached(guid, 20, pin.as_ref());
let report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes: stale_bytes,
expected_seq: 10,
content_version: Some(stale_version),
}])
.unwrap();
assert_eq!(report.statuses, vec![WriteThroughStatus::Stale]);
assert_eq!(
bm.snapshot_dirty()[&guid],
20,
"newer writer entry must survive stale write-through retirement",
);
let mut stored = AlignedBlobBuf::zeroed();
inner.read_blob(guid, &mut stored).unwrap();
assert_eq!(
stored.as_slice()[123],
0,
"stale checkpoint bytes must not overwrite the store"
);
}
#[test]
fn overlapping_checkpoint_epochs_keep_cache_image_protected() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x58; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x11;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let first = bm.snapshot_dirty();
assert_eq!(bm.flushing_count(), 1);
let first_version = bm.snapshot_dirty_versions(&first).unwrap()[0].content_version;
let first_bytes = bm
.snapshot_bytes_if_version(guid, first_version)
.unwrap()
.unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x22;
}
bm.mark_dirty_cached(guid, 20, pin.as_ref());
let second = bm.snapshot_dirty();
assert_eq!(bm.flushing_count(), 2);
let second_version = bm.snapshot_dirty_versions(&second).unwrap()[0].content_version;
let second_bytes = bm
.snapshot_bytes_if_version(guid, second_version)
.unwrap()
.unwrap();
drop(pin);
let first_report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes: first_bytes,
expected_seq: 10,
content_version: Some(first_version),
}])
.unwrap();
assert_eq!(first_report.statuses, vec![WriteThroughStatus::Stale]);
bm.restore_dirty(first);
assert!(
!bm.try_evict_cold(guid),
"second in-flight epoch must keep the blob cached after first retire",
);
assert_eq!(bm.flushing_count(), 1);
let second_report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes: second_bytes,
expected_seq: 20,
content_version: Some(second_version),
}])
.unwrap();
assert_eq!(second_report.statuses, vec![WriteThroughStatus::Written]);
assert!(
bm.try_evict_cold(guid),
"last in-flight epoch can release eviction protection",
);
assert_eq!(bm.flushing_count(), 0);
}
#[test]
fn pending_delete_preserves_in_flight_checkpoint_image() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x59; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x33;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.unwrap();
drop(pin);
bm.mark_for_delete(guid, 20);
assert_eq!(
bm.flushing_count(),
1,
"a pending delete must not retire an in-flight checkpoint epoch",
);
assert!(
bm.cache.contains_key(&guid),
"a pending delete must keep the cache image needed by write-through validation",
);
let report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 10,
content_version: Some(version),
}])
.unwrap();
assert_eq!(report.statuses, vec![WriteThroughStatus::Written]);
assert_eq!(bm.flushing_count(), 0);
assert_eq!(bm.pending_delete_count(), 1);
assert!(
bm.pin(guid).is_err(),
"pending delete must still hide the blob"
);
let mut stored = AlignedBlobBuf::zeroed();
inner.read_blob(guid, &mut stored).unwrap();
assert_eq!(
stored.as_slice()[123],
0x33,
"checkpoint write-through must preserve the durable image until delete applies",
);
}
#[test]
fn execute_pending_delete_defers_while_blob_is_flushing() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x5B; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x44;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.unwrap();
drop(pin);
bm.mark_for_delete(guid, 20);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&20));
assert!(
!bm.execute_pending_delete(guid).unwrap(),
"delete must wait for the in-flight checkpoint image to retire",
);
assert!(inner.has_blob(guid).unwrap());
bm.restore_pending_deletes(pending);
assert_eq!(bm.pending_delete_count(), 1);
let report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 10,
content_version: Some(version),
}])
.unwrap();
assert_eq!(report.statuses, vec![WriteThroughStatus::Written]);
let pending = bm.snapshot_pending_deletes();
assert!(bm.execute_pending_delete(guid).unwrap());
assert!(!inner.has_blob(guid).unwrap());
assert_eq!(bm.pending_delete_count(), 0);
assert_eq!(pending.get(&guid), Some(&20));
}
#[test]
fn write_through_does_not_clear_in_flight_checkpoint_owner() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x5C; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x55;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.unwrap();
drop(pin);
bm.write_blob(guid, &make_buf(0x66)).unwrap();
assert_eq!(
bm.flushing_count(),
1,
"direct write-through must not retire another checkpoint epoch",
);
assert!(
bm.cache.contains_key(&guid),
"direct write-through must keep the image required by version validation",
);
let report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 10,
content_version: Some(version),
}])
.unwrap();
assert_eq!(report.statuses, vec![WriteThroughStatus::Stale]);
}
#[test]
fn delete_blob_rejects_in_flight_checkpoint_owner() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x5D; 16];
inner.write_blob(guid, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 1);
let pin = bm.pin(guid).unwrap();
{
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0x77;
}
bm.mark_dirty_cached(guid, 10, pin.as_ref());
let snap = bm.snapshot_dirty();
let version = bm.snapshot_dirty_versions(&snap).unwrap()[0].content_version;
let bytes = bm
.snapshot_bytes_if_version(guid, version)
.unwrap()
.unwrap();
drop(pin);
assert!(bm.delete_blob(guid).is_err());
assert_eq!(bm.flushing_count(), 1);
assert!(bm.cache.contains_key(&guid));
assert!(inner.has_blob(guid).unwrap());
let report = bm
.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 10,
content_version: Some(version),
}])
.unwrap();
assert_eq!(report.statuses, vec![WriteThroughStatus::Written]);
}
#[test]
fn restore_dirty_merges_keeping_min() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for guid in [[0x01; 16], [0x02; 16], [0x03; 16]] {
inner.write_blob(guid, &make_buf(1)).unwrap();
}
let bm = BufferManager::new(inner, 4);
let _p1 = bm.pin([0x01; 16]).unwrap();
let _p2 = bm.pin([0x02; 16]).unwrap();
let _p3 = bm.pin([0x03; 16]).unwrap();
let mut snap = HashMap::new();
snap.insert([0x01; 16], 10);
snap.insert([0x02; 16], 20);
bm.mark_dirty([0x01; 16], 50);
bm.mark_dirty([0x03; 16], 5);
bm.restore_dirty(snap);
assert_eq!(bm.dirty_count(), 3);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0x01; 16]], 10);
assert_eq!(live[&[0x02; 16]], 20);
assert_eq!(live[&[0x03; 16]], 5);
}
#[test]
fn write_through_keeps_racing_writer_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xAA; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xAA; 16]).unwrap();
bm.mark_dirty([0xAA; 16], 200);
let snap_bytes = bm.snapshot_bytes([0xAA; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xAA; 16],
bytes: snap_bytes,
expected_seq: 50,
content_version: None,
}])
.unwrap();
assert_eq!(
bm.dirty_count(),
1,
"write-through must not stomp a racing newer-seq entry",
);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0xAA; 16]], 200, "racing writer's seq survives");
}
#[test]
fn write_through_keeps_racing_structural_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xA5; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xA5; 16]).unwrap();
bm.mark_dirty([0xA5; 16], STRUCTURAL_SEQ);
let snap_bytes = bm.snapshot_bytes([0xA5; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xA5; 16],
bytes: snap_bytes,
expected_seq: STRUCTURAL_SEQ,
content_version: None,
}])
.unwrap();
assert_eq!(
bm.dirty_count(),
1,
"structural sentinel equality is not enough to retire a racing entry",
);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0xA5; 16]], STRUCTURAL_SEQ);
}
#[test]
fn write_through_retires_clean_snapshot() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xBB; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xBB; 16]).unwrap();
bm.mark_dirty([0xBB; 16], 42);
let snap_bytes = bm.snapshot_bytes([0xBB; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xBB; 16],
bytes: snap_bytes,
expected_seq: 42,
content_version: None,
}])
.unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn write_through_batch_retires_clean_snapshots() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let g1 = [0xB1; 16];
let g2 = [0xB2; 16];
inner.write_blob(g1, &make_buf(0)).unwrap();
inner.write_blob(g2, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
for (guid, byte) in [(g1, 11), (g2, 22)] {
let pin = bm.pin(guid).unwrap();
let mut guard = pin.write();
guard.as_mut_slice()[100] = byte;
bm.mark_dirty(guid, u64::from(byte));
}
let snap = bm.snapshot_dirty();
let entries: Vec<_> = snap
.iter()
.map(|(guid, expected_seq)| WriteThroughEntry {
guid: *guid,
bytes: bm.snapshot_bytes(*guid).unwrap(),
expected_seq: *expected_seq,
content_version: None,
})
.collect();
bm.write_through_batch(&entries).unwrap();
assert_eq!(bm.dirty_count(), 0);
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob(g1, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 11);
inner.read_blob(g2, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 22);
}
#[test]
fn write_through_batch_keeps_racing_writer_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let g1 = [0xC1; 16];
let g2 = [0xC2; 16];
inner.write_blob(g1, &make_buf(0)).unwrap();
inner.write_blob(g2, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _ = bm.pin(g1).unwrap();
let _ = bm.pin(g2).unwrap();
bm.mark_dirty(g1, 50);
bm.mark_dirty(g2, 60);
let snap = bm.snapshot_dirty();
bm.mark_dirty(g1, 200);
let entries: Vec<_> = snap
.iter()
.map(|(guid, expected_seq)| WriteThroughEntry {
guid: *guid,
bytes: bm.snapshot_bytes(*guid).unwrap(),
expected_seq: *expected_seq,
content_version: None,
})
.collect();
bm.write_through_batch(&entries).unwrap();
let live = bm.snapshot_dirty();
assert_eq!(live.len(), 1);
assert_eq!(live[&g1], 200);
}
#[test]
fn write_blob_through_trait_clears_dirty() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x88; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0x88; 16]).unwrap();
bm.mark_dirty([0x88; 16], 100);
assert_eq!(bm.dirty_count(), 1);
BlobStore::write_blob(&bm, [0x88; 16], &make_buf(9)).unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn delete_blob_drops_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x99; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _ = bm.pin([0x99; 16]).unwrap();
bm.mark_dirty([0x99; 16], 7);
assert_eq!(bm.dirty_count(), 1);
BlobStore::delete_blob(&bm, [0x99; 16]).unwrap();
assert_eq!(
bm.dirty_count(),
0,
"deleted blobs must not linger as flush candidates"
);
}
#[test]
fn install_new_blob_caches_and_marks_dirty_without_store_write() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(Arc::clone(&inner), 4);
let new_guid = [0xCC; 16];
let mut bytes = AlignedBlobBuf::zeroed();
bytes.as_mut_slice()[200] = 0x77;
bm.install_new_blob(new_guid, bytes, 42);
assert_eq!(bm.cached_count(), 1);
assert_eq!(bm.dirty_count(), 1);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&new_guid], 42);
bm.restore_dirty(snap);
assert!(
!inner.has_blob(new_guid).unwrap(),
"install_new_blob must defer the store write to the checkpoint round",
);
let pin = bm.pin(new_guid).unwrap();
let guard = pin.read();
assert_eq!(guard.as_slice()[200], 0x77);
drop(guard);
drop(pin);
let snap = bm.snapshot_dirty();
let bytes = bm.snapshot_bytes(new_guid).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: new_guid,
bytes,
expected_seq: snap[&new_guid],
content_version: None,
}])
.unwrap();
bm.flush_inner().unwrap();
assert_eq!(bm.dirty_count(), 0);
assert!(inner.has_blob(new_guid).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob(new_guid, &mut dst).unwrap();
assert_eq!(dst.as_slice()[200], 0x77);
}
#[test]
fn concurrent_reads_on_different_blobs_progress() {
use std::thread;
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..16u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = Arc::new(BufferManager::new(inner, 16));
let handles: Vec<_> = (0..8u8)
.map(|t| {
let bm = bm.clone();
thread::spawn(move || {
for _ in 0..50 {
let mut g = [0u8; 16];
g[0] = t * 2; let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], t * 2);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(bm.cached_count(), 8);
}
}