spacetimedb_table/pages.rs
1//! Provides [`Pages`], a page manager dealing with [`Page`]s as a collection.
2
3use crate::MemoryUsage;
4
5use super::blob_store::BlobStore;
6use super::indexes::{Bytes, PageIndex, PageOffset, RowPointer, Size};
7use super::page::Page;
8use super::table::BlobNumBytes;
9use super::var_len::VarLenMembers;
10use core::ops::{ControlFlow, Deref, Index, IndexMut};
11use std::ops::DerefMut;
12use thiserror::Error;
13
14#[derive(Error, Debug, PartialEq, Eq)]
15pub enum Error {
16 #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())]
17 TooManyPages,
18 #[error(transparent)]
19 Page(#[from] super::page::Error),
20}
21
22impl Index<PageIndex> for Pages {
23 type Output = Page;
24
25 fn index(&self, pi: PageIndex) -> &Self::Output {
26 &self.pages[pi.idx()]
27 }
28}
29
30impl IndexMut<PageIndex> for Pages {
31 fn index_mut(&mut self, pi: PageIndex) -> &mut Self::Output {
32 &mut self.pages[pi.idx()]
33 }
34}
35
36/// A manager of [`Page`]s.
37#[derive(Default, Debug, PartialEq, Eq)]
38pub struct Pages {
39 /// The collection of pages under management.
40 pages: Vec<Box<Page>>,
41 /// The set of pages that aren't yet full.
42 non_full_pages: Vec<PageIndex>,
43}
44
45impl MemoryUsage for Pages {
46 fn heap_usage(&self) -> usize {
47 let Self { pages, non_full_pages } = self;
48 pages.heap_usage() + non_full_pages.heap_usage()
49 }
50}
51
52impl Pages {
53 /// Is there space to allocate another page?
54 pub fn can_allocate_new_page(&self) -> Result<PageIndex, Error> {
55 let new_idx = self.len();
56 if new_idx <= PageIndex::MAX.idx() {
57 Ok(PageIndex(new_idx as _))
58 } else {
59 Err(Error::TooManyPages)
60 }
61 }
62
63 /// Get a mutable reference to a `Page`.
64 ///
65 /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`,
66 /// as that allows split borrows.
67 pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page {
68 &mut self.pages[page.idx()]
69 }
70
71 /// Make all pages within `self` clear,
72 /// deallocating all rows.
73 #[doc(hidden)] // Used in benchmarks.
74 pub fn clear(&mut self) {
75 // Clear every page.
76 for page in &mut self.pages {
77 page.clear();
78 }
79 // Mark every page non-full.
80 self.non_full_pages = (0..self.pages.len()).map(|idx| PageIndex(idx as u64)).collect();
81 }
82
83 /// Get a reference to fixed-len row data.
84 ///
85 /// Used in benchmarks.
86 /// Higher-level code paths are expected to go through [`super::de::read_row_from_pages`].
87 #[doc(hidden)] // Used in benchmarks.
88 pub fn get_fixed_len_row(&self, row: RowPointer, fixed_row_size: Size) -> &Bytes {
89 self[row.page_index()].get_row_data(row.page_offset(), fixed_row_size)
90 }
91
92 /// Allocates one additional page,
93 /// returning an error if the new number of pages would overflow `PageIndex::MAX`.
94 ///
95 /// The new page is initially empty, but is not added to the non-full set.
96 /// Callers should call [`Pages::maybe_mark_page_non_full`] after operating on the new page.
97 fn allocate_new_page(&mut self, fixed_row_size: Size) -> Result<PageIndex, Error> {
98 let new_idx = self.can_allocate_new_page()?;
99
100 self.pages.push(Page::new(fixed_row_size));
101
102 Ok(new_idx)
103 }
104
105 /// Reserve a new, initially empty page.
106 pub fn reserve_empty_page(&mut self, fixed_row_size: Size) -> Result<PageIndex, Error> {
107 let idx = self.allocate_new_page(fixed_row_size)?;
108 self.mark_page_non_full(idx);
109 Ok(idx)
110 }
111
112 /// Mark the page at `idx` as non-full.
113 pub fn mark_page_non_full(&mut self, idx: PageIndex) {
114 self.non_full_pages.push(idx);
115 }
116
117 /// If the page at `page_index` is not full,
118 /// add it to the non-full set so that later insertions can access it.
119 pub fn maybe_mark_page_non_full(&mut self, page_index: PageIndex, fixed_row_size: Size) {
120 if !self[page_index].is_full(fixed_row_size) {
121 self.non_full_pages.push(page_index);
122 }
123 }
124
125 /// Call `f` with a reference to a page which satisfies
126 /// `page.has_space_for_row(fixed_row_size, num_var_len_granules)`.
127 pub fn with_page_to_insert_row<Res>(
128 &mut self,
129 fixed_row_size: Size,
130 num_var_len_granules: usize,
131 f: impl FnOnce(&mut Page) -> Res,
132 ) -> Result<(PageIndex, Res), Error> {
133 let page_index = self.find_page_with_space_for_row(fixed_row_size, num_var_len_granules)?;
134 let res = f(&mut self[page_index]);
135 self.maybe_mark_page_non_full(page_index, fixed_row_size);
136 Ok((page_index, res))
137 }
138
139 /// Find a page with sufficient available space to store a row of size `fixed_row_size`
140 /// containing `num_var_len_granules` granules of var-len data.
141 ///
142 /// Retrieving a page in this way will remove it from the non-full set.
143 /// After performing an insertion, the caller should use [`Pages::maybe_mark_page_non_full`]
144 /// to restore the page to the non-full set.
145 fn find_page_with_space_for_row(
146 &mut self,
147 fixed_row_size: Size,
148 num_var_len_granules: usize,
149 ) -> Result<PageIndex, Error> {
150 if let Some((page_idx_idx, page_idx)) = self
151 .non_full_pages
152 .iter()
153 .copied()
154 .enumerate()
155 .find(|(_, page_idx)| self[*page_idx].has_space_for_row(fixed_row_size, num_var_len_granules))
156 {
157 self.non_full_pages.swap_remove(page_idx_idx);
158 return Ok(page_idx);
159 }
160
161 self.allocate_new_page(fixed_row_size)
162 }
163
164 /// Superseded by `write_av_to_pages`, but exposed for benchmarking
165 /// when we want to avoid the overhead of traversing `AlgebraicType`.
166 ///
167 /// Inserts a row with fixed parts in `fixed_len` and variable parts in `var_len`.
168 /// The `fixed_len.len()` is equal to `fixed_row_size`.
169 ///
170 /// # Safety
171 ///
172 /// - `var_len_visitor` must be suitable for visiting var-len refs in `fixed_row`.
173 /// - `fixed_row.len()` matches the row type size exactly.
174 /// - `fixed_row.len()` is consistent
175 /// with what has been passed to the manager in all other ops
176 /// and must be consistent with the `var_len_visitor` the manager was made with.
177 // TODO(bikeshedding): rename to make purpose as bench interface clear?
178 pub unsafe fn insert_row(
179 &mut self,
180 var_len_visitor: &impl VarLenMembers,
181 fixed_row_size: Size,
182 fixed_len: &Bytes,
183 var_len: &[&[u8]],
184 blob_store: &mut dyn BlobStore,
185 ) -> Result<(PageIndex, PageOffset), Error> {
186 debug_assert!(fixed_len.len() == fixed_row_size.len());
187
188 match self.with_page_to_insert_row(
189 fixed_row_size,
190 Page::total_granules_required_for_objects(var_len),
191 |page| {
192 // This insertion can never fail, as we know that the page has sufficient space from `find_page_with_space_for_row`.
193 //
194 // SAFETY:
195 // - Caller promised that `var_len_visitor`
196 // is suitable for visiting var-len refs in `fixed_row`
197 // and that `fixed_row.len()` matches the row type size exactly.
198 //
199 // - Caller promised that `fixed_row.len()` is consistent
200 // with what has been passed to the manager in all other ops.
201 // This entails that `fixed_row.len()` is consistent with `page`.
202 unsafe { page.insert_row(fixed_len, var_len, var_len_visitor, blob_store) }
203 },
204 )? {
205 (page, Ok(offset)) => Ok((page, offset)),
206 (_, Err(e)) => Err(e.into()),
207 }
208 }
209
210 /// Free the row that is pointed to by `row_ptr`,
211 /// marking its fixed-len storage
212 /// and var-len storage granules as available for re-use.
213 ///
214 /// # Safety
215 ///
216 /// The `row_ptr` must point to a valid row in this page manager,
217 /// of `fixed_row_size` bytes for the fixed part.
218 ///
219 /// The `fixed_row_size` must be consistent
220 /// with what has been passed to the manager in all other operations
221 /// and must be consistent with the `var_len_visitor` the manager was made with.
222 pub unsafe fn delete_row(
223 &mut self,
224 var_len_visitor: &impl VarLenMembers,
225 fixed_row_size: Size,
226 row_ptr: RowPointer,
227 blob_store: &mut dyn BlobStore,
228 ) -> BlobNumBytes {
229 let page = &mut self[row_ptr.page_index()];
230 let full_before = page.is_full(fixed_row_size);
231 // SAFETY:
232 // - `row_ptr.page_offset()` does point to a valid row in this page
233 // as the caller promised that `row_ptr` points to a valid row in `self`.
234 //
235 // - `fixed_row_size` is consistent with the size in bytes of the fixed part of the row.
236 // The size is also conistent with `var_len_visitor`.
237 let blob_store_deleted_bytes =
238 unsafe { page.delete_row(row_ptr.page_offset(), fixed_row_size, var_len_visitor, blob_store) };
239
240 // If the page was previously full, mark it as non-full now,
241 // since we just opened a space in it.
242 if full_before {
243 self.mark_page_non_full(row_ptr.page_index());
244 }
245 blob_store_deleted_bytes
246 }
247
248 /// Materialize a view of rows in `self` for which the `filter` returns `true`.
249 ///
250 /// # Safety
251 ///
252 /// - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
253 /// as the visitor provided to all other methods on `self`.
254 ///
255 /// - The `fixed_row_size` is consistent with the `var_len_visitor`
256 /// and is equal to the value provided to all other methods on `self`.
257 pub unsafe fn copy_filter(
258 &self,
259 var_len_visitor: &impl VarLenMembers,
260 fixed_row_size: Size,
261 blob_store: &mut dyn BlobStore,
262 mut filter: impl FnMut(&Page, PageOffset) -> bool,
263 ) -> Self {
264 // Build a new container to hold the materialized view.
265 // Push pages into it later.
266 let mut partial_copied_pages = Self::default();
267
268 // A destination page that was not filled entirely,
269 // or `None` if it's time to allocate a new destination page.
270 let mut partial_page = None;
271
272 // Copy each page.
273 for from_page in &self.pages {
274 // You may require multiple calls to `Page::copy_starting_from`
275 // if `partial_page` fills up;
276 // the first call starts from 0.
277 let mut copy_starting_from = Some(PageOffset(0));
278
279 // While there are unprocessed rows in `from_page`,
280 while let Some(next_offset) = copy_starting_from.take() {
281 // Grab the `partial_page` or allocate a new one.
282 let mut to_page = partial_page.take().unwrap_or_else(|| Page::new(fixed_row_size));
283
284 // Copy as many rows as will fit in `to_page`.
285 //
286 // SAFETY:
287 //
288 // - The `var_len_visitor` will visit the same set of `VarLenRef`s in the row
289 // as the visitor provided to all other methods on `self`.
290 // The `to_page` uses the same visitor as the `from_page`.
291 //
292 // - The `fixed_row_size` is consistent with the `var_len_visitor`
293 // and is equal to the value provided to all other methods on `self`,
294 // as promised by the caller.
295 // The newly made `to_page` uses the same `fixed_row_size` as the `from_page`.
296 //
297 // - The `next_offset` is either 0,
298 // which is always a valid starting offset for any row size,
299 // or it came from `copy_filter_into` in a previous iteration,
300 // which, given that `fixed_row_size` was valid,
301 // always returns a valid starting offset in case of `Continue(_)`.
302 let cfi_ret = unsafe {
303 from_page.copy_filter_into(
304 next_offset,
305 &mut to_page,
306 fixed_row_size,
307 var_len_visitor,
308 blob_store,
309 &mut filter,
310 )
311 };
312 copy_starting_from = if let ControlFlow::Continue(continue_point) = cfi_ret {
313 // If `to_page` couldn't fit all of `from_page`,
314 // repeat the `while_let` loop to copy the rest.
315 Some(continue_point)
316 } else {
317 // If `to_page` fit all of `from_page`, we can move on.
318 None
319 };
320
321 // If `from_page` finished copying into `to_page`, then `to_page` may have extra room.
322 //
323 // If `copy_filtered_into` returns `Some`,
324 // that means at least one row didn't have space in `to_page`,
325 // so we must consider `to_page` full.
326 //
327 // Note that this is distinct from `Page::is_full`,
328 // as that method considers the optimistic case of a row with no var-len members.
329 if copy_starting_from.is_none() {
330 partial_page = Some(to_page);
331 } else {
332 partial_copied_pages.pages.push(to_page);
333 }
334 }
335 }
336
337 partial_copied_pages
338 }
339
340 /// Set this [`Pages`]' contents to be the `pages`.
341 ///
342 /// Used when restoring from a snapshot.
343 ///
344 /// Each page in the `pages` must be consistent with the schema for this [`Pages`],
345 /// i.e. the schema for the [`crate::table::Table`] which contains `self`.
346 ///
347 /// Should only ever be called when `self.is_empty()`.
348 ///
349 /// Also populates `self.non_full_pages`.
350 pub fn set_contents(&mut self, pages: Vec<Box<Page>>, fixed_row_size: Size) {
351 debug_assert!(self.is_empty());
352 self.non_full_pages = pages
353 .iter()
354 .enumerate()
355 .filter_map(|(idx, page)| (!page.is_full(fixed_row_size)).then_some(PageIndex(idx as _)))
356 .collect();
357 self.pages = pages;
358 }
359}
360
361impl Deref for Pages {
362 type Target = [Box<Page>];
363
364 fn deref(&self) -> &Self::Target {
365 &self.pages
366 }
367}
368
369impl DerefMut for Pages {
370 fn deref_mut(&mut self) -> &mut Self::Target {
371 &mut self.pages
372 }
373}