use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU32, Ordering};
use data_bucket::page::INNER_PAGE_SIZE;
use data_bucket::page::PageId;
use data_bucket::{DataPage, GeneralPage};
use derive_more::{Display, Error};
#[cfg(feature = "perf_measurements")]
use performance_measurement_codegen::performance_measurement;
use rkyv::{
Archive, Deserialize, Portable, Serialize,
api::high::HighDeserializer,
rancor::Strategy,
seal::Seal,
ser::{Serializer, allocator::ArenaHandle, sharing::Share},
util::AlignedVec,
with::{AtomicLoad, Relaxed, Skip, Unsafe},
};
use crate::prelude::Link;
pub const DATA_HEADER_LENGTH: usize = 4;
pub const DATA_INNER_LENGTH: usize = INNER_PAGE_SIZE - DATA_HEADER_LENGTH;
#[derive(Archive, Clone, Copy, Debug, Deserialize, Serialize)]
#[repr(C, align(16))]
pub struct AlignedBytes<const N: usize>(pub [u8; N]);
impl<const N: usize> Deref for AlignedBytes<N> {
type Target = [u8; N];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<const N: usize> DerefMut for AlignedBytes<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Archive, Deserialize, Debug, Serialize)]
pub struct Data<Row, const DATA_LENGTH: usize = DATA_INNER_LENGTH> {
#[rkyv(with = Skip)]
pub id: PageId,
#[rkyv(with = AtomicLoad<Relaxed>)]
pub free_offset: AtomicU32,
#[rkyv(with = Unsafe)]
inner_data: UnsafeCell<AlignedBytes<DATA_LENGTH>>,
_phantom: PhantomData<Row>,
}
unsafe impl<Row, const DATA_LENGTH: usize> Sync for Data<Row, DATA_LENGTH> {}
impl<Row, const DATA_LENGTH: usize> Data<Row, DATA_LENGTH> {
pub fn new(id: PageId) -> Self {
Self {
id,
free_offset: AtomicU32::default(),
inner_data: UnsafeCell::new(AlignedBytes::<DATA_LENGTH>([0; DATA_LENGTH])),
_phantom: PhantomData,
}
}
pub fn from_data_page(page: GeneralPage<DataPage<DATA_LENGTH>>) -> Self {
Self {
id: page.header.page_id,
free_offset: AtomicU32::from(page.header.data_length),
inner_data: UnsafeCell::new(AlignedBytes::<DATA_LENGTH>(page.inner.data)),
_phantom: PhantomData,
}
}
pub fn set_page_id(&mut self, id: PageId) {
self.id = id;
}
#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataRow")
)]
pub fn save_row(&self, row: &Row) -> Result<Link, ExecutionError>
where
Row: Archive
+ for<'a> Serialize<
Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
>,
{
let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(row)
.map_err(|_| ExecutionError::SerializeError)?;
let length = bytes.len();
if length > DATA_LENGTH {
return Err(ExecutionError::PageTooSmall {
need: length,
allowed: DATA_LENGTH,
});
}
let length = length as u32;
let offset = self.free_offset.fetch_add(length, Ordering::AcqRel);
if offset > DATA_LENGTH as u32 - length {
return Err(ExecutionError::PageIsFull {
need: length,
left: DATA_LENGTH as i64 - offset as i64,
});
}
let inner_data = unsafe { &mut *self.inner_data.get() };
inner_data[offset as usize..][..length as usize].copy_from_slice(bytes.as_slice());
let link = Link {
page_id: self.id,
offset,
length,
};
Ok(link)
}
#[allow(clippy::missing_safety_doc)]
#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataRow")
)]
pub unsafe fn save_row_by_link(&self, row: &Row, link: Link) -> Result<Link, ExecutionError>
where
Row: Archive
+ for<'a> Serialize<
Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
>,
{
let bytes = rkyv::to_bytes(row).map_err(|_| ExecutionError::SerializeError)?;
let length = bytes.len() as u32;
if length != link.length {
return Err(ExecutionError::InvalidLink);
}
let inner_data = unsafe { &mut *self.inner_data.get() };
inner_data[link.offset as usize..][..link.length as usize]
.copy_from_slice(bytes.as_slice());
Ok(link)
}
#[allow(clippy::missing_safety_doc)]
pub unsafe fn try_save_row_by_link(
&self,
row: &Row,
mut link: Link,
) -> Result<(Link, Option<Link>), ExecutionError>
where
Row: Archive
+ for<'a> Serialize<
Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
>,
{
let bytes = rkyv::to_bytes(row).map_err(|_| ExecutionError::SerializeError)?;
let length = bytes.len() as u32;
if length > link.length {
return Err(ExecutionError::InvalidLink);
}
let link_diff = link.length - length;
let link_left = if link_diff > 0 {
link.length -= link_diff;
Some(Link {
page_id: link.page_id,
offset: link.offset + link.length,
length: link_diff,
})
} else {
None
};
let inner_data = unsafe { &mut *self.inner_data.get() };
inner_data[link.offset as usize..][..link.length as usize]
.copy_from_slice(bytes.as_slice());
Ok((link, link_left))
}
pub unsafe fn get_mut_row_ref(
&self,
link: Link,
) -> Result<Seal<'_, <Row as Archive>::Archived>, ExecutionError>
where
Row: Archive,
<Row as Archive>::Archived: Portable,
{
let inner_data = unsafe { &mut *self.inner_data.get() };
let bytes = &mut inner_data[link.offset as usize..(link.offset + link.length) as usize];
Ok(unsafe { rkyv::access_unchecked_mut::<<Row as Archive>::Archived>(&mut bytes[..]) })
}
#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataRow")
)]
pub fn get_row_ref(&self, link: Link) -> Result<&<Row as Archive>::Archived, ExecutionError>
where
Row: Archive,
{
let inner_data = unsafe { &*self.inner_data.get() };
let bytes = &inner_data[link.offset as usize..(link.offset + link.length) as usize];
Ok(unsafe { rkyv::access_unchecked::<<Row as Archive>::Archived>(bytes) })
}
pub fn get_row(&self, link: Link) -> Result<Row, ExecutionError>
where
Row: Archive,
<Row as Archive>::Archived: Deserialize<Row, HighDeserializer<rkyv::rancor::Error>>,
{
let row = self.get_row_ref(link)?;
rkyv::deserialize::<_, rkyv::rancor::Error>(row)
.map_err(|_| ExecutionError::DeserializeError)
}
pub fn get_raw_row(&self, link: Link) -> Result<Vec<u8>, ExecutionError> {
let inner_data = unsafe { &mut *self.inner_data.get() };
let bytes = &mut inner_data[link.offset as usize..(link.offset + link.length) as usize];
Ok(bytes.to_vec())
}
pub unsafe fn move_from_to(&self, from: Link, to: Link) -> Result<(), ExecutionError> {
if from.length != to.length {
return Err(ExecutionError::InvalidLink);
}
let inner_data = unsafe { &mut *self.inner_data.get() };
let src_offset = from.offset as usize;
let dst_offset = to.offset as usize;
let length = from.length as usize;
unsafe {
std::ptr::copy(
inner_data.as_ptr().add(src_offset),
inner_data.as_mut_ptr().add(dst_offset),
length,
);
}
Ok(())
}
pub fn save_raw_row(&self, data: &[u8]) -> Result<Link, ExecutionError> {
let length = data.len();
if length > DATA_LENGTH {
return Err(ExecutionError::PageTooSmall {
need: length,
allowed: DATA_LENGTH,
});
}
let length = length as u32;
let offset = self.free_offset.fetch_add(length, Ordering::AcqRel);
if offset > DATA_LENGTH as u32 - length {
return Err(ExecutionError::PageIsFull {
need: length,
left: DATA_LENGTH as i64 - offset as i64,
});
}
let inner_data = unsafe { &mut *self.inner_data.get() };
inner_data[offset as usize..][..length as usize].copy_from_slice(data);
Ok(Link {
page_id: self.id,
offset,
length,
})
}
pub fn get_bytes(&self) -> [u8; DATA_LENGTH] {
let data = unsafe { &*self.inner_data.get() };
data.0
}
pub fn free_space(&self) -> usize {
DATA_LENGTH.saturating_sub(self.free_offset.load(Ordering::Acquire) as usize)
}
pub fn reset(&self) {
self.free_offset.store(0, Ordering::Release);
}
}
#[derive(Copy, Clone, Debug, Display, Error, PartialEq)]
pub enum ExecutionError {
#[display("need {}, but {} left", need, left)]
PageIsFull { need: u32, left: i64 },
#[display("need {}, but {} allowed", need, allowed)]
PageTooSmall { need: usize, allowed: usize },
SerializeError,
DeserializeError,
InvalidLink,
}
#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use std::sync::{Arc, mpsc};
use std::thread;
use rkyv::{Archive, Deserialize, Serialize};
use crate::in_memory::DATA_INNER_LENGTH;
use crate::in_memory::data::{Data, ExecutionError, INNER_PAGE_SIZE};
use crate::prelude::Link;
#[derive(
Archive, Copy, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
#[rkyv(compare(PartialEq), derive(Debug))]
struct TestRow {
a: u64,
b: u64,
}
#[test]
fn data_page_length_valid() {
let data = Data::<()>::new(1.into());
let bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&data).unwrap();
assert_eq!(bytes.len(), INNER_PAGE_SIZE)
}
#[test]
fn data_page_save_row() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let initial_free = page.free_space();
assert!(initial_free > 0);
let link = page.save_row(&row).unwrap();
assert_eq!(link.page_id, page.id);
assert_eq!(link.length, 16);
assert_eq!(link.offset, 0);
assert_eq!(page.free_offset.load(Ordering::Relaxed), link.length);
assert_eq!(page.free_space(), initial_free - link.length as usize);
let inner_data = unsafe { &mut *page.inner_data.get() };
let bytes = &inner_data[link.offset as usize..link.length as usize];
let archived = unsafe { rkyv::access_unchecked::<ArchivedTestRow>(bytes) };
assert_eq!(archived, &row)
}
#[test]
fn data_page_overwrite_row() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let link = page.save_row(&row).unwrap();
let new_row = TestRow { a: 20, b: 20 };
let res = unsafe { page.save_row_by_link(&new_row, link) }.unwrap();
assert_eq!(res, link);
let inner_data = unsafe { &mut *page.inner_data.get() };
let bytes = &inner_data[link.offset as usize..link.length as usize];
let archived = unsafe { rkyv::access_unchecked::<ArchivedTestRow>(bytes) };
assert_eq!(archived, &new_row)
}
#[test]
fn data_page_full() {
let page = Data::<TestRow, 16>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let _ = page.save_row(&row).unwrap();
let new_row = TestRow { a: 20, b: 20 };
let res = page.save_row(&new_row);
assert!(matches!(res, Err(ExecutionError::PageIsFull { .. })));
}
#[test]
fn data_page_too_small() {
let page = Data::<TestRow, 1>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let res = page.save_row(&row);
assert!(matches!(res, Err(ExecutionError::PageTooSmall { .. })));
}
#[test]
fn data_page_full_multithread() {
let page = Data::<TestRow, 128>::new(1.into());
let shared = Arc::new(page);
let (tx, rx) = mpsc::channel();
let second_shared = shared.clone();
thread::spawn(move || {
let mut links = Vec::new();
for i in 1..10 {
let row = TestRow {
a: 10 + i,
b: 20 + i,
};
let link = second_shared.save_row(&row);
links.push(link)
}
tx.send(links).unwrap();
});
let mut links = Vec::new();
for i in 1..10 {
let row = TestRow {
a: 30 + i,
b: 40 + i,
};
let link = shared.save_row(&row);
links.push(link)
}
let _other_links = rx.recv().unwrap();
}
#[test]
fn data_page_save_many_rows() {
let page = Data::<TestRow>::new(1.into());
let initial_free = page.free_space();
let mut total_used = 0;
let mut rows = Vec::new();
let mut links = Vec::new();
for i in 1..10 {
let row = TestRow {
a: 10 + i,
b: 20 + i,
};
rows.push(row);
let link = page.save_row(&row);
total_used += link.as_ref().unwrap().length as usize;
links.push(link)
}
assert_eq!(page.free_space(), initial_free - total_used);
let inner_data = unsafe { &mut *page.inner_data.get() };
for (i, link) in links.into_iter().enumerate() {
let link = link.unwrap();
let bytes = &inner_data[link.offset as usize..(link.offset + link.length) as usize];
let archived = unsafe { rkyv::access_unchecked::<ArchivedTestRow>(bytes) };
let row = rows.get(i).unwrap();
assert_eq!(row, archived)
}
}
#[test]
fn data_page_get_row_ref() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let link = page.save_row(&row).unwrap();
let archived = page.get_row_ref(link).unwrap();
assert_eq!(archived, &row)
}
#[test]
fn data_page_get_row() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 10, b: 20 };
let link = page.save_row(&row).unwrap();
let deserialized = page.get_row(link).unwrap();
assert_eq!(deserialized, row)
}
#[test]
fn multithread() {
let page = Data::<TestRow>::new(1.into());
let shared = Arc::new(page);
let (tx, rx) = mpsc::channel();
let second_shared = shared.clone();
thread::spawn(move || {
let mut links = Vec::new();
for i in 1..10 {
let row = TestRow {
a: 10 + i,
b: 20 + i,
};
let link = second_shared.save_row(&row);
links.push(link)
}
tx.send(links).unwrap();
});
let mut links = Vec::new();
for i in 1..10 {
let row = TestRow {
a: 30 + i,
b: 40 + i,
};
let link = shared.save_row(&row);
links.push(link)
}
let other_links = rx.recv().unwrap();
let links = other_links
.into_iter()
.chain(links)
.map(|v| v.unwrap())
.collect::<Vec<_>>();
for link in links {
let _ = shared.get_row(link).unwrap();
}
}
#[test]
fn move_from_to() {
let page = Data::<TestRow>::new(1.into());
let row1 = TestRow { a: 100, b: 200 };
let link1 = page.save_row(&row1).unwrap();
assert_eq!(link1.offset, 0);
let row2 = TestRow { a: 300, b: 400 };
let link2 = page.save_row(&row2).unwrap();
assert_eq!(link2.offset, 16);
let new_link = Link {
page_id: link2.page_id,
offset: 0,
length: link2.length,
};
unsafe { page.move_from_to(link2, new_link).unwrap() };
let moved_row = page.get_row(new_link).unwrap();
assert_eq!(moved_row, row2);
}
#[test]
fn move_from_to_different_lengths() {
let page = Data::<TestRow>::new(1.into());
let from = Link {
page_id: 1.into(),
offset: 0,
length: 16,
};
let to = Link {
page_id: 1.into(),
offset: 32,
length: 8,
};
let result = unsafe { page.move_from_to(from, to) };
assert!(matches!(result, Err(ExecutionError::InvalidLink)));
}
#[test]
fn save_raw_row_appends_to_page() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 42, b: 99 };
let link = page.save_row(&row).unwrap();
let raw_data = page.get_raw_row(link).unwrap();
let new_link = page.save_raw_row(&raw_data).unwrap();
assert_eq!(new_link.page_id, page.id);
assert_eq!(new_link.length, link.length);
assert_eq!(new_link.offset, link.length);
let retrieved = page.get_row(new_link).unwrap();
assert_eq!(retrieved, row);
}
#[test]
fn save_raw_row_page_too_small() {
let page = Data::<TestRow, 16>::new(1.into());
let data = vec![0u8; 32];
let result = page.save_raw_row(&data);
assert!(matches!(result, Err(ExecutionError::PageTooSmall { .. })));
}
#[test]
fn save_raw_row_page_full() {
let page = Data::<TestRow, 16>::new(1.into());
let row = TestRow { a: 1, b: 2 };
let _ = page.save_row(&row).unwrap();
let data = vec![0u8; 16];
let result = page.save_raw_row(&data);
assert!(matches!(result, Err(ExecutionError::PageIsFull { .. })));
}
#[test]
fn save_raw_row_move_between_pages() {
let page1 = Data::<TestRow>::new(1.into());
let page2 = Data::<TestRow>::new(2.into());
let original = TestRow { a: 123, b: 456 };
let link1 = page1.save_row(&original).unwrap();
let raw = page1.get_raw_row(link1).unwrap();
let link2 = page2.save_raw_row(&raw).unwrap();
let retrieved = page2.get_row(link2).unwrap();
assert_eq!(retrieved, original);
}
#[test]
fn save_raw_row_multiple_entries() {
let page = Data::<TestRow>::new(1.into());
let row = TestRow { a: 77, b: 88 };
let link = page.save_row(&row).unwrap();
let raw_data = page.get_raw_row(link).unwrap();
let row_size = link.length as usize;
let initial_free = page.free_space();
let mut links = vec![link];
for i in 0..5 {
let new_link = page.save_raw_row(&raw_data).unwrap();
links.push(new_link);
let expected_free = initial_free - ((i + 1) as usize * row_size);
assert_eq!(page.free_space(), expected_free);
}
for link in links {
let retrieved = page.get_row(link).unwrap();
assert_eq!(retrieved, row);
}
}
#[test]
fn reset_clears_free_offset() {
let page = Data::<TestRow>::new(1.into());
let row1 = TestRow { a: 10, b: 20 };
let row2 = TestRow { a: 30, b: 40 };
let link1 = page.save_row(&row1).unwrap();
let link2 = page.save_row(&row2).unwrap();
assert!(page.free_offset.load(Ordering::Relaxed) > 0);
assert_eq!(link1.offset, 0);
assert_eq!(link2.offset, 16);
page.reset();
assert_eq!(page.free_offset.load(Ordering::Relaxed), 0);
assert_eq!(page.free_space(), DATA_INNER_LENGTH);
let row3 = TestRow { a: 99, b: 88 };
let link3 = page.save_row(&row3).unwrap();
assert_eq!(link3.offset, 0);
let retrieved = page.get_row(link3).unwrap();
assert_eq!(retrieved, row3);
}
}