use std::{
mem::MaybeUninit,
ptr::NonNull,
sync::atomic::{AtomicU64, Ordering},
};
use bitflags::bitflags;
use log::info;
use super::Result;
use crate::{
page::{PageBuf, PageRef},
page_store::Error,
util::latch::Latch,
};
pub(crate) struct WriteBuffer
where
Self: Send,
{
group_id: u32,
buf: NonNull<u8>,
buf_size: usize,
buffer_state: AtomicU64,
flush_latch: Latch,
}
#[derive(Default, Debug, Clone)]
struct BufferState {
sealed: bool,
num_writer: u32,
allocated: u32,
}
#[repr(C)]
pub(crate) struct RecordHeader {
data: u64,
flags: u32,
page_size: u32,
}
pub(crate) struct RecordIterator<'a> {
write_buffer: &'a WriteBuffer,
offset: u32,
}
pub(crate) enum RecordRef<'a> {
Page(PageRef<'a>),
DeallocPages(DeallocPagesRecordRef<'a>),
}
pub(crate) struct DeallocPagesRecordRef<'a> {
dealloc_pages: &'a [u64],
access_index: usize,
}
#[derive(Debug)]
pub(crate) enum ReleaseState {
None,
Flush,
}
impl WriteBuffer {
pub(crate) fn with_capacity(group_id: u32, size: u32) -> Self {
use std::alloc::{alloc, Layout};
let buf_size = size as usize;
if buf_size <= core::mem::size_of::<usize>() {
panic!("The capacity of WriteBuffer is too small");
}
if !buf_size.is_power_of_two() {
panic!("The capacity of WriteBuffer is not pow of two");
}
let layout = Layout::from_size_align(buf_size, core::mem::size_of::<usize>())
.expect("Invalid layout");
let buf = unsafe {
NonNull::new(alloc(layout)).expect("The memory is exhausted")
};
let default_state = BufferState::default();
WriteBuffer {
group_id,
buf,
buf_size,
buffer_state: AtomicU64::new(default_state.apply()),
flush_latch: Latch::new(1),
}
}
#[inline]
pub(crate) fn group_id(&self) -> u32 {
self.group_id
}
#[inline]
pub(crate) fn is_flushable(&self) -> bool {
self.buffer_state().is_flushable()
}
#[inline]
pub(crate) fn is_sealed(&self) -> bool {
self.buffer_state().sealed
}
#[inline]
pub(crate) fn is_empty(&self) -> bool {
self.buffer_state().allocated == 0
}
#[inline]
pub(crate) async fn wait_flushed(&self) {
self.flush_latch.wait().await;
}
#[inline]
pub(crate) fn on_flushed(&self) {
self.flush_latch.count_down();
}
#[cfg(test)]
#[allow(clippy::type_complexity)]
pub(crate) fn batch(
&self,
new_page_list: &[(u64 /* page id */, u32 /* page size */)],
dealloc_pages: &[u64],
) -> Result<(
Vec<(u64, &mut RecordHeader, PageBuf)>,
Option<&mut RecordHeader>,
)> {
const ALIGN: u32 = core::mem::size_of::<usize>() as u32;
let dealloc_pages_size = (dealloc_pages.len() * core::mem::size_of::<u64>()) as u32;
let need = new_page_list
.iter()
.map(|(_, v)| record_size(*v))
.sum::<u32>()
+ record_size(dealloc_pages_size);
debug_assert_eq!(need % ALIGN, 0);
let mut offset = self.alloc_size(need, true)?;
let mut records = Vec::with_capacity(new_page_list.len());
for (page_id, page_size) in new_page_list {
let (page_id, page_size) = (*page_id, *page_size);
let (page_addr, header, page_buf) =
unsafe { self.new_page_at(offset, page_id, page_size) };
offset += header.record_size();
records.push((page_addr, header, page_buf));
}
let dealloc_pages_header = if !dealloc_pages.is_empty() {
let (header, body) =
unsafe { self.new_dealloc_pages_record_at(offset, dealloc_pages.len()) };
body.copy_from_slice(dealloc_pages);
Some(header)
} else {
None
};
Ok((records, dealloc_pages_header))
}
pub(crate) unsafe fn alloc_page<'a>(
&self,
page_id: u64,
page_size: u32,
acquire_writer: bool,
) -> Result<(u64, &'a mut RecordHeader, PageBuf<'a>)> {
let acquire_size = record_size(page_size);
let offset = self.alloc_size(acquire_size, acquire_writer)?;
Ok(unsafe { self.new_page_at(offset, page_id, page_size) })
}
pub(crate) unsafe fn dealloc_pages<'a>(
&self,
page_addrs: &[u64],
acquire_writer: bool,
) -> Result<&'a mut RecordHeader> {
let dealloc_pages_size = (page_addrs.len() * core::mem::size_of::<u64>()) as u32;
let acquire_size = record_size(dealloc_pages_size);
let offset = self.alloc_size(acquire_size, acquire_writer)?;
let (header, body) = unsafe { self.new_dealloc_pages_record_at(offset, page_addrs.len()) };
body.copy_from_slice(page_addrs);
Ok(header)
}
pub(crate) unsafe fn release_writer(&self) -> ReleaseState {
let mut current = self.buffer_state.load(Ordering::Acquire);
loop {
let mut buffer_state = BufferState::load(current);
buffer_state.dec_writer();
let new = buffer_state.apply();
match self.buffer_state.compare_exchange(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
if buffer_state.is_flushable() {
return ReleaseState::Flush;
} else {
return ReleaseState::None;
}
}
Err(v) => {
current = v;
}
}
}
}
pub(crate) fn seal(&self) -> Result<ReleaseState> {
let mut current = self.buffer_state.load(Ordering::Acquire);
loop {
let mut buffer_state = BufferState::load(current);
if buffer_state.sealed {
return Err(Error::Again);
}
buffer_state.set_sealed();
let new = buffer_state.apply();
match self.buffer_state.compare_exchange(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
info!(
"Seal write buffer {}, allocated {} bytes, usage {:.4}",
self.group_id,
buffer_state.allocated,
buffer_state.allocated as f64 / self.buf_size as f64
);
if buffer_state.has_writer() {
return Ok(ReleaseState::None);
} else {
return Ok(ReleaseState::Flush);
}
}
Err(v) => {
current = v;
}
}
}
}
pub(crate) fn iter(&self) -> RecordIterator {
RecordIterator {
write_buffer: self,
offset: 0,
}
}
pub(crate) unsafe fn page<'a>(&self, page_addr: u64) -> PageRef<'a> {
const ALIGN: u32 = core::mem::size_of::<usize>() as u32;
let file_id = (page_addr >> 32) as u32;
let offset = (page_addr & ((1 << 32) - 1)) as u32;
if file_id != self.group_id {
panic!("The specified addr is not belongs to the buffer");
}
if offset % ALIGN != 0 {
panic!("The specified addr is not satisfied the align requirement");
}
let offset = offset
.checked_sub(core::mem::size_of::<RecordHeader>() as u32)
.expect("The specified addr is not a valid page");
let header = self.record(offset);
if let Some(RecordRef::Page(page_ref)) = header.record_ref() {
return page_ref;
}
panic!("The specified addr is not a valid page");
}
#[inline]
unsafe fn record(&self, offset: u32) -> &RecordHeader {
self.record_uninit(offset).assume_init_ref()
}
#[inline]
fn record_uninit(&self, offset: u32) -> &MaybeUninit<RecordHeader> {
let offset = offset as usize;
if offset % core::mem::size_of::<usize>() != 0 {
panic!("The specified offset is not aligned with pointer size");
}
assert!(offset + core::mem::size_of::<RecordHeader>() < self.buf_size);
unsafe {
&*(self
.buf
.as_ptr()
.add(offset)
.cast::<MaybeUninit<RecordHeader>>())
}
}
#[inline]
unsafe fn record_uninit_mut<'a>(&self, offset: u32) -> &'a mut MaybeUninit<RecordHeader> {
let offset = offset as usize;
if offset % core::mem::size_of::<usize>() != 0 {
panic!("The specified offset is not aligned with pointer size");
}
assert!(offset + core::mem::size_of::<RecordHeader>() < self.buf_size);
unsafe {
&mut *(self
.buf
.as_ptr()
.add(offset)
.cast::<MaybeUninit<RecordHeader>>())
}
}
#[inline]
fn buffer_state(&self) -> BufferState {
BufferState::load(self.buffer_state.load(Ordering::Acquire))
}
fn alloc_size(&self, need: u32, acquire_writer: bool) -> Result<u32> {
let mut current = self.buffer_state.load(Ordering::Acquire);
loop {
let mut state = BufferState::load(current);
if state.sealed {
return Err(Error::Again);
}
if acquire_writer {
state.inc_writer();
}
let offset = state.alloc_size(need, self.buf_size as u32)?;
let new = state.apply();
match self.buffer_state.compare_exchange(
current,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Ok(offset);
}
Err(e) => {
current = e;
}
}
}
}
unsafe fn new_page_at<'a>(
&self,
offset: u32,
page_id: u64,
page_size: u32,
) -> (u64, &'a mut RecordHeader, PageBuf<'a>) {
let header = unsafe { self.record_uninit_mut(offset) };
header.write(RecordHeader {
data: page_id,
flags: RecordFlags::NORMAL_PAGE.bits(),
page_size,
});
let header = unsafe { header.assume_init_mut() };
let page_offset = offset + core::mem::size_of::<RecordHeader>() as u32;
let page_addr = ((self.group_id as u64) << 32) | (page_offset as u64);
let buf = unsafe {
let ptr = (header as *mut RecordHeader).offset(1).cast::<u8>();
std::slice::from_raw_parts_mut(ptr, page_size as usize)
};
let page_buf = PageBuf::new(buf);
(page_addr, header, page_buf)
}
unsafe fn new_dealloc_pages_record_at<'a>(
&self,
offset: u32,
num_dealloc_pages: usize,
) -> (&'a mut RecordHeader, &'a mut [u64]) {
let page_size = (num_dealloc_pages * core::mem::size_of::<u64>()) as u32;
let header = unsafe { self.record_uninit_mut(offset) };
header.write(RecordHeader {
data: 0,
flags: RecordFlags::DELETED_PAGES.bits(),
page_size,
});
let header = unsafe { header.assume_init_mut() };
let body = unsafe {
let ptr = (header as *mut RecordHeader).offset(1).cast::<u64>();
std::slice::from_raw_parts_mut(ptr, num_dealloc_pages)
};
(header, body)
}
}
impl std::fmt::Debug for WriteBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let buffer_state = BufferState::load(self.buffer_state.load(Ordering::Relaxed));
f.debug_struct("WriteBuffer")
.field("file_id", &self.group_id)
.field("buf_size", &self.buf_size)
.field("buffer_state", &buffer_state)
.finish()
}
}
impl Drop for WriteBuffer {
fn drop(&mut self) {
use std::alloc::{dealloc, Layout};
let state = BufferState::load(self.buffer_state.load(Ordering::SeqCst));
if state.has_writer() {
panic!("Try drop a write buffer that is still in use");
}
let layout = Layout::from_size_align(self.buf_size, core::mem::size_of::<usize>())
.expect("Invalid layout");
unsafe {
dealloc(self.buf.as_ptr(), layout);
}
}
}
unsafe impl Send for WriteBuffer {}
unsafe impl Sync for WriteBuffer {}
impl BufferState {
#[inline]
fn load(val: u64) -> Self {
let allocated = (val & ((1 << 32) - 1)) as u32;
let num_writer = ((val >> 32) & ((1 << 31) - 1)) as u32;
let sealed = val & (1 << 63) != 0;
BufferState {
sealed,
num_writer,
allocated,
}
}
#[inline]
fn has_writer(&self) -> bool {
self.num_writer > 0
}
#[inline]
fn is_flushable(&self) -> bool {
self.sealed && !self.has_writer()
}
#[inline]
fn set_sealed(&mut self) {
self.sealed = true;
}
#[inline]
fn inc_writer(&mut self) {
self.num_writer = self
.num_writer
.checked_add(1)
.expect("inc writer out of range");
}
#[inline]
fn dec_writer(&mut self) {
self.num_writer = self
.num_writer
.checked_sub(1)
.expect("dec writer out of range");
}
#[inline]
fn alloc_size(&mut self, required: u32, buf_size: u32) -> Result<u32> {
const ALIGN: u32 = core::mem::size_of::<usize>() as u32;
debug_assert_eq!(self.allocated % ALIGN, 0);
let required = next_multiple_of_u32(required, ALIGN);
if required > buf_size {
return Err(Error::TooLargeSize);
}
if self.allocated + required > buf_size {
return Err(Error::Again);
}
let offset = self.allocated;
self.allocated = offset + required;
Ok(offset)
}
#[inline]
fn apply(&self) -> u64 {
assert!(self.num_writer < (1 << 31));
(if self.sealed { 1 << 63 } else { 0 })
| ((self.num_writer as u64) << 32)
| (self.allocated as u64)
}
}
impl RecordHeader {
#[inline]
fn record_size(&self) -> u32 {
record_size(self.page_size)
}
#[inline]
pub(crate) fn is_tombstone(&self) -> bool {
self.flags == RecordFlags::TOMBSTONE.bits()
}
#[inline]
pub(crate) fn set_tombstone(&mut self) {
self.flags = RecordFlags::TOMBSTONE.bits();
}
#[inline]
pub(crate) fn page_id(&self) -> u64 {
debug_assert_eq!(self.flags, RecordFlags::NORMAL_PAGE.bits());
self.data
}
#[inline]
pub(crate) fn page_size(&self) -> usize {
self.page_size as usize
}
#[inline]
pub(crate) fn set_page_id(&mut self, page_id: u64) {
debug_assert_eq!(self.flags, RecordFlags::NORMAL_PAGE.bits());
self.data = page_id;
}
fn record_ref<'a>(&self) -> Option<RecordRef<'a>> {
match RecordFlags::from_bits_truncate(self.flags) {
RecordFlags::NORMAL_PAGE => {
let buf = unsafe {
let ptr = (self as *const RecordHeader).offset(1).cast::<u8>();
std::slice::from_raw_parts(ptr, self.page_size as usize)
};
Some(RecordRef::Page(PageRef::new(buf)))
}
RecordFlags::DELETED_PAGES => {
let size = self.page_size as usize / core::mem::size_of::<u64>();
assert_eq!(size * core::mem::size_of::<u64>(), self.page_size as usize);
let record = unsafe {
let addr = (self as *const RecordHeader).offset(1).cast::<u64>();
std::slice::from_raw_parts(addr, size)
};
let val = DeallocPagesRecordRef {
dealloc_pages: record,
access_index: 0,
};
Some(RecordRef::DeallocPages(val))
}
_ => None,
}
}
}
impl<'a> Iterator for RecordIterator<'a> {
type Item = (u64 , &'a RecordHeader, RecordRef<'a>);
fn next(&mut self) -> Option<Self::Item> {
let buffer_state =
BufferState::load(self.write_buffer.buffer_state.load(Ordering::Acquire));
assert!(buffer_state.is_flushable());
loop {
if self.offset >= buffer_state.allocated {
return None;
}
let record_offset = self.offset;
let record_header = unsafe { self.write_buffer.record(record_offset) };
self.offset += record_header.record_size();
let page_offset = record_offset + core::mem::size_of::<RecordHeader>() as u32;
if let Some(record_ref) = record_header.record_ref() {
let page_addr = ((self.write_buffer.group_id as u64) << 32) | (page_offset as u64);
return Some((page_addr, record_header, record_ref));
}
}
}
}
impl<'a> DeallocPagesRecordRef<'a> {
pub(crate) fn as_slice(&self) -> &[u64] {
self.dealloc_pages
}
}
impl<'a> Iterator for DeallocPagesRecordRef<'a> {
type Item = u64;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.access_index < self.dealloc_pages.len() {
let item = self.dealloc_pages[self.access_index];
self.access_index += 1;
Some(item)
} else {
None
}
}
}
#[inline]
fn next_multiple_of_u32(val: u32, multiple: u32) -> u32 {
((val + multiple - 1) / multiple) * multiple
}
#[inline]
fn record_size(x: u32) -> u32 {
const ALIGN: u32 = core::mem::size_of::<usize>() as u32;
core::mem::size_of::<RecordHeader>() as u32 + next_multiple_of_u32(x, ALIGN)
}
bitflags! {
struct RecordFlags: u32 {
const EMPTY = 0b0000_0000;
const NORMAL_PAGE = 0b0000_0001;
const DELETED_PAGES = 0b0000_0010;
const TOMBSTONE = 0b1000_0000;
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use crate::page_store::Error;
#[test]
fn buffer_state_load_and_apply() {
let mut state = BufferState::default();
assert!(!state.sealed);
assert_eq!(state.num_writer, 0);
assert_eq!(state.allocated, 0);
state.set_sealed();
state.inc_writer();
state.alloc_size(3, 1024).unwrap();
let raw = state.apply();
let state = BufferState::load(raw);
assert!(state.sealed);
assert_eq!(state.num_writer, 1);
assert_eq!(state.allocated, 8);
}
#[test]
fn record_header_record_size() {
struct Test {
page_size: u32,
record_size: u32,
}
let tests = vec![
Test {
page_size: 1,
record_size: 8,
},
Test {
page_size: 8,
record_size: 8,
},
Test {
page_size: 15,
record_size: 16,
},
Test {
page_size: 16,
record_size: 16,
},
];
for Test {
page_size,
record_size,
} in tests
{
let header = RecordHeader {
data: 0,
flags: RecordFlags::NORMAL_PAGE.bits(),
page_size,
};
assert_eq!(
header.record_size(),
core::mem::size_of::<RecordHeader>() as u32 + record_size
);
}
}
#[test]
fn write_buffer_construct_and_drop() {
let buf = WriteBuffer::with_capacity(1, 512);
drop(buf);
}
#[test]
#[should_panic]
fn write_buffer_capacity_is_power_of_two() {
WriteBuffer::with_capacity(1, 513);
}
#[test]
fn write_buffer_seal() {
let buf = WriteBuffer::with_capacity(1, 512);
assert!(matches!(buf.seal(), Ok(ReleaseState::Flush)));
}
#[test]
fn write_buffer_sealed_seal() {
let buf = WriteBuffer::with_capacity(1, 512);
buf.seal().unwrap();
assert!(matches!(buf.seal(), Err(Error::Again)));
}
#[test]
fn write_buffer_sealed_but_write_inflights_seal() {
let buf = WriteBuffer::with_capacity(1, 1024);
buf.batch(&[], &[1]).unwrap();
buf.seal().unwrap();
assert!(matches!(
unsafe { buf.release_writer() },
ReleaseState::Flush
));
}
#[test]
#[should_panic]
fn write_buffer_empty_writer_release_seal() {
let buf = WriteBuffer::with_capacity(1, 512);
buf.seal().unwrap();
unsafe { buf.release_writer() };
}
#[test]
fn write_buffer_iterate() {
let buf = WriteBuffer::with_capacity(1, 1024);
buf.batch(
&[(1, 20), (3, 40), (5, 60), (7, 80), (9, 100)],
&[11, 12, 13, 14, 15],
)
.unwrap();
unsafe { buf.release_writer() };
let (records_header, dealloc_pages_header) = buf.batch(&[(16, 17)], &[1, 2]).unwrap();
records_header
.into_iter()
.for_each(|(_, h, _)| h.set_tombstone());
if let Some(h) = dealloc_pages_header {
h.set_tombstone()
}
unsafe { buf.release_writer() };
buf.seal().unwrap();
let expect_dealloc_pages = vec![11, 12, 13, 14, 15];
let mut active_pages: HashSet<u64> = vec![1, 3, 5, 7, 9].into_iter().collect();
for (_, header, record_ref) in buf.iter() {
match record_ref {
RecordRef::Page(_page) => {
let page_id = header.page_id();
assert!(active_pages.remove(&page_id));
}
RecordRef::DeallocPages(dealloc_pages) => {
let dealloc_pages: Vec<u64> = dealloc_pages.collect();
assert_eq!(dealloc_pages, expect_dealloc_pages);
}
}
}
assert!(active_pages.is_empty());
}
#[test]
fn write_buffer_iterate_with_correct_offset() {
let mut page_addrs = Vec::new();
let buf = WriteBuffer::with_capacity(1, 1 << 16);
for i in 4..12 {
let (addr, _, _) = unsafe { buf.alloc_page(0, 1 << i, false).unwrap() };
page_addrs.push(addr);
}
buf.seal().unwrap();
let new_page_addrs = buf.iter().map(|(addr, _, _)| addr).collect::<Vec<_>>();
assert_eq!(page_addrs, new_page_addrs);
}
#[test]
fn write_buffer_pages_alloc() {
let buf = WriteBuffer::with_capacity(1, 1 << 20);
unsafe { buf.alloc_page(1, 123, true) }.unwrap();
unsafe { buf.dealloc_pages(&[5, 6, 7], false) }.unwrap();
{
let (_, header, _) = unsafe { buf.alloc_page(2, 222, false) }.unwrap();
header.set_tombstone();
}
{
let header = unsafe { buf.dealloc_pages(&[1, 3, 4], false) }.unwrap();
header.set_tombstone();
}
unsafe { buf.release_writer() };
}
#[test]
fn write_buffer_alloc_out_of_range_try_next() {
let buf = WriteBuffer::with_capacity(1, 8 << 10);
unsafe { buf.alloc_page(1, 4 << 10, true) }.unwrap();
unsafe { buf.alloc_page(2, 2 << 10, false) }.unwrap();
assert!(matches!(
unsafe { buf.alloc_page(3, 4 << 10, false) },
Err(Error::Again)
));
unsafe { buf.release_writer() };
}
#[test]
fn write_buffer_alloc_out_of_range_too_large_size() {
let buf = WriteBuffer::with_capacity(1, 2 << 10);
assert!(matches!(
unsafe { buf.alloc_page(1, 4 << 10, true) },
Err(Error::TooLargeSize)
));
}
}