spacetimedb_table/
page_pool.rs

1use super::{indexes::Size, page::Page};
2use crate::indexes::max_rows_in_page;
3use crate::{page::PageHeader, MemoryUsage};
4use core::sync::atomic::{AtomicUsize, Ordering};
5use crossbeam_queue::ArrayQueue;
6use spacetimedb_sats::bsatn::{self, DecodeError};
7use spacetimedb_sats::de::{
8    DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess,
9};
10use std::sync::Arc;
11
12/// A page pool of currently unused pages available for use in [`Pages`](super::pages::Pages).
13#[derive(Clone)]
14pub struct PagePool {
15    inner: Arc<PagePoolInner>,
16}
17
18impl MemoryUsage for PagePool {
19    fn heap_usage(&self) -> usize {
20        let Self { inner } = self;
21        inner.heap_usage()
22    }
23}
24
25impl PagePool {
26    pub fn new_for_test() -> Self {
27        Self::new(Some(100 * size_of::<Page>()))
28    }
29
30    /// Returns a new page pool with `max_size` bytes rounded down to the nearest multiple of 64 KiB.
31    ///
32    /// if no size is provided, a default of 1 page is used.
33    pub fn new(max_size: Option<usize>) -> Self {
34        const PAGE_SIZE: usize = size_of::<Page>();
35        // TODO(centril): Currently, we have a test `test_index_scans`.
36        // The test sets up a `Location` table, like in BitCraft, with a `chunk` field,
37        // and populates it with 1000 different chunks with 1200 rows each.
38        // Then it asserts that the cold latency of an index scan on `chunk` takes < 1 ms.
39        // However, for reasons currently unknown to us,
40        // a large page pool, with capacity `1 << 26` bytes, on i7-7700K, 64GB RAM,
41        // will turn the latency into 30-40 ms.
42        // As a precaution, we use a smaller page pool by default.
43        const DEFAULT_MAX_SIZE: usize = 128 * PAGE_SIZE; // 128 pages
44
45        let queue_size = max_size.unwrap_or(DEFAULT_MAX_SIZE) / PAGE_SIZE;
46        let inner = Arc::new(PagePoolInner::new(queue_size));
47        Self { inner }
48    }
49
50    /// Puts back a [`Page`] into the pool.
51    pub fn put(&self, page: Box<Page>) {
52        self.inner.put(page);
53    }
54
55    /// Puts back a [`Page`] into the pool.
56    pub fn put_many(&self, pages: impl Iterator<Item = Box<Page>>) {
57        for page in pages {
58            self.put(page);
59        }
60    }
61
62    /// Takes a [`Page`] from the pool or creates a new one.
63    ///
64    /// The returned page supports fixed rows of size `fixed_row_size`.
65    pub fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
66        self.inner.take_with_fixed_row_size(fixed_row_size)
67    }
68
69    /// Takes a [`Page`] from the pool or creates a new one.
70    ///
71    /// The returned page supports a maximum of `max_rows_in_page` rows.
72    fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
73        self.inner.take_with_max_row_count(max_rows_in_page)
74    }
75
76    /// Deserialize a page from `buf` but reuse the allocations in the pool.
77    pub fn take_deserialize_from(&self, buf: &[u8]) -> Result<Box<Page>, DecodeError> {
78        self.deserialize(bsatn::Deserializer::new(&mut &*buf))
79    }
80
81    /// Returns the number of pages dropped by the pool because the pool was at capacity.
82    pub fn dropped_pages_count(&self) -> usize {
83        self.inner.dropped_pages_count.load(Ordering::Relaxed)
84    }
85
86    /// Returns the number of fresh pages allocated through the pool.
87    pub fn new_pages_allocated_count(&self) -> usize {
88        self.inner.new_pages_allocated_count.load(Ordering::Relaxed)
89    }
90
91    /// Returns the number of pages reused from the pool.
92    pub fn pages_reused_count(&self) -> usize {
93        self.inner.pages_reused_count.load(Ordering::Relaxed)
94    }
95
96    /// Returns the number of pages returned to the pool.
97    pub fn pages_returned_count(&self) -> usize {
98        self.inner.pages_returned_count.load(Ordering::Relaxed)
99    }
100}
101
102impl<'de> DeserializeSeed<'de> for &PagePool {
103    type Output = Box<Page>;
104
105    fn deserialize<D: Deserializer<'de>>(self, de: D) -> Result<Self::Output, D::Error> {
106        de.deserialize_product(self)
107    }
108}
109
110impl<'de> ProductVisitor<'de> for &PagePool {
111    type Output = Box<Page>;
112
113    fn product_name(&self) -> Option<&str> {
114        Some("Page")
115    }
116
117    fn product_len(&self) -> usize {
118        2
119    }
120
121    fn visit_seq_product<A: SeqProductAccess<'de>>(self, mut prod: A) -> Result<Self::Output, A::Error> {
122        let header = prod
123            .next_element::<PageHeader>()?
124            .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
125        let row_data = prod
126            .next_element()?
127            .ok_or_else(|| A::Error::invalid_product_length(2, &self))?;
128
129        // TODO(perf, centril): reuse the allocation of `present_rows` in `page`.
130        let mut page = self.take_with_max_row_count(header.max_rows_in_page());
131        // SAFETY: `header` and `row_data` are consistent with each other.
132        unsafe { page.set_raw(header, row_data) };
133
134        Ok(page)
135    }
136
137    fn visit_named_product<A: NamedProductAccess<'de>>(self, _: A) -> Result<Self::Output, A::Error> {
138        unreachable!()
139    }
140}
141
142/// The inner actual page pool containing all the logic.
143struct PagePoolInner {
144    pages: ArrayQueue<Box<Page>>,
145    dropped_pages_count: AtomicUsize,
146    new_pages_allocated_count: AtomicUsize,
147    pages_reused_count: AtomicUsize,
148    pages_returned_count: AtomicUsize,
149}
150
151impl MemoryUsage for PagePoolInner {
152    fn heap_usage(&self) -> usize {
153        let Self {
154            pages,
155            dropped_pages_count,
156            new_pages_allocated_count,
157            pages_reused_count,
158            pages_returned_count,
159        } = self;
160        dropped_pages_count.heap_usage() +
161        new_pages_allocated_count.heap_usage() +
162        pages_reused_count.heap_usage() +
163        pages_returned_count.heap_usage() +
164        // This is the amount the queue itself takes up on the heap.
165        pages.capacity() * size_of::<(AtomicUsize, Box<Page>)>() +
166        // Each page takes up a fixed amount.
167        pages.len() * size_of::<Page>()
168    }
169}
170
171#[inline]
172fn inc(atomic: &AtomicUsize) {
173    atomic.fetch_add(1, Ordering::Relaxed);
174}
175
176impl PagePoolInner {
177    /// Creates a new page pool capable of holding `cap` pages.
178    fn new(cap: usize) -> Self {
179        let pages = ArrayQueue::new(cap);
180        Self {
181            pages,
182            dropped_pages_count: <_>::default(),
183            new_pages_allocated_count: <_>::default(),
184            pages_reused_count: <_>::default(),
185            pages_returned_count: <_>::default(),
186        }
187    }
188
189    /// Puts back a [`Page`] into the pool.
190    fn put(&self, page: Box<Page>) {
191        // Add it to the pool if there's room, or just drop it.
192        if self.pages.push(page).is_ok() {
193            inc(&self.pages_returned_count);
194        } else {
195            inc(&self.dropped_pages_count);
196        }
197    }
198
199    /// Takes a [`Page`] from the pool or creates a new one.
200    ///
201    /// The returned page supports a maximum of `max_rows_in_page` rows.
202    fn take_with_max_row_count(&self, max_rows_in_page: usize) -> Box<Page> {
203        self.pages
204            .pop()
205            .map(|mut page| {
206                inc(&self.pages_reused_count);
207                page.reset_for(max_rows_in_page);
208                page
209            })
210            .unwrap_or_else(|| {
211                inc(&self.new_pages_allocated_count);
212                Page::new_with_max_row_count(max_rows_in_page)
213            })
214    }
215
216    /// Takes a [`Page`] from the pool or creates a new one.
217    ///
218    /// The returned page supports fixed rows of size `fixed_row_size`.
219    fn take_with_fixed_row_size(&self, fixed_row_size: Size) -> Box<Page> {
220        self.take_with_max_row_count(max_rows_in_page(fixed_row_size))
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use core::{iter, ptr::addr_eq};
228
229    fn present_rows_ptr(page: &Page) -> *const () {
230        page.page_header_for_test().present_rows_storage_ptr_for_test()
231    }
232
233    fn assert_metrics(pool: &PagePool, dropped: usize, new: usize, reused: usize, returned: usize) {
234        assert_eq!(pool.dropped_pages_count(), dropped);
235        assert_eq!(pool.new_pages_allocated_count(), new);
236        assert_eq!(pool.pages_reused_count(), reused);
237        assert_eq!(pool.pages_returned_count(), returned);
238    }
239
240    #[test]
241    fn page_pool_returns_same_page() {
242        let pool = PagePool::new_for_test();
243        assert_metrics(&pool, 0, 0, 0, 0);
244
245        // Create a page and put it back.
246        let page1 = pool.take_with_max_row_count(10);
247        assert_metrics(&pool, 0, 1, 0, 0);
248        let page1_ptr = &*page1 as *const _;
249        let page1_pr_ptr = present_rows_ptr(&page1);
250        pool.put(page1);
251        assert_metrics(&pool, 0, 1, 0, 1);
252
253        // Extract a page again.
254        let page2 = pool.take_with_max_row_count(64);
255        assert_metrics(&pool, 0, 1, 1, 1);
256        let page2_ptr = &*page2 as *const _;
257        let page2_pr_ptr = present_rows_ptr(&page2);
258        // It should be the same as the previous one.
259        assert!(addr_eq(page1_ptr, page2_ptr));
260        // And the bitset should also be the same, as `10.div_ceil(64) == 64`.
261        assert!(addr_eq(page1_pr_ptr, page2_pr_ptr));
262        pool.put(page2);
263        assert_metrics(&pool, 0, 1, 1, 2);
264
265        // Extract a page again, but this time, go beyond the first block.
266        let page3 = pool.take_with_max_row_count(64 + 1);
267        assert_metrics(&pool, 0, 1, 2, 2);
268        let page3_ptr = &*page3 as *const _;
269        let page3_pr_ptr = present_rows_ptr(&page3);
270        // It should be the same as the previous one.
271        assert!(addr_eq(page1_ptr, page3_ptr));
272        // But the bitset should not be the same, as `65.div_ceil(64) == 2`.
273        assert!(!addr_eq(page1_pr_ptr, page3_pr_ptr));
274
275        // Manually create a page and put it in.
276        let page4 = Page::new_with_max_row_count(10);
277        let page4_ptr = &*page4 as *const _;
278        pool.put(page4);
279        pool.put(page3);
280        assert_metrics(&pool, 0, 1, 2, 4);
281        // When we take out a page, it should be the same as `page4` and not `page1`.
282        let page5 = pool.take_with_max_row_count(10);
283        assert_metrics(&pool, 0, 1, 3, 4);
284        let page5_ptr = &*page5 as *const _;
285        // Same as page4.
286        assert!(!addr_eq(page5_ptr, page1_ptr));
287        assert!(addr_eq(page5_ptr, page4_ptr));
288    }
289
290    #[test]
291    fn page_pool_drops_past_max_size() {
292        const N: usize = 3;
293        let pool = PagePool::new(Some(size_of::<Page>() * N));
294
295        let pages = iter::repeat_with(|| pool.take_with_max_row_count(42))
296            .take(N + 1)
297            .collect::<Vec<_>>();
298        assert_metrics(&pool, 0, N + 1, 0, 0);
299
300        pool.put_many(pages.into_iter());
301        assert_metrics(&pool, 1, N + 1, 0, N);
302        assert_eq!(pool.inner.pages.len(), N);
303    }
304}