spacetimedb_table/pages.rs
1//! Provides [`Pages`], a page manager dealing with [`Page`]s as a collection.
2
3use super::blob_store::BlobStore;
4use super::indexes::{Bytes, PageIndex, PageOffset, RowPointer, Size};
5use super::page::Page;
6use super::page_pool::PagePool;
7use super::table::BlobNumBytes;
8use super::var_len::VarLenMembers;
9use crate::MemoryUsage;
10use core::ops::{ControlFlow, Deref, Index, IndexMut};
11use std::ops::DerefMut;
12use thiserror::Error;
13
14#[derive(Error, Debug, PartialEq, Eq)]
15pub enum Error {
16 #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())]
17 TooManyPages,
18 #[error(transparent)]
19 Page(#[from] super::page::Error),
20}
21
22impl Index<PageIndex> for Pages {
23 type Output = Page;
24
25 fn index(&self, pi: PageIndex) -> &Self::Output {
26 &self.pages[pi.idx()]
27 }
28}
29
30impl IndexMut<PageIndex> for Pages {
31 fn index_mut(&mut self, pi: PageIndex) -> &mut Self::Output {
32 &mut self.pages[pi.idx()]
33 }
34}
35
36/// A manager of [`Page`]s.
37#[derive(Default, Debug, PartialEq, Eq)]
38pub struct Pages {
39 /// The collection of pages under management.
40 pages: Vec<Box<Page>>,
41 /// The set of pages that aren't yet full.
42 non_full_pages: Vec<PageIndex>,
43}
44
45impl MemoryUsage for Pages {
46 fn heap_usage(&self) -> usize {
47 let Self { pages, non_full_pages } = self;
48 pages.heap_usage() + non_full_pages.heap_usage()
49 }
50}
51
52impl Pages {
53 /// Is there space to allocate another page?
54 pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
55 let new_idx = self.len();
56 if new_idx <= PageIndex::MAX.idx() {
57 Ok(PageIndex(new_idx as _))
58 } else {
59 Err(Error::TooManyPages)
60 }
61 }
62
63 /// Get a mutable reference to a `Page`.
64 ///
65 /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`,
66 /// as that allows split borrows.
67 pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page {
68 &mut self.pages[page.idx()]
69 }
70
71 /// Make all pages within `self` clear,
72 /// deleting all rows.
73 #[doc(hidden)] // Used in benchmarks.
74 pub fn clear(&mut self) {
75 // Clear every page.
76 for page in &mut self.pages {
77 page.clear();
78 }
79 // Mark every page non-full.
80 self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
81 }
82
83 /// Get a reference to fixed-len row data.
84 ///
85 /// Used in benchmarks.
86 /// Higher-level code paths are expected to go through [`super::de::read_row_from_pages`].
87 #[doc(hidden)] // Used in benchmarks.
88 pub fn get_fixed_len_row(&self, row: RowPointer, fixed_row_size: Size) -> &Bytes {
89 self[row.page_index()].get_row_data(row.page_offset(), fixed_row_size)
90 }
91
92 /// Allocates one additional page,
93 /// returning an error if the new number of pages would overflow `PageIndex::MAX`.
94 ///
95 /// The new page is initially empty, but is not added to the non-full set.
96 /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
97 fn allocate_new_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
98 let new_idx = self.can_allocate_new_page()?;
99
100 let page = pool.take_with_fixed_row_size(fixed_row_size);
101 self.pages.push(page);
102
103 Ok(new_idx)
104 }
105
106 /// Reserve a new, initially empty page.
107 pub fn reserve_empty_page(&mut self, pool: &PagePool, fixed_row_size: Size) -> Result<PageIndex, Error> {
108 let idx = self.allocate_new_page(pool, fixed_row_size)?;
109 self.mark_page_non_full(idx);
110 Ok(idx)
111 }
112
113 /// Mark the page at `idx` as non-full.
114 pub fn mark_page_non_full(&mut self, idx: PageIndex) {
115 self.non_full_pages.push(idx);
116 }
117
118 /// If the page at `page_index` is not full,
119 /// add it to the non-full set so that later insertions can access it.
120 pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
121 if !self[page_index].is_full(fixed_row_size) {
122 self.non_full_pages.push(page_index);
123 }
124 }
125
126 /// Call `f` with a reference to a page which satisfies
127 /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
128 pub fn with_page_to_insert_row<Res>(
129 &mut self,
130 pool: &PagePool,
131 fixed_row_size: Size,
132 num_var_len_granules: usize,
133 f: impl FnOnce(&mut Page) -> Res,
134 ) -> Result<(PageIndex, Res), Error> {
135 let page_index = self.find_page_with_space_for_row(pool, fixed_row_size, num_var_len_granules)?;
136 let res = f(&mut self[page_index]);
137 self.maybe_mark_page_non_full(page_index, fixed_row_size);
138 Ok((page_index, res))
139 }
140
141 /// Find a page with sufficient available space to store a row of size `fixed_row_size`
142 /// containing `num_var_len_granules` granules of var-len data.
143 ///
144 /// Retrieving a page in this way will remove it from the non-full set.
145 /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
146 /// to restore the page to the non-full set.
147 fn find_page_with_space_for_row(
148 &mut self,
149 pool: &PagePool,
150 fixed_row_size: Size,
151 num_var_len_granules: usize,
152 ) -> Result<PageIndex, Error> {
153 if let Some((page_idx_idx, page_idx)) = self
154 .non_full_pages
155 .iter()
156 .copied()
157 .enumerate()
158 .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
159 {
160 self.non_full_pages.swap_remove(page_idx_idx);
161 return Ok(page_idx);
162 }
163
164 self.allocate_new_page(pool, fixed_row_size)
165 }
166
167 /// Superseded by `write_av_to_pages`, but exposed for benchmarking
168 /// when we want to avoid the overhead of traversing `AlgebraicType`.
169 ///
170 /// Inserts a row with fixed parts in `fixed_len` and variable parts in `var_len`.
171 /// The `fixed_len.len()` is equal to `fixed_row_size`.
172 ///
173 /// # Safety
174 ///
175 /// - `var_len_visitor` must be suitable for visiting var-len refs in `fixed_row`.
176 /// - `fixed_row.len()` matches the row type size exactly.
177 /// - `fixed_row.len()` is consistent
178 /// with what has been passed to the manager in all other ops
179 /// and must be consistent with the `var_len_visitor` the manager was made with.
180 // TODO(bikeshedding): rename to make purpose as bench interface clear?
181 pub unsafe fn insert_row(
182 &mut self,
183 pool: &PagePool,
184 var_len_visitor: &impl VarLenMembers,
185 fixed_row_size: Size,
186 fixed_len: &Bytes,
187 var_len: &[&[u8]],
188 blob_store: &mut dyn BlobStore,
189 ) -> Result<(PageIndex, PageOffset), Error> {
190 debug_assert!(fixed_len.len() == fixed_row_size.len());
191
192 match self.with_page_to_insert_row(
193 pool,
194 fixed_row_size,
195 Page::total_granules_required_for_objects(var_len),
196 |page| {
197 // This insertion can never fail, as we know that the page has sufficient space from `find_page_with_space_for_row`.
198 //
199 // SAFETY:
200 // - Caller promised that `var_len_visitor`
201 // is suitable for visiting var-len refs in `fixed_row`
202 // and that `fixed_row.len()` matches the row type size exactly.
203 //
204 // - Caller promised that `fixed_row.len()` is consistent
205 // with what has been passed to the manager in all other ops.
206 // This entails that `fixed_row.len()` is consistent with `page`.
207 unsafe { page.insert_row(fixed_len, var_len, var_len_visitor, blob_store) }
208 },
209 )? {
210 (page, Ok(offset)) => Ok((page, offset)),
211 (_, Err(e)) => Err(e.into()),
212 }
213 }
214
215 /// Free the row that is pointed to by `row_ptr`,
216 /// marking its fixed-len storage
217 /// and var-len storage granules as available for re-use.
218 ///
219 /// # Safety
220 ///
221 /// The `row_ptr` must point to a valid row in this page manager,
222 /// of `fixed_row_size` bytes for the fixed part.
223 ///
224 /// The `fixed_row_size` must be consistent
225 /// with what has been passed to the manager in all other operations
226 /// and must be consistent with the `var_len_visitor` the manager was made with.
227 pub unsafe fn delete_row(
228 &mut self,
229 var_len_visitor: &impl VarLenMembers,
230 fixed_row_size: Size,
231 row_ptr: RowPointer,
232 blob_store: &mut dyn BlobStore,
233 ) -> BlobNumBytes {
234 let page = &mut self[row_ptr.page_index()];
235 let full_before = page.is_full(fixed_row_size);
236 // SAFETY:
237 // - `row_ptr.page_offset()` does point to a valid row in this page
238 // as the caller promised that `row_ptr` points to a valid row in `self`.
239 //
240 // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
241 // The size is also conistent with `var_len_visitor`.
242 let blob_store_deleted_bytes =
243 unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };
244
245 // If the page was previously full, mark it as non-full now,
246 // since we just opened a space in it.
247 if full_before {
248 self.mark_page_non_full(row_ptr.page_index());
249 }
250 blob_store_deleted_bytes
251 }
252
253 /// Materialize a view of rows in `self` for which the `filter` returns `true`.
254 ///
255 /// # Safety
256 ///
257 /// - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
258 /// as the visitor provided to all other methods on `self`.
259 ///
260 /// - The `fixed_row_size` is consistent with the `var_len_visitor`
261 /// and is equal to the value provided to all other methods on `self`.
262 pub unsafe fn copy_filter(
263 &self,
264 var_len_visitor: &impl VarLenMembers,
265 fixed_row_size: Size,
266 blob_store: &mut dyn BlobStore,
267 mut filter: impl FnMut(&Page, PageOffset) -> bool,
268 ) -> Self {
269 // Build a new container to hold the materialized view.
270 // Push pages into it later.
271 let mut partial_copied_pages = Self::default();
272
273 // A destination page that was not filled entirely,
274 // or `None` if it's time to allocate a new destination page.
275 let mut partial_page = None;
276
277 // Copy each page.
278 for from_page in &self.pages {
279 // You may require multiple calls to `Page::copy_starting_from`
280 // if `partial_page` fills up;
281 // the first call starts from 0.
282 let mut copy_starting_from = Some(PageOffset(0));
283
284 // While there are unprocessed rows in `from_page`,
285 while let Some(next_offset) = copy_starting_from.take() {
286 // Grab the `partial_page` or allocate a new one.
287 let mut to_page = partial_page.take().unwrap_or_else(|| Page::new(fixed_row_size));
288
289 // Copy as many rows as will fit in `to_page`.
290 //
291 // SAFETY:
292 //
293 // - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
294 // as the visitor provided to all other methods on `self`.
295 // The `to_page` uses the same visitor as the `from_page`.
296 //
297 // - The `fixed_row_size` is consistent with the `var_len_visitor`
298 // and is equal to the value provided to all other methods on `self`,
299 // as promised by the caller.
300 // The newly made `to_page` uses the same `fixed_row_size` as the `from_page`.
301 //
302 // - The `next_offset` is either 0,
303 // which is always a valid starting offset for any row size,
304 // or it came from `copy_filter_into` in a previous iteration,
305 // which, given that `fixed_row_size` was valid,
306 // always returns a valid starting offset in case of `Continue(_)`.
307 let cfi_ret = unsafe {
308 from_page.copy_filter_into(
309 next_offset,
310 &mut to_page,
311 fixed_row_size,
312 var_len_visitor,
313 blob_store,
314 &mut filter,
315 )
316 };
317 copy_starting_from = if let ControlFlow::Continue(continue_point) = cfi_ret {
318 // If `to_page` couldn't fit all of `from_page`,
319 // repeat the `while_let` loop to copy the rest.
320 Some(continue_point)
321 } else {
322 // If `to_page` fit all of `from_page`, we can move on.
323 None
324 };
325
326 // If `from_page` finished copying into `to_page`, then `to_page` may have extra room.
327 //
328 // If `copy_filtered_into` returns `Some`,
329 // that means at least one row didn't have space in `to_page`,
330 // so we must consider `to_page` full.
331 //
332 // Note that this is distinct from `Page::is_full`,
333 // as that method considers the optimistic case of a row with no var-len members.
334 if copy_starting_from.is_none() {
335 partial_page = Some(to_page);
336 } else {
337 partial_copied_pages.pages.push(to_page);
338 }
339 }
340 }
341
342 partial_copied_pages
343 }
344
345 /// Set this [`Pages`]' contents to be the `pages`.
346 ///
347 /// Used when restoring from a snapshot.
348 ///
349 /// Each page in the `pages` must be consistent with the schema for this [`Pages`],
350 /// i.e. the schema for the [`crate::table::Table`] which contains `self`.
351 ///
352 /// Should only ever be called when `self.is_empty()`.
353 ///
354 /// Also populates `self.non_full_pages`.
355 pub fn set_contents(&mut self, pages: Vec<Box<Page>>, fixed_row_size: Size) {
356 debug_assert!(self.is_empty());
357 self.non_full_pages = pages
358 .iter()
359 .enumerate()
360 .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
361 .collect();
362 self.pages = pages;
363 }
364
365 /// Consumes the page manager, returning all the pages it held.
366 pub fn into_page_iter(self) -> impl Iterator<Item = Box<Page>> {
367 self.pages.into_iter()
368 }
369}
370
371impl Deref for Pages {
372 type Target = [Box<Page>];
373
374 fn deref(&self) -> &Self::Target {
375 &self.pages
376 }
377}
378
379impl DerefMut for Pages {
380 fn deref_mut(&mut self) -> &mut Self::Target {
381 &mut self.pages
382 }
383}