spacetimedb_table/
pages.rs

1//! Provides [`Pages`], a page manager dealing with [`Page`]s as a collection.
2
3use super::blob_store::BlobStore;
4use super::indexes::{Bytes, PageIndex, PageOffset, RowPointer, Size};
5use super::page::Page;
6use super::page_pool::PagePool;
7use super::table::BlobNumBytes;
8use super::var_len::VarLenMembers;
9use crate::MemoryUsage;
10use core::ops::{ControlFlow, Deref, Index, IndexMut};
11use std::ops::DerefMut;
12use thiserror::Error;
13
14#[derive(Error, Debug, PartialEq, Eq)]
15pub enum Error {
16    #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())]
17    TooManyPages,
18    #[error(transparent)]
19    Page(#[from] super::page::Error),
20}
21
22impl Index<PageIndex> for Pages {
23    type Output = Page;
24
25    fn index(&self, pi: PageIndex) -> &Self::Output {
26        &self.pages[pi.idx()]
27    }
28}
29
30impl IndexMut<PageIndex> for Pages {
31    fn index_mut(&mut self, pi: PageIndex) -> &mut Self::Output {
32        &mut self.pages[pi.idx()]
33    }
34}
35
36/// A manager of [`Page`]s.
37#[derive(Default, Debug, PartialEq, Eq)]
38pub struct Pages {
39    /// The collection of pages under management.
40    pages: Vec<Box<Page>>,
41    /// The set of pages that aren't yet full.
42    non_full_pages: Vec<PageIndex>,
43}
44
45impl MemoryUsage for Pages {
46    fn heap_usage(&self) -> usize {
47        let Self { pages, non_full_pages } = self;
48        pages.heap_usage() + non_full_pages.heap_usage()
49    }
50}
51
52impl Pages {
53    /// Is there space to allocate another page?
54    pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
55        let new_idx = self.len();
56        if new_idx <= PageIndex::MAX.idx() {
57            Ok(PageIndex(new_idx as _))
58        } else {
59            Err(Error::TooManyPages)
60        }
61    }
62
63    /// Get a mutable reference to a `Page`.
64    ///
65    /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`,
66    /// as that allows split borrows.
67    pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page {
68        &mut self.pages[page.idx()]
69    }
70
71    /// Make all pages within `self` clear,
72    /// deleting all rows.
73    #[doc(hidden)] // Used in benchmarks.
74    pub fn clear(&mut self) {
75        // Clear every page.
76        for page in &mut self.pages {
77            page.clear();
78        }
79        // Mark every page non-full.
80        self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
81    }
82
83    /// Get a reference to fixed-len row data.
84    ///
85    /// Used in benchmarks.
86    /// Higher-level code paths are expected to go through [`super::de::read_row_from_pages`].
87    #[doc(hidden)] // Used in benchmarks.
88    pub fn get_fixed_len_row(&self, row: RowPointer, fixed_row_size: Size) -> &Bytes {
89        self[row.page_index()].get_row_data(row.page_offset(), fixed_row_size)
90    }
91
92    /// Allocates one additional page,
93    /// returning an error if the new number of pages would overflow `PageIndex::MAX`.
94    ///
95    /// The new page is initially empty, but is not added to the non-full set.
96    /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
97    fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
98        let new_idx = self.can_allocate_new_page()?;
99
100        let page = pool.take_with_fixed_row_size(fixed_row_size);
101        self.pages.push(page);
102
103        Ok(new_idx)
104    }
105
106    /// Reserve a new, initially empty page.
107    pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
108        let idx = self.allocate_new_page(pool, fixed_row_size)?;
109        self.mark_page_non_full(idx);
110        Ok(idx)
111    }
112
113    /// Mark the page at `idx` as non-full.
114    pub fn mark_page_non_full(&mut self, idx: PageIndex) {
115        self.non_full_pages.push(idx);
116    }
117
118    /// If the page at `page_index` is not full,
119    /// add it to the non-full set so that later insertions can access it.
120    pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
121        if !self[page_index].is_full(fixed_row_size) {
122            self.non_full_pages.push(page_index);
123        }
124    }
125
126    /// Call `f` with a reference to a page which satisfies
127    /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
128    pub fn with_page_to_insert_row<Res>(
129        &mut self,
130        pool: &PagePool,
131        fixed_row_size: Size,
132        num_var_len_granules: usize,
133        f: impl FnOnce(&mut Page) -> Res,
134    ) -> Result<(PageIndex, Res), Error> {
135        let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?;
136        let res = f(&mut self[page_index]);
137        self.maybe_mark_page_non_full(page_index, fixed_row_size);
138        Ok((page_index, res))
139    }
140
141    /// Find a page with sufficient available space to store a row of size `fixed_row_size`
142    /// containing `num_var_len_granules` granules of var-len data.
143    ///
144    /// Retrieving a page in this way will remove it from the non-full set.
145    /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
146    /// to restore the page to the non-full set.
147    fn find_page_with_space_for_row(
148        &mut self,
149        pool: &PagePool,
150        fixed_row_size: Size,
151        num_var_len_granules: usize,
152    ) -> Result<PageIndex, Error> {
153        if let Some((page_idx_idx, page_idx)) = self
154            .non_full_pages
155            .iter()
156            .copied()
157            .enumerate()
158            .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
159        {
160            self.non_full_pages.swap_remove(page_idx_idx);
161            return Ok(page_idx);
162        }
163
164        self.allocate_new_page(pool, fixed_row_size)
165    }
166
167    /// Superseded by `write_av_to_pages`, but exposed for benchmarking
168    /// when we want to avoid the overhead of traversing `AlgebraicType`.
169    ///
170    /// Inserts a row with fixed parts in `fixed_len` and variable parts in `var_len`.
171    /// The `fixed_len.len()` is equal to `fixed_row_size`.
172    ///
173    /// # Safety
174    ///
175    /// - `var_len_visitor` must be suitable for visiting var-len refs in `fixed_row`.
176    /// - `fixed_row.len()` matches the row type size exactly.
177    /// - `fixed_row.len()` is consistent
178    ///    with what has been passed to the manager in all other ops
179    ///    and must be consistent with the `var_len_visitor` the manager was made with.
180    // TODO(bikeshedding): rename to make purpose as bench interface clear?
181    pub unsafe fn insert_row(
182        &mut self,
183        pool: &PagePool,
184        var_len_visitor: &impl VarLenMembers,
185        fixed_row_size: Size,
186        fixed_len: &Bytes,
187        var_len: &[&[u8]],
188        blob_store: &mut dyn BlobStore,
189    ) -> Result<(PageIndex, PageOffset), Error> {
190        debug_assert!(fixed_len.len() == fixed_row_size.len());
191
192        match self.with_page_to_insert_row(
193            pool,
194            fixed_row_size,
195            Page::total_granules_required_for_objects(var_len),
196            |page| {
197                // This insertion can never fail, as we know that the page has sufficient space from `find_page_with_space_for_row`.
198                //
199                // SAFETY:
200                // - Caller promised that `var_len_visitor`
201                //   is suitable for visiting var-len refs in `fixed_row`
202                //   and that `fixed_row.len()` matches the row type size exactly.
203                //
204                // - Caller promised that `fixed_row.len()` is consistent
205                //   with what has been passed to the manager in all other ops.
206                //   This entails that `fixed_row.len()` is consistent with `page`.
207                unsafe { page.insert_row(fixed_len, var_len, var_len_visitor, blob_store) }
208            },
209        )? {
210            (page, Ok(offset)) => Ok((page, offset)),
211            (_, Err(e)) => Err(e.into()),
212        }
213    }
214
215    /// Free the row that is pointed to by `row_ptr`,
216    /// marking its fixed-len storage
217    /// and var-len storage granules as available for re-use.
218    ///
219    /// # Safety
220    ///
221    /// The `row_ptr` must point to a valid row in this page manager,
222    /// of `fixed_row_size` bytes for the fixed part.
223    ///
224    /// The `fixed_row_size` must be consistent
225    /// with what has been passed to the manager in all other operations
226    /// and must be consistent with the `var_len_visitor` the manager was made with.
227    pub unsafe fn delete_row(
228        &mut self,
229        var_len_visitor: &impl VarLenMembers,
230        fixed_row_size: Size,
231        row_ptr: RowPointer,
232        blob_store: &mut dyn BlobStore,
233    ) -> BlobNumBytes {
234        let page = &mut self[row_ptr.page_index()];
235        let full_before = page.is_full(fixed_row_size);
236        // SAFETY:
237        // - `row_ptr.page_offset()` does point to a valid row in this page
238        //   as the caller promised that `row_ptr` points to a valid row in `self`.
239        //
240        // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
241        //   The size is also conistent with `var_len_visitor`.
242        let blob_store_deleted_bytes =
243            unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };
244
245        // If the page was previously full, mark it as non-full now,
246        // since we just opened a space in it.
247        if full_before {
248            self.mark_page_non_full(row_ptr.page_index());
249        }
250        blob_store_deleted_bytes
251    }
252
253    /// Materialize a view of rows in `self` for which the  `filter` returns `true`.
254    ///
255    /// # Safety
256    ///
257    /// - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
258    ///   as the visitor provided to all other methods on `self`.
259    ///
260    /// - The `fixed_row_size` is consistent with the `var_len_visitor`
261    ///   and is equal to the value provided to all other methods on `self`.
262    pub unsafe fn copy_filter(
263        &self,
264        var_len_visitor: &impl VarLenMembers,
265        fixed_row_size: Size,
266        blob_store: &mut dyn BlobStore,
267        mut filter: impl FnMut(&Page, PageOffset) -> bool,
268    ) -> Self {
269        // Build a new container to hold the materialized view.
270        // Push pages into it later.
271        let mut partial_copied_pages = Self::default();
272
273        // A destination page that was not filled entirely,
274        // or `None` if it's time to allocate a new destination page.
275        let mut partial_page = None;
276
277        // Copy each page.
278        for from_page in &self.pages {
279            // You may require multiple calls to `Page::copy_starting_from`
280            // if `partial_page` fills up;
281            // the first call starts from 0.
282            let mut copy_starting_from = Some(PageOffset(0));
283
284            // While there are unprocessed rows in `from_page`,
285            while let Some(next_offset) = copy_starting_from.take() {
286                // Grab the `partial_page` or allocate a new one.
287                let mut to_page = partial_page.take().unwrap_or_else(|| Page::new(fixed_row_size));
288
289                // Copy as many rows as will fit in `to_page`.
290                //
291                // SAFETY:
292                //
293                // - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
294                //   as the visitor provided to all other methods on `self`.
295                //   The `to_page` uses the same visitor as the `from_page`.
296                //
297                // - The `fixed_row_size` is consistent with the `var_len_visitor`
298                //   and is equal to the value provided to all other methods on `self`,
299                //   as promised by the caller.
300                //   The newly made `to_page` uses the same `fixed_row_size` as the `from_page`.
301                //
302                // - The `next_offset` is either 0,
303                //   which is always a valid starting offset for any row size,
304                //   or it came from `copy_filter_into` in a previous iteration,
305                //   which, given that `fixed_row_size` was valid,
306                //   always returns a valid starting offset in case of `Continue(_)`.
307                let cfi_ret = unsafe {
308                    from_page.copy_filter_into(
309                        next_offset,
310                        &mut to_page,
311                        fixed_row_size,
312                        var_len_visitor,
313                        blob_store,
314                        &mut filter,
315                    )
316                };
317                copy_starting_from = if let ControlFlow::Continue(continue_point) = cfi_ret {
318                    // If `to_page` couldn't fit all of `from_page`,
319                    // repeat the `while_let` loop to copy the rest.
320                    Some(continue_point)
321                } else {
322                    // If `to_page` fit all of `from_page`, we can move on.
323                    None
324                };
325
326                // If `from_page` finished copying into `to_page`, then `to_page` may have extra room.
327                //
328                // If `copy_filtered_into` returns `Some`,
329                // that means at least one row didn't have space in `to_page`,
330                // so we must consider `to_page` full.
331                //
332                // Note that this is distinct from `Page::is_full`,
333                // as that method considers the optimistic case of a row with no var-len members.
334                if copy_starting_from.is_none() {
335                    partial_page = Some(to_page);
336                } else {
337                    partial_copied_pages.pages.push(to_page);
338                }
339            }
340        }
341
342        partial_copied_pages
343    }
344
345    /// Set this [`Pages`]' contents to be the `pages`.
346    ///
347    /// Used when restoring from a snapshot.
348    ///
349    /// Each page in the `pages` must be consistent with the schema for this [`Pages`],
350    /// i.e. the schema for the [`crate::table::Table`] which contains `self`.
351    ///
352    /// Should only ever be called when `self.is_empty()`.
353    ///
354    /// Also populates `self.non_full_pages`.
355    pub fn set_contents(&mut self, pages: Vec<Box<Page>>, fixed_row_size: Size) {
356        debug_assert!(self.is_empty());
357        self.non_full_pages = pages
358            .iter()
359            .enumerate()
360            .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
361            .collect();
362        self.pages = pages;
363    }
364
365    /// Consumes the page manager, returning all the pages it held.
366    pub fn into_page_iter(self) -> impl Iterator<Item = Box<Page>> {
367        self.pages.into_iter()
368    }
369}
370
371impl Deref for Pages {
372    type Target = [Box<Page>];
373
374    fn deref(&self) -> &Self::Target {
375        &self.pages
376    }
377}
378
379impl DerefMut for Pages {
380    fn deref_mut(&mut self) -> &mut Self::Target {
381        &mut self.pages
382    }
383}