spacetimedb_table/pages.rs
1//! Provides [`Pages`], a page manager dealing with [`Page`]s as a collection.
2
3use super::blob_store::BlobStore;
4use super::indexes::{Bytes, PageIndex, PageOffset, RowPointer};
5use super::page::Page;
6use super::page_pool::PagePool;
7use super::table::BlobNumBytes;
8use super::var_len::VarLenMembers;
9use core::ops::{ControlFlow, Deref, Index, IndexMut};
10use spacetimedb_sats::layout::Size;
11use spacetimedb_sats::memory_usage::MemoryUsage;
12use std::ops::DerefMut;
13use thiserror::Error;
14
15#[derive(Error, Debug, PartialEq, Eq)]
16pub enum Error {
17 #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())]
18 TooManyPages,
19 #[error(transparent)]
20 Page(#[from] super::page::Error),
21}
22
23impl Index<PageIndex> for Pages {
24 type Output = Page;
25
26 fn index(&self, pi: PageIndex) -> &Self::Output {
27 &self.pages[pi.idx()]
28 }
29}
30
31impl IndexMut<PageIndex> for Pages {
32 fn index_mut(&mut self, pi: PageIndex) -> &mut Self::Output {
33 &mut self.pages[pi.idx()]
34 }
35}
36
37/// A manager of [`Page`]s.
38#[derive(Default, Debug, PartialEq, Eq)]
39pub struct Pages {
40 /// The collection of pages under management.
41 pages: Vec<Box<Page>>,
42 /// The set of pages that aren't yet full.
43 non_full_pages: Vec<PageIndex>,
44}
45
46impl MemoryUsage for Pages {
47 fn heap_usage(&self) -> usize {
48 let Self { pages, non_full_pages } = self;
49 pages.heap_usage() + non_full_pages.heap_usage()
50 }
51}
52
53impl Pages {
54 /// Is there space to allocate another page?
55 pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
56 let new_idx = self.len();
57 if new_idx <= PageIndex::MAX.idx() {
58 Ok(PageIndex(new_idx as _))
59 } else {
60 Err(Error::TooManyPages)
61 }
62 }
63
64 /// Get a mutable reference to a `Page`.
65 ///
66 /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`,
67 /// as that allows split borrows.
68 pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page {
69 &mut self.pages[page.idx()]
70 }
71
72 /// Make all pages within `self` clear,
73 /// deleting all rows.
74 #[doc(hidden)] // Used in benchmarks.
75 pub fn clear(&mut self) {
76 // Clear every page.
77 for page in &mut self.pages {
78 page.clear();
79 }
80 // Mark every page non-full.
81 self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
82 }
83
84 /// Get a reference to fixed-len row data.
85 ///
86 /// Used in benchmarks.
87 /// Higher-level code paths are expected to go through [`super::de::read_row_from_pages`].
88 #[doc(hidden)] // Used in benchmarks.
89 pub fn get_fixed_len_row(&self, row: RowPointer, fixed_row_size: Size) -> &Bytes {
90 self[row.page_index()].get_row_data(row.page_offset(), fixed_row_size)
91 }
92
93 /// Allocates one additional page,
94 /// returning an error if the new number of pages would overflow `PageIndex::MAX`.
95 ///
96 /// The new page is initially empty, but is not added to the non-full set.
97 /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
98 fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
99 let new_idx = self.can_allocate_new_page()?;
100
101 let page = pool.take_with_fixed_row_size(fixed_row_size);
102 self.pages.push(page);
103
104 Ok(new_idx)
105 }
106
107 /// Reserve a new, initially empty page.
108 pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
109 let idx = self.allocate_new_page(pool, fixed_row_size)?;
110 self.mark_page_non_full(idx);
111 Ok(idx)
112 }
113
114 /// Mark the page at `idx` as non-full.
115 pub fn mark_page_non_full(&mut self, idx: PageIndex) {
116 self.non_full_pages.push(idx);
117 }
118
119 /// If the page at `page_index` is not full,
120 /// add it to the non-full set so that later insertions can access it.
121 pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
122 if !self[page_index].is_full(fixed_row_size) {
123 self.non_full_pages.push(page_index);
124 }
125 }
126
127 /// Call `f` with a reference to a page which satisfies
128 /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
129 pub fn with_page_to_insert_row<Res>(
130 &mut self,
131 pool: &PagePool,
132 fixed_row_size: Size,
133 num_var_len_granules: usize,
134 f: impl FnOnce(&mut Page) -> Res,
135 ) -> Result<(PageIndex, Res), Error> {
136 let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?;
137 let res = f(&mut self[page_index]);
138 self.maybe_mark_page_non_full(page_index, fixed_row_size);
139 Ok((page_index, res))
140 }
141
142 /// Find a page with sufficient available space to store a row of size `fixed_row_size`
143 /// containing `num_var_len_granules` granules of var-len data.
144 ///
145 /// Retrieving a page in this way will remove it from the non-full set.
146 /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
147 /// to restore the page to the non-full set.
148 fn find_page_with_space_for_row(
149 &mut self,
150 pool: &PagePool,
151 fixed_row_size: Size,
152 num_var_len_granules: usize,
153 ) -> Result<PageIndex, Error> {
154 if let Some((page_idx_idx, page_idx)) = self
155 .non_full_pages
156 .iter()
157 .copied()
158 .enumerate()
159 .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
160 {
161 self.non_full_pages.swap_remove(page_idx_idx);
162 return Ok(page_idx);
163 }
164
165 self.allocate_new_page(pool, fixed_row_size)
166 }
167
168 /// Superseded by `write_av_to_pages`, but exposed for benchmarking
169 /// when we want to avoid the overhead of traversing `AlgebraicType`.
170 ///
171 /// Inserts a row with fixed parts in `fixed_len` and variable parts in `var_len`.
172 /// The `fixed_len.len()` is equal to `fixed_row_size`.
173 ///
174 /// # Safety
175 ///
176 /// - `var_len_visitor` must be suitable for visiting var-len refs in `fixed_row`.
177 /// - `fixed_row.len()` matches the row type size exactly.
178 /// - `fixed_row.len()` is consistent
179 /// with what has been passed to the manager in all other ops
180 /// and must be consistent with the `var_len_visitor` the manager was made with.
181 // TODO(bikeshedding): rename to make purpose as bench interface clear?
182 pub unsafe fn insert_row(
183 &mut self,
184 pool: &PagePool,
185 var_len_visitor: &impl VarLenMembers,
186 fixed_row_size: Size,
187 fixed_len: &Bytes,
188 var_len: &[&[u8]],
189 blob_store: &mut dyn BlobStore,
190 ) -> Result<(PageIndex, PageOffset), Error> {
191 debug_assert!(fixed_len.len() == fixed_row_size.len());
192
193 match self.with_page_to_insert_row(
194 pool,
195 fixed_row_size,
196 Page::total_granules_required_for_objects(var_len),
197 |page| {
198 // This insertion can never fail, as we know that the page has sufficient space from `find_page_with_space_for_row`.
199 //
200 // SAFETY:
201 // - Caller promised that `var_len_visitor`
202 // is suitable for visiting var-len refs in `fixed_row`
203 // and that `fixed_row.len()` matches the row type size exactly.
204 //
205 // - Caller promised that `fixed_row.len()` is consistent
206 // with what has been passed to the manager in all other ops.
207 // This entails that `fixed_row.len()` is consistent with `page`.
208 unsafe { page.insert_row(fixed_len, var_len, var_len_visitor, blob_store) }
209 },
210 )? {
211 (page, Ok(offset)) => Ok((page, offset)),
212 (_, Err(e)) => Err(e.into()),
213 }
214 }
215
216 /// Free the row that is pointed to by `row_ptr`,
217 /// marking its fixed-len storage
218 /// and var-len storage granules as available for re-use.
219 ///
220 /// # Safety
221 ///
222 /// The `row_ptr` must point to a valid row in this page manager,
223 /// of `fixed_row_size` bytes for the fixed part.
224 ///
225 /// The `fixed_row_size` must be consistent
226 /// with what has been passed to the manager in all other operations
227 /// and must be consistent with the `var_len_visitor` the manager was made with.
228 pub unsafe fn delete_row(
229 &mut self,
230 var_len_visitor: &impl VarLenMembers,
231 fixed_row_size: Size,
232 row_ptr: RowPointer,
233 blob_store: &mut dyn BlobStore,
234 ) -> BlobNumBytes {
235 let page = &mut self[row_ptr.page_index()];
236 let full_before = page.is_full(fixed_row_size);
237 // SAFETY:
238 // - `row_ptr.page_offset()` does point to a valid row in this page
239 // as the caller promised that `row_ptr` points to a valid row in `self`.
240 //
241 // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
242 // The size is also conistent with `var_len_visitor`.
243 let blob_store_deleted_bytes =
244 unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };
245
246 // If the page was previously full, mark it as non-full now,
247 // since we just opened a space in it.
248 if full_before {
249 self.mark_page_non_full(row_ptr.page_index());
250 }
251 blob_store_deleted_bytes
252 }
253
254 /// Materialize a view of rows in `self` for which the `filter` returns `true`.
255 ///
256 /// # Safety
257 ///
258 /// - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
259 /// as the visitor provided to all other methods on `self`.
260 ///
261 /// - The `fixed_row_size` is consistent with the `var_len_visitor`
262 /// and is equal to the value provided to all other methods on `self`.
263 pub unsafe fn copy_filter(
264 &self,
265 var_len_visitor: &impl VarLenMembers,
266 fixed_row_size: Size,
267 blob_store: &mut dyn BlobStore,
268 mut filter: impl FnMut(&Page, PageOffset) -> bool,
269 ) -> Self {
270 // Build a new container to hold the materialized view.
271 // Push pages into it later.
272 let mut partial_copied_pages = Self::default();
273
274 // A destination page that was not filled entirely,
275 // or `None` if it's time to allocate a new destination page.
276 let mut partial_page = None;
277
278 // Copy each page.
279 for from_page in &self.pages {
280 // You may require multiple calls to `Page::copy_starting_from`
281 // if `partial_page` fills up;
282 // the first call starts from 0.
283 let mut copy_starting_from = Some(PageOffset(0));
284
285 // While there are unprocessed rows in `from_page`,
286 while let Some(next_offset) = copy_starting_from.take() {
287 // Grab the `partial_page` or allocate a new one.
288 let mut to_page = partial_page.take().unwrap_or_else(|| Page::new(fixed_row_size));
289
290 // Copy as many rows as will fit in `to_page`.
291 //
292 // SAFETY:
293 //
294 // - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
295 // as the visitor provided to all other methods on `self`.
296 // The `to_page` uses the same visitor as the `from_page`.
297 //
298 // - The `fixed_row_size` is consistent with the `var_len_visitor`
299 // and is equal to the value provided to all other methods on `self`,
300 // as promised by the caller.
301 // The newly made `to_page` uses the same `fixed_row_size` as the `from_page`.
302 //
303 // - The `next_offset` is either 0,
304 // which is always a valid starting offset for any row size,
305 // or it came from `copy_filter_into` in a previous iteration,
306 // which, given that `fixed_row_size` was valid,
307 // always returns a valid starting offset in case of `Continue(_)`.
308 let cfi_ret = unsafe {
309 from_page.copy_filter_into(
310 next_offset,
311 &mut to_page,
312 fixed_row_size,
313 var_len_visitor,
314 blob_store,
315 &mut filter,
316 )
317 };
318 copy_starting_from = if let ControlFlow::Continue(continue_point) = cfi_ret {
319 // If `to_page` couldn't fit all of `from_page`,
320 // repeat the `while_let` loop to copy the rest.
321 Some(continue_point)
322 } else {
323 // If `to_page` fit all of `from_page`, we can move on.
324 None
325 };
326
327 // If `from_page` finished copying into `to_page`, then `to_page` may have extra room.
328 //
329 // If `copy_filtered_into` returns `Some`,
330 // that means at least one row didn't have space in `to_page`,
331 // so we must consider `to_page` full.
332 //
333 // Note that this is distinct from `Page::is_full`,
334 // as that method considers the optimistic case of a row with no var-len members.
335 if copy_starting_from.is_none() {
336 partial_page = Some(to_page);
337 } else {
338 partial_copied_pages.pages.push(to_page);
339 }
340 }
341 }
342
343 partial_copied_pages
344 }
345
346 /// Set this [`Pages`]' contents to be the `pages`.
347 ///
348 /// Used when restoring from a snapshot.
349 ///
350 /// Each page in the `pages` must be consistent with the schema for this [`Pages`],
351 /// i.e. the schema for the [`crate::table::Table`] which contains `self`.
352 ///
353 /// Should only ever be called when `self.is_empty()`.
354 ///
355 /// Also populates `self.non_full_pages`.
356 pub fn set_contents(&mut self, pages: Vec<Box<Page>>, fixed_row_size: Size) {
357 debug_assert!(self.is_empty());
358 self.non_full_pages = pages
359 .iter()
360 .enumerate()
361 .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
362 .collect();
363 self.pages = pages;
364 }
365
366 /// Consumes the page manager, returning all the pages it held.
367 pub fn into_page_iter(self) -> impl Iterator<Item = Box<Page>> {
368 self.pages.into_iter()
369 }
370}
371
372impl Deref for Pages {
373 type Target = [Box<Page>];
374
375 fn deref(&self) -> &Self::Target {
376 &self.pages
377 }
378}
379
380impl DerefMut for Pages {
381 fn deref_mut(&mut self) -> &mut Self::Target {
382 &mut self.pages
383 }
384}