use super::{
indexes::max_rows_in_page,
page::{Page, PageHeader},
};
use derive_more::Deref;
use spacetimedb_data_structures::object_pool::{Pool, PooledObject};
use spacetimedb_sats::bsatn::{self, DecodeError};
use spacetimedb_sats::de::{
DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess,
};
use spacetimedb_sats::layout::Size;
use spacetimedb_sats::memory_usage::MemoryUsage;
impl PooledObject for Box<Page> {
type ResidentBytesStorage = ();
fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
num_objects * size_of::<Page>()
}
fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
}
#[derive(Clone, Deref)]
pub struct PagePool {
pool: Pool<Box<Page>>,
}
impl MemoryUsage for PagePool {
fn heap_usage(&self) -> usize {
self.pool.heap_usage()
}
}
impl PagePool {
pub fn new_for_test() -> Self {
Self::new(Some(100 * size_of::<Page>()))
}
pub fn new(max_size: Option<usize>) -> Self {
const PAGE_SIZE: usize = size_of::<Page>();
const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE;
let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
let pool = Pool::new(queue_size);
Self { pool }
}
pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
self.take_with_max_row_count(max_rows_in_page(fixed_row_size))
}
fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
self.pool.take(
|page| page.reset_for(max_rows_in_page),
|| Page::new_with_max_row_count(max_rows_in_page),
)
}
pub fn take_deserialize_from(&self, buf: &[u8]) -> Result<Box<Page>, DecodeError> {
self.deserialize(bsatn::Deserializer::new(&mut &*buf))
}
}
impl<'de> DeserializeSeed<'de> for &PagePool {
type Output = Box<Page>;
fn deserialize<D: Deserializer<'de>>(self, de: D) -> Result<Self::Output, D::Error> {
de.deserialize_product(self)
}
}
impl<'de> ProductVisitor<'de> for &PagePool {
type Output = Box<Page>;
fn product_name(&self) -> Option<&str> {
Some("Page")
}
fn product_len(&self) -> usize {
2
}
fn visit_seq_product<A: SeqProductAccess<'de>>(self, mut prod: A) -> Result<Self::Output, A::Error> {
let header = prod
.next_element::<PageHeader>()?
.ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
let row_data = prod
.next_element()?
.ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
let mut page = self.take_with_max_row_count(header.max_rows_in_page());
unsafe { page.set_raw(header, row_data) };
Ok(page)
}
fn visit_named_product<A: NamedProductAccess<'de>>(self, _: A) -> Result<Self::Output, A::Error> {
unreachable!()
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::ptr::addr_eq;
fn present_rows_ptr(page: &Page) -> *const () {
page.page_header_for_test().present_rows_storage_ptr_for_test()
}
#[test]
fn page_pool_bitset_reuse() {
let pool = PagePool::new_for_test();
let page1 = pool.take_with_max_row_count(10);
let page1_pr_ptr = present_rows_ptr(&page1);
pool.put(page1);
let page2 = pool.take_with_max_row_count(64);
assert!(addr_eq(page1_pr_ptr, present_rows_ptr(&page2)));
pool.put(page2);
let page3 = pool.take_with_max_row_count(64 + 1);
assert!(!addr_eq(page1_pr_ptr, present_rows_ptr(&page3)));
}
}