1use std::{
8 any::Any,
9 marker::PhantomData,
10 ops::{Deref, DerefMut},
11};
12
13use crate::numeric_id::{DenseIdMap, NumericId, define_id};
14use smallvec::SmallVec;
15
16use crate::{
17 QueryEntry, TableId, Variable,
18 action::{
19 Bindings, ExecutionState,
20 mask::{Mask, MaskIter, ValueSource},
21 },
22 common::Value,
23 hash_index::{ColumnIndex, IndexBase, TupleIndex},
24 offsets::{RowId, Subset, SubsetRef},
25 pool::{PoolSet, Pooled, with_pool_set},
26 row_buffer::{RowBuffer, TaggedRowBuffer},
27};
28
29define_id!(pub ColumnId, u32, "a particular column in a table");
30define_id!(
31 pub Generation,
32 u64,
33 "the current version of a table -- used to invalidate any existing RowIds"
34);
35define_id!(
36 pub Offset,
37 u64,
38 "an opaque offset token -- used to encode iterations over a table (within a generation). These always start at 0."
39);
40
41#[derive(Clone, Debug, PartialEq, Eq)]
43pub struct TableVersion {
44 pub major: Generation,
46 pub minor: Offset,
50 }
52
53#[derive(Clone)]
54pub struct TableSpec {
55 pub n_keys: usize,
57
58 pub n_vals: usize,
62
63 pub uncacheable_columns: DenseIdMap<ColumnId, bool>,
68
69 pub allows_delete: bool,
74}
75
76impl TableSpec {
77 pub fn arity(&self) -> usize {
79 self.n_keys + self.n_vals
80 }
81}
82
83#[derive(Eq, PartialEq, Copy, Clone)]
85pub struct TableChange {
86 pub added: bool,
88 pub removed: bool,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq)]
94pub enum Constraint {
95 Eq { l_col: ColumnId, r_col: ColumnId },
96 EqConst { col: ColumnId, val: Value },
97 LtConst { col: ColumnId, val: Value },
98 GtConst { col: ColumnId, val: Value },
99 LeConst { col: ColumnId, val: Value },
100 GeConst { col: ColumnId, val: Value },
101}
102
103pub trait Rebuilder: Send + Sync {
112 fn hint_col(&self) -> Option<ColumnId>;
115 fn rebuild_val(&self, val: Value) -> Value;
116 fn rebuild_buf(
118 &self,
119 buf: &RowBuffer,
120 start: RowId,
121 end: RowId,
122 out: &mut TaggedRowBuffer,
123 exec_state: &mut ExecutionState,
124 );
125 fn rebuild_subset(
127 &self,
128 other: WrappedTableRef,
129 subset: SubsetRef,
130 out: &mut TaggedRowBuffer,
131 exec_state: &mut ExecutionState,
132 );
133 fn rebuild_slice(&self, vals: &mut [Value]) -> bool;
135}
136
137pub struct Row {
139 pub id: RowId,
141 pub vals: Pooled<Vec<Value>>,
143}
144
145pub trait Table: Any + Send + Sync {
147 fn dyn_clone(&self) -> Box<dyn Table>;
150
151 fn rebuilder<'a>(&'a self, _cols: &[ColumnId]) -> Option<Box<dyn Rebuilder + 'a>> {
153 None
154 }
155
156 fn apply_rebuild(
163 &mut self,
164 _table_id: TableId,
165 _table: &WrappedTable,
166 _next_ts: Value,
167 _exec_state: &mut ExecutionState,
168 ) {
169 }
171
172 fn as_any(&self) -> &dyn Any;
177
178 fn spec(&self) -> TableSpec;
183
184 fn clear(&mut self);
187
188 fn all(&self) -> Subset;
192
193 fn len(&self) -> usize;
199
200 fn is_empty(&self) -> bool {
202 self.len() == 0
203 }
204
205 fn version(&self) -> TableVersion;
208
209 fn updates_since(&self, offset: Offset) -> Subset;
211
212 fn scan_generic_bounded(
221 &self,
222 subset: SubsetRef,
223 start: Offset,
224 n: usize,
225 cs: &[Constraint],
226 f: impl FnMut(RowId, &[Value]),
227 ) -> Option<Offset>
228 where
229 Self: Sized;
230
231 fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
236 where
237 Self: Sized,
238 {
239 let mut cur = Offset::new(0);
240 while let Some(next) = self.scan_generic_bounded(subset, cur, usize::MAX, &[], |id, row| {
241 f(id, row);
242 }) {
243 cur = next;
244 }
245 }
246
247 fn refine_live(&self, subset: Subset) -> Subset {
249 self.refine_one(
251 subset,
252 &Constraint::LtConst {
253 col: ColumnId::new_const(0),
254 val: Value::stale(),
255 },
256 )
257 }
258
259 fn refine_one(&self, subset: Subset, c: &Constraint) -> Subset {
263 self.refine(subset, std::slice::from_ref(c))
264 }
265
266 fn refine(&self, subset: Subset, cs: &[Constraint]) -> Subset {
270 cs.iter()
271 .fold(subset, |subset, c| self.refine_one(subset, c))
272 }
273
274 fn fast_subset(&self, _: &Constraint) -> Option<Subset> {
281 None
282 }
283
284 fn split_fast_slow(
288 &self,
289 cs: &[Constraint],
290 ) -> (
291 Subset, Pooled<Vec<Constraint>>, Pooled<Vec<Constraint>>, ) {
295 with_pool_set(|ps| {
296 let mut fast = ps.get::<Vec<Constraint>>();
297 let mut slow = ps.get::<Vec<Constraint>>();
298 let mut subset = self.all();
299 for c in cs {
300 if let Some(sub) = self.fast_subset(c) {
301 subset.intersect(sub.as_ref(), &ps.get_pool());
302 fast.push(c.clone());
303 } else {
304 slow.push(c.clone());
305 }
306 }
307 (subset, fast, slow)
308 })
309 }
310
311 fn get_row(&self, key: &[Value]) -> Option<Row>;
318
319 fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
325 self.get_row(key).map(|row| row.vals[col.index()])
326 }
327
328 fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange;
331
332 fn new_buffer(&self) -> Box<dyn MutationBuffer>;
334}
335
336pub trait MutationBuffer: Any + Send + Sync {
342 fn stage_insert(&mut self, row: &[Value]);
346
347 fn stage_remove(&mut self, key: &[Value]);
351
352 fn fresh_handle(&self) -> Box<dyn MutationBuffer>;
354}
355
356struct WrapperImpl<T>(PhantomData<T>);
357
358pub(crate) fn wrapper<T: Table>() -> Box<dyn TableWrapper> {
359 Box::new(WrapperImpl::<T>(PhantomData))
360}
361
362impl<T: Table> TableWrapper for WrapperImpl<T> {
363 fn dyn_clone(&self) -> Box<dyn TableWrapper> {
364 Box::new(Self(PhantomData))
365 }
366 fn scan_bounded(
367 &self,
368 table: &dyn Table,
369 subset: SubsetRef,
370 start: Offset,
371 n: usize,
372 out: &mut TaggedRowBuffer,
373 ) -> Option<Offset> {
374 let table = table.as_any().downcast_ref::<T>().unwrap();
375 table.scan_generic_bounded(subset, start, n, &[], |row_id, row| {
376 out.add_row(row_id, row);
377 })
378 }
379 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
380 let table = table.as_any().downcast_ref::<T>().unwrap();
381 let mut res = ColumnIndex::new();
382 table.scan_generic(subset, |row_id, row| {
383 res.add_row(&[row[col.index()]], row_id);
384 });
385 res
386 }
387 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
388 let table = table.as_any().downcast_ref::<T>().unwrap();
389 let mut res = TupleIndex::new(cols.len());
390 match cols {
391 [] => {}
392 [col] => table.scan_generic(subset, |row_id, row| {
393 res.add_row(&[row[col.index()]], row_id);
394 }),
395 [x, y] => table.scan_generic(subset, |row_id, row| {
396 res.add_row(&[row[x.index()], row[y.index()]], row_id);
397 }),
398 [x, y, z] => table.scan_generic(subset, |row_id, row| {
399 res.add_row(&[row[x.index()], row[y.index()], row[z.index()]], row_id);
400 }),
401 _ => {
402 let mut scratch = SmallVec::<[Value; 8]>::new();
403 table.scan_generic(subset, |row_id, row| {
404 for col in cols {
405 scratch.push(row[col.index()]);
406 }
407 res.add_row(&scratch, row_id);
408 scratch.clear();
409 });
410 }
411 }
412 res
413 }
414 fn scan_project(
415 &self,
416 table: &dyn Table,
417 subset: SubsetRef,
418 cols: &[ColumnId],
419 start: Offset,
420 n: usize,
421 cs: &[Constraint],
422 out: &mut TaggedRowBuffer,
423 ) -> Option<Offset> {
424 let table = table.as_any().downcast_ref::<T>().unwrap();
425 match cols {
426 [] => None,
427 [col] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
428 out.add_row(id, &[row[col.index()]]);
429 }),
430 [x, y] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
431 out.add_row(id, &[row[x.index()], row[y.index()]]);
432 }),
433 [x, y, z] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
434 out.add_row(id, &[row[x.index()], row[y.index()], row[z.index()]]);
435 }),
436 _ => {
437 let mut scratch = SmallVec::<[Value; 8]>::with_capacity(cols.len());
438 table.scan_generic_bounded(subset, start, n, cs, |id, row| {
439 for col in cols {
440 scratch.push(row[col.index()]);
441 }
442 out.add_row(id, &scratch);
443 scratch.clear();
444 })
445 }
446 }
447 }
448
449 fn lookup_row_vectorized(
450 &self,
451 table: &dyn Table,
452 mask: &mut Mask,
453 bindings: &mut Bindings,
454 args: &[QueryEntry],
455 col: ColumnId,
456 out_var: Variable,
457 ) {
458 let table = table.as_any().downcast_ref::<T>().unwrap();
459 let mut out = with_pool_set(PoolSet::get::<Vec<Value>>);
460 for_each_binding_with_mask!(mask, args, bindings, |iter| {
461 iter.fill_vec(&mut out, Value::stale, |_, args| {
462 table.get_row_column(args.as_slice(), col)
463 })
464 });
465 bindings.insert(out_var, &out);
466 }
467
468 fn lookup_with_default_vectorized(
469 &self,
470 table: &dyn Table,
471 mask: &mut Mask,
472 bindings: &mut Bindings,
473 args: &[QueryEntry],
474 col: ColumnId,
475 default: QueryEntry,
476 out_var: Variable,
477 ) {
478 let table = table.as_any().downcast_ref::<T>().unwrap();
479 let mut out = with_pool_set(|ps| ps.get::<Vec<Value>>());
480 for_each_binding_with_mask!(mask, args, bindings, |iter| {
481 match default {
482 QueryEntry::Var(default) => iter.zip(&bindings[default]).fill_vec(
483 &mut out,
484 Value::stale,
485 |_, (args, default)| {
486 Some(
487 table
488 .get_row_column(args.as_slice(), col)
489 .unwrap_or(*default),
490 )
491 },
492 ),
493 QueryEntry::Const(default) => iter.fill_vec(&mut out, Value::stale, |_, args| {
494 Some(
495 table
496 .get_row_column(args.as_slice(), col)
497 .unwrap_or(default),
498 )
499 }),
500 }
501 });
502 bindings.insert(out_var, &out);
503 }
504}
505
506pub struct WrappedTable {
514 inner: Box<dyn Table>,
515 wrapper: Box<dyn TableWrapper>,
516}
517
518impl WrappedTable {
519 pub(crate) fn new<T: Table>(inner: T) -> Self {
520 let wrapper = wrapper::<T>();
521 let inner = Box::new(inner);
522 Self { inner, wrapper }
523 }
524
525 pub fn dyn_clone(&self) -> Self {
527 WrappedTable {
528 inner: self.inner.dyn_clone(),
529 wrapper: self.wrapper.dyn_clone(),
530 }
531 }
532
533 pub(crate) fn as_ref(&self) -> WrappedTableRef<'_> {
534 WrappedTableRef {
535 inner: &*self.inner,
536 wrapper: &*self.wrapper,
537 }
538 }
539
540 pub fn scan_bounded(
544 &self,
545 subset: SubsetRef,
546 start: Offset,
547 n: usize,
548 out: &mut TaggedRowBuffer,
549 ) -> Option<Offset> {
550 self.as_ref().scan_bounded(subset, start, n, out)
551 }
552
553 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
555 self.as_ref().group_by_col(subset, col)
556 }
557
558 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
560 self.as_ref().group_by_key(subset, cols)
561 }
562
563 pub fn scan_project(
566 &self,
567 subset: SubsetRef,
568 cols: &[ColumnId],
569 start: Offset,
570 n: usize,
571 cs: &[Constraint],
572 out: &mut TaggedRowBuffer,
573 ) -> Option<Offset> {
574 self.as_ref().scan_project(subset, cols, start, n, cs, out)
575 }
576
577 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
579 self.as_ref().scan(subset)
580 }
581
582 pub(crate) fn lookup_row_vectorized(
583 &self,
584 mask: &mut Mask,
585 bindings: &mut Bindings,
586 args: &[QueryEntry],
587 col: ColumnId,
588 out_var: Variable,
589 ) {
590 self.as_ref()
591 .lookup_row_vectorized(mask, bindings, args, col, out_var)
592 }
593
594 #[allow(clippy::too_many_arguments)]
595 pub(crate) fn lookup_with_default_vectorized(
596 &self,
597 mask: &mut Mask,
598 bindings: &mut Bindings,
599 args: &[QueryEntry],
600 col: ColumnId,
601 default: QueryEntry,
602 out_var: Variable,
603 ) {
604 self.as_ref()
605 .lookup_with_default_vectorized(mask, bindings, args, col, default, out_var)
606 }
607}
608
609impl Deref for WrappedTable {
610 type Target = dyn Table;
611
612 fn deref(&self) -> &Self::Target {
613 &*self.inner
614 }
615}
616
617impl DerefMut for WrappedTable {
618 fn deref_mut(&mut self) -> &mut Self::Target {
619 &mut *self.inner
620 }
621}
622
623pub(crate) trait TableWrapper: Send + Sync {
624 fn dyn_clone(&self) -> Box<dyn TableWrapper>;
625 fn scan_bounded(
626 &self,
627 table: &dyn Table,
628 subset: SubsetRef,
629 start: Offset,
630 n: usize,
631 out: &mut TaggedRowBuffer,
632 ) -> Option<Offset>;
633 fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex;
634 fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex;
635
636 #[allow(clippy::too_many_arguments)]
637 fn scan_project(
638 &self,
639 table: &dyn Table,
640 subset: SubsetRef,
641 cols: &[ColumnId],
642 start: Offset,
643 n: usize,
644 cs: &[Constraint],
645 out: &mut TaggedRowBuffer,
646 ) -> Option<Offset>;
647
648 fn scan(&self, table: &dyn Table, subset: SubsetRef) -> TaggedRowBuffer {
649 let arity = table.spec().arity();
650 let mut buf = TaggedRowBuffer::new(arity);
651 assert!(
652 self.scan_bounded(table, subset, Offset::new(0), usize::MAX, &mut buf)
653 .is_none()
654 );
655 buf
656 }
657
658 #[allow(clippy::too_many_arguments)]
659 fn lookup_row_vectorized(
660 &self,
661 table: &dyn Table,
662 mask: &mut Mask,
663 bindings: &mut Bindings,
664 args: &[QueryEntry],
665 col: ColumnId,
666 out_var: Variable,
667 );
668
669 #[allow(clippy::too_many_arguments)]
670 fn lookup_with_default_vectorized(
671 &self,
672 table: &dyn Table,
673 mask: &mut Mask,
674 bindings: &mut Bindings,
675 args: &[QueryEntry],
676 col: ColumnId,
677 default: QueryEntry,
678 out_var: Variable,
679 );
680}
681
682#[derive(Clone, Copy)]
686pub struct WrappedTableRef<'a> {
687 inner: &'a dyn Table,
688 wrapper: &'a dyn TableWrapper,
689}
690
691impl WrappedTableRef<'_> {
692 pub(crate) fn with_wrapper<T: Table, R>(
693 inner: &T,
694 f: impl for<'a> FnOnce(WrappedTableRef<'a>) -> R,
695 ) -> R {
696 let wrapper = WrapperImpl::<T>(PhantomData);
697 f(WrappedTableRef {
698 inner,
699 wrapper: &wrapper,
700 })
701 }
702
703 pub fn scan_bounded(
707 &self,
708 subset: SubsetRef,
709 start: Offset,
710 n: usize,
711 out: &mut TaggedRowBuffer,
712 ) -> Option<Offset> {
713 self.wrapper.scan_bounded(self.inner, subset, start, n, out)
714 }
715
716 pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
718 self.wrapper.group_by_col(self.inner, subset, col)
719 }
720
721 pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
723 self.wrapper.group_by_key(self.inner, subset, cols)
724 }
725
726 pub fn scan_project(
729 &self,
730 subset: SubsetRef,
731 cols: &[ColumnId],
732 start: Offset,
733 n: usize,
734 cs: &[Constraint],
735 out: &mut TaggedRowBuffer,
736 ) -> Option<Offset> {
737 self.wrapper
738 .scan_project(self.inner, subset, cols, start, n, cs, out)
739 }
740
741 pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
743 self.wrapper.scan(self.inner, subset)
744 }
745
746 pub(crate) fn lookup_row_vectorized(
747 &self,
748 mask: &mut Mask,
749 bindings: &mut Bindings,
750 args: &[QueryEntry],
751 col: ColumnId,
752 out_var: Variable,
753 ) {
754 self.wrapper
755 .lookup_row_vectorized(self.inner, mask, bindings, args, col, out_var);
756 }
757
758 #[allow(clippy::too_many_arguments)]
759 pub(crate) fn lookup_with_default_vectorized(
760 &self,
761 mask: &mut Mask,
762 bindings: &mut Bindings,
763 args: &[QueryEntry],
764 col: ColumnId,
765 default: QueryEntry,
766 out_var: Variable,
767 ) {
768 self.wrapper.lookup_with_default_vectorized(
769 self.inner, mask, bindings, args, col, default, out_var,
770 );
771 }
772}
773
774impl Deref for WrappedTableRef<'_> {
775 type Target = dyn Table;
776
777 fn deref(&self) -> &Self::Target {
778 self.inner
779 }
780}