spacetimedb_table/table_index/
unique_direct_index.rs1use crate::indexes::{PageIndex, PageOffset, RowPointer, SquashedOffset};
2use crate::MemoryUsage;
3use core::mem;
4use core::ops::{Bound, RangeBounds};
5use core::option::IntoIter;
6
7#[derive(Debug, Clone, Default, PartialEq, Eq)]
14pub struct UniqueDirectIndex {
15 outer: Vec<Option<InnerIndex>>,
17 len: usize,
19}
20
21impl MemoryUsage for UniqueDirectIndex {
22 fn heap_usage(&self) -> usize {
23 let Self { outer, len } = self;
24 outer.heap_usage() + len.heap_usage()
25 }
26}
27
28const PAGE_SIZE: usize = 4_096;
30const KEYS_PER_INNER: usize = PAGE_SIZE / size_of::<RowPointer>();
32type InnerIndexArray = [RowPointer; KEYS_PER_INNER];
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37struct InnerIndex {
38 inner: Box<InnerIndexArray>,
39}
40
41impl MemoryUsage for InnerIndex {
42 fn heap_usage(&self) -> usize {
43 self.inner.heap_usage()
44 }
45}
46
47pub(super) const NONE_PTR: RowPointer = RowPointer::new(false, PageIndex(0), PageOffset(0), SquashedOffset::TX_STATE);
50
51struct InnerIndexKey(usize);
52
53fn split_key(key: usize) -> (usize, InnerIndexKey) {
55 (key / KEYS_PER_INNER, InnerIndexKey(key % KEYS_PER_INNER))
56}
57
58impl InnerIndex {
59 fn new() -> Self {
60 use std::alloc::{alloc_zeroed, handle_alloc_error, Layout};
61
62 let layout = Layout::new::<InnerIndexArray>();
63
64 let raw: *mut InnerIndexArray = unsafe { alloc_zeroed(layout) }.cast();
69
70 if raw.is_null() {
71 handle_alloc_error(layout);
72 }
73
74 let inner = unsafe { Box::from_raw(raw) };
78
79 Self { inner }
80 }
81
82 fn get(&self, key: InnerIndexKey) -> RowPointer {
84 *unsafe { self.inner.get_unchecked(key.0) }
86 }
87
88 fn get_mut(&mut self, key: InnerIndexKey) -> &mut RowPointer {
90 unsafe { self.inner.get_unchecked_mut(key.0) }
92 }
93}
94
95impl UniqueDirectIndex {
96 pub fn insert(&mut self, key: usize, val: RowPointer) -> Result<(), RowPointer> {
101 let (key_outer, key_inner) = split_key(key);
102
103 let outer = &mut self.outer;
105 outer.resize(outer.len().max(key_outer + 1), None);
106
107 let inner = unsafe { outer.get_unchecked_mut(key_outer) };
110 let inner = inner.get_or_insert_with(InnerIndex::new);
111
112 let slot = inner.get_mut(key_inner);
114 let in_slot = *slot;
115 if in_slot == NONE_PTR {
116 *slot = val.with_reserved_bit(true);
118 self.len += 1;
119 Ok(())
120 } else {
121 Err(in_slot.with_reserved_bit(false))
122 }
123 }
124
125 pub fn delete(&mut self, key: usize) -> bool {
129 let (key_outer, key_inner) = split_key(key);
130 let outer = &mut self.outer;
131 if let Some(Some(inner)) = outer.get_mut(key_outer) {
132 let slot = inner.get_mut(key_inner);
133 let old_val = mem::replace(slot, NONE_PTR);
134 let deleted = old_val != NONE_PTR;
135 self.len -= deleted as usize;
136 return deleted;
137 }
138 false
139 }
140
141 pub fn seek_point(&self, key: usize) -> UniqueDirectIndexPointIter {
143 let (outer_key, inner_key) = split_key(key);
144 let point = self
145 .outer
146 .get(outer_key)
147 .and_then(|x| x.as_ref())
148 .map(|inner| inner.get(inner_key))
149 .filter(|slot| *slot != NONE_PTR);
150 UniqueDirectIndexPointIter::new(point)
151 }
152
153 pub fn seek_range(&self, range: &impl RangeBounds<usize>) -> UniqueDirectIndexRangeIter {
155 let max_key = self.outer.len() * KEYS_PER_INNER;
159
160 let start = match range.start_bound() {
162 Bound::Included(&s) => s,
163 Bound::Excluded(&s) => s + 1, Bound::Unbounded => 0,
165 };
166 let end = match range.end_bound() {
167 Bound::Included(&e) => e + 1, Bound::Excluded(&e) => e,
169 Bound::Unbounded => max_key,
170 };
171
172 let end = end.min(max_key);
174
175 let start = start.min(end);
177
178 UniqueDirectIndexRangeIter {
179 outer: &self.outer,
180 start,
181 end,
182 }
183 }
184
185 pub fn num_keys(&self) -> usize {
187 self.len
188 }
189
190 pub fn len(&self) -> usize {
192 self.len
193 }
194
195 #[allow(unused)] pub fn is_empty(&self) -> bool {
198 self.len() == 0
199 }
200
201 pub fn clear(&mut self) {
204 self.outer.clear();
205 self.len = 0;
206 }
207
208 pub(crate) fn can_merge(&self, other: &Self, ignore: impl Fn(&RowPointer) -> bool) -> Result<(), RowPointer> {
213 for (inner_s, inner_o) in self.outer.iter().zip(&other.outer) {
214 let (Some(inner_s), Some(inner_o)) = (inner_s, inner_o) else {
215 continue;
216 };
217
218 for (slot_s, slot_o) in inner_s.inner.iter().zip(inner_o.inner.iter()) {
219 let ptr_s = slot_s.with_reserved_bit(false);
220 if *slot_s != NONE_PTR && *slot_o != NONE_PTR && !ignore(&ptr_s) {
221 return Err(ptr_s);
223 }
224 }
225 }
226
227 Ok(())
228 }
229}
230
231pub struct UniqueDirectIndexPointIter {
233 iter: IntoIter<RowPointer>,
234}
235
236impl UniqueDirectIndexPointIter {
237 pub(super) fn new(point: Option<RowPointer>) -> Self {
238 let iter = point.map(|ptr| ptr.with_reserved_bit(false)).into_iter();
239 Self { iter }
240 }
241}
242
243impl Iterator for UniqueDirectIndexPointIter {
244 type Item = RowPointer;
245 fn next(&mut self) -> Option<Self::Item> {
246 self.iter.next()
247 }
248}
249
250#[derive(Debug)]
252pub struct UniqueDirectIndexRangeIter<'a> {
253 outer: &'a [Option<InnerIndex>],
254 start: usize,
255 end: usize,
256}
257
258impl Iterator for UniqueDirectIndexRangeIter<'_> {
259 type Item = RowPointer;
260 fn next(&mut self) -> Option<Self::Item> {
261 loop {
262 if self.start >= self.end {
263 return None;
265 }
266
267 let (outer_key, inner_key) = split_key(self.start);
268 let inner = unsafe { self.outer.get_unchecked(outer_key) };
274 let Some(inner) = inner else {
275 self.start += KEYS_PER_INNER;
279 continue;
280 };
281 let ptr = inner.get(inner_key);
282
283 self.start += 1;
285
286 if ptr != NONE_PTR {
287 return Some(ptr.with_reserved_bit(false));
289 }
290 }
291 }
292}
293
294#[cfg(test)]
295pub(super) mod test {
296 use core::iter::repeat_with;
297
298 use super::*;
299 use crate::indexes::Size;
300
301 const FIXED_ROW_SIZE: Size = Size(4 * 4);
302
303 pub(crate) fn gen_row_pointers() -> impl Iterator<Item = RowPointer> {
304 let mut page_index = PageIndex(0);
305 let mut page_offset = PageOffset(0);
306 repeat_with(move || {
307 if page_offset.0 as usize + FIXED_ROW_SIZE.0 as usize >= PageOffset::PAGE_END.0 as usize {
308 page_index.0 += 1;
310 page_offset = PageOffset(0);
311 } else {
312 page_offset += FIXED_ROW_SIZE;
313 }
314
315 RowPointer::new(false, page_index, page_offset, SquashedOffset::COMMITTED_STATE)
316 })
317 }
318
319 #[test]
320 fn seek_range_gives_back_inserted() {
321 let range = (KEYS_PER_INNER - 2)..(KEYS_PER_INNER + 2);
322 let (keys, ptrs): (Vec<_>, Vec<_>) = range.clone().zip(gen_row_pointers()).unzip();
323
324 let mut index = UniqueDirectIndex::default();
325 for (key, ptr) in keys.iter().zip(&ptrs) {
326 index.insert(*key, *ptr).unwrap();
327 }
328 assert_eq!(index.len(), 4);
329
330 let ptrs_found = index.seek_range(&range).collect::<Vec<_>>();
331 assert_eq!(ptrs, ptrs_found);
332 }
333
334 #[test]
335 fn inserting_again_errors() {
336 let range = (KEYS_PER_INNER - 2)..(KEYS_PER_INNER + 2);
337 let (keys, ptrs): (Vec<_>, Vec<_>) = range.zip(gen_row_pointers()).unzip();
338
339 let mut index = UniqueDirectIndex::default();
340 for (key, ptr) in keys.iter().zip(&ptrs) {
341 index.insert(*key, *ptr).unwrap();
342 }
343
344 for (key, ptr) in keys.iter().zip(&ptrs) {
345 assert_eq!(index.insert(*key, *ptr).unwrap_err(), *ptr)
346 }
347 }
348
349 #[test]
350 fn deleting_allows_reinsertion() {
351 let range = (KEYS_PER_INNER - 2)..(KEYS_PER_INNER + 2);
352 let (keys, ptrs): (Vec<_>, Vec<_>) = range.zip(gen_row_pointers()).unzip();
353
354 let mut index = UniqueDirectIndex::default();
355 for (key, ptr) in keys.iter().zip(&ptrs) {
356 index.insert(*key, *ptr).unwrap();
357 }
358 assert_eq!(index.len(), 4);
359
360 let key = KEYS_PER_INNER + 1;
361 let ptr = index.seek_point(key).next().unwrap();
362 assert!(index.delete(key));
363 assert!(!index.delete(key));
364 assert_eq!(index.len(), 3);
365
366 index.insert(key, ptr).unwrap();
367 assert_eq!(index.len(), 4);
368 }
369}