spacetimedb_table/
page_pool.rs1use super::{
2 indexes::max_rows_in_page,
3 page::{Page, PageHeader},
4};
5use derive_more::Deref;
6use spacetimedb_data_structures::object_pool::{Pool, PooledObject};
7use spacetimedb_sats::bsatn::{self, DecodeError};
8use spacetimedb_sats::de::{
9 DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess,
10};
11use spacetimedb_sats::layout::Size;
12use spacetimedb_sats::memory_usage::MemoryUsage;
13
14impl PooledObject for Box<Page> {
15 type ResidentBytesStorage = ();
16 fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
17 num_objects * size_of::<Page>()
19 }
20 fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
21 fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
22}
23
24#[derive(Clone, Deref)]
26pub struct PagePool {
27 pool: Pool<Box<Page>>,
28}
29
30impl MemoryUsage for PagePool {
31 fn heap_usage(&self) -> usize {
32 self.pool.heap_usage()
33 }
34}
35
36impl PagePool {
37 pub fn new_for_test() -> Self {
38 Self::new(Some(100 * size_of::<Page>()))
39 }
40
41 pub fn new(max_size: Option<usize>) -> Self {
45 const PAGE_SIZE: usize = size_of::<Page>();
46 const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE; let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
57 let pool = Pool::new(queue_size);
58 Self { pool }
59 }
60
61 pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
65 self.take_with_max_row_count(max_rows_in_page(fixed_row_size))
66 }
67
68 fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
72 self.pool.take(
73 |page| page.reset_for(max_rows_in_page),
74 || Page::new_with_max_row_count(max_rows_in_page),
75 )
76 }
77
78 pub fn take_deserialize_from(&self, buf: &[u8]) -> Result<Box<Page>, DecodeError> {
80 self.deserialize(bsatn::Deserializer::new(&mut &*buf))
81 }
82}
83
84impl<'de> DeserializeSeed<'de> for &PagePool {
85 type Output = Box<Page>;
86
87 fn deserialize<D: Deserializer<'de>>(self, de: D) -> Result<Self::Output, D::Error> {
88 de.deserialize_product(self)
89 }
90}
91
92impl<'de> ProductVisitor<'de> for &PagePool {
93 type Output = Box<Page>;
94
95 fn product_name(&self) -> Option<&str> {
96 Some("Page")
97 }
98
99 fn product_len(&self) -> usize {
100 2
101 }
102
103 fn visit_seq_product<A: SeqProductAccess<'de>>(self, mut prod: A) -> Result<Self::Output, A::Error> {
104 let header = prod
105 .next_element::<PageHeader>()?
106 .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
107 let row_data = prod
108 .next_element()?
109 .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
110
111 let mut page = self.take_with_max_row_count(header.max_rows_in_page());
113 unsafe { page.set_raw(header, row_data) };
115
116 Ok(page)
117 }
118
119 fn visit_named_product<A: NamedProductAccess<'de>>(self, _: A) -> Result<Self::Output, A::Error> {
120 unreachable!()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use core::ptr::addr_eq;
128
129 fn present_rows_ptr(page: &Page) -> *const () {
130 page.page_header_for_test().present_rows_storage_ptr_for_test()
131 }
132
133 #[test]
134 fn page_pool_bitset_reuse() {
135 let pool = PagePool::new_for_test();
136 let page1 = pool.take_with_max_row_count(10);
138 let page1_pr_ptr = present_rows_ptr(&page1);
139 pool.put(page1);
140
141 let page2 = pool.take_with_max_row_count(64);
144 assert!(addr_eq(page1_pr_ptr, present_rows_ptr(&page2)));
145 pool.put(page2);
146
147 let page3 = pool.take_with_max_row_count(64 + 1);
149 assert!(!addr_eq(page1_pr_ptr, present_rows_ptr(&page3)));
151 }
152}