use crate::buffer::eviction::{ClockProPolicy, EvictionPolicy, FrameId};
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
pub type FileId = u64;
pub type BlockId = u64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageId {
pub file_id: FileId,
pub offset: BlockId,
}
#[derive(Debug, Clone)]
pub struct BufferPoolOptions {
pub capacity_bytes: usize,
pub frame_size: usize,
pub num_shards: usize, }
impl Default for BufferPoolOptions {
fn default() -> Self {
Self {
capacity_bytes: 128 * 1024 * 1024, frame_size: 16 * 1024, num_shards: 16, }
}
}
pub struct FrameRef {
pool: Arc<BufferPool>,
page_id: PageId,
frame_id: FrameId,
data_ptr: *const u8,
data_len: usize,
}
impl FrameRef {
#[must_use]
pub fn data(&self) -> &[u8] {
panic!("Use get_data() which returns a guard");
}
pub fn get_data(&self) -> parking_lot::RwLockReadGuard<'_, Vec<u8>> {
self.pool.get_frame_data(self.frame_id)
}
#[must_use]
pub const unsafe fn data_unchecked(&self) -> &[u8] {
std::slice::from_raw_parts(self.data_ptr, self.data_len)
}
}
unsafe impl Send for FrameRef {}
unsafe impl Sync for FrameRef {}
impl Clone for FrameRef {
fn clone(&self) -> Self {
self.pool.pin_frame(self.frame_id);
Self {
pool: self.pool.clone(),
page_id: self.page_id,
frame_id: self.frame_id,
data_ptr: self.data_ptr,
data_len: self.data_len,
}
}
}
impl Drop for FrameRef {
fn drop(&mut self) {
self.pool.unpin(self.frame_id);
}
}
impl fmt::Debug for FrameRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"FrameRef(page={:?}, frame={})",
self.page_id, self.frame_id
)
}
}
struct FrameHeader {
pin_count: AtomicUsize,
is_dirty: AtomicBool,
page_id: RwLock<Option<PageId>>,
}
struct FrameSlot {
header: FrameHeader,
data: RwLock<Vec<u8>>,
}
struct BufferShard {
frames: Vec<FrameSlot>,
page_table: DashMap<PageId, FrameId>,
free_list: Mutex<Vec<FrameId>>,
eviction: Box<dyn EvictionPolicy>,
frame_offset: usize, frames_per_shard: usize,
}
impl BufferShard {
fn new(shard_idx: usize, frames_per_shard: usize, frame_size: usize) -> Self {
let frame_offset = shard_idx * frames_per_shard;
let mut frames = Vec::with_capacity(frames_per_shard);
let mut free_list = Vec::with_capacity(frames_per_shard);
for i in 0..frames_per_shard {
frames.push(FrameSlot {
header: FrameHeader {
pin_count: AtomicUsize::new(0),
is_dirty: AtomicBool::new(false),
page_id: RwLock::new(None),
},
data: RwLock::new(vec![0u8; frame_size]),
});
free_list.push(frame_offset + i); }
Self {
frames,
page_table: DashMap::new(),
free_list: Mutex::new(free_list),
eviction: Box::new(ClockProPolicy::new(frames_per_shard)),
frame_offset,
frames_per_shard,
}
}
#[inline]
const fn local_frame_id(&self, global_id: FrameId) -> usize {
global_id - self.frame_offset
}
fn allocate_frame(&self) -> Option<FrameId> {
{
let mut free = self.free_list.lock();
if let Some(id) = free.pop() {
let local_id = id - self.frame_offset;
self.frames[local_id]
.header
.pin_count
.fetch_add(1, Ordering::SeqCst);
return Some(id);
}
}
let max_attempts = self.frames_per_shard * 2;
let mut attempts = 0;
while attempts < max_attempts {
attempts += 1;
if let Some(local_victim_id) = self.eviction.evict() {
let slot = &self.frames[local_victim_id];
if slot.header.pin_count.load(Ordering::SeqCst) > 0 {
continue;
}
let mut pid_guard = slot.header.page_id.write();
if slot.header.pin_count.load(Ordering::SeqCst) > 0 {
continue;
}
slot.header.pin_count.fetch_add(1, Ordering::SeqCst);
if let Some(old_pid) = *pid_guard {
self.page_table.remove(&old_pid);
}
*pid_guard = None;
return Some(self.frame_offset + local_victim_id);
}
}
None }
fn free_frame(&self, global_frame_id: FrameId) {
let mut free = self.free_list.lock();
free.push(global_frame_id);
}
}
pub struct BufferPool {
shards: Vec<BufferShard>,
num_shards: usize,
#[allow(dead_code)] options: BufferPoolOptions,
hasher: std::collections::hash_map::RandomState,
}
impl BufferPool {
#[must_use]
pub fn new(options: BufferPoolOptions) -> Arc<Self> {
let num_frames = options.capacity_bytes / options.frame_size;
let frames_per_shard = num_frames.div_ceil(options.num_shards);
let mut shards = Vec::with_capacity(options.num_shards);
for shard_idx in 0..options.num_shards {
shards.push(BufferShard::new(
shard_idx,
frames_per_shard,
options.frame_size,
));
}
Arc::new(Self {
shards,
num_shards: options.num_shards,
options,
hasher: std::collections::hash_map::RandomState::new(),
})
}
#[inline]
fn hash_page_id(&self, page_id: PageId) -> usize {
use std::hash::BuildHasher;
let hash = self.hasher.hash_one(page_id);
(hash as usize) % self.num_shards
}
#[inline]
fn get_shard(&self, page_id: PageId) -> &BufferShard {
let shard_idx = self.hash_page_id(page_id);
&self.shards[shard_idx]
}
#[inline]
fn get_shard_by_frame(&self, frame_id: FrameId) -> &BufferShard {
let shard_idx = frame_id / self.shards[0].frames_per_shard;
&self.shards[shard_idx.min(self.num_shards - 1)]
}
pub fn get_page<F, E>(self: &Arc<Self>, page_id: PageId, loader: F) -> Result<FrameRef, E>
where
F: FnOnce(&mut Vec<u8>) -> Result<(), E>,
E: From<crate::buffer::BufferPoolError>,
{
let shard = self.get_shard(page_id);
if let Some(frame_ref) = self.lookup(shard, page_id) {
return Ok(frame_ref);
}
let Some(frame_id) = shard.allocate_frame() else {
return Err(self.make_capacity_error::<E>());
};
if let Some(frame_ref) = self.lookup(shard, page_id) {
self.unpin(frame_id); shard.free_frame(frame_id);
return Ok(frame_ref);
}
let local_frame_id = shard.local_frame_id(frame_id);
{
let slot = &shard.frames[local_frame_id];
let mut data_guard = slot.data.write();
match loader(&mut data_guard) {
Ok(()) => {}
Err(e) => {
drop(data_guard);
shard.free_frame(frame_id);
self.unpin(frame_id);
return Err(e);
}
}
let mut pid_guard = slot.header.page_id.write();
*pid_guard = Some(page_id);
slot.header.is_dirty.store(false, Ordering::SeqCst);
}
shard.page_table.insert(page_id, frame_id);
shard.eviction.access(local_frame_id);
let (data_ptr, data_len) = {
let data_guard = shard.frames[local_frame_id].data.read();
(data_guard.as_ptr(), data_guard.len())
};
Ok(FrameRef {
pool: self.clone(),
page_id,
frame_id,
data_ptr,
data_len,
})
}
fn lookup(self: &Arc<Self>, shard: &BufferShard, page_id: PageId) -> Option<FrameRef> {
if let Some(entry) = shard.page_table.get(&page_id) {
let frame_id = *entry.value();
let local_frame_id = shard.local_frame_id(frame_id);
self.pin_frame(frame_id);
let slot = &shard.frames[local_frame_id];
let current_pid = slot.header.page_id.read();
if *current_pid == Some(page_id) {
shard.eviction.access(local_frame_id);
let (data_ptr, data_len) = {
let data_guard = slot.data.read();
(data_guard.as_ptr(), data_guard.len())
};
return Some(FrameRef {
pool: self.clone(),
page_id,
frame_id,
data_ptr,
data_len,
});
}
self.unpin(frame_id);
}
None
}
fn pin_frame(&self, frame_id: FrameId) {
let shard = self.get_shard_by_frame(frame_id);
let local_id = shard.local_frame_id(frame_id);
shard.frames[local_id]
.header
.pin_count
.fetch_add(1, Ordering::SeqCst);
}
fn unpin(&self, frame_id: FrameId) {
let shard = self.get_shard_by_frame(frame_id);
let local_id = shard.local_frame_id(frame_id);
shard.frames[local_id]
.header
.pin_count
.fetch_sub(1, Ordering::SeqCst);
}
fn get_frame_data(&self, frame_id: FrameId) -> parking_lot::RwLockReadGuard<'_, Vec<u8>> {
let shard = self.get_shard_by_frame(frame_id);
let local_id = shard.local_frame_id(frame_id);
shard.frames[local_id].data.read()
}
fn make_capacity_error<E>(&self) -> E
where
E: From<crate::buffer::BufferPoolError>,
{
E::from(crate::buffer::BufferPoolError::PoolFull)
}
}