#[cfg(test)]
pub mod tests;
#[cfg(test)]
use crate::test_util::journal::TestTransaction as Transaction;
#[cfg(test)]
use crate::test_util::journal::TestWriteJournal as WriteJournal;
use crate::util::div_mod_pow2;
use crate::util::div_pow2;
use crate::util::floor_pow2;
use crate::util::mod_pow2;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use off64::int::create_u64_be;
use off64::int::Off64ReadInt;
use off64::int::Off64WriteMutInt;
use off64::usz;
use std::sync::Arc;
use tracing::info;
#[cfg(not(test))]
use write_journal::Transaction;
#[cfg(not(test))]
use write_journal::WriteJournal;
pub(crate) const MIN_PAGE_SIZE_POW2: u8 = 8;
pub(crate) const MAX_PAGE_SIZE_POW2: u8 = 32;
const PAGE_HEADER_CAP_POW2: u8 = 4;
pub(crate) const PAGE_HEADER_CAP: u64 = 1 << PAGE_HEADER_CAP_POW2;
const FREE_PAGE_OFFSETOF_PREV: u64 = 0;
const FREE_PAGE_OFFSETOF_NEXT: u64 = FREE_PAGE_OFFSETOF_PREV + 5;
const OBJECT_OFFSETOF_PREV: u64 = 0;
const OBJECT_OFFSETOF_NEXT: u64 = OBJECT_OFFSETOF_PREV + 5;
const OBJECT_OFFSETOF_DELETED_SEC: u64 = OBJECT_OFFSETOF_NEXT + 5;
const OBJECT_OFFSETOF_META_SIZE_AND_STATE: u64 = OBJECT_OFFSETOF_DELETED_SEC + 5;
pub(crate) trait PageHeader {
fn serialize(&self, out: &mut [u8]);
fn deserialize(raw: &[u8]) -> Self;
}
pub(crate) struct FreePagePageHeader {
pub prev: u64,
pub next: u64,
}
impl PageHeader for FreePagePageHeader {
fn serialize(&self, out: &mut [u8]) {
out.write_u40_be_at(FREE_PAGE_OFFSETOF_PREV, self.prev >> MIN_PAGE_SIZE_POW2);
out.write_u40_be_at(FREE_PAGE_OFFSETOF_NEXT, self.next >> MIN_PAGE_SIZE_POW2);
}
fn deserialize(raw: &[u8]) -> Self {
Self {
prev: raw.read_u40_be_at(FREE_PAGE_OFFSETOF_PREV) << MIN_PAGE_SIZE_POW2,
next: raw.read_u40_be_at(FREE_PAGE_OFFSETOF_NEXT) << MIN_PAGE_SIZE_POW2,
}
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
#[repr(u8)]
pub(crate) enum ObjectState {
Incomplete = 1,
Committed,
Deleted,
}
pub(crate) struct ObjectPageHeader {
pub prev: u64,
pub next: u64,
pub deleted_sec: Option<u64>,
pub state: ObjectState,
pub metadata_size_pow2: u8,
}
impl PageHeader for ObjectPageHeader {
fn serialize(&self, out: &mut [u8]) {
out.write_u40_be_at(OBJECT_OFFSETOF_PREV, self.prev >> MIN_PAGE_SIZE_POW2);
out.write_u40_be_at(OBJECT_OFFSETOF_NEXT, self.next >> MIN_PAGE_SIZE_POW2);
out.write_u40_be_at(OBJECT_OFFSETOF_DELETED_SEC, self.deleted_sec.unwrap_or(0));
out[usz!(OBJECT_OFFSETOF_META_SIZE_AND_STATE)] =
(self.metadata_size_pow2 << 3) | (self.state as u8);
}
fn deserialize(raw: &[u8]) -> Self {
let b = raw[usz!(OBJECT_OFFSETOF_META_SIZE_AND_STATE)];
Self {
prev: raw.read_u40_be_at(OBJECT_OFFSETOF_PREV) << MIN_PAGE_SIZE_POW2,
next: raw.read_u40_be_at(OBJECT_OFFSETOF_NEXT) << MIN_PAGE_SIZE_POW2,
deleted_sec: Some(raw.read_u40_be_at(OBJECT_OFFSETOF_DELETED_SEC)).filter(|ts| *ts != 0),
state: ObjectState::from_u8(b & 0b111).unwrap(),
metadata_size_pow2: b >> 3,
}
}
}
pub(crate) struct Pages {
journal: Arc<WriteJournal>,
heap_dev_offset: u64,
block_size_pow2: u8,
pages_per_lpage_pow2: u8,
pub block_size: u64,
pub lpage_size_pow2: u8,
pub spage_size_pow2: u8,
}
impl Pages {
pub fn new(
journal: Arc<WriteJournal>,
heap_dev_offset: u64,
spage_size_pow2: u8,
lpage_size_pow2: u8,
) -> Pages {
assert!(spage_size_pow2 >= MIN_PAGE_SIZE_POW2);
assert!(lpage_size_pow2 >= spage_size_pow2 && lpage_size_pow2 <= MAX_PAGE_SIZE_POW2);
assert_eq!(mod_pow2(heap_dev_offset, lpage_size_pow2), 0);
let pages_per_lpage_pow2 = 1 + lpage_size_pow2 - spage_size_pow2;
let lpages_per_block_pow2 = lpage_size_pow2 + 3 - pages_per_lpage_pow2;
let block_size_pow2 = lpages_per_block_pow2 + lpage_size_pow2;
let block_size = 1 << block_size_pow2;
info!(block_size, "page config");
Pages {
block_size_pow2,
block_size,
heap_dev_offset,
journal,
lpage_size_pow2,
pages_per_lpage_pow2,
spage_size_pow2,
}
}
#[allow(unused)]
pub fn spage_size(&self) -> u64 {
1 << self.spage_size_pow2
}
#[allow(unused)]
pub fn lpage_size(&self) -> u64 {
1 << self.lpage_size_pow2
}
fn assert_valid_page_dev_offset(&self, page_dev_offset: u64) {
assert!(
page_dev_offset >= self.heap_dev_offset,
"page dev offset {page_dev_offset} is not in the heap"
);
assert!(
mod_pow2(page_dev_offset - self.heap_dev_offset, self.block_size_pow2) >= self.lpage_size(),
"page dev offset {page_dev_offset} is in a metadata lpage"
);
assert_eq!(
mod_pow2(page_dev_offset, self.spage_size_pow2),
0,
"page dev offset {page_dev_offset} is not a multiple of spage"
);
}
fn get_page_free_bit_offset(&self, page_dev_offset: u64, page_size_pow2: u8) -> (u64, u64) {
self.assert_valid_page_dev_offset(page_dev_offset);
let block_dev_offset = self.heap_dev_offset
+ floor_pow2(page_dev_offset - self.heap_dev_offset, self.block_size_pow2);
let (lpage_n, offset_within_lpage) =
div_mod_pow2(page_dev_offset - block_dev_offset, self.lpage_size_pow2);
let lpage_dev_offset = page_dev_offset - offset_within_lpage;
let n = div_pow2(page_dev_offset - lpage_dev_offset, page_size_pow2);
let i = (lpage_n * (1 << self.pages_per_lpage_pow2))
| (1 << (self.lpage_size_pow2 - page_size_pow2))
| n;
let (elem, bit) = div_mod_pow2(i - 1, 6);
let elem_dev_offset = block_dev_offset + (elem * 8);
(elem_dev_offset, bit)
}
pub async fn is_page_free(&self, page_dev_offset: u64, page_size_pow2: u8) -> bool {
let (elem_dev_offset, bit) = self.get_page_free_bit_offset(page_dev_offset, page_size_pow2);
let bitmap = self
.journal
.read_with_overlay(elem_dev_offset, 8)
.await
.read_u64_be_at(0);
(bitmap & (1 << bit)) != 0
}
pub async fn mark_page_as_free(
&self,
txn: &mut Transaction,
page_dev_offset: u64,
page_size_pow2: u8,
) {
let (elem_dev_offset, bit) = self.get_page_free_bit_offset(page_dev_offset, page_size_pow2);
let bitmap = self
.journal
.read_with_overlay(elem_dev_offset, 8)
.await
.read_u64_be_at(0);
txn.write_with_overlay(elem_dev_offset, create_u64_be(bitmap | (1 << bit)));
}
pub async fn mark_page_as_not_free(
&self,
txn: &mut Transaction,
page_dev_offset: u64,
page_size_pow2: u8,
) {
let (elem_dev_offset, bit) = self.get_page_free_bit_offset(page_dev_offset, page_size_pow2);
let bitmap = self
.journal
.read_with_overlay(elem_dev_offset, 8)
.await
.read_u64_be_at(0);
txn.write_with_overlay(elem_dev_offset, create_u64_be(bitmap & !(1 << bit)));
}
pub async fn read_page_header<H: PageHeader>(&self, page_dev_offset: u64) -> H {
self.assert_valid_page_dev_offset(page_dev_offset);
let raw = self
.journal
.read_with_overlay(page_dev_offset, PAGE_HEADER_CAP)
.await;
H::deserialize(&raw)
}
pub fn write_page_header<H: PageHeader>(
&self,
txn: &mut Transaction,
page_dev_offset: u64,
h: H,
) {
self.assert_valid_page_dev_offset(page_dev_offset);
let mut out = vec![0u8; usz!(PAGE_HEADER_CAP)];
h.serialize(&mut out);
txn.write_with_overlay(page_dev_offset, out);
}
pub async fn update_page_header<H: PageHeader>(
&self,
txn: &mut Transaction,
page_dev_offset: u64,
f: impl FnOnce(&mut H) -> (),
) {
let mut hdr = self.read_page_header(page_dev_offset).await;
f(&mut hdr);
self.write_page_header(txn, page_dev_offset, hdr);
}
}