use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use bitflags::bitflags;
use super::{
cache::CacheToken,
stats::AtomicWritebufStats,
version::Version,
write_buffer::{RecordHeader, ReleaseState},
CacheEntry, Error, LRUCache, PageFiles, PageTable, Result, WriteBuffer, NAN_ID,
};
use crate::{
env::Env,
page::{PageBuf, PageInfo, PageRef},
};
bitflags! {
pub struct CacheOption: u8 {
const DEFAULT = 0b00000000;
const REFILL_COLD_WHEN_NOT_FULL = 0b00000001;
const LOW_PRI = 0b00000010;
const BOTTOM_PRI = 0b00000100;
}
}
impl Default for CacheOption {
fn default() -> Self {
CacheOption::DEFAULT
}
}
impl CacheOption {
pub(crate) fn priority(&self) -> CachePriority {
if self.contains(CacheOption::LOW_PRI) {
CachePriority::Low
} else if self.contains(CacheOption::BOTTOM_PRI) {
CachePriority::Bottom
} else {
CachePriority::High
}
}
pub(crate) fn set_priority(mut self, pri: CachePriority) -> Self {
match pri {
CachePriority::High => {
self.set(CacheOption::BOTTOM_PRI, false);
self.set(CacheOption::LOW_PRI, false);
}
CachePriority::Low => {
self.set(CacheOption::BOTTOM_PRI, false);
self.set(CacheOption::LOW_PRI, true);
}
CachePriority::Bottom => {
self.set(CacheOption::BOTTOM_PRI, true);
self.set(CacheOption::LOW_PRI, false);
}
};
self
}
pub(crate) fn refill_cold_when_not_full(&self) -> bool {
self.contains(CacheOption::REFILL_COLD_WHEN_NOT_FULL)
}
pub(crate) fn set_refill_cold_when_not_full(mut self, v: bool) -> Self {
self.set(CacheOption::REFILL_COLD_WHEN_NOT_FULL, v);
self
}
}
pub(crate) enum CachePriority {
High,
Low,
Bottom,
}
type CacheEntryGuard = CacheEntry<Vec<u8>, LRUCache<Vec<u8>>>;
pub(crate) struct Guard<E: Env>
where
Self: Send,
{
version: Arc<Version>,
page_table: PageTable,
page_files: Arc<PageFiles<E>>,
cache_guards: Mutex<Vec<CacheEntryGuard>>,
writebuf_stats: Arc<AtomicWritebufStats>,
}
impl<E: Env> Guard<E> {
pub(crate) fn new(
version: Arc<Version>,
page_table: PageTable,
page_files: Arc<PageFiles<E>>,
writebuf_stats: Arc<AtomicWritebufStats>,
) -> Self {
Guard {
version,
page_table,
page_files,
cache_guards: Mutex::default(),
writebuf_stats,
}
}
pub(crate) async fn begin(&self) -> PageTxn<E> {
let buffer_id = self.version.buffer_set.acquire_active_buffer_id().await;
PageTxn {
guard: self,
buffer_id,
hold_write_guard: false,
records: HashMap::default(),
page_ids: Vec::default(),
}
}
#[inline]
pub(crate) fn page_addr(&self, id: u64) -> u64 {
self.page_table.get(id)
}
pub(crate) fn read_page_info(&self, addr: u64) -> Result<PageInfo> {
let logical_id = (addr >> 32) as u32;
if let Some(buf) = self.version.get(logical_id) {
let page = unsafe { buf.page(addr) };
return Ok(page.info());
}
let Some(file_info) = self.version.page_groups().get(&logical_id) else {
panic!("File {logical_id} (addr {addr}) is not exists");
};
let Some(page_info) = file_info.get_page_info(addr) else {
panic!("The addr {addr} is not belongs to the target file");
};
Ok(page_info)
}
pub(crate) async fn read_page(
&self,
addr: u64,
hint: CacheOption,
) -> Result<(PageRef, Option<CacheToken>)> {
let logical_id = (addr >> 32) as u32;
if let Some(buf) = self.version.get(logical_id) {
self.writebuf_stats.read_in_buf.inc();
return Ok((unsafe { buf.page(addr) }, None));
}
self.writebuf_stats.read_in_file.inc();
let Some(page_group) = self.version.page_groups().get(&logical_id) else {
panic!("File {logical_id} (addr {addr}) is not exists");
};
let physical_id = page_group.meta().file_id;
let Some(file_info) = self.version.file_infos().get(&physical_id) else {
panic!("Target file {physical_id} is missing");
};
let Some(handle) = page_group.get_page_handle(addr) else {
panic!("The addr {addr} is not belongs to the target file {physical_id:?}");
};
let (entry, hit) = self
.page_files
.read_page(physical_id, file_info.meta(), addr, handle, hint)
.await?;
let mut owned_pages = self.cache_guards.lock().expect("Poisoned");
owned_pages.push(entry);
let last_guard = owned_pages.last().unwrap();
let page = last_guard.value();
if !hit {
self.writebuf_stats.read_file_bytes.add(page.len() as u64);
}
let cache_token = last_guard.cache_token();
let page = PageRef::new(unsafe {
std::slice::from_raw_parts(page.as_ptr(), page.len())
});
if !hit && !page.tier().is_leaf() {
self.writebuf_stats.miss_inner.inc();
}
Ok((page, Some(cache_token)))
}
}
pub(crate) struct PageTxn<'a, E: Env>
where
Self: Send,
{
guard: &'a Guard<E>,
buffer_id: u32,
hold_write_guard: bool,
records: HashMap<u64 , &'a mut RecordHeader>,
page_ids: Vec<u64>,
}
impl<'a, E: Env> PageTxn<'a, E> {
pub(crate) async fn alloc_page(&mut self, size: usize) -> Result<(u64, PageBuf<'a>)> {
let page_size = size as u32;
let (addr, header, buf) = self.alloc_page_impl(page_size).await?;
self.records.insert(addr, header);
Ok((addr, buf))
}
pub(crate) fn insert_page(&mut self, addr: u64) -> u64 {
let header = self.records.get_mut(&addr).expect("no such pages");
if header.is_tombstone() {
panic!("insert page with tombstone");
}
let page_id = unsafe { self.guard.page_table.alloc() }.expect("page id is exhausted");
self.guard.page_table.set(page_id, addr);
header.set_page_id(page_id);
self.page_ids.push(page_id);
page_id
}
pub(crate) fn update_page(
mut self,
id: u64,
old_addr: u64,
new_addr: u64,
) -> Result<(), Option<(Self, u64)>> {
if new_addr <= old_addr {
return Err(None);
}
if let Err(addr) = self.guard.page_table.cas(id, old_addr, new_addr) {
return Err(Some((self, addr)));
}
let record_header = self
.records
.get_mut(&new_addr)
.expect("No such page exists");
record_header.set_page_id(id);
self.commit();
Ok(())
}
pub(crate) async fn replace_page(
mut self,
id: u64,
old_addr: u64,
new_addr: u64,
dealloc_addrs: &[u64],
) -> Result<()> {
if new_addr <= old_addr {
return Err(Error::Again);
}
let dealloc_pages = self.dealloc_pages_impl(dealloc_addrs).await?;
self.update_page(id, old_addr, new_addr).map_err(|_| {
dealloc_pages.set_tombstone();
Error::Again
})?;
Ok(())
}
#[inline]
async fn alloc_page_impl(
&mut self,
page_size: u32,
) -> Result<(u64, &'a mut RecordHeader, PageBuf<'a>)> {
self.with_write_guard(|buf, is_first_op| unsafe {
buf.alloc_page(NAN_ID, page_size, is_first_op)
})
.await
}
#[inline]
async fn dealloc_pages_impl(&mut self, page_addrs: &[u64]) -> Result<&'a mut RecordHeader> {
self.with_write_guard(|buf, is_first_op| unsafe {
buf.dealloc_pages(page_addrs, is_first_op)
})
.await
}
#[inline]
async fn with_write_guard<F, O>(&mut self, f: F) -> Result<O>
where
F: FnOnce(&WriteBuffer, bool) -> Result<O>,
{
let is_first_op = !self.hold_write_guard;
let result = {
let buffer = self
.guard
.version
.get(self.buffer_id)
.expect("The target buffer must exists");
f(&buffer, is_first_op)
};
match result {
Ok(val) => {
self.hold_write_guard = true;
Ok(val)
}
Err(Error::TooLargeSize) => Err(Error::TooLargeSize),
Err(Error::Again) => {
self.guard
.version
.buffer_set
.switch_buffer(self.buffer_id)
.await;
Err(Error::Again)
}
_ => unreachable!(),
}
}
pub(crate) fn commit(mut self) {
self.page_ids.clear();
if self.hold_write_guard {
self.records.clear();
self.drop_writer_guard();
self.hold_write_guard = false;
}
}
#[inline]
fn drop_writer_guard(&mut self) {
assert!(self.hold_write_guard);
let buf = self
.guard
.version
.get(self.buffer_id)
.expect("The target write buffer must exists");
let release_state = unsafe { buf.release_writer() };
if matches!(release_state, ReleaseState::Flush) {
self.guard.version.buffer_set.notify_flush_job();
}
}
}
impl<'a, E: Env> Drop for PageTxn<'a, E> {
fn drop(&mut self) {
for id in &self.page_ids {
unsafe { self.guard.page_table.dealloc(*id) };
}
if self.hold_write_guard {
for header in self.records.values_mut() {
header.set_tombstone();
}
self.records.clear();
self.drop_writer_guard();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
page_store::{
page_table::PageTable,
version::{DeltaVersion, Version},
},
PageStoreOptions,
};
fn new_version(size: u32) -> Arc<Version> {
Arc::new(Version::new(size, 1, 8, DeltaVersion::default()))
}
#[photonio::test]
async fn page_txn_update_page() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_txn_update_page").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(512);
let page_table = PageTable::default();
let guard = Guard::new(version.clone(), page_table, files, Default::default());
let mut page_txn = guard.begin().await;
let (addr, _) = page_txn.alloc_page(123).await.unwrap();
let id = page_txn.insert_page(addr);
let (new, _) = page_txn.alloc_page(123).await.unwrap();
assert!(page_txn.update_page(id, addr, new).is_ok());
assert_current_buffer_is_flushable(version);
}
#[photonio::test]
async fn page_txn_failed_update_page() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_txn_failed_update_page").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(1 << 10);
let page_table = PageTable::default();
let guard = Guard::new(version.clone(), page_table, files, Default::default());
let mut page_txn = guard.begin().await;
let (addr, _) = page_txn.alloc_page(123).await.unwrap();
let id = page_txn.insert_page(addr);
let (new, _) = page_txn.alloc_page(123).await.unwrap();
assert!(page_txn.update_page(id, addr, new).is_ok());
let mut page_txn = guard.begin().await;
let (addr, _) = page_txn.alloc_page(123).await.unwrap();
assert!(page_txn.update_page(id, 1, addr).is_err());
assert_current_buffer_is_flushable(version);
}
#[photonio::test]
async fn page_txn_increment_page_addr_update() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_increment_page_addr_update").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(512);
let page_table = PageTable::default();
let guard = Guard::new(version, page_table, files, Default::default());
let page_txn = guard.begin().await;
assert!(matches!(page_txn.update_page(1, 3, 2), Err(None)));
}
#[photonio::test]
async fn page_txn_replace_page() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_txn_replace_page").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(1 << 10);
let page_table = PageTable::default();
let guard = Guard::new(version.clone(), page_table, files, Default::default());
let mut page_txn = guard.begin().await;
let (addr, _) = page_txn.alloc_page(123).await.unwrap();
let id = page_txn.insert_page(addr);
let (new, _) = page_txn.alloc_page(123).await.unwrap();
assert!(page_txn
.replace_page(id, addr, new, &[1, 2, 3])
.await
.is_ok());
assert_current_buffer_is_flushable(version);
}
impl<'a, E: Env> PageTxn<'a, E> {
async fn seal_write_buffer(&mut self) {
self.guard
.version
.buffer_set
.switch_buffer(self.buffer_id)
.await;
}
}
#[photonio::test]
async fn page_txn_seal_write_buffer() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_seal_write_buffer").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(512);
let page_table = PageTable::default();
let guard = Guard::new(version, page_table, files, Default::default());
let mut page_txn = guard.begin().await;
page_txn.seal_write_buffer().await;
}
#[photonio::test]
async fn page_txn_seal_write_buffer_twice() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_seal_write_buffer_twice").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(512);
let page_table = PageTable::default();
let guard = Guard::new(version, page_table, files, Default::default());
let mut page_txn_1 = guard.begin().await;
let mut page_txn_2 = guard.begin().await;
page_txn_1.seal_write_buffer().await;
page_txn_2.seal_write_buffer().await;
}
#[photonio::test]
async fn page_txn_insert_page() {
let env = crate::env::Photon;
let base = tempdir::TempDir::new("test_page_insert_page").unwrap();
let files = Arc::new(
PageFiles::new(env, base.path(), &test_option())
.await
.unwrap(),
);
let version = new_version(512);
let page_table = PageTable::default();
let guard = Guard::new(
version.clone(),
page_table.clone(),
files,
Default::default(),
);
let mut page_txn = guard.begin().await;
let (addr, _) = page_txn.alloc_page(123).await.unwrap();
let id = page_txn.insert_page(addr);
page_txn.commit();
assert_eq!(page_table.get(id), addr);
assert_current_buffer_is_flushable(version);
}
fn assert_current_buffer_is_flushable(version: Arc<Version>) {
let current = version.buffer_set.current();
let buf = current.last_writer_buffer();
buf.seal().unwrap();
assert!(buf.is_flushable());
}
fn test_option() -> PageStoreOptions {
PageStoreOptions {
cache_capacity: 2 << 10,
..Default::default()
}
}
}