use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use dashmap::DashMap;
use crate::api::errors::Result;
use crate::concurrency::{Guard as LatchGuard, HybridLatch};
use crate::layout::BlobGuid;
use super::backend::{AlignedBlobBuf, Backend};
pub const STRUCTURAL_SEQ: u64 = u64::MAX;
pub struct BufferManager {
backend: Arc<dyn Backend>,
capacity: usize,
cache: DashMap<BlobGuid, Arc<CachedBlob>>,
dirty: Mutex<HashMap<BlobGuid, u64>>,
pending_deletes: Mutex<HashMap<BlobGuid, u64>>,
clock: AtomicU64,
cache_hits: AtomicU64,
cache_misses: AtomicU64,
optimistic_restarts: AtomicU64,
}
pub struct CachedBlob {
latch: HybridLatch,
buf: UnsafeCell<AlignedBlobBuf>,
last_touched: AtomicU64,
}
unsafe impl Sync for CachedBlob {}
impl CachedBlob {
fn new(buf: AlignedBlobBuf) -> Self {
Self {
latch: HybridLatch::new(),
buf: UnsafeCell::new(buf),
last_touched: AtomicU64::new(0),
}
}
#[must_use]
pub(crate) fn last_touched(&self) -> u64 {
self.last_touched.load(Ordering::Relaxed)
}
pub fn read_optimistic(&self) -> OptimisticGuard<'_> {
OptimisticGuard {
latch: LatchGuard::optimistic(&self.latch),
buf: &self.buf,
}
}
pub fn read(&self) -> BlobReadGuard<'_> {
BlobReadGuard {
_latch: LatchGuard::shared(&self.latch),
buf: &self.buf,
}
}
pub fn write(&self) -> BlobWriteGuard<'_> {
BlobWriteGuard {
_latch: LatchGuard::exclusive(&self.latch),
buf: &self.buf,
}
}
}
pub struct OptimisticGuard<'a> {
latch: LatchGuard<'a>,
buf: &'a UnsafeCell<AlignedBlobBuf>,
}
impl<'a> OptimisticGuard<'a> {
#[must_use]
pub fn as_slice(&self) -> &'a [u8] {
unsafe { (&*self.buf.get()).as_slice() }
}
#[must_use]
pub fn validate(&self) -> bool {
self.latch.validate()
}
}
pub struct BlobReadGuard<'a> {
_latch: LatchGuard<'a>,
buf: &'a UnsafeCell<AlignedBlobBuf>,
}
impl Deref for BlobReadGuard<'_> {
type Target = AlignedBlobBuf;
fn deref(&self) -> &AlignedBlobBuf {
unsafe { &*self.buf.get() }
}
}
pub struct BlobWriteGuard<'a> {
_latch: LatchGuard<'a>,
buf: &'a UnsafeCell<AlignedBlobBuf>,
}
impl Deref for BlobWriteGuard<'_> {
type Target = AlignedBlobBuf;
fn deref(&self) -> &AlignedBlobBuf {
unsafe { &*self.buf.get() }
}
}
impl DerefMut for BlobWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut AlignedBlobBuf {
unsafe { &mut *self.buf.get() }
}
}
impl BufferManager {
#[must_use]
pub fn new(backend: Arc<dyn Backend>, capacity: usize) -> Self {
Self {
backend,
capacity: capacity.max(1),
cache: DashMap::new(),
dirty: Mutex::new(HashMap::new()),
pending_deletes: Mutex::new(HashMap::new()),
clock: AtomicU64::new(1),
cache_hits: AtomicU64::new(0),
cache_misses: AtomicU64::new(0),
optimistic_restarts: AtomicU64::new(0),
}
}
pub(crate) fn clock_tick(&self) -> u64 {
self.clock.load(Ordering::Relaxed)
}
pub(crate) fn snapshot_entries(&self) -> Vec<(BlobGuid, Arc<CachedBlob>)> {
self.cache
.iter()
.map(|kv| (*kv.key(), Arc::clone(kv.value())))
.collect()
}
pub(crate) fn try_evict_cold(&self, guid: BlobGuid) -> bool {
{
let dirty_guard = self.dirty.lock().unwrap();
if dirty_guard.contains_key(&guid) {
return false;
}
}
self.cache
.remove_if(&guid, |_, entry| Arc::strong_count(entry) == 1)
.is_some()
}
#[allow(dead_code)]
#[must_use]
pub fn cached_count(&self) -> usize {
self.cache.len()
}
#[must_use]
pub fn cache_hits(&self) -> u64 {
self.cache_hits.load(Ordering::Relaxed)
}
#[must_use]
pub fn cache_misses(&self) -> u64 {
self.cache_misses.load(Ordering::Relaxed)
}
#[must_use]
pub fn optimistic_restarts(&self) -> u64 {
self.optimistic_restarts.load(Ordering::Relaxed)
}
pub(crate) fn note_optimistic_restart(&self) {
self.optimistic_restarts.fetch_add(1, Ordering::Relaxed);
}
fn get_cached(&self, guid: BlobGuid) -> Option<Arc<CachedBlob>> {
let Some(entry) = self.cache.get(&guid) else {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
return None;
};
let arc = Arc::clone(entry.value());
drop(entry);
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
arc.last_touched.store(tick, Ordering::Relaxed);
self.cache_hits.fetch_add(1, Ordering::Relaxed);
Some(arc)
}
fn get_cached_silent(&self, guid: BlobGuid) -> Option<Arc<CachedBlob>> {
let entry = self.cache.get(&guid)?;
let arc = Arc::clone(entry.value());
drop(entry);
Some(arc)
}
fn insert_into_cache(&self, guid: BlobGuid, contents: &AlignedBlobBuf) {
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
let inserted = self.cache.entry(guid).or_insert_with(|| {
let entry = Arc::new(CachedBlob::new(contents.clone()));
entry.last_touched.store(tick, Ordering::Relaxed);
entry
});
inserted.value().last_touched.store(tick, Ordering::Relaxed);
drop(inserted);
const RETRY_BUDGET: u32 = 8;
let mut retries_left = RETRY_BUDGET;
let mut entry_spins = self.cache.len();
while self.cache.len() > self.capacity {
if self.try_evict_lru() {
entry_spins = self.cache.len();
continue;
}
if retries_left == 0 || entry_spins == 0 {
break;
}
std::thread::yield_now();
retries_left -= 1;
entry_spins = entry_spins.saturating_sub(1);
}
}
fn try_evict_lru(&self) -> bool {
let mut victim: Option<(BlobGuid, u64)> = None;
for kv in &self.cache {
if Arc::strong_count(kv.value()) > 1 {
continue;
}
let tick = kv.value().last_touched.load(Ordering::Relaxed);
match victim {
None => victim = Some((*kv.key(), tick)),
Some((_, vmin)) if tick < vmin => {
victim = Some((*kv.key(), tick));
}
_ => {}
}
}
if let Some((guid, _)) = victim {
return self
.cache
.remove_if(&guid, |_, e| Arc::strong_count(e) == 1)
.is_some();
}
false
}
fn evict_from_cache(&self, guid: BlobGuid) {
self.cache.remove(&guid);
self.dirty.lock().unwrap().remove(&guid);
}
pub fn pin(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
if let Some(entry) = self.get_cached(guid) {
return Ok(entry);
}
let mut scratch = AlignedBlobBuf::zeroed();
self.backend.read_blob(guid, &mut scratch)?;
self.insert_into_cache(guid, &scratch);
if let Some(entry) = self.get_cached(guid) {
return Ok(entry);
}
let entry = Arc::new(CachedBlob::new(scratch));
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
Ok(entry)
}
pub fn pin_silent(&self, guid: BlobGuid) -> Result<Arc<CachedBlob>> {
if let Some(entry) = self.get_cached_silent(guid) {
return Ok(entry);
}
let mut scratch = AlignedBlobBuf::zeroed();
self.backend.read_blob(guid, &mut scratch)?;
self.insert_into_cache(guid, &scratch);
if let Some(entry) = self.get_cached_silent(guid) {
return Ok(entry);
}
let entry = Arc::new(CachedBlob::new(scratch));
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, Arc::clone(&entry));
Ok(entry)
}
#[allow(dead_code)]
pub fn commit(&self, guid: BlobGuid) -> Result<()> {
let drained = {
let mut d = self.dirty.lock().unwrap();
d.remove(&guid)
};
if let Some(entry) = self.get_cached(guid) {
let buf = entry.read();
if let Err(e) = self.backend.write_blob(guid, &buf) {
if let Some(t) = drained {
let mut d = self.dirty.lock().unwrap();
d.entry(guid)
.and_modify(|cur| *cur = (*cur).min(t))
.or_insert(t);
}
return Err(e);
}
}
Ok(())
}
pub fn mark_dirty(&self, guid: BlobGuid, txn_id: u64) {
let mut d = self.dirty.lock().unwrap();
d.entry(guid)
.and_modify(|cur| *cur = (*cur).min(txn_id))
.or_insert(txn_id);
}
#[must_use]
pub fn snapshot_dirty(&self) -> HashMap<BlobGuid, u64> {
let mut d = self.dirty.lock().unwrap();
std::mem::take(&mut *d)
}
pub fn restore_dirty(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
let mut d = self.dirty.lock().unwrap();
for (guid, t) in entries {
d.entry(guid)
.and_modify(|cur| *cur = (*cur).min(t))
.or_insert(t);
}
}
#[allow(dead_code)]
#[must_use]
pub fn min_unflushed_txn(&self) -> Option<u64> {
let d = self.dirty.lock().unwrap();
d.values().copied().min()
}
#[must_use]
pub fn dirty_count(&self) -> usize {
self.dirty.lock().unwrap().len()
}
pub fn mark_for_delete(&self, guid: BlobGuid, txn_id: u64) {
self.cache.remove(&guid);
self.dirty.lock().unwrap().remove(&guid);
let mut p = self.pending_deletes.lock().unwrap();
p.entry(guid)
.and_modify(|cur| *cur = (*cur).min(txn_id))
.or_insert(txn_id);
}
#[must_use]
pub fn snapshot_pending_deletes(&self) -> HashMap<BlobGuid, u64> {
let mut p = self.pending_deletes.lock().unwrap();
std::mem::take(&mut *p)
}
pub fn restore_pending_deletes(&self, entries: HashMap<BlobGuid, u64>) {
if entries.is_empty() {
return;
}
let mut p = self.pending_deletes.lock().unwrap();
for (g, t) in entries {
p.entry(g)
.and_modify(|cur| *cur = (*cur).min(t))
.or_insert(t);
}
}
#[must_use]
pub fn pending_delete_count(&self) -> usize {
self.pending_deletes.lock().unwrap().len()
}
pub(crate) fn execute_pending_delete(&self, guid: BlobGuid) -> Result<()> {
self.backend.delete_blob(guid)
}
pub(crate) fn snapshot_bytes(&self, guid: BlobGuid) -> Option<AlignedBlobBuf> {
let entry = self.get_cached(guid)?;
let buf = entry.read();
Some(buf.clone())
}
pub(crate) fn write_through(
&self,
guid: BlobGuid,
bytes: &AlignedBlobBuf,
expected_seq: u64,
) -> Result<()> {
self.backend.write_blob(guid, bytes)?;
let mut d = self.dirty.lock().unwrap();
if let std::collections::hash_map::Entry::Occupied(e) = d.entry(guid) {
if *e.get() == expected_seq {
e.remove();
}
}
Ok(())
}
pub(crate) fn backend_flush(&self) -> Result<()> {
self.backend.flush()
}
pub(crate) fn install_new_blob(&self, guid: BlobGuid, bytes: AlignedBlobBuf, seq: u64) {
let tick = self.clock.fetch_add(1, Ordering::Relaxed);
let entry = Arc::new(CachedBlob::new(bytes));
entry.last_touched.store(tick, Ordering::Relaxed);
self.cache.insert(guid, entry);
let mut d = self.dirty.lock().unwrap();
d.entry(guid)
.and_modify(|cur| *cur = (*cur).min(seq))
.or_insert(seq);
}
}
impl Backend for BufferManager {
fn read_blob(&self, guid: BlobGuid, dst: &mut AlignedBlobBuf) -> Result<()> {
if let Some(entry) = self.get_cached(guid) {
let buf = entry.read();
dst.as_mut_slice().copy_from_slice(buf.as_slice());
return Ok(());
}
self.backend.read_blob(guid, dst)?;
self.insert_into_cache(guid, dst);
Ok(())
}
fn write_blob(&self, guid: BlobGuid, src: &AlignedBlobBuf) -> Result<()> {
if let Some(entry) = self.get_cached(guid) {
let mut buf = entry.write();
buf.as_mut_slice().copy_from_slice(src.as_slice());
}
self.backend.write_blob(guid, src)?;
self.dirty.lock().unwrap().remove(&guid);
Ok(())
}
fn delete_blob(&self, guid: BlobGuid) -> Result<()> {
self.evict_from_cache(guid);
self.backend.delete_blob(guid)
}
fn list_blobs(&self) -> Result<Vec<BlobGuid>> {
self.backend.list_blobs()
}
fn flush(&self) -> Result<()> {
self.backend.flush()
}
fn has_blob(&self, guid: BlobGuid) -> Result<bool> {
if self.cache.contains_key(&guid) {
return Ok(true);
}
self.backend.has_blob(guid)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::backend::MemoryBackend;
fn make_buf(byte_at_100: u8) -> AlignedBlobBuf {
let mut b = AlignedBlobBuf::zeroed();
b.as_mut_slice()[100] = byte_at_100;
b
}
#[test]
fn read_caches_after_first_load() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0xAB; 16], &make_buf(7)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
assert_eq!(bm.cached_count(), 0);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 7);
assert_eq!(bm.cached_count(), 1);
bm.read_blob([0xAB; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
}
#[test]
fn lru_eviction_at_capacity() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = BufferManager::new(inner, 4);
for i in 0..10u8 {
let mut g = [0u8; 16];
g[0] = i;
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
}
assert_eq!(
bm.cached_count(),
4,
"cache must shrink to capacity after over-fill",
);
let mut g_last = [0u8; 16];
g_last[0] = 9;
let mut g_first = [0u8; 16];
g_first[0] = 0;
assert!(bm.cache.contains_key(&g_last));
assert!(!bm.cache.contains_key(&g_first));
}
#[test]
fn write_through_propagates_to_inner_backend() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let bm = BufferManager::new(inner.clone(), 4);
bm.write_blob([0xCD; 16], &make_buf(0x42)).unwrap();
assert!(inner.has_blob([0xCD; 16]).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob([0xCD; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 0x42);
}
#[test]
fn write_through_updates_cache_if_present() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0xEF; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 1);
bm.write_blob([0xEF; 16], &make_buf(99)).unwrap();
bm.read_blob([0xEF; 16], &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], 99);
}
#[test]
fn delete_evicts_from_cache_and_inner() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0x33; 16], &make_buf(5)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x33; 16], &mut dst).unwrap();
assert_eq!(bm.cached_count(), 1);
bm.delete_blob([0x33; 16]).unwrap();
assert_eq!(bm.cached_count(), 0);
assert!(!inner.has_blob([0x33; 16]).unwrap());
assert!(!bm.has_blob([0x33; 16]).unwrap());
}
#[test]
fn has_blob_fast_path_avoids_inner_when_cached() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0x77; 16], &make_buf(11)).unwrap();
let bm = BufferManager::new(inner.clone(), 4);
let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob([0x77; 16], &mut dst).unwrap();
assert!(bm.has_blob([0x77; 16]).unwrap());
assert!(!bm.has_blob([0x88; 16]).unwrap());
}
#[test]
fn mark_dirty_keeps_lowest_txn_id() {
let bm = BufferManager::new(Arc::new(MemoryBackend::new()), 4);
bm.mark_dirty([0x01; 16], 50);
bm.mark_dirty([0x01; 16], 30);
bm.mark_dirty([0x01; 16], 99);
assert_eq!(bm.min_unflushed_txn(), Some(30));
assert_eq!(bm.dirty_count(), 1);
}
#[test]
fn min_unflushed_txn_returns_none_when_clean() {
let bm = BufferManager::new(Arc::new(MemoryBackend::new()), 4);
assert_eq!(bm.min_unflushed_txn(), None);
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn snapshot_dirty_drains_atomically() {
let bm = BufferManager::new(Arc::new(MemoryBackend::new()), 4);
bm.mark_dirty([0x01; 16], 10);
bm.mark_dirty([0x02; 16], 20);
let snap = bm.snapshot_dirty();
assert_eq!(snap.len(), 2);
assert_eq!(snap[&[0x01; 16]], 10);
assert_eq!(snap[&[0x02; 16]], 20);
assert_eq!(bm.dirty_count(), 0);
assert_eq!(bm.min_unflushed_txn(), None);
bm.mark_dirty([0x03; 16], 99);
assert_eq!(bm.dirty_count(), 1);
assert_eq!(bm.min_unflushed_txn(), Some(99));
}
#[test]
fn restore_dirty_merges_keeping_min() {
let bm = BufferManager::new(Arc::new(MemoryBackend::new()), 4);
let mut snap = HashMap::new();
snap.insert([0x01; 16], 10);
snap.insert([0x02; 16], 20);
bm.mark_dirty([0x01; 16], 50);
bm.mark_dirty([0x03; 16], 5);
bm.restore_dirty(snap);
assert_eq!(bm.dirty_count(), 3);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0x01; 16]], 10);
assert_eq!(live[&[0x02; 16]], 20);
assert_eq!(live[&[0x03; 16]], 5);
}
#[test]
fn commit_clears_dirty_on_success() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0x77; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let pin = bm.pin([0x77; 16]).unwrap();
{
let mut g = pin.write();
g.as_mut_slice()[200] = 0xCD;
}
bm.mark_dirty([0x77; 16], 42);
assert_eq!(bm.dirty_count(), 1);
bm.commit([0x77; 16]).unwrap();
assert_eq!(bm.dirty_count(), 0, "successful commit must clear dirty");
}
#[test]
fn write_through_keeps_racing_writer_dirty_entry() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0xAA; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xAA; 16]).unwrap();
bm.mark_dirty([0xAA; 16], 200);
let snap_bytes = bm.snapshot_bytes([0xAA; 16]).unwrap();
bm.write_through([0xAA; 16], &snap_bytes, 50).unwrap();
assert_eq!(
bm.dirty_count(),
1,
"write_through must not stomp a racing newer-seq entry",
);
let live = bm.snapshot_dirty();
assert_eq!(live[&[0xAA; 16]], 200, "racing writer's seq survives");
}
#[test]
fn write_through_retires_clean_snapshot() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0xBB; 16], &make_buf(0)).unwrap();
let bm = BufferManager::new(inner, 4);
let _pin = bm.pin([0xBB; 16]).unwrap();
bm.mark_dirty([0xBB; 16], 42);
let snap_bytes = bm.snapshot_bytes([0xBB; 16]).unwrap();
bm.write_through([0xBB; 16], &snap_bytes, 42).unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn write_blob_through_trait_clears_dirty() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let bm = BufferManager::new(inner, 4);
bm.mark_dirty([0x88; 16], 100);
assert_eq!(bm.dirty_count(), 1);
Backend::write_blob(&bm, [0x88; 16], &make_buf(9)).unwrap();
assert_eq!(bm.dirty_count(), 0);
}
#[test]
fn delete_blob_drops_dirty_entry() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
inner.write_blob([0x99; 16], &make_buf(1)).unwrap();
let bm = BufferManager::new(inner, 4);
let _ = bm.pin([0x99; 16]).unwrap();
bm.mark_dirty([0x99; 16], 7);
assert_eq!(bm.dirty_count(), 1);
Backend::delete_blob(&bm, [0x99; 16]).unwrap();
assert_eq!(
bm.dirty_count(),
0,
"deleted blobs must not linger as flush candidates"
);
}
#[test]
fn install_new_blob_caches_and_marks_dirty_without_backend_write() {
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
let bm = BufferManager::new(Arc::clone(&inner), 4);
let new_guid = [0xCC; 16];
let mut bytes = AlignedBlobBuf::zeroed();
bytes.as_mut_slice()[200] = 0x77;
bm.install_new_blob(new_guid, bytes, 42);
assert_eq!(bm.cached_count(), 1);
assert_eq!(bm.dirty_count(), 1);
assert_eq!(bm.min_unflushed_txn(), Some(42));
assert!(
!inner.has_blob(new_guid).unwrap(),
"install_new_blob must defer the backend write to the checkpoint round",
);
let pin = bm.pin(new_guid).unwrap();
let guard = pin.read();
assert_eq!(guard.as_slice()[200], 0x77);
drop(guard);
drop(pin);
bm.commit(new_guid).unwrap();
assert_eq!(bm.dirty_count(), 0);
assert!(inner.has_blob(new_guid).unwrap());
let mut dst = AlignedBlobBuf::zeroed();
inner.read_blob(new_guid, &mut dst).unwrap();
assert_eq!(dst.as_slice()[200], 0x77);
}
#[test]
fn concurrent_reads_on_different_blobs_progress() {
use std::thread;
let inner: Arc<dyn Backend> = Arc::new(MemoryBackend::new());
for i in 0..16u8 {
let mut g = [0u8; 16];
g[0] = i;
inner.write_blob(g, &make_buf(i)).unwrap();
}
let bm = Arc::new(BufferManager::new(inner, 16));
let handles: Vec<_> = (0..8u8)
.map(|t| {
let bm = bm.clone();
thread::spawn(move || {
for _ in 0..50 {
let mut g = [0u8; 16];
g[0] = t * 2; let mut dst = AlignedBlobBuf::zeroed();
bm.read_blob(g, &mut dst).unwrap();
assert_eq!(dst.as_slice()[100], t * 2);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(bm.cached_count(), 8);
}
}