use std::{
any::Any,
marker::PhantomData,
ops::{Deref, DerefMut},
};
use crate::numeric_id::{DenseIdMap, NumericId, define_id};
use smallvec::SmallVec;
use crate::{
QueryEntry, TableId, Variable,
action::{
Bindings, ExecutionState,
mask::{Mask, MaskIter, ValueSource},
},
common::Value,
hash_index::{ColumnIndex, IndexBase, TupleIndex},
offsets::{RowId, Subset, SubsetRef},
pool::{PoolSet, Pooled, with_pool_set},
row_buffer::{RowBuffer, TaggedRowBuffer},
};
define_id!(pub ColumnId, u32, "a particular column in a table");
define_id!(
pub Generation,
u64,
"the current version of a table -- used to invalidate any existing RowIds"
);
define_id!(
pub Offset,
u64,
"an opaque offset token -- used to encode iterations over a table (within a generation). These always start at 0."
);
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TableVersion {
pub major: Generation,
pub minor: Offset,
}
#[derive(Clone)]
pub struct TableSpec {
pub n_keys: usize,
pub n_vals: usize,
pub uncacheable_columns: DenseIdMap<ColumnId, bool>,
pub allows_delete: bool,
}
impl TableSpec {
pub fn arity(&self) -> usize {
self.n_keys + self.n_vals
}
}
#[derive(Eq, PartialEq, Copy, Clone)]
pub struct TableChange {
pub added: bool,
pub removed: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Constraint {
Eq { l_col: ColumnId, r_col: ColumnId },
EqConst { col: ColumnId, val: Value },
LtConst { col: ColumnId, val: Value },
GtConst { col: ColumnId, val: Value },
LeConst { col: ColumnId, val: Value },
GeConst { col: ColumnId, val: Value },
}
pub trait Rebuilder: Send + Sync {
fn hint_col(&self) -> Option<ColumnId>;
fn rebuild_val(&self, val: Value) -> Value;
fn rebuild_buf(
&self,
buf: &RowBuffer,
start: RowId,
end: RowId,
out: &mut TaggedRowBuffer,
exec_state: &mut ExecutionState,
);
fn rebuild_subset(
&self,
other: WrappedTableRef,
subset: SubsetRef,
out: &mut TaggedRowBuffer,
exec_state: &mut ExecutionState,
);
fn rebuild_slice(&self, vals: &mut [Value]) -> bool;
}
pub struct Row {
pub id: RowId,
pub vals: Pooled<Vec<Value>>,
}
pub trait Table: Any + Send + Sync {
fn dyn_clone(&self) -> Box<dyn Table>;
fn rebuilder<'a>(&'a self, _cols: &[ColumnId]) -> Option<Box<dyn Rebuilder + 'a>> {
None
}
fn apply_rebuild(
&mut self,
_table_id: TableId,
_table: &WrappedTable,
_next_ts: Value,
_exec_state: &mut ExecutionState,
) -> bool {
false
}
fn as_any(&self) -> &dyn Any;
fn spec(&self) -> TableSpec;
fn clear(&mut self);
fn all(&self) -> Subset;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn version(&self) -> TableVersion;
fn updates_since(&self, offset: Offset) -> Subset;
fn scan_generic_bounded(
&self,
subset: SubsetRef,
start: Offset,
n: usize,
cs: &[Constraint],
f: impl FnMut(RowId, &[Value]),
) -> Option<Offset>
where
Self: Sized;
fn scan_generic(&self, subset: SubsetRef, mut f: impl FnMut(RowId, &[Value]))
where
Self: Sized,
{
let mut cur = Offset::new(0);
while let Some(next) = self.scan_generic_bounded(subset, cur, usize::MAX, &[], |id, row| {
f(id, row);
}) {
cur = next;
}
}
fn refine_live(&self, subset: Subset) -> Subset {
self.refine_one(
subset,
&Constraint::LtConst {
col: ColumnId::new_const(0),
val: Value::stale(),
},
)
}
fn refine_one(&self, subset: Subset, c: &Constraint) -> Subset {
self.refine(subset, std::slice::from_ref(c))
}
fn refine(&self, subset: Subset, cs: &[Constraint]) -> Subset {
cs.iter()
.fold(subset, |subset, c| self.refine_one(subset, c))
}
fn fast_subset(&self, _: &Constraint) -> Option<Subset> {
None
}
fn split_fast_slow(
&self,
cs: &[Constraint],
) -> (
Subset, /* the subset of the table matching all fast constraints */
Pooled<Vec<Constraint>>, /* the fast constraints */
Pooled<Vec<Constraint>>, /* the slow constraints */
) {
with_pool_set(|ps| {
let mut fast = ps.get::<Vec<Constraint>>();
let mut slow = ps.get::<Vec<Constraint>>();
let mut subset = self.all();
for c in cs {
if let Some(sub) = self.fast_subset(c) {
subset.intersect(sub.as_ref(), &ps.get_pool());
fast.push(c.clone());
} else {
slow.push(c.clone());
}
}
(subset, fast, slow)
})
}
fn get_row(&self, key: &[Value]) -> Option<Row>;
fn get_row_column(&self, key: &[Value], col: ColumnId) -> Option<Value> {
self.get_row(key).map(|row| row.vals[col.index()])
}
fn merge(&mut self, exec_state: &mut ExecutionState) -> TableChange;
fn new_buffer(&self) -> Box<dyn MutationBuffer>;
}
pub trait MutationBuffer: Any + Send + Sync {
fn stage_insert(&mut self, row: &[Value]);
fn stage_remove(&mut self, key: &[Value]);
fn fresh_handle(&self) -> Box<dyn MutationBuffer>;
}
struct WrapperImpl<T>(PhantomData<T>);
pub(crate) fn wrapper<T: Table>() -> Box<dyn TableWrapper> {
Box::new(WrapperImpl::<T>(PhantomData))
}
impl<T: Table> TableWrapper for WrapperImpl<T> {
fn dyn_clone(&self) -> Box<dyn TableWrapper> {
Box::new(Self(PhantomData))
}
fn scan_bounded(
&self,
table: &dyn Table,
subset: SubsetRef,
start: Offset,
n: usize,
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
let table = table.as_any().downcast_ref::<T>().unwrap();
table.scan_generic_bounded(subset, start, n, &[], |row_id, row| {
out.add_row(row_id, row);
})
}
fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
let table = table.as_any().downcast_ref::<T>().unwrap();
let mut res = ColumnIndex::new();
table.scan_generic(subset, |row_id, row| {
res.add_row(&[row[col.index()]], row_id);
});
res
}
fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
let table = table.as_any().downcast_ref::<T>().unwrap();
let mut res = TupleIndex::new(cols.len());
match cols {
[] => {}
[col] => table.scan_generic(subset, |row_id, row| {
res.add_row(&[row[col.index()]], row_id);
}),
[x, y] => table.scan_generic(subset, |row_id, row| {
res.add_row(&[row[x.index()], row[y.index()]], row_id);
}),
[x, y, z] => table.scan_generic(subset, |row_id, row| {
res.add_row(&[row[x.index()], row[y.index()], row[z.index()]], row_id);
}),
_ => {
let mut scratch = SmallVec::<[Value; 8]>::new();
table.scan_generic(subset, |row_id, row| {
for col in cols {
scratch.push(row[col.index()]);
}
res.add_row(&scratch, row_id);
scratch.clear();
});
}
}
res
}
fn scan_project(
&self,
table: &dyn Table,
subset: SubsetRef,
cols: &[ColumnId],
start: Offset,
n: usize,
cs: &[Constraint],
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
let table = table.as_any().downcast_ref::<T>().unwrap();
match cols {
[] => None,
[col] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
out.add_row(id, &[row[col.index()]]);
}),
[x, y] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
out.add_row(id, &[row[x.index()], row[y.index()]]);
}),
[x, y, z] => table.scan_generic_bounded(subset, start, n, cs, |id, row| {
out.add_row(id, &[row[x.index()], row[y.index()], row[z.index()]]);
}),
_ => {
let mut scratch = SmallVec::<[Value; 8]>::with_capacity(cols.len());
table.scan_generic_bounded(subset, start, n, cs, |id, row| {
for col in cols {
scratch.push(row[col.index()]);
}
out.add_row(id, &scratch);
scratch.clear();
})
}
}
}
fn lookup_row_vectorized(
&self,
table: &dyn Table,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
out_var: Variable,
) {
let table = table.as_any().downcast_ref::<T>().unwrap();
let mut out = with_pool_set(PoolSet::get::<Vec<Value>>);
for_each_binding_with_mask!(mask, args, bindings, |iter| {
iter.fill_vec(&mut out, Value::stale, |_, args| {
table.get_row_column(args.as_slice(), col)
})
});
bindings.insert(out_var, &out);
}
fn lookup_with_default_vectorized(
&self,
table: &dyn Table,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
default: QueryEntry,
out_var: Variable,
) {
let table = table.as_any().downcast_ref::<T>().unwrap();
let mut out = with_pool_set(|ps| ps.get::<Vec<Value>>());
for_each_binding_with_mask!(mask, args, bindings, |iter| {
match default {
QueryEntry::Var(default) => iter.zip(&bindings[default]).fill_vec(
&mut out,
Value::stale,
|_, (args, default)| {
Some(
table
.get_row_column(args.as_slice(), col)
.unwrap_or(*default),
)
},
),
QueryEntry::Const(default) => iter.fill_vec(&mut out, Value::stale, |_, args| {
Some(
table
.get_row_column(args.as_slice(), col)
.unwrap_or(default),
)
}),
}
});
bindings.insert(out_var, &out);
}
}
pub struct WrappedTable {
inner: Box<dyn Table>,
wrapper: Box<dyn TableWrapper>,
}
impl WrappedTable {
pub(crate) fn new<T: Table>(inner: T) -> Self {
let wrapper = wrapper::<T>();
let inner = Box::new(inner);
Self { inner, wrapper }
}
pub fn dyn_clone(&self) -> Self {
WrappedTable {
inner: self.inner.dyn_clone(),
wrapper: self.wrapper.dyn_clone(),
}
}
pub(crate) fn as_ref(&self) -> WrappedTableRef<'_> {
WrappedTableRef {
inner: &*self.inner,
wrapper: &*self.wrapper,
}
}
pub fn scan_bounded(
&self,
subset: SubsetRef,
start: Offset,
n: usize,
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
self.as_ref().scan_bounded(subset, start, n, out)
}
pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
self.as_ref().group_by_col(subset, col)
}
pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
self.as_ref().group_by_key(subset, cols)
}
pub fn scan_project(
&self,
subset: SubsetRef,
cols: &[ColumnId],
start: Offset,
n: usize,
cs: &[Constraint],
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
self.as_ref().scan_project(subset, cols, start, n, cs, out)
}
pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
self.as_ref().scan(subset)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub(crate) fn lookup_row_vectorized(
&self,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
out_var: Variable,
) {
self.as_ref()
.lookup_row_vectorized(mask, bindings, args, col, out_var)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn lookup_with_default_vectorized(
&self,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
default: QueryEntry,
out_var: Variable,
) {
self.as_ref()
.lookup_with_default_vectorized(mask, bindings, args, col, default, out_var)
}
}
impl Deref for WrappedTable {
type Target = dyn Table;
fn deref(&self) -> &Self::Target {
&*self.inner
}
}
impl DerefMut for WrappedTable {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.inner
}
}
pub(crate) trait TableWrapper: Send + Sync {
fn dyn_clone(&self) -> Box<dyn TableWrapper>;
fn scan_bounded(
&self,
table: &dyn Table,
subset: SubsetRef,
start: Offset,
n: usize,
out: &mut TaggedRowBuffer,
) -> Option<Offset>;
fn group_by_col(&self, table: &dyn Table, subset: SubsetRef, col: ColumnId) -> ColumnIndex;
fn group_by_key(&self, table: &dyn Table, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex;
#[allow(clippy::too_many_arguments)]
fn scan_project(
&self,
table: &dyn Table,
subset: SubsetRef,
cols: &[ColumnId],
start: Offset,
n: usize,
cs: &[Constraint],
out: &mut TaggedRowBuffer,
) -> Option<Offset>;
fn scan(&self, table: &dyn Table, subset: SubsetRef) -> TaggedRowBuffer {
let arity = table.spec().arity();
let mut buf = TaggedRowBuffer::new(arity);
assert!(
self.scan_bounded(table, subset, Offset::new(0), usize::MAX, &mut buf)
.is_none()
);
buf
}
#[allow(clippy::too_many_arguments)]
fn lookup_row_vectorized(
&self,
table: &dyn Table,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
out_var: Variable,
);
#[allow(clippy::too_many_arguments)]
fn lookup_with_default_vectorized(
&self,
table: &dyn Table,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
default: QueryEntry,
out_var: Variable,
);
}
#[derive(Clone, Copy)]
pub struct WrappedTableRef<'a> {
inner: &'a dyn Table,
wrapper: &'a dyn TableWrapper,
}
impl WrappedTableRef<'_> {
pub(crate) fn with_wrapper<T: Table, R>(
inner: &T,
f: impl for<'a> FnOnce(WrappedTableRef<'a>) -> R,
) -> R {
let wrapper = WrapperImpl::<T>(PhantomData);
f(WrappedTableRef {
inner,
wrapper: &wrapper,
})
}
pub fn scan_bounded(
&self,
subset: SubsetRef,
start: Offset,
n: usize,
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
self.wrapper.scan_bounded(self.inner, subset, start, n, out)
}
pub(crate) fn group_by_col(&self, subset: SubsetRef, col: ColumnId) -> ColumnIndex {
self.wrapper.group_by_col(self.inner, subset, col)
}
pub(crate) fn group_by_key(&self, subset: SubsetRef, cols: &[ColumnId]) -> TupleIndex {
self.wrapper.group_by_key(self.inner, subset, cols)
}
pub fn scan_project(
&self,
subset: SubsetRef,
cols: &[ColumnId],
start: Offset,
n: usize,
cs: &[Constraint],
out: &mut TaggedRowBuffer,
) -> Option<Offset> {
self.wrapper
.scan_project(self.inner, subset, cols, start, n, cs, out)
}
pub fn scan(&self, subset: SubsetRef) -> TaggedRowBuffer {
self.wrapper.scan(self.inner, subset)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub(crate) fn lookup_row_vectorized(
&self,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
out_var: Variable,
) {
self.wrapper
.lookup_row_vectorized(self.inner, mask, bindings, args, col, out_var);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn lookup_with_default_vectorized(
&self,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
col: ColumnId,
default: QueryEntry,
out_var: Variable,
) {
self.wrapper.lookup_with_default_vectorized(
self.inner, mask, bindings, args, col, default, out_var,
);
}
}
impl Deref for WrappedTableRef<'_> {
type Target = dyn Table;
fn deref(&self) -> &Self::Target {
self.inner
}
}