spacetimedb_table/
page_pool.rs

1use super::{
2    indexes::max_rows_in_page,
3    page::{Page, PageHeader},
4};
5use derive_more::Deref;
6use spacetimedb_data_structures::object_pool::{Pool, PooledObject};
7use spacetimedb_sats::bsatn::{self, DecodeError};
8use spacetimedb_sats::de::{
9    DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess,
10};
11use spacetimedb_sats::layout::Size;
12use spacetimedb_sats::memory_usage::MemoryUsage;
13
14impl PooledObject for Box<Page> {
15    type ResidentBytesStorage = ();
16    fn resident_object_bytes(_: &Self::ResidentBytesStorage, num_objects: usize) -> usize {
17        // Each page takes up a fixed amount.
18        num_objects * size_of::<Page>()
19    }
20    fn add_to_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
21    fn sub_from_resident_object_bytes(_: &Self::ResidentBytesStorage, _: usize) {}
22}
23
24/// A page pool of currently unused pages available for use in [`Pages`](super::pages::Pages).
25#[derive(Clone, Deref)]
26pub struct PagePool {
27    pool: Pool<Box<Page>>,
28}
29
30impl MemoryUsage for PagePool {
31    fn heap_usage(&self) -> usize {
32        self.pool.heap_usage()
33    }
34}
35
36impl PagePool {
37    pub fn new_for_test() -> Self {
38        Self::new(Some(100 * size_of::<Page>()))
39    }
40
41    /// Returns a new page pool with `max_size` bytes rounded down to the nearest multiple of 64 KiB.
42    ///
43    /// if no size is provided, a default of 1 page is used.
44    pub fn new(max_size: Option<usize>) -> Self {
45        const PAGE_SIZE: usize = size_of::<Page>();
46        // TODO(centril): Currently, we have a test `test_index_scans`.
47        // The test sets up a `Location` table, like in BitCraft, with a `chunk` field,
48        // and populates it with 1000 different chunks with 1200 rows each.
49        // Then it asserts that the cold latency of an index scan on `chunk` takes < 1 ms.
50        // However, for reasons currently unknown to us,
51        // a large page pool, with capacity `1 << 26` bytes, on i7-7700K, 64GB RAM,
52        // will turn the latency into 30-40 ms.
53        // As a precaution, we use a smaller page pool by default.
54        const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE; // 128 pages
55
56        let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
57        let pool = Pool::new(queue_size);
58        Self { pool }
59    }
60
61    /// Takes a [`Page`] from the pool or creates a new one.
62    ///
63    /// The returned page supports fixed rows of size `fixed_row_size`.
64    pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
65        self.take_with_max_row_count(max_rows_in_page(fixed_row_size))
66    }
67
68    /// Takes a [`Page`] from the pool or creates a new one.
69    ///
70    /// The returned page supports a maximum of `max_rows_in_page` rows.
71    fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
72        self.pool.take(
73            |page| page.reset_for(max_rows_in_page),
74            || Page::new_with_max_row_count(max_rows_in_page),
75        )
76    }
77
78    /// Deserialize a page from `buf` but reuse the allocations in the pool.
79    pub fn take_deserialize_from(&self, buf: &[u8]) -> Result<Box<Page>, DecodeError> {
80        self.deserialize(bsatn::Deserializer::new(&mut &*buf))
81    }
82}
83
84impl<'de> DeserializeSeed<'de> for &PagePool {
85    type Output = Box<Page>;
86
87    fn deserialize<D: Deserializer<'de>>(self, de: D) -> Result<Self::Output, D::Error> {
88        de.deserialize_product(self)
89    }
90}
91
92impl<'de> ProductVisitor<'de> for &PagePool {
93    type Output = Box<Page>;
94
95    fn product_name(&self) -> Option<&str> {
96        Some("Page")
97    }
98
99    fn product_len(&self) -> usize {
100        2
101    }
102
103    fn visit_seq_product<A: SeqProductAccess<'de>>(self, mut prod: A) -> Result<Self::Output, A::Error> {
104        let header = prod
105            .next_element::<PageHeader>()?
106            .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
107        let row_data = prod
108            .next_element()?
109            .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
110
111        // TODO(perf, centril): reuse the allocation of `present_rows` in `page`.
112        let mut page = self.take_with_max_row_count(header.max_rows_in_page());
113        // SAFETY: `header` and `row_data` are consistent with each other.
114        unsafe { page.set_raw(header, row_data) };
115
116        Ok(page)
117    }
118
119    fn visit_named_product<A: NamedProductAccess<'de>>(self, _: A) -> Result<Self::Output, A::Error> {
120        unreachable!()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use core::ptr::addr_eq;
128
129    fn present_rows_ptr(page: &Page) -> *const () {
130        page.page_header_for_test().present_rows_storage_ptr_for_test()
131    }
132
133    #[test]
134    fn page_pool_bitset_reuse() {
135        let pool = PagePool::new_for_test();
136        // Create a page and put it back.
137        let page1 = pool.take_with_max_row_count(10);
138        let page1_pr_ptr = present_rows_ptr(&page1);
139        pool.put(page1);
140
141        // Extract another page again, but use a different max row count (64).
142        // The bitset should be the same, as `10.div_ceil(64) == 64`.
143        let page2 = pool.take_with_max_row_count(64);
144        assert!(addr_eq(page1_pr_ptr, present_rows_ptr(&page2)));
145        pool.put(page2);
146
147        // Extract a page again, but this time, go beyond the first bitset block.
148        let page3 = pool.take_with_max_row_count(64 + 1);
149        // The bitset should not be the same, as `65.div_ceil(64) == 2`.
150        assert!(!addr_eq(page1_pr_ptr, present_rows_ptr(&page3)));
151    }
152}