use std::collections::HashMap;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use parking_lot::{Mutex, RwLock};
use super::block_storage::{AlignedBlock, BlockStorage};
use super::disk_manager::{MmapDiskManager, BLOCK_SIZE};
use super::error::{PersistentARTrieError, Result};
pub type FrameId = usize;
#[derive(Debug)]
pub struct FrameMetadata {
block_id: AtomicU32,
lease_state: AtomicU32,
dirty: AtomicBool,
reference_bit: AtomicBool,
}
impl FrameMetadata {
const NONE_BLOCK: u32 = u32::MAX;
const WRITE_LEASE: u32 = 1 << 31;
const READERS_MASK: u32 = Self::WRITE_LEASE - 1;
fn new() -> Self {
Self {
block_id: AtomicU32::new(Self::NONE_BLOCK),
lease_state: AtomicU32::new(0),
dirty: AtomicBool::new(false),
reference_bit: AtomicBool::new(false),
}
}
fn is_free(&self) -> bool {
self.block_id.load(Ordering::Acquire) == Self::NONE_BLOCK
}
fn is_pinned(&self) -> bool {
self.lease_state.load(Ordering::Acquire) != 0
}
fn has_write_lease(&self) -> bool {
self.lease_state.load(Ordering::Acquire) & Self::WRITE_LEASE != 0
}
fn pin_read(&self) -> Result<()> {
loop {
let observed = self.lease_state.load(Ordering::Acquire);
if observed & Self::WRITE_LEASE != 0 {
return Err(PersistentARTrieError::internal(
"cannot read-pin frame while an exclusive write lease is active",
));
}
if observed == Self::READERS_MASK {
return Err(PersistentARTrieError::internal(
"buffer frame read lease count overflow",
));
}
if self
.lease_state
.compare_exchange_weak(observed, observed + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.reference_bit.store(true, Ordering::Release);
return Ok(());
}
}
}
fn unpin_read(&self) {
loop {
let observed = self.lease_state.load(Ordering::Acquire);
debug_assert!(
observed > 0 && observed & Self::WRITE_LEASE == 0,
"read unpin called without an active read lease"
);
if observed == 0 || observed & Self::WRITE_LEASE != 0 {
return;
}
if self
.lease_state
.compare_exchange_weak(observed, observed - 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return;
}
}
}
fn pin_write(&self) -> Result<()> {
self.lease_state
.compare_exchange(0, Self::WRITE_LEASE, Ordering::AcqRel, Ordering::Acquire)
.map(|_| {
self.reference_bit.store(true, Ordering::Release);
})
.map_err(|_| {
PersistentARTrieError::internal(
"cannot mutably pin frame while another page lease is active",
)
})
}
fn unpin_write(&self) {
let result = self.lease_state.compare_exchange(
Self::WRITE_LEASE,
0,
Ordering::AcqRel,
Ordering::Acquire,
);
debug_assert!(
result.is_ok(),
"write unpin called without an active write lease"
);
}
fn mark_dirty(&self) {
self.dirty.store(true, Ordering::Release);
}
fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
fn clear_dirty(&self) {
self.dirty.store(false, Ordering::Release);
}
fn get_block_id(&self) -> Option<u32> {
match self.block_id.load(Ordering::Acquire) {
Self::NONE_BLOCK => None,
id => Some(id),
}
}
fn set_block_id(&self, block_id: Option<u32>) {
let val = block_id.unwrap_or(Self::NONE_BLOCK);
self.block_id.store(val, Ordering::Release);
}
}
pub struct PageReadGuard<'a, S: BlockStorage = MmapDiskManager> {
buffer_manager: &'a BufferManager<S>,
frame_id: FrameId,
}
impl<'a, S: BlockStorage> PageReadGuard<'a, S> {
pub fn data(&self) -> &[u8; BLOCK_SIZE] {
&self.buffer_manager.buffer_pool[self.frame_id].data
}
pub fn block_id(&self) -> u32 {
self.buffer_manager.frames[self.frame_id]
.get_block_id()
.expect("pinned frame must have block_id")
}
}
impl<'a, S: BlockStorage> Drop for PageReadGuard<'a, S> {
fn drop(&mut self) {
self.buffer_manager.frames[self.frame_id].unpin_read();
}
}
pub struct PageWriteGuard<'a, S: BlockStorage = MmapDiskManager> {
buffer_manager: &'a BufferManager<S>,
frame_id: FrameId,
}
impl<'a, S: BlockStorage> PageWriteGuard<'a, S> {
pub fn data_mut(&mut self) -> &mut [u8; BLOCK_SIZE] {
unsafe {
let ptr = self.buffer_manager.buffer_pool.as_ptr() as *mut AlignedBlock;
&mut (*ptr.add(self.frame_id)).data
}
}
pub fn data(&self) -> &[u8; BLOCK_SIZE] {
&self.buffer_manager.buffer_pool[self.frame_id].data
}
pub fn block_id(&self) -> u32 {
self.buffer_manager.frames[self.frame_id]
.get_block_id()
.expect("pinned frame must have block_id")
}
}
impl<'a, S: BlockStorage> Drop for PageWriteGuard<'a, S> {
fn drop(&mut self) {
self.buffer_manager.frames[self.frame_id].mark_dirty();
self.buffer_manager.frames[self.frame_id].unpin_write();
}
}
pub struct BufferManager<S: BlockStorage = MmapDiskManager> {
storage: S,
page_table: RwLock<HashMap<u32, FrameId>>,
lifecycle_lock: Mutex<()>,
frames: Vec<FrameMetadata>,
buffer_pool: Vec<AlignedBlock>,
clock_hand: AtomicUsize,
pool_size: usize,
active_pool_size: AtomicUsize,
fixed_buffers_registered: bool,
}
impl<S: BlockStorage> BufferManager<S> {
pub fn new(storage: S, pool_size: usize) -> Self {
let frames: Vec<FrameMetadata> = (0..pool_size).map(|_| FrameMetadata::new()).collect();
let buffer_pool: Vec<AlignedBlock> = (0..pool_size).map(|_| AlignedBlock::new()).collect();
let buffers: Vec<(*mut u8, usize)> = buffer_pool
.iter()
.map(|block| (block.data.as_ptr() as *mut u8, BLOCK_SIZE))
.collect();
let fixed_buffers_registered = unsafe { storage.register_buffer_pool(&buffers).is_ok() }
&& storage.supports_fixed_buffers();
Self {
storage,
page_table: RwLock::new(HashMap::with_capacity(pool_size)),
lifecycle_lock: Mutex::new(()),
frames,
buffer_pool,
clock_hand: AtomicUsize::new(0),
pool_size,
active_pool_size: AtomicUsize::new(pool_size),
fixed_buffers_registered,
}
}
#[cfg(any(test, feature = "bench-internals"))]
pub fn new_without_registration(storage: S, pool_size: usize) -> Self {
let frames: Vec<FrameMetadata> = (0..pool_size).map(|_| FrameMetadata::new()).collect();
let buffer_pool: Vec<AlignedBlock> = (0..pool_size).map(|_| AlignedBlock::new()).collect();
Self {
storage,
page_table: RwLock::new(HashMap::with_capacity(pool_size)),
lifecycle_lock: Mutex::new(()),
frames,
buffer_pool,
clock_hand: AtomicUsize::new(0),
pool_size,
active_pool_size: AtomicUsize::new(pool_size),
fixed_buffers_registered: false,
}
}
pub fn new_with_max_capacity(storage: S, initial_size: usize, max_pool_size: usize) -> Self {
let frames: Vec<FrameMetadata> = (0..max_pool_size).map(|_| FrameMetadata::new()).collect();
let buffer_pool: Vec<AlignedBlock> =
(0..max_pool_size).map(|_| AlignedBlock::new()).collect();
let buffers: Vec<(*mut u8, usize)> = buffer_pool
.iter()
.map(|block| (block.data.as_ptr() as *mut u8, BLOCK_SIZE))
.collect();
let fixed_buffers_registered = unsafe { storage.register_buffer_pool(&buffers).is_ok() }
&& storage.supports_fixed_buffers();
Self {
storage,
page_table: RwLock::new(HashMap::with_capacity(max_pool_size)),
lifecycle_lock: Mutex::new(()),
frames,
buffer_pool,
clock_hand: AtomicUsize::new(0),
pool_size: max_pool_size,
active_pool_size: AtomicUsize::new(initial_size.min(max_pool_size)),
fixed_buffers_registered,
}
}
pub fn fetch_page(&self, block_id: u32) -> Result<PageReadGuard<'_, S>> {
let _lifecycle = self.lifecycle_lock.lock();
if let Some(frame_id) = self.lookup_frame(block_id) {
self.frames[frame_id].pin_read()?;
self.frames[frame_id]
.reference_bit
.store(true, Ordering::Release);
return Ok(PageReadGuard {
buffer_manager: self,
frame_id,
});
}
let frame_id = self.load_page(block_id)?;
Ok(PageReadGuard {
buffer_manager: self,
frame_id,
})
}
pub fn fetch_page_mut(&self, block_id: u32) -> Result<PageWriteGuard<'_, S>> {
let _lifecycle = self.lifecycle_lock.lock();
if let Some(frame_id) = self.lookup_frame(block_id) {
self.frames[frame_id].pin_write()?;
self.frames[frame_id]
.reference_bit
.store(true, Ordering::Release);
return Ok(PageWriteGuard {
buffer_manager: self,
frame_id,
});
}
let frame_id = self.load_page_mut(block_id)?;
Ok(PageWriteGuard {
buffer_manager: self,
frame_id,
})
}
pub fn new_page(&self) -> Result<PageWriteGuard<'_, S>> {
let block_id = self.storage.allocate_block()?;
let _lifecycle = self.lifecycle_lock.lock();
let frame_id = match self.get_free_frame() {
Ok(frame_id) => frame_id,
Err(e) => {
let _ = self.storage.free_block(block_id);
return Err(e);
}
};
self.frames[frame_id].set_block_id(Some(block_id));
if let Err(e) = self.frames[frame_id].pin_write() {
self.frames[frame_id].set_block_id(None);
let _ = self.storage.free_block(block_id);
return Err(e);
}
self.frames[frame_id].mark_dirty();
unsafe {
let ptr = self.buffer_pool.as_ptr() as *mut AlignedBlock;
(*ptr.add(frame_id)).data.fill(0);
}
self.page_table.write().insert(block_id, frame_id);
Ok(PageWriteGuard {
buffer_manager: self,
frame_id,
})
}
pub fn delete_page(&self, block_id: u32) -> Result<()> {
let _lifecycle = self.lifecycle_lock.lock();
if let Some(frame_id) = self.lookup_frame(block_id) {
let frame = &self.frames[frame_id];
if frame.is_pinned() {
return Err(PersistentARTrieError::InternalError {
message: format!("Cannot delete pinned page (block {})", block_id),
});
}
frame.set_block_id(None);
frame.clear_dirty();
frame.reference_bit.store(false, Ordering::Release);
self.page_table.write().remove(&block_id);
}
self.storage.free_block(block_id)
}
pub fn flush_page(&self, block_id: u32) -> Result<()> {
let _lifecycle = self.lifecycle_lock.lock();
if let Some(frame_id) = self.lookup_frame(block_id) {
let frame = &self.frames[frame_id];
if frame.is_dirty() {
if frame.has_write_lease() {
return Err(PersistentARTrieError::internal(
"cannot flush page during an active mutable page lease",
));
}
self.storage.write_block_fixed(
block_id,
&self.buffer_pool[frame_id].data,
frame_id as u16,
)?;
frame.clear_dirty();
}
}
Ok(())
}
pub fn flush_all(&self) -> Result<()> {
let _lifecycle = self.lifecycle_lock.lock();
let active_size = self.active_pool_size.load(Ordering::Acquire);
let dirty_frames: Vec<(u32, usize)> = self.frames[..active_size]
.iter()
.enumerate()
.filter_map(|(frame_id, frame)| {
if frame.is_dirty() {
frame.get_block_id().map(|block_id| (block_id, frame_id))
} else {
None
}
})
.collect();
if !dirty_frames.is_empty() {
if dirty_frames
.iter()
.any(|&(_, frame_id)| self.frames[frame_id].has_write_lease())
{
return Err(PersistentARTrieError::internal(
"cannot flush dirty pages during an active mutable page lease",
));
}
if self.fixed_buffers_registered {
let requests: Vec<(u32, &[u8; BLOCK_SIZE], u16)> = dirty_frames
.iter()
.map(|&(block_id, frame_id)| {
(block_id, &self.buffer_pool[frame_id].data, frame_id as u16)
})
.collect();
self.storage.write_blocks_batch_fixed(&requests)?;
} else {
let requests: Vec<(u32, &[u8; BLOCK_SIZE])> = dirty_frames
.iter()
.map(|&(block_id, frame_id)| (block_id, &self.buffer_pool[frame_id].data))
.collect();
self.storage.write_blocks_batch(&requests)?;
}
for &(_, frame_id) in &dirty_frames {
self.frames[frame_id].clear_dirty();
}
}
self.storage.sync()
}
fn lookup_frame(&self, block_id: u32) -> Option<FrameId> {
self.page_table.read().get(&block_id).copied()
}
fn load_page(&self, block_id: u32) -> Result<FrameId> {
let frame_id = self.get_free_frame()?;
unsafe {
let ptr = self.buffer_pool.as_ptr() as *mut AlignedBlock;
self.storage.read_block_fixed(
block_id,
&mut (*ptr.add(frame_id)).data,
frame_id as u16,
)?;
}
self.frames[frame_id].set_block_id(Some(block_id));
self.frames[frame_id].pin_read()?;
self.frames[frame_id].clear_dirty();
self.frames[frame_id]
.reference_bit
.store(true, Ordering::Release);
self.page_table.write().insert(block_id, frame_id);
Ok(frame_id)
}
fn load_page_mut(&self, block_id: u32) -> Result<FrameId> {
let frame_id = self.get_free_frame()?;
unsafe {
let ptr = self.buffer_pool.as_ptr() as *mut AlignedBlock;
self.storage.read_block_fixed(
block_id,
&mut (*ptr.add(frame_id)).data,
frame_id as u16,
)?;
}
self.frames[frame_id].set_block_id(Some(block_id));
self.frames[frame_id].pin_write()?;
self.frames[frame_id].clear_dirty();
self.frames[frame_id]
.reference_bit
.store(true, Ordering::Release);
self.page_table.write().insert(block_id, frame_id);
Ok(frame_id)
}
pub(crate) fn pin_page_data(
&self,
block_id: u32,
) -> Result<(FrameId, NonNull<[u8; BLOCK_SIZE]>)> {
let _lifecycle = self.lifecycle_lock.lock();
let frame_id = if let Some(frame_id) = self.lookup_frame(block_id) {
self.frames[frame_id].pin_read()?;
self.frames[frame_id]
.reference_bit
.store(true, Ordering::Release);
frame_id
} else {
self.load_page(block_id)?
};
Ok((frame_id, NonNull::from(&self.buffer_pool[frame_id].data)))
}
pub(crate) fn unpin_read_frame(&self, frame_id: FrameId) {
self.frames[frame_id].unpin_read();
}
fn get_free_frame(&self) -> Result<FrameId> {
let active_size = self.active_pool_size.load(Ordering::Acquire);
for frame_id in 0..active_size {
if self.frames[frame_id].is_free() && !self.frames[frame_id].is_pinned() {
return Ok(frame_id);
}
}
let mut attempts = 0;
let max_attempts = active_size * 2;
while attempts < max_attempts {
let frame_id = self.clock_hand.fetch_add(1, Ordering::Relaxed) % active_size;
let frame = &self.frames[frame_id];
if frame.is_pinned() {
attempts += 1;
continue;
}
if frame.reference_bit.swap(false, Ordering::AcqRel) {
attempts += 1;
continue;
}
if let Some(old_block_id) = frame.get_block_id() {
if frame.is_dirty() {
self.storage.write_block_fixed(
old_block_id,
&self.buffer_pool[frame_id].data,
frame_id as u16,
)?;
frame.clear_dirty();
}
self.page_table.write().remove(&old_block_id);
frame.set_block_id(None);
}
return Ok(frame_id);
}
Err(PersistentARTrieError::BufferPoolExhausted {
pinned_pages: self.count_pinned(),
total_pages: active_size,
})
}
fn count_pinned(&self) -> usize {
self.frames.iter().filter(|f| f.is_pinned()).count()
}
pub fn pool_size(&self) -> usize {
self.active_pool_size.load(Ordering::Relaxed)
}
pub fn max_pool_size(&self) -> usize {
self.pool_size
}
pub fn grow_pool(&self, additional_frames: usize) -> Result<usize> {
let _lifecycle = self.lifecycle_lock.lock();
loop {
let current = self.active_pool_size.load(Ordering::Acquire);
let new_size = current.saturating_add(additional_frames);
if new_size > self.pool_size {
return Err(PersistentARTrieError::InternalError {
message: format!(
"Cannot grow pool beyond max capacity {} (current: {}, requested: +{})",
self.pool_size, current, additional_frames
),
});
}
match self.active_pool_size.compare_exchange(
current,
new_size,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Ok(new_size),
Err(_) => continue, }
}
}
pub fn shrink_pool(&self, frames_to_remove: usize) -> Result<usize> {
let _lifecycle = self.lifecycle_lock.lock();
loop {
let current = self.active_pool_size.load(Ordering::Acquire);
let new_size = current.saturating_sub(frames_to_remove).max(1);
if new_size == current {
return Ok(current); }
for frame_id in new_size..current {
let frame = &self.frames[frame_id];
if frame.is_pinned() {
return Err(PersistentARTrieError::InternalError {
message: format!("Cannot shrink pool: frame {} is pinned", frame_id),
});
}
if frame.is_dirty() {
if let Some(block_id) = frame.get_block_id() {
self.storage.write_block_fixed(
block_id,
&self.buffer_pool[frame_id].data,
frame_id as u16,
)?;
frame.clear_dirty();
}
}
if let Some(block_id) = frame.get_block_id() {
self.page_table.write().remove(&block_id);
frame.set_block_id(None);
}
}
match self.active_pool_size.compare_exchange(
current,
new_size,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let clock = self.clock_hand.load(Ordering::Relaxed);
if clock >= new_size {
self.clock_hand.store(0, Ordering::Relaxed);
}
return Ok(new_size);
}
Err(_) => continue, }
}
}
pub fn stats(&self) -> BufferPoolStats {
let active_size = self.active_pool_size.load(Ordering::Relaxed);
let mut free = 0;
let mut pinned = 0;
let mut dirty = 0;
for frame_id in 0..active_size {
let frame = &self.frames[frame_id];
if frame.is_free() {
free += 1;
} else {
if frame.is_pinned() {
pinned += 1;
}
if frame.is_dirty() {
dirty += 1;
}
}
}
BufferPoolStats {
total_frames: active_size,
max_frames: self.pool_size,
free_frames: free,
pinned_frames: pinned,
dirty_frames: dirty,
used_frames: active_size - free,
}
}
pub fn storage(&self) -> &S {
&self.storage
}
#[deprecated(since = "0.9.0", note = "Use storage() instead")]
pub fn disk_manager(&self) -> &S {
&self.storage
}
}
impl<S: BlockStorage> Drop for BufferManager<S> {
fn drop(&mut self) {
let _ = self.storage.unregister_buffer_pool();
}
}
#[derive(Debug, Clone, Copy)]
pub struct BufferPoolStats {
pub total_frames: usize,
pub max_frames: usize,
pub free_frames: usize,
pub pinned_frames: usize,
pub dirty_frames: usize,
pub used_frames: usize,
}
#[cfg(test)]
mod tests {
use super::super::disk_manager::{DiskManager, FileHeader};
use super::*;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tempfile::tempdir;
fn create_buffer_manager(pool_size: usize) -> BufferManager {
let dir = tempdir().expect("Failed to create temp dir");
let path = dir.path().join("test.part");
let disk_manager = DiskManager::create(&path).expect("Failed to create disk manager");
std::mem::forget(dir);
BufferManager::new(disk_manager, pool_size)
}
#[derive(Clone)]
struct TrackingFixedStorage {
state: Arc<TrackingFixedStorageState>,
}
struct TrackingFixedStorageState {
blocks: RwLock<Vec<Box<AlignedBlock>>>,
registered: AtomicBool,
register_calls: AtomicUsize,
unregister_calls: AtomicUsize,
registered_buffers: AtomicUsize,
regular_writes: AtomicUsize,
fixed_writes: AtomicUsize,
fixed_batch_writes: AtomicUsize,
fixed_reads: AtomicUsize,
}
impl TrackingFixedStorage {
fn new() -> Self {
Self {
state: Arc::new(TrackingFixedStorageState {
blocks: RwLock::new(vec![AlignedBlock::new_boxed()]),
registered: AtomicBool::new(false),
register_calls: AtomicUsize::new(0),
unregister_calls: AtomicUsize::new(0),
registered_buffers: AtomicUsize::new(0),
regular_writes: AtomicUsize::new(0),
fixed_writes: AtomicUsize::new(0),
fixed_batch_writes: AtomicUsize::new(0),
fixed_reads: AtomicUsize::new(0),
}),
}
}
}
impl TrackingFixedStorageState {
fn invalid_block(block_id: u32) -> PersistentARTrieError {
PersistentARTrieError::InvalidBlockId {
block_id,
reason: "tracking storage block does not exist".to_string(),
}
}
fn invalid_range(block_id: u32) -> PersistentARTrieError {
PersistentARTrieError::InvalidBlockId {
block_id,
reason: "tracking storage byte range is outside the block".to_string(),
}
}
fn read_block_into(&self, block_id: u32, buffer: &mut [u8; BLOCK_SIZE]) -> Result<()> {
let blocks = self.blocks.read();
let block = blocks
.get(block_id as usize)
.ok_or_else(|| Self::invalid_block(block_id))?;
buffer.copy_from_slice(&block.data);
Ok(())
}
fn write_block_from(&self, block_id: u32, buffer: &[u8; BLOCK_SIZE]) -> Result<()> {
let mut blocks = self.blocks.write();
let block = blocks
.get_mut(block_id as usize)
.ok_or_else(|| Self::invalid_block(block_id))?;
block.data.copy_from_slice(buffer);
Ok(())
}
fn read_bytes_into(&self, block_id: u32, offset: usize, buffer: &mut [u8]) -> Result<()> {
let end = offset
.checked_add(buffer.len())
.ok_or_else(|| Self::invalid_range(block_id))?;
if end > BLOCK_SIZE {
return Err(Self::invalid_range(block_id));
}
let blocks = self.blocks.read();
let block = blocks
.get(block_id as usize)
.ok_or_else(|| Self::invalid_block(block_id))?;
buffer.copy_from_slice(&block.data[offset..end]);
Ok(())
}
fn write_bytes_from(&self, block_id: u32, offset: usize, data: &[u8]) -> Result<()> {
let end = offset
.checked_add(data.len())
.ok_or_else(|| Self::invalid_range(block_id))?;
if end > BLOCK_SIZE {
return Err(Self::invalid_range(block_id));
}
let mut blocks = self.blocks.write();
let block = blocks
.get_mut(block_id as usize)
.ok_or_else(|| Self::invalid_block(block_id))?;
block.data[offset..end].copy_from_slice(data);
Ok(())
}
}
impl BlockStorage for TrackingFixedStorage {
fn read_block(&self, block_id: u32, buffer: &mut [u8; BLOCK_SIZE]) -> Result<()> {
self.state.read_block_into(block_id, buffer)
}
fn write_block(&self, block_id: u32, buffer: &[u8; BLOCK_SIZE]) -> Result<()> {
self.state.regular_writes.fetch_add(1, Ordering::AcqRel);
self.state.write_block_from(block_id, buffer)
}
fn read_bytes(&self, block_id: u32, offset: usize, buffer: &mut [u8]) -> Result<()> {
self.state.read_bytes_into(block_id, offset, buffer)
}
fn write_bytes(&self, block_id: u32, offset: usize, data: &[u8]) -> Result<()> {
self.state.write_bytes_from(block_id, offset, data)
}
fn allocate_block(&self) -> Result<u32> {
let mut blocks = self.state.blocks.write();
let block_id = blocks.len() as u32;
blocks.push(AlignedBlock::new_boxed());
Ok(block_id)
}
fn free_block(&self, block_id: u32) -> Result<()> {
if block_id == 0 {
return Err(TrackingFixedStorageState::invalid_block(block_id));
}
let mut blocks = self.state.blocks.write();
let block = blocks
.get_mut(block_id as usize)
.ok_or_else(|| TrackingFixedStorageState::invalid_block(block_id))?;
block.data.fill(0);
Ok(())
}
fn read_header(&self) -> Result<FileHeader> {
let mut bytes = [0u8; 64];
self.read_header_bytes(&mut bytes)?;
Ok(FileHeader::from_bytes(&bytes))
}
fn write_header(&self, header: &FileHeader) -> Result<()> {
self.write_header_bytes(&header.to_bytes())
}
fn read_header_bytes(&self, buffer: &mut [u8]) -> Result<()> {
self.state.read_bytes_into(0, 0, buffer)
}
fn write_header_bytes(&self, bytes: &[u8]) -> Result<()> {
self.state.write_bytes_from(0, 0, bytes)
}
fn root_ptr(&self) -> Result<u64> {
Ok(self.read_header()?.root_ptr.load(Ordering::SeqCst))
}
fn set_root_ptr(&self, ptr: u64) -> Result<()> {
let header = self.read_header()?;
header.root_ptr.store(ptr, Ordering::SeqCst);
self.write_header(&header)
}
fn entry_count(&self) -> Result<u64> {
Ok(self.read_header()?.entry_count.load(Ordering::SeqCst))
}
fn set_entry_count(&self, count: u64) -> Result<()> {
let header = self.read_header()?;
header.entry_count.store(count, Ordering::SeqCst);
self.write_header(&header)
}
fn file_size(&self) -> u64 {
self.state.blocks.read().len() as u64 * BLOCK_SIZE as u64
}
fn block_count(&self) -> Result<u32> {
Ok(self.state.blocks.read().len() as u32)
}
fn path(&self) -> &str {
"tracking-fixed-storage"
}
fn sync(&self) -> Result<()> {
Ok(())
}
unsafe fn register_buffer_pool(&self, buffers: &[(*mut u8, usize)]) -> Result<()> {
assert!(
buffers
.iter()
.all(|(ptr, len)| !ptr.is_null() && *len == BLOCK_SIZE),
"registered buffers must be non-null full blocks"
);
self.state.register_calls.fetch_add(1, Ordering::AcqRel);
self.state
.registered_buffers
.store(buffers.len(), Ordering::Release);
self.state.registered.store(true, Ordering::Release);
Ok(())
}
fn unregister_buffer_pool(&self) -> Result<()> {
self.state.unregister_calls.fetch_add(1, Ordering::AcqRel);
self.state.registered.store(false, Ordering::Release);
Ok(())
}
fn read_block_fixed(
&self,
block_id: u32,
buffer: &mut [u8; BLOCK_SIZE],
_buf_index: u16,
) -> Result<()> {
self.state.fixed_reads.fetch_add(1, Ordering::AcqRel);
self.state.read_block_into(block_id, buffer)
}
fn write_block_fixed(
&self,
block_id: u32,
buffer: &[u8; BLOCK_SIZE],
_buf_index: u16,
) -> Result<()> {
self.state.fixed_writes.fetch_add(1, Ordering::AcqRel);
self.state.write_block_from(block_id, buffer)
}
fn supports_fixed_buffers(&self) -> bool {
self.state.registered.load(Ordering::Acquire)
}
fn write_blocks_batch_fixed(
&self,
requests: &[(u32, &[u8; BLOCK_SIZE], u16)],
) -> Result<()> {
self.state.fixed_batch_writes.fetch_add(1, Ordering::AcqRel);
for &(block_id, buffer, buf_index) in requests {
self.write_block_fixed(block_id, buffer, buf_index)?;
}
Ok(())
}
}
#[test]
fn fixed_buffer_registration_covers_write_guard_mutation_and_flush() {
let storage = TrackingFixedStorage::new();
let state = Arc::clone(&storage.state);
let block_id;
{
let bm = BufferManager::new(storage, 2);
assert_eq!(state.register_calls.load(Ordering::Acquire), 1);
assert_eq!(state.registered_buffers.load(Ordering::Acquire), 2);
assert!(state.registered.load(Ordering::Acquire));
{
let mut guard = bm.new_page().expect("new page");
block_id = guard.block_id();
guard.data_mut()[7] = 0xA5;
guard.data_mut()[BLOCK_SIZE - 1] = 0x5A;
}
assert_eq!(bm.stats().dirty_frames, 1);
bm.flush_all().expect("flush fixed buffer page");
assert_eq!(bm.stats().dirty_frames, 0);
assert_eq!(state.regular_writes.load(Ordering::Acquire), 0);
assert_eq!(state.fixed_batch_writes.load(Ordering::Acquire), 1);
assert_eq!(state.fixed_writes.load(Ordering::Acquire), 1);
let blocks = state.blocks.read();
assert_eq!(blocks[block_id as usize].data[7], 0xA5);
assert_eq!(blocks[block_id as usize].data[BLOCK_SIZE - 1], 0x5A);
}
assert_eq!(state.unregister_calls.load(Ordering::Acquire), 1);
assert!(!state.registered.load(Ordering::Acquire));
}
#[test]
fn test_new_page() {
let bm = create_buffer_manager(10);
let mut guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
guard.data_mut()[0] = 0xDE;
guard.data_mut()[1] = 0xAD;
drop(guard);
let guard = bm.fetch_page(block_id).expect("fetch_page");
assert_eq!(guard.data()[0], 0xDE);
assert_eq!(guard.data()[1], 0xAD);
}
#[test]
fn test_fetch_page() {
let bm = create_buffer_manager(10);
let mut guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
guard.data_mut()[100] = 42;
drop(guard);
bm.flush_page(block_id).expect("flush");
let guard = bm.fetch_page(block_id).expect("fetch_page");
assert_eq!(guard.data()[100], 42);
}
#[test]
fn mutable_page_lease_excludes_other_leases() {
let bm = create_buffer_manager(10);
let mut guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
guard.data_mut()[0] = 7;
drop(guard);
let mut write_guard = bm.fetch_page_mut(block_id).expect("fetch_page_mut");
write_guard.data_mut()[0] = 9;
assert!(
bm.fetch_page(block_id).is_err(),
"read lease must not overlap an active mutable page lease"
);
assert!(
bm.fetch_page_mut(block_id).is_err(),
"second mutable lease must not alias the first mutable page lease"
);
drop(write_guard);
let read_guard = bm.fetch_page(block_id).expect("fetch after write lease");
assert_eq!(read_guard.data()[0], 9);
}
#[test]
fn read_page_lease_blocks_mutable_lease() {
let bm = create_buffer_manager(10);
let mut guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
guard.data_mut()[0] = 11;
drop(guard);
let read_guard = bm.fetch_page(block_id).expect("fetch_page");
assert_eq!(read_guard.data()[0], 11);
assert!(
bm.fetch_page_mut(block_id).is_err(),
"mutable page lease must not overlap an active read lease"
);
drop(read_guard);
let mut write_guard = bm
.fetch_page_mut(block_id)
.expect("fetch_page_mut after read");
write_guard.data_mut()[0] = 12;
}
#[test]
fn flush_rejects_dirty_page_with_active_mutable_lease() {
let bm = create_buffer_manager(10);
let mut guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
guard.data_mut()[0] = 1;
drop(guard);
let mut write_guard = bm.fetch_page_mut(block_id).expect("fetch_page_mut");
write_guard.data_mut()[0] = 2;
assert!(
bm.flush_page(block_id).is_err(),
"flush_page must not read a dirty frame while a mutable lease is active"
);
assert!(
bm.flush_all().is_err(),
"flush_all must not read dirty frames while a mutable lease is active"
);
drop(write_guard);
bm.flush_page(block_id)
.expect("flush succeeds after mutable lease release");
}
#[test]
fn test_multiple_pages() {
let bm = create_buffer_manager(10);
let mut block_ids = Vec::new();
for i in 0..5 {
let mut guard = bm.new_page().expect("new_page");
guard.data_mut()[0] = i as u8;
block_ids.push(guard.block_id());
}
for (i, &block_id) in block_ids.iter().enumerate() {
let guard = bm.fetch_page(block_id).expect("fetch_page");
assert_eq!(guard.data()[0], i as u8);
}
}
#[test]
fn test_eviction() {
let bm = create_buffer_manager(3);
let mut block_ids = Vec::new();
for i in 0..10 {
let mut guard = bm.new_page().expect("new_page");
guard.data_mut()[0] = i as u8;
block_ids.push(guard.block_id());
}
for (i, &block_id) in block_ids.iter().enumerate() {
let guard = bm.fetch_page(block_id).expect("fetch_page");
assert_eq!(guard.data()[0], i as u8, "Page {} corrupted", i);
}
}
#[test]
fn test_stats() {
let bm = create_buffer_manager(10);
let initial_stats = bm.stats();
assert_eq!(initial_stats.total_frames, 10);
assert_eq!(initial_stats.free_frames, 10);
assert_eq!(initial_stats.used_frames, 0);
let guard1 = bm.new_page().expect("new_page");
let _guard2 = bm.new_page().expect("new_page");
let stats = bm.stats();
assert_eq!(stats.used_frames, 2);
assert_eq!(stats.free_frames, 8);
assert!(stats.pinned_frames >= 2); assert!(stats.dirty_frames >= 2);
drop(guard1);
let stats = bm.stats();
assert!(stats.pinned_frames >= 1);
}
#[test]
fn test_flush_all() {
let bm = create_buffer_manager(10);
for i in 0..5 {
let mut guard = bm.new_page().expect("new_page");
guard.data_mut()[0] = i as u8;
}
bm.flush_all().expect("flush_all");
let stats = bm.stats();
assert_eq!(stats.dirty_frames, 0);
}
#[test]
fn test_delete_page() {
let bm = create_buffer_manager(10);
let guard = bm.new_page().expect("new_page");
let block_id = guard.block_id();
drop(guard);
bm.delete_page(block_id).expect("delete_page");
let stats = bm.stats();
assert_eq!(stats.used_frames, 0);
}
#[test]
fn test_pinned_page_not_evicted() {
let bm = create_buffer_manager(2);
let pinned_guard = bm.new_page().expect("new_page");
let pinned_block = pinned_guard.block_id();
let mut other_guard = bm.new_page().expect("new_page");
other_guard.data_mut()[0] = 99;
drop(other_guard);
let _new_guard = bm.new_page().expect("new_page - should evict unpinned");
assert_eq!(pinned_guard.block_id(), pinned_block);
}
}