mod cached_blob;
mod mutation;
use std::collections::{hash_map::Entry, HashMap};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use dashmap::DashMap;
use crate::api::errors::{Error, Result};
use crate::layout::BlobGuid;
use super::blob_store::{AlignedBlobBuf, BlobStore};
pub use cached_blob::{BlobWriteGuard, CachedBlob};
use mutation::{
bookkeeping_shard_idx, pop_candidate_batch, CandidateKind, MutationState, BOOKKEEPING_SHARDS,
};
pub const STRUCTURAL_SEQ: u64 = u64::MAX;
pub(crate) struct WriteThroughEntry {
pub(crate) guid: BlobGuid,
pub(crate) bytes: AlignedBlobBuf,
pub(crate) expected_seq: u64,
}
pub struct BufferManager {
store: Arc<dyn BlobStore>,
capacity: usize,
cache: DashMap<BlobGuid, Arc<CachedBlob>>,
mutation: [Mutex<MutationState>; BOOKKEEPING_SHARDS],
pending_delete_total: AtomicUsize,
compact_candidate_cursor: AtomicUsize,
merge_candidate_cursor: AtomicUsize,
compact_candidate_total: AtomicUsize,
merge_candidate_total: AtomicUsize,
clock: AtomicU64,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
optimistic_restarts: AtomicU64,
range_restarts: AtomicU64,
walker_ops: AtomicU64,
walker_blob_hops: AtomicU64,
max_blob_hops: AtomicU64,
max_cross_blob_depth: AtomicU64,
spillover_count: AtomicU64,
merge_count: AtomicU64,
}
#[inline]
fn fetch_max_relaxed(atom: &AtomicU64, value: u64) {
let mut cur = atom.load(Ordering::Relaxed);
while value > cur {
match atom.compare_exchange_weak(cur, value, Ordering::Relaxed, Ordering::Relaxed) {
Ok(_) => break,
Err(actual) => cur = actual,
}
}
}
impl BufferManager {
#[must_use]
pub fn new(store: Arc<dyn BlobStore>, capacity: usize) -> Self {
Self {
store,
capacity: capacity.max(1),
cache: DashMap::new(),
mutation: std::array::from_fn(|_| Mutex::new(MutationState::default())),
pending_delete_total: AtomicUsize::new(0),
compact_candidate_cursor: AtomicUsize::new(0),
merge_candidate_cursor: AtomicUsize::new(0),
compact_candidate_total: AtomicUsize::new(0),
merge_candidate_total: AtomicUsize::new(0),
clock: AtomicU64::new(1),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
optimistic_restarts: AtomicU64::new(0),
range_restarts: AtomicU64::new(0),
walker_ops: AtomicU64::new(0),
walker_blob_hops: AtomicU64::new(0),
max_blob_hops: AtomicU64::new(0),
max_cross_blob_depth: AtomicU64::new(0),
spillover_count: AtomicU64::new(0),
merge_count: AtomicU64::new(0),
}
}
pub(crate) fn clock_tick(&self) -> u64 {
self.clock.load(Ordering::Relaxed)
}
pub(crate) fn cache_excess(&self) -> usize {
self.cache.len().saturating_sub(self.capacity)
}
pub(crate) fn snapshot_entries(&self) -> Vec<(BlobGuid, Arc<CachedBlob>)> {
self.cache
.iter()
.map(|kv| (*kv.key(), Arc::clone(kv.value())))
.collect()
}
fn decrement_candidate_totals(&self, removed: (bool, bool)) {
if removed.0 {
self.compact_candidate_total.fetch_sub(1, Ordering::Relaxed);
}
if removed.1 {
self.merge_candidate_total.fetch_sub(1, Ordering::Relaxed);
}
}
pub(crate) fn try_evict_cold(&self, guid: BlobGuid) -> bool {
{
let state = self.mutation_shard(guid).lock().unwrap();
if state.is_protected_or_pending(&guid) {
return false;
}
}
self.cache
.remove_if(&guid, |_, entry| {
if Arc::strong_count(entry) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.is_protected_or_pending(&guid)
})
.is_some()
}
#[cfg(test)]
#[must_use]
pub fn cached_count(&self) -> usize {
self.cache.len()
}
#[must_use]
pub fn cache_hits(&self) -> u64 {
self.cache_hits.load(Ordering::Relaxed)
}
#[must_use]
pub fn cache_misses(&self) -> u64 {
self.cache_misses.load(Ordering::Relaxed)
}
#[must_use]
pub fn optimistic_restarts(&self) -> u64 {
self.optimistic_restarts.load(Ordering::Relaxed)
}
pub(crate) fn note_optimistic_restart(&self) {
self.optimistic_restarts.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn range_restarts(&self) -> u64 {
self.range_restarts.load(Ordering::Relaxed)
}
pub(crate) fn note_range_restart(&self) {
self.range_restarts.fetch_add(1, Ordering::Relaxed);
}
#[must_use]
pub fn walker_ops(&self) -> u64 {
self.walker_ops.load(Ordering::Relaxed)
}
#[must_use]
pub fn walker_blob_hops(&self) -> u64 {
self.walker_blob_hops.load(Ordering::Relaxed)
}
#[must_use]
pub fn max_blob_hops(&self) -> u64 {
self.max_blob_hops.load(Ordering::Relaxed)
}
#[must_use]
pub fn max_cross_blob_depth(&self) -> u64 {
self.max_cross_blob_depth.load(Ordering::Relaxed)
}
#[must_use]
pub fn spillover_count(&self) -> u64 {
self.spillover_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn merge_count(&self) -> u64 {
self.merge_count.load(Ordering::Relaxed)
}
pub(crate) fn note_walker_blob_hops(&self, hops: u64, max_cross_blob_depth: usize) {
self.walker_ops.fetch_add(1, Ordering::Relaxed);
self.walker_blob_hops.fetch_add(hops, Ordering::Relaxed);
fetch_max_relaxed(&self.max_blob_hops, hops);
fetch_max_relaxed(&self.max_cross_blob_depth, max_cross_blob_depth as u64);
}
pub(crate) fn note_spillover(&self) {
self.spillover_count.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn note_merges(&self, merged: u64) {
if merged != 0 {
self.merge_count.fetch_add(merged, Ordering::Relaxed);
}
}
fn get_cached(&self, guid: BlobGuid) -> Option<Arc<CachedBlob>> {
let Some(entry) = self.cache.get(&guid) else {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
return None;
};
let arc = Arc::clone(entry.value());
drop(entry);
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
arc.last_touched.store(tick, Ordering::Relaxed);
self.cache_hits.fetch_add(1, Ordering::Relaxed);
Some(arc)
}
fn get_cached_silent(&self, guid: BlobGuid) -> Option<Arc<CachedBlob>> {
let entry = self.cache.get(&guid)?;
let arc = Arc::clone(entry.value());
drop(entry);
Some(arc)
}
fn mutation_shard(&self, guid: BlobGuid) -> &Mutex<MutationState> {
&self.mutation[bookkeeping_shard_idx(&guid)]
}
fn is_pending_delete(&self, guid: BlobGuid) -> bool {
if self.pending_delete_total.load(Ordering::Acquire) == 0 {
return false;
}
self.mutation_shard(guid)
.lock()
.unwrap()
.pending_deletes
.contains_key(&guid)
}
fn pending_delete_not_found(guid: BlobGuid) -> Error {
Error::BlobStoreIo(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("blob {:02x?} is pending delete", &guid[..4]),
))
}
fn insert_into_cache(&self, guid: BlobGuid, contents: &AlignedBlobBuf) {
self.insert_owned_into_cache(guid, contents.clone());
}
fn insert_owned_into_cache(&self, guid: BlobGuid, contents: AlignedBlobBuf) {
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
let inserted = self.cache.entry(guid).or_insert_with(|| {
let entry = Arc::new(CachedBlob::new(contents));
entry.last_touched.store(tick, Ordering::Relaxed);
entry
});
inserted.value().last_touched.store(tick, Ordering::Relaxed);
drop(inserted);
const RETRY_BUDGET: u32 = 8;
let mut retries_left = RETRY_BUDGET;
let mut entry_spins = self.cache.len();
while self.cache.len() > self.capacity {
if self.try_evict_lru() {
entry_spins = self.cache.len();
continue;
}
if retries_left == 0 || entry_spins == 0 {
break;
}
std::thread::yield_now();
retries_left -= 1;
entry_spins = entry_spins.saturating_sub(1);
}
}
fn try_evict_lru(&self) -> bool {
let protected_snap: std::collections::HashSet<BlobGuid> = {
let mut out = std::collections::HashSet::new();
for shard in &self.mutation {
let state = shard.lock().unwrap();
out.extend(state.dirty.keys().copied());
out.extend(state.flushing.keys().copied());
out.extend(state.pending_deletes.keys().copied());
}
out
};
let mut victim: Option<(BlobGuid, u64)> = None;
for kv in &self.cache {
if Arc::strong_count(kv.value()) > 1 {
continue;
}
let guid = *kv.key();
if protected_snap.contains(&guid) {
continue;
}
let tick = kv.value().last_touched.load(Ordering::Relaxed);
match victim {
None => victim = Some((guid, tick)),
Some((_, vmin)) if tick < vmin => {
victim = Some((guid, tick));
}
_ => {}
}
}
if let Some((guid, _)) = victim {
return self
.cache
.remove_if(&guid, |_, e| {
if Arc::strong_count(e) > 1 {
return false;
}
let state = self.mutation_shard(guid).lock().unwrap();
!state.is_protected_or_pending(&guid)
})
.is_some();
}
false
}
fn evict_from_cache(&self, guid: BlobGuid) {
if let Some((_, entry)) = self.cache.remove(&guid) {
entry.clear_dirty_hint();
}
let mut state = self.mutation_shard(guid).lock().unwrap();
state.remove_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
}
pub fn pin(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if let Some(entry) = self.get_cached(guid) {
return Ok(entry);
}
let mut scratch = self.store.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut scratch)?;
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
self.insert_owned_into_cache(guid, scratch);
if let Some(entry) = self.get_cached(guid) {
return Ok(entry);
}
let mut scratch = self.store.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut scratch)?;
let entry = Arc::new(CachedBlob::new(scratch));
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
Ok(entry)
}
pub fn pin_silent(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
if let Some(entry) = self.get_cached_silent(guid) {
return Ok(entry);
}
let mut scratch = self.store.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut scratch)?;
if self.is_pending_delete(guid) {
return Err(Self::pending_delete_not_found(guid));
}
self.insert_owned_into_cache(guid, scratch);
if let Some(entry) = self.get_cached_silent(guid) {
return Ok(entry);
}
let mut scratch = self.store.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut scratch)?;
let entry = Arc::new(CachedBlob::new(scratch));
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
Ok(entry)
}
pub fn mark_dirty(&self, guid: BlobGuid, seq: u64) {
let cached = self.get_cached_silent(guid);
self.mark_dirty_with_hint(guid, seq, cached.as_deref());
}
pub(crate) fn mark_dirty_cached(&self, guid: BlobGuid, seq: u64, entry: &CachedBlob) {
self.mark_dirty_with_hint(guid, seq, Some(entry));
}
fn mark_dirty_with_hint(&self, guid: BlobGuid, seq: u64, cached: Option<&CachedBlob>) {
let Some(cached) = cached else {
return;
};
let hint_covers_seq = !cached.dirty_hint_needs_map_publish(seq);
let mut state = self.mutation_shard(guid).lock().unwrap();
if state.pending_deletes.contains_key(&guid) {
cached.clear_dirty_hint();
return;
}
if hint_covers_seq && matches!(state.dirty.get(&guid), Some(cur) if *cur <= seq) {
return;
}
if hint_covers_seq {
cached.clear_dirty_hint();
let _ = cached.dirty_hint_needs_map_publish(seq);
}
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
}
#[must_use]
pub fn snapshot_dirty(&self) -> HashMap<BlobGuid, u64> {
let mut out = HashMap::new();
for shard in &self.mutation {
let mut state = shard.lock().unwrap();
for (guid, seq) in &mut state.dirty {
if let Some(hinted_seq) = self
.get_cached_silent(*guid)
.and_then(|entry| entry.take_dirty_hint())
{
*seq = (*seq).min(hinted_seq);
}
}
let snap = std::mem::take(&mut state.dirty);
for (guid, seq) in snap {
if state.pending_deletes.contains_key(&guid) {
if let Some(entry) = self.get_cached_silent(guid) {
entry.clear_dirty_hint();
}
continue;
}
state
.flushing
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
out.insert(guid, seq);
}
}
out
}
pub fn restore_dirty(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
for (guid, t) in entries {
let cached = self.get_cached_silent(guid);
if let Some(entry) = &cached {
let _ = entry.dirty_hint_needs_map_publish(t);
}
let mut state = self.mutation_shard(guid).lock().unwrap();
if state.pending_deletes.contains_key(&guid) {
if let Some(entry) = cached {
entry.clear_dirty_hint();
}
state.flushing.remove(&guid);
continue;
}
if matches!(state.flushing.get(&guid), Some(cur) if *cur == t) {
state.flushing.remove(&guid);
}
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(t))
.or_insert(t);
}
}
#[must_use]
pub fn dirty_count(&self) -> usize {
self.mutation
.iter()
.map(|shard| shard.lock().unwrap().dirty.len())
.sum()
}
pub fn mark_for_delete(&self, guid: BlobGuid, seq: u64) {
let mut state = self.mutation_shard(guid).lock().unwrap();
match state.pending_deletes.entry(guid) {
Entry::Occupied(mut entry) => {
let cur = entry.get_mut();
*cur = (*cur).min(seq);
}
Entry::Vacant(entry) => {
entry.insert(seq);
self.pending_delete_total.fetch_add(1, Ordering::AcqRel);
}
}
state.remove_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
if let Some((_, entry)) = self.cache.remove(&guid) {
entry.clear_dirty_hint();
}
}
#[must_use]
pub fn snapshot_pending_deletes(&self) -> HashMap<BlobGuid, u64> {
let mut out = HashMap::new();
for shard in &self.mutation {
let mut state = shard.lock().unwrap();
let pending = std::mem::take(&mut state.pending_deletes);
let count = pending.len();
if count != 0 {
self.pending_delete_total.fetch_sub(count, Ordering::AcqRel);
}
out.extend(pending);
}
out
}
pub fn restore_pending_deletes(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
for (g, t) in entries {
let mut state = self.mutation_shard(g).lock().unwrap();
match state.pending_deletes.entry(g) {
Entry::Occupied(mut entry) => {
let cur = entry.get_mut();
*cur = (*cur).min(t);
}
Entry::Vacant(entry) => {
entry.insert(t);
self.pending_delete_total.fetch_add(1, Ordering::AcqRel);
}
}
}
}
#[must_use]
pub fn pending_delete_count(&self) -> usize {
self.pending_delete_total.load(Ordering::Acquire)
}
pub(crate) fn note_compaction_candidate(&self, guid: BlobGuid) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if !state.pending_deletes.contains_key(&guid) && state.compact_candidates.insert(guid) {
self.compact_candidate_total.fetch_add(1, Ordering::Relaxed);
}
}
pub(crate) fn note_merge_candidate(&self, guid: BlobGuid) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if !state.pending_deletes.contains_key(&guid) && state.merge_candidates.insert(guid) {
self.merge_candidate_total.fetch_add(1, Ordering::Relaxed);
}
}
#[must_use]
pub(crate) fn pop_compaction_candidates(&self, limit: usize) -> Vec<BlobGuid> {
pop_candidate_batch(
&self.mutation,
&self.compact_candidate_cursor,
&self.compact_candidate_total,
CandidateKind::Compact,
limit,
)
}
#[must_use]
pub(crate) fn pop_merge_candidates(&self, limit: usize) -> Vec<BlobGuid> {
pop_candidate_batch(
&self.mutation,
&self.merge_candidate_cursor,
&self.merge_candidate_total,
CandidateKind::Merge,
limit,
)
}
#[must_use]
pub(crate) fn compaction_candidate_count(&self) -> usize {
self.compact_candidate_total.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn merge_candidate_count(&self) -> usize {
self.merge_candidate_total.load(Ordering::Relaxed)
}
pub(crate) fn execute_pending_delete(&self, guid: BlobGuid) -> Result<()> {
self.store.delete_blob(guid)
}
pub(crate) fn snapshot_bytes(&self, guid: BlobGuid) -> Option<AlignedBlobBuf> {
let entry = self.get_cached(guid)?;
let buf = entry.read();
let mut out = self.store.alloc_blob_buf_uninit();
out.as_mut_slice().copy_from_slice(buf.as_slice());
Some(out)
}
pub(crate) fn snapshot_blob_image(&self, guid: BlobGuid) -> Result<AlignedBlobBuf> {
if let Some(entry) = self.get_cached_silent(guid) {
let buf = entry.read();
let mut out = self.store.alloc_blob_buf_uninit();
out.as_mut_slice().copy_from_slice(buf.as_slice());
return Ok(out);
}
let mut out = self.store.alloc_blob_buf_uninit();
self.store.read_blob(guid, &mut out)?;
Ok(out)
}
#[must_use]
pub(crate) fn alloc_blob_buf_zeroed(&self) -> AlignedBlobBuf {
self.store.alloc_blob_buf_zeroed()
}
pub(crate) fn write_through_batch(&self, entries: &[WriteThroughEntry]) -> Result<()> {
if entries.is_empty() {
return Ok(());
}
let writes: Vec<_> = entries
.iter()
.map(|entry| (entry.guid, &entry.bytes))
.collect();
self.store.write_blobs_with_data_sync(&writes)?;
for entry in entries {
self.retire_write_through(entry.guid, entry.expected_seq);
}
Ok(())
}
fn retire_write_through(&self, guid: BlobGuid, expected_seq: u64) {
let mut state = self.mutation_shard(guid).lock().unwrap();
if expected_seq != STRUCTURAL_SEQ {
if let std::collections::hash_map::Entry::Occupied(e) = state.dirty.entry(guid) {
if *e.get() == expected_seq {
e.remove();
}
}
}
if matches!(state.flushing.get(&guid), Some(seq) if *seq == expected_seq) {
state.flushing.remove(&guid);
}
let still_dirty = state.dirty.contains_key(&guid) || state.flushing.contains_key(&guid);
drop(state);
if !still_dirty {
if let Some(entry) = self.get_cached_silent(guid) {
entry.clear_dirty_hint();
}
}
}
pub(crate) fn flush_inner(&self) -> Result<()> {
self.store.flush()
}
pub(crate) fn install_new_blob(&self, guid: BlobGuid, bytes: AlignedBlobBuf, seq: u64) {
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
let entry = Arc::new(CachedBlob::new(bytes));
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
let _ = entry.dirty_hint_needs_map_publish(seq);
let mut state = self.mutation_shard(guid).lock().unwrap();
state
.dirty
.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
drop(entry);
}
}
impl BlobStore for BufferManager {
fn read_blob(&self, guid: BlobGuid, dst: &mut AlignedBlobBuf) -> Result<()> {
if let Some(entry) = self.get_cached(guid) {
let buf = entry.read();
dst.as_mut_slice().copy_from_slice(buf.as_slice());
return Ok(());
}
self.store.read_blob(guid, dst)?;
self.insert_into_cache(guid, dst);
Ok(())
}
fn write_blob(&self, guid: BlobGuid, src: &AlignedBlobBuf) -> Result<()> {
if let Some(entry) = self.get_cached(guid) {
let mut buf = entry.write();
buf.as_mut_slice().copy_from_slice(src.as_slice());
entry.clear_dirty_hint();
}
self.store.write_blob(guid, src)?;
let mut state = self.mutation_shard(guid).lock().unwrap();
state.remove_dirty(&guid);
let removed = state.remove_maintenance_candidates(&guid);
drop(state);
self.decrement_candidate_totals(removed);
Ok(())
}
fn write_blobs(&self, writes: &[(BlobGuid, &AlignedBlobBuf)]) -> Result<()> {
for (guid, src) in writes {
if let Some(entry) = self.get_cached(*guid) {
let mut buf = entry.write();
buf.as_mut_slice().copy_from_slice(src.as_slice());
entry.clear_dirty_hint();
}
}
self.store.write_blobs(writes)?;
for (guid, _) in writes {
let mut state = self.mutation_shard(*guid).lock().unwrap();
state.remove_dirty(guid);
let removed = state.remove_maintenance_candidates(guid);
drop(state);
self.decrement_candidate_totals(removed);
}
Ok(())
}
fn delete_blob(&self, guid: BlobGuid) -> Result<()> {
self.evict_from_cache(guid);
self.store.delete_blob(guid)
}
fn list_blobs(&self) -> Result<Vec<BlobGuid>> {
self.store.list_blobs()
}
fn flush(&self) -> Result<()> {
self.store.flush()
}
fn needs_flush(&self) -> bool {
self.store.needs_flush()
}
fn has_blob(&self, guid: BlobGuid) -> Result<bool> {
if self.is_pending_delete(guid) {
return Ok(false);
}
if self.cache.contains_key(&guid) {
return Ok(true);
}
self.store.has_blob(guid)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::blob_store::MemoryBlobStore;
fn make_buf(byte_at_100: u8) -> AlignedBlobBuf {
let mut b = AlignedBlobBuf::zeroed();
b.as_mut_slice()[100] = byte_at_100;
b
}
#[test]
fn read_caches_after_first_load() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xAB; 16], &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
assert_eq!(bm.cached_count(), 0);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 7);
assert_eq!(bm.cached_count(), 1);
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
}
#[test]
fn lru_eviction_at_capacity() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 4);
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
}
assert_eq!(
bm.cached_count(),
4,
"cache must shrink to capacity after over-fill",
);
let mut g_last = [0u8; 16];
g_last[0] = 9;
let mut g_first = [0u8; 16];
g_first[0] = 0;
assert!(bm.cache.contains_key(&g_last));
assert!(!bm.cache.contains_key(&g_first));
}
#[test]
fn lru_eviction_skips_dirty_entries() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..3u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 2);
let g_a = {
let mut g = [0u8; 16];
g[0] = 0;
g
};
let g_b = {
let mut g = [0u8; 16];
g[0] = 1;
g
};
let g_c = {
let mut g = [0u8; 16];
g[0] = 2;
g
};
{
let _pin = bm.pin(g_a).unwrap();
}
bm.mark_dirty(g_a, 10);
assert_eq!(bm.dirty_count(), 1);
assert!(bm.cache.contains_key(&g_a));
{
let _pin = bm.pin(g_b).unwrap();
}
assert!(bm.cache.contains_key(&g_a));
assert!(bm.cache.contains_key(&g_b));
{
let _pin = bm.pin(g_c).unwrap();
}
assert!(
bm.cache.contains_key(&g_a),
"dirty entry A's cache image must survive inline LRU eviction",
);
assert!(
bm.cache.contains_key(&g_c),
"newly-pinned C must be in cache",
);
assert!(
!bm.cache.contains_key(&g_b),
"B (clean, no pin) should have been evicted in A's stead",
);
assert_eq!(
bm.dirty_count(),
1,
"dirty bookkeeping must not be touched by eviction",
);
assert!(
bm.snapshot_bytes(g_a).is_some(),
"dirty entry's cache image must be snapshottable",
);
}
#[test]
fn maintenance_candidates_are_unique_and_fifo_budgeted() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let mut buckets = vec![Vec::<BlobGuid>::new(); BOOKKEEPING_SHARDS];
for i in 0..=u8::MAX {
let mut g = [0u8; 16];
g[0] = i;
buckets[bookkeeping_shard_idx(&g)].push(g);
}
let same_shard = buckets.into_iter().find(|b| b.len() >= 3).unwrap();
let a = same_shard[0];
let b = same_shard[1];
let c = same_shard[2];
bm.note_compaction_candidate(a);
bm.note_compaction_candidate(b);
bm.note_compaction_candidate(a);
bm.note_compaction_candidate(c);
assert_eq!(bm.compaction_candidate_count(), 3);
assert_eq!(bm.pop_compaction_candidates(2), vec![a, b]);
assert_eq!(bm.compaction_candidate_count(), 1);
bm.note_compaction_candidate(a);
assert_eq!(bm.pop_compaction_candidates(8), vec![c, a]);
assert_eq!(bm.compaction_candidate_count(), 0);
}
#[test]
fn maintenance_candidate_drain_rotates_across_shards() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let mut by_shard = [None::<BlobGuid>; BOOKKEEPING_SHARDS];
let mut counter = 0u32;
while by_shard.iter().any(Option::is_none) {
assert!(
counter < 100_000,
"test helper could not cover every bookkeeping shard"
);
let mut guid = [0u8; 16];
guid[0..4].copy_from_slice(&counter.to_le_bytes());
let shard = bookkeeping_shard_idx(&guid);
by_shard[shard].get_or_insert(guid);
counter += 1;
}
for guid in by_shard.iter().flatten() {
bm.note_compaction_candidate(*guid);
}
for expected_shard in 0..4 {
let batch = bm.pop_compaction_candidates(1);
assert_eq!(batch.len(), 1);
assert_eq!(bookkeeping_shard_idx(&batch[0]), expected_shard);
}
}
#[test]
fn write_through_propagates_to_inner_store() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner.clone(), 4);
bm.write_blob([0xCD; 16], &make_buf(0x42)).unwrap();
assert!(inner.has_blob([0xCD; 16]).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob([0xCD; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 0x42);
}
#[test]
fn write_through_updates_cache_if_present() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xEF; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 1);
bm.write_blob([0xEF; 16], &make_buf(99)).unwrap();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 99);
}
#[test]
fn delete_evicts_from_cache_and_inner() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x33; 16], &make_buf(5)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x33; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
bm.delete_blob([0x33; 16]).unwrap();
assert_eq!(bm.cached_count(), 0);
assert!(!inner.has_blob([0x33; 16]).unwrap());
assert!(!bm.has_blob([0x33; 16]).unwrap());
}
#[test]
fn pending_delete_hides_blob_until_checkpoint_delete_applies() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x44; 16], &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let _pin = bm.pin([0x44; 16]).unwrap();
assert!(bm.has_blob([0x44; 16]).unwrap());
bm.mark_dirty([0x44; 16], 10);
bm.mark_for_delete([0x44; 16], 11);
assert!(inner.has_blob([0x44; 16]).unwrap());
assert!(!bm.has_blob([0x44; 16]).unwrap());
assert!(
bm.pin([0x44; 16]).is_err(),
"pending-delete child must not be reloaded from store"
);
bm.mark_dirty([0x44; 16], 12);
let mut restore = HashMap::new();
restore.insert([0x44; 16], 13);
bm.restore_dirty(restore);
assert_eq!(bm.dirty_count(), 0);
assert_eq!(bm.pending_delete_count(), 1);
}
#[test]
fn pending_delete_count_tracks_snapshot_and_restore() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(inner, 4);
let guid = [0x55; 16];
bm.mark_for_delete(guid, 20);
bm.mark_for_delete(guid, 10);
assert_eq!(bm.pending_delete_count(), 1);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert_eq!(bm.pending_delete_count(), 0);
bm.restore_pending_deletes(pending);
assert_eq!(bm.pending_delete_count(), 1);
let pending = bm.snapshot_pending_deletes();
assert_eq!(pending.get(&guid), Some(&10));
assert_eq!(bm.pending_delete_count(), 0);
}
#[test]
fn has_blob_fast_path_avoids_inner_when_cached() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x77; 16], &make_buf(11)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x77; 16], &mut dst).unwrap();
assert!(bm.has_blob([0x77; 16]).unwrap());
assert!(!bm.has_blob([0x88; 16]).unwrap());
}
#[test]
fn mark_dirty_keeps_lowest_seq() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x01; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, 50);
bm.mark_dirty(guid, 30);
bm.mark_dirty(guid, 99);
assert_eq!(bm.dirty_count(), 1);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 30);
}
#[test]
fn mark_dirty_without_cache_image_does_not_publish_orphan() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xAB; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
bm.mark_dirty(guid, 10);
assert!(
bm.snapshot_dirty().is_empty(),
"dirty map must not contain an entry without a cache image",
);
}
#[test]
fn cached_dirty_hint_resets_after_snapshot() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD1; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, 10);
bm.mark_dirty(guid, 20);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 10);
assert_eq!(bm.dirty_count(), 0);
bm.mark_dirty(guid, 30);
let next = bm.snapshot_dirty();
assert_eq!(
next[&guid], 30,
"mark_dirty after snapshot must publish a fresh dirty entry",
);
}
#[test]
fn stale_dirty_hint_cannot_skip_dirty_map_publish() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD3; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let pin = bm.pin(guid).unwrap();
assert!(pin.dirty_hint_needs_map_publish(10));
bm.mark_dirty(guid, 20);
let snap = bm.snapshot_dirty();
assert_eq!(
snap[&guid], 20,
"a stale hint without a dirty-map entry must not hide a fresh write",
);
}
#[test]
fn cached_dirty_hint_preserves_lower_restored_seq() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0xD2; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin(guid).unwrap();
let mut restored = HashMap::new();
restored.insert(guid, 40);
bm.restore_dirty(restored);
bm.mark_dirty(guid, 90);
let snap = bm.snapshot_dirty();
assert_eq!(
snap[&guid], 40,
"duplicate higher seq must be covered by restored low-watermark",
);
bm.restore_dirty(snap);
bm.mark_dirty(guid, 20);
let lowered = bm.snapshot_dirty();
assert_eq!(
lowered[&guid], 20,
"lower seq must still update the dirty low-watermark",
);
}
#[test]
fn snapshot_dirty_drains_atomically() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for guid in [[0x01; 16], [0x02; 16], [0x03; 16]] {
inner.write_blob(guid, &make_buf(1)).unwrap();
}
let bm = BufferManager::new(inner, 4);
let _p1 = bm.pin([0x01; 16]).unwrap();
let _p2 = bm.pin([0x02; 16]).unwrap();
bm.mark_dirty([0x01; 16], 10);
bm.mark_dirty([0x02; 16], 20);
let snap = bm.snapshot_dirty();
assert_eq!(snap.len(), 2);
assert_eq!(snap[&[0x01; 16]], 10);
assert_eq!(snap[&[0x02; 16]], 20);
assert_eq!(bm.dirty_count(), 0);
let _p3 = bm.pin([0x03; 16]).unwrap();
bm.mark_dirty([0x03; 16], 99);
assert_eq!(bm.dirty_count(), 1);
let next = bm.snapshot_dirty();
assert_eq!(next[&[0x03; 16]], 99);
}
#[test]
fn snapshot_dirty_drains_every_bookkeeping_shard() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(Arc::clone(&inner), BOOKKEEPING_SHARDS);
let mut guids: [Option<BlobGuid>; BOOKKEEPING_SHARDS] = [None; BOOKKEEPING_SHARDS];
for i in 0..20_000u64 {
let mut guid = [0u8; 16];
guid[0..8].copy_from_slice(&i.to_le_bytes());
guid[8..16].copy_from_slice(&i.wrapping_mul(0x9E37_79B9_7F4A_7C15).to_le_bytes());
let shard = bookkeeping_shard_idx(&guid);
guids[shard].get_or_insert(guid);
if guids.iter().all(Option::is_some) {
break;
}
}
assert!(
guids.iter().all(Option::is_some),
"test generator should hit every bookkeeping shard"
);
for (shard, guid) in guids.iter().enumerate() {
let guid = guid.expect("filled");
inner.write_blob(guid, &make_buf(1)).unwrap();
let _pin = bm.pin(guid).unwrap();
bm.mark_dirty(guid, shard as u64 + 1);
}
let snap = bm.snapshot_dirty();
assert_eq!(snap.len(), BOOKKEEPING_SHARDS);
assert_eq!(bm.dirty_count(), 0);
for (shard, guid) in guids.iter().enumerate() {
assert_eq!(snap[&guid.expect("filled")], shard as u64 + 1);
}
}
#[test]
fn snapshot_dirty_protects_flushing_entry_from_eviction() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let guid = [0x55; 16];
inner.write_blob(guid, &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 1);
{
let pin = bm.pin(guid).unwrap();
let mut guard = pin.write();
guard.as_mut_slice()[123] = 0xAB;
}
bm.mark_dirty(guid, 42);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&guid], 42);
assert_eq!(
bm.dirty_count(),
0,
"snapshot drains the live dirty map for racing writers",
);
assert!(
!bm.try_evict_cold(guid),
"checkpoint-owned flushing entries must stay cached until write-through",
);
let bytes = bm
.snapshot_bytes(guid)
.expect("flushing protection must preserve cached bytes");
assert_eq!(bytes.as_slice()[123], 0xAB);
bm.write_through_batch(&[WriteThroughEntry {
guid,
bytes,
expected_seq: 42,
}])
.unwrap();
assert!(
bm.try_evict_cold(guid),
"successful write-through releases flushing protection",
);
}
#[test]
fn restore_dirty_merges_keeping_min() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for guid in [[0x01; 16], [0x02; 16], [0x03; 16]] {
inner.write_blob(guid, &make_buf(1)).unwrap();
}
let bm = BufferManager::new(inner, 4);
let _p1 = bm.pin([0x01; 16]).unwrap();
let _p2 = bm.pin([0x02; 16]).unwrap();
let _p3 = bm.pin([0x03; 16]).unwrap();
let mut snap = HashMap::new();
snap.insert([0x01; 16], 10);
snap.insert([0x02; 16], 20);
bm.mark_dirty([0x01; 16], 50);
bm.mark_dirty([0x03; 16], 5);
bm.restore_dirty(snap);
assert_eq!(bm.dirty_count(), 3);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0x01; 16]], 10);
assert_eq!(live[&[0x02; 16]], 20);
assert_eq!(live[&[0x03; 16]], 5);
}
#[test]
fn write_through_keeps_racing_writer_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xAA; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xAA; 16]).unwrap();
bm.mark_dirty([0xAA; 16], 200);
let snap_bytes = bm.snapshot_bytes([0xAA; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xAA; 16],
bytes: snap_bytes,
expected_seq: 50,
}])
.unwrap();
assert_eq!(
bm.dirty_count(),
1,
"write-through must not stomp a racing newer-seq entry",
);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0xAA; 16]], 200, "racing writer's seq survives");
}
#[test]
fn write_through_keeps_racing_structural_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xA5; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xA5; 16]).unwrap();
bm.mark_dirty([0xA5; 16], STRUCTURAL_SEQ);
let snap_bytes = bm.snapshot_bytes([0xA5; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xA5; 16],
bytes: snap_bytes,
expected_seq: STRUCTURAL_SEQ,
}])
.unwrap();
assert_eq!(
bm.dirty_count(),
1,
"structural sentinel equality is not enough to retire a racing entry",
);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0xA5; 16]], STRUCTURAL_SEQ);
}
#[test]
fn write_through_retires_clean_snapshot() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0xBB; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xBB; 16]).unwrap();
bm.mark_dirty([0xBB; 16], 42);
let snap_bytes = bm.snapshot_bytes([0xBB; 16]).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: [0xBB; 16],
bytes: snap_bytes,
expected_seq: 42,
}])
.unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn write_through_batch_retires_clean_snapshots() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let g1 = [0xB1; 16];
let g2 = [0xB2; 16];
inner.write_blob(g1, &make_buf(0)).unwrap();
inner.write_blob(g2, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
for (guid, byte) in [(g1, 11), (g2, 22)] {
let pin = bm.pin(guid).unwrap();
let mut guard = pin.write();
guard.as_mut_slice()[100] = byte;
bm.mark_dirty(guid, u64::from(byte));
}
let snap = bm.snapshot_dirty();
let entries: Vec<_> = snap
.iter()
.map(|(guid, expected_seq)| WriteThroughEntry {
guid: *guid,
bytes: bm.snapshot_bytes(*guid).unwrap(),
expected_seq: *expected_seq,
})
.collect();
bm.write_through_batch(&entries).unwrap();
assert_eq!(bm.dirty_count(), 0);
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob(g1, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 11);
inner.read_blob(g2, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 22);
}
#[test]
fn write_through_batch_keeps_racing_writer_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let g1 = [0xC1; 16];
let g2 = [0xC2; 16];
inner.write_blob(g1, &make_buf(0)).unwrap();
inner.write_blob(g2, &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _ = bm.pin(g1).unwrap();
let _ = bm.pin(g2).unwrap();
bm.mark_dirty(g1, 50);
bm.mark_dirty(g2, 60);
let snap = bm.snapshot_dirty();
bm.mark_dirty(g1, 200);
let entries: Vec<_> = snap
.iter()
.map(|(guid, expected_seq)| WriteThroughEntry {
guid: *guid,
bytes: bm.snapshot_bytes(*guid).unwrap(),
expected_seq: *expected_seq,
})
.collect();
bm.write_through_batch(&entries).unwrap();
let live = bm.snapshot_dirty();
assert_eq!(live.len(), 1);
assert_eq!(live[&g1], 200);
}
#[test]
fn write_blob_through_trait_clears_dirty() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x88; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0x88; 16]).unwrap();
bm.mark_dirty([0x88; 16], 100);
assert_eq!(bm.dirty_count(), 1);
BlobStore::write_blob(&bm, [0x88; 16], &make_buf(9)).unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn delete_blob_drops_dirty_entry() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
inner.write_blob([0x99; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _ = bm.pin([0x99; 16]).unwrap();
bm.mark_dirty([0x99; 16], 7);
assert_eq!(bm.dirty_count(), 1);
BlobStore::delete_blob(&bm, [0x99; 16]).unwrap();
assert_eq!(
bm.dirty_count(),
0,
"deleted blobs must not linger as flush candidates"
);
}
#[test]
fn install_new_blob_caches_and_marks_dirty_without_store_write() {
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
let bm = BufferManager::new(Arc::clone(&inner), 4);
let new_guid = [0xCC; 16];
let mut bytes = AlignedBlobBuf::zeroed();
bytes.as_mut_slice()[200] = 0x77;
bm.install_new_blob(new_guid, bytes, 42);
assert_eq!(bm.cached_count(), 1);
assert_eq!(bm.dirty_count(), 1);
let snap = bm.snapshot_dirty();
assert_eq!(snap[&new_guid], 42);
bm.restore_dirty(snap);
assert!(
!inner.has_blob(new_guid).unwrap(),
"install_new_blob must defer the store write to the checkpoint round",
);
let pin = bm.pin(new_guid).unwrap();
let guard = pin.read();
assert_eq!(guard.as_slice()[200], 0x77);
drop(guard);
drop(pin);
let snap = bm.snapshot_dirty();
let bytes = bm.snapshot_bytes(new_guid).unwrap();
bm.write_through_batch(&[WriteThroughEntry {
guid: new_guid,
bytes,
expected_seq: snap[&new_guid],
}])
.unwrap();
bm.flush_inner().unwrap();
assert_eq!(bm.dirty_count(), 0);
assert!(inner.has_blob(new_guid).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob(new_guid, &mut dst).unwrap();
assert_eq!(dst.as_slice()[200], 0x77);
}
#[test]
fn concurrent_reads_on_different_blobs_progress() {
use std::thread;
let inner: Arc<dyn BlobStore> = Arc::new(MemoryBlobStore::new());
for i in 0..16u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = Arc::new(BufferManager::new(inner, 16));
let handles: Vec<_> = (0..8u8)
.map(|t| {
let bm = bm.clone();
thread::spawn(move || {
for _ in 0..50 {
let mut g = [0u8; 16];
g[0] = t * 2; let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], t * 2);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(bm.cached_count(), 8);
}
}