spacetimedb_table/
pages.rs

1//! Provides [`Pages`], a page manager dealing with [`Page`]s as a collection.
2
3use super::blob_store::BlobStore;
4use super::indexes::{Bytes, PageIndex, PageOffset, RowPointer};
5use super::page::Page;
6use super::page_pool::PagePool;
7use super::table::BlobNumBytes;
8use super::var_len::VarLenMembers;
9use core::ops::{ControlFlow, Deref, Index, IndexMut};
10use spacetimedb_sats::layout::Size;
11use spacetimedb_sats::memory_usage::MemoryUsage;
12use std::ops::DerefMut;
13use thiserror::Error;
14
15#[derive(Error, Debug, PartialEq, Eq)]
16pub enum Error {
17    #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())]
18    TooManyPages,
19    #[error(transparent)]
20    Page(#[from] super::page::Error),
21}
22
23impl Index<PageIndex> for Pages {
24    type Output = Page;
25
26    fn index(&self, pi: PageIndex) -> &Self::Output {
27        &self.pages[pi.idx()]
28    }
29}
30
31impl IndexMut<PageIndex> for Pages {
32    fn index_mut(&mut self, pi: PageIndex) -> &mut Self::Output {
33        &mut self.pages[pi.idx()]
34    }
35}
36
37/// A manager of [`Page`]s.
38#[derive(Default, Debug, PartialEq, Eq)]
39pub struct Pages {
40    /// The collection of pages under management.
41    pages: Vec<Box<Page>>,
42    /// The set of pages that aren't yet full.
43    non_full_pages: Vec<PageIndex>,
44}
45
46impl MemoryUsage for Pages {
47    fn heap_usage(&self) -> usize {
48        let Self { pages, non_full_pages } = self;
49        pages.heap_usage() + non_full_pages.heap_usage()
50    }
51}
52
53impl Pages {
54    /// Is there space to allocate another page?
55    pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
56        let new_idx = self.len();
57        if new_idx <= PageIndex::MAX.idx() {
58            Ok(PageIndex(new_idx as _))
59        } else {
60            Err(Error::TooManyPages)
61        }
62    }
63
64    /// Get a mutable reference to a `Page`.
65    ///
66    /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`,
67    /// as that allows split borrows.
68    pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page {
69        &mut self.pages[page.idx()]
70    }
71
72    /// Make all pages within `self` clear,
73    /// deleting all rows.
74    #[doc(hidden)] // Used in benchmarks.
75    pub fn clear(&mut self) {
76        // Clear every page.
77        for page in &mut self.pages {
78            page.clear();
79        }
80        // Mark every page non-full.
81        self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
82    }
83
84    /// Get a reference to fixed-len row data.
85    ///
86    /// Used in benchmarks.
87    /// Higher-level code paths are expected to go through [`super::de::read_row_from_pages`].
88    #[doc(hidden)] // Used in benchmarks.
89    pub fn get_fixed_len_row(&self, row: RowPointer, fixed_row_size: Size) -> &Bytes {
90        self[row.page_index()].get_row_data(row.page_offset(), fixed_row_size)
91    }
92
93    /// Allocates one additional page,
94    /// returning an error if the new number of pages would overflow `PageIndex::MAX`.
95    ///
96    /// The new page is initially empty, but is not added to the non-full set.
97    /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
98    fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
99        let new_idx = self.can_allocate_new_page()?;
100
101        let page = pool.take_with_fixed_row_size(fixed_row_size);
102        self.pages.push(page);
103
104        Ok(new_idx)
105    }
106
107    /// Reserve a new, initially empty page.
108    pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
109        let idx = self.allocate_new_page(pool, fixed_row_size)?;
110        self.mark_page_non_full(idx);
111        Ok(idx)
112    }
113
114    /// Mark the page at `idx` as non-full.
115    pub fn mark_page_non_full(&mut self, idx: PageIndex) {
116        self.non_full_pages.push(idx);
117    }
118
119    /// If the page at `page_index` is not full,
120    /// add it to the non-full set so that later insertions can access it.
121    pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
122        if !self[page_index].is_full(fixed_row_size) {
123            self.non_full_pages.push(page_index);
124        }
125    }
126
127    /// Call `f` with a reference to a page which satisfies
128    /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
129    pub fn with_page_to_insert_row<Res>(
130        &mut self,
131        pool: &PagePool,
132        fixed_row_size: Size,
133        num_var_len_granules: usize,
134        f: impl FnOnce(&mut Page) -> Res,
135    ) -> Result<(PageIndex, Res), Error> {
136        let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?;
137        let res = f(&mut self[page_index]);
138        self.maybe_mark_page_non_full(page_index, fixed_row_size);
139        Ok((page_index, res))
140    }
141
142    /// Find a page with sufficient available space to store a row of size `fixed_row_size`
143    /// containing `num_var_len_granules` granules of var-len data.
144    ///
145    /// Retrieving a page in this way will remove it from the non-full set.
146    /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
147    /// to restore the page to the non-full set.
148    fn find_page_with_space_for_row(
149        &mut self,
150        pool: &PagePool,
151        fixed_row_size: Size,
152        num_var_len_granules: usize,
153    ) -> Result<PageIndex, Error> {
154        if let Some((page_idx_idx, page_idx)) = self
155            .non_full_pages
156            .iter()
157            .copied()
158            .enumerate()
159            .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
160        {
161            self.non_full_pages.swap_remove(page_idx_idx);
162            return Ok(page_idx);
163        }
164
165        self.allocate_new_page(pool, fixed_row_size)
166    }
167
168    /// Superseded by `write_av_to_pages`, but exposed for benchmarking
169    /// when we want to avoid the overhead of traversing `AlgebraicType`.
170    ///
171    /// Inserts a row with fixed parts in `fixed_len` and variable parts in `var_len`.
172    /// The `fixed_len.len()` is equal to `fixed_row_size`.
173    ///
174    /// # Safety
175    ///
176    /// - `var_len_visitor` must be suitable for visiting var-len refs in `fixed_row`.
177    /// - `fixed_row.len()` matches the row type size exactly.
178    /// - `fixed_row.len()` is consistent
179    ///   with what has been passed to the manager in all other ops
180    ///   and must be consistent with the `var_len_visitor` the manager was made with.
181    // TODO(bikeshedding): rename to make purpose as bench interface clear?
182    pub unsafe fn insert_row(
183        &mut self,
184        pool: &PagePool,
185        var_len_visitor: &impl VarLenMembers,
186        fixed_row_size: Size,
187        fixed_len: &Bytes,
188        var_len: &[&[u8]],
189        blob_store: &mut dyn BlobStore,
190    ) -> Result<(PageIndex, PageOffset), Error> {
191        debug_assert!(fixed_len.len() == fixed_row_size.len());
192
193        match self.with_page_to_insert_row(
194            pool,
195            fixed_row_size,
196            Page::total_granules_required_for_objects(var_len),
197            |page| {
198                // This insertion can never fail, as we know that the page has sufficient space from `find_page_with_space_for_row`.
199                //
200                // SAFETY:
201                // - Caller promised that `var_len_visitor`
202                //   is suitable for visiting var-len refs in `fixed_row`
203                //   and that `fixed_row.len()` matches the row type size exactly.
204                //
205                // - Caller promised that `fixed_row.len()` is consistent
206                //   with what has been passed to the manager in all other ops.
207                //   This entails that `fixed_row.len()` is consistent with `page`.
208                unsafe { page.insert_row(fixed_len, var_len, var_len_visitor, blob_store) }
209            },
210        )? {
211            (page, Ok(offset)) => Ok((page, offset)),
212            (_, Err(e)) => Err(e.into()),
213        }
214    }
215
216    /// Free the row that is pointed to by `row_ptr`,
217    /// marking its fixed-len storage
218    /// and var-len storage granules as available for re-use.
219    ///
220    /// # Safety
221    ///
222    /// The `row_ptr` must point to a valid row in this page manager,
223    /// of `fixed_row_size` bytes for the fixed part.
224    ///
225    /// The `fixed_row_size` must be consistent
226    /// with what has been passed to the manager in all other operations
227    /// and must be consistent with the `var_len_visitor` the manager was made with.
228    pub unsafe fn delete_row(
229        &mut self,
230        var_len_visitor: &impl VarLenMembers,
231        fixed_row_size: Size,
232        row_ptr: RowPointer,
233        blob_store: &mut dyn BlobStore,
234    ) -> BlobNumBytes {
235        let page = &mut self[row_ptr.page_index()];
236        let full_before = page.is_full(fixed_row_size);
237        // SAFETY:
238        // - `row_ptr.page_offset()` does point to a valid row in this page
239        //   as the caller promised that `row_ptr` points to a valid row in `self`.
240        //
241        // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
242        //   The size is also conistent with `var_len_visitor`.
243        let blob_store_deleted_bytes =
244            unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };
245
246        // If the page was previously full, mark it as non-full now,
247        // since we just opened a space in it.
248        if full_before {
249            self.mark_page_non_full(row_ptr.page_index());
250        }
251        blob_store_deleted_bytes
252    }
253
254    /// Materialize a view of rows in `self` for which the  `filter` returns `true`.
255    ///
256    /// # Safety
257    ///
258    /// - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
259    ///   as the visitor provided to all other methods on `self`.
260    ///
261    /// - The `fixed_row_size` is consistent with the `var_len_visitor`
262    ///   and is equal to the value provided to all other methods on `self`.
263    pub unsafe fn copy_filter(
264        &self,
265        var_len_visitor: &impl VarLenMembers,
266        fixed_row_size: Size,
267        blob_store: &mut dyn BlobStore,
268        mut filter: impl FnMut(&Page, PageOffset) -> bool,
269    ) -> Self {
270        // Build a new container to hold the materialized view.
271        // Push pages into it later.
272        let mut partial_copied_pages = Self::default();
273
274        // A destination page that was not filled entirely,
275        // or `None` if it's time to allocate a new destination page.
276        let mut partial_page = None;
277
278        // Copy each page.
279        for from_page in &self.pages {
280            // You may require multiple calls to `Page::copy_starting_from`
281            // if `partial_page` fills up;
282            // the first call starts from 0.
283            let mut copy_starting_from = Some(PageOffset(0));
284
285            // While there are unprocessed rows in `from_page`,
286            while let Some(next_offset) = copy_starting_from.take() {
287                // Grab the `partial_page` or allocate a new one.
288                let mut to_page = partial_page.take().unwrap_or_else(|| Page::new(fixed_row_size));
289
290                // Copy as many rows as will fit in `to_page`.
291                //
292                // SAFETY:
293                //
294                // - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
295                //   as the visitor provided to all other methods on `self`.
296                //   The `to_page` uses the same visitor as the `from_page`.
297                //
298                // - The `fixed_row_size` is consistent with the `var_len_visitor`
299                //   and is equal to the value provided to all other methods on `self`,
300                //   as promised by the caller.
301                //   The newly made `to_page` uses the same `fixed_row_size` as the `from_page`.
302                //
303                // - The `next_offset` is either 0,
304                //   which is always a valid starting offset for any row size,
305                //   or it came from `copy_filter_into` in a previous iteration,
306                //   which, given that `fixed_row_size` was valid,
307                //   always returns a valid starting offset in case of `Continue(_)`.
308                let cfi_ret = unsafe {
309                    from_page.copy_filter_into(
310                        next_offset,
311                        &mut to_page,
312                        fixed_row_size,
313                        var_len_visitor,
314                        blob_store,
315                        &mut filter,
316                    )
317                };
318                copy_starting_from = if let ControlFlow::Continue(continue_point) = cfi_ret {
319                    // If `to_page` couldn't fit all of `from_page`,
320                    // repeat the `while_let` loop to copy the rest.
321                    Some(continue_point)
322                } else {
323                    // If `to_page` fit all of `from_page`, we can move on.
324                    None
325                };
326
327                // If `from_page` finished copying into `to_page`, then `to_page` may have extra room.
328                //
329                // If `copy_filtered_into` returns `Some`,
330                // that means at least one row didn't have space in `to_page`,
331                // so we must consider `to_page` full.
332                //
333                // Note that this is distinct from `Page::is_full`,
334                // as that method considers the optimistic case of a row with no var-len members.
335                if copy_starting_from.is_none() {
336                    partial_page = Some(to_page);
337                } else {
338                    partial_copied_pages.pages.push(to_page);
339                }
340            }
341        }
342
343        partial_copied_pages
344    }
345
346    /// Set this [`Pages`]' contents to be the `pages`.
347    ///
348    /// Used when restoring from a snapshot.
349    ///
350    /// Each page in the `pages` must be consistent with the schema for this [`Pages`],
351    /// i.e. the schema for the [`crate::table::Table`] which contains `self`.
352    ///
353    /// Should only ever be called when `self.is_empty()`.
354    ///
355    /// Also populates `self.non_full_pages`.
356    pub fn set_contents(&mut self, pages: Vec<Box<Page>>, fixed_row_size: Size) {
357        debug_assert!(self.is_empty());
358        self.non_full_pages = pages
359            .iter()
360            .enumerate()
361            .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
362            .collect();
363        self.pages = pages;
364    }
365
366    /// Consumes the page manager, returning all the pages it held.
367    pub fn into_page_iter(self) -> impl Iterator<Item = Box<Page>> {
368        self.pages.into_iter()
369    }
370}
371
372impl Deref for Pages {
373    type Target = [Box<Page>];
374
375    fn deref(&self) -> &Self::Target {
376        &self.pages
377    }
378}
379
380impl DerefMut for Pages {
381    fn deref_mut(&mut self) -> &mut Self::Target {
382        &mut self.pages
383    }
384}