use std::{
mem,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use crate::{
common::IndexSet,
hash_index::IndexCatalog,
numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id},
};
use egglog_concurrency::{NotificationList, ResettableOnceLock};
use rayon::prelude::*;
use smallvec::SmallVec;
use crate::{
BaseValues, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
action::{
Bindings, DbView,
mask::{Mask, MaskIter, ValueSource},
},
dependency_graph::DependencyGraph,
hash_index::{ColumnIndex, Index, IndexBase},
offsets::Subset,
parallel_heuristics::parallelize_db_level_op,
pool::{Pool, Pooled, with_pool_set},
query::{Query, RuleSetBuilder},
table_spec::{
ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
},
};
use self::plan::Plan;
use crate::action::ExecutionState;
pub(crate) mod execute;
pub(crate) mod frame_update;
pub(crate) mod plan;
define_id!(
pub AtomId,
u32,
"A component of a query consisting of a function and a list of variables or constants"
);
define_id!(pub Variable, u32, "a variable in a query");
impl Variable {
pub fn placeholder() -> Variable {
Variable::new(!0)
}
}
define_id!(pub TableId, u32, "a table in the database");
define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
#[derive(Debug)]
pub(crate) struct ProcessedConstraints {
pub(crate) subset: Subset,
pub(crate) fast: Pooled<Vec<Constraint>>,
pub(crate) slow: Pooled<Vec<Constraint>>,
}
impl Clone for ProcessedConstraints {
fn clone(&self) -> Self {
ProcessedConstraints {
subset: self.subset.clone(),
fast: Pooled::cloned(&self.fast),
slow: Pooled::cloned(&self.slow),
}
}
}
impl ProcessedConstraints {
fn approx_size(&self) -> usize {
self.subset.size()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SubAtom {
pub(crate) atom: AtomId,
pub(crate) vars: SmallVec<[ColumnId; 2]>,
}
impl SubAtom {
pub(crate) fn new(atom: AtomId) -> SubAtom {
SubAtom {
atom,
vars: Default::default(),
}
}
}
#[derive(Debug)]
pub(crate) struct VarInfo {
pub(crate) occurrences: Vec<SubAtom>,
pub(crate) used_in_rhs: bool,
pub(crate) defined_in_rhs: bool,
pub(crate) name: Option<Arc<str>>,
}
pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
pub struct TableInfo {
pub(crate) name: Option<Arc<str>>,
pub(crate) spec: TableSpec,
pub(crate) table: WrappedTable,
pub(crate) indexes: IndexCatalog<SmallVec<[ColumnId; 4]>, HashIndex>,
pub(crate) column_indexes: IndexCatalog<ColumnId, HashColumnIndex>,
}
impl TableInfo {
pub fn table(&self) -> &WrappedTable {
&self.table
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn spec(&self) -> &TableSpec {
&self.spec
}
}
impl Clone for TableInfo {
fn clone(&self) -> Self {
fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
map: &IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>>,
table: WrappedTableRef,
) -> IndexCatalog<K, Arc<ResettableOnceLock<Index<TI>>>> {
map.map(|table_ref| {
let (k, v) = table_ref;
let v: Index<TI> = v
.get_or_update(|index| {
index.refresh(table);
})
.clone();
(k.clone(), Arc::new(ResettableOnceLock::new(v)))
})
}
TableInfo {
name: self.name.clone(),
spec: self.spec.clone(),
table: self.table.dyn_clone(),
indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
}
}
}
define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
}
pub fn make_external_func<
F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
>(
f: F,
) -> impl ExternalFunction {
#[derive(Clone)]
struct Wrapped<F>(F);
impl<F> ExternalFunction for Wrapped<F>
where
F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
{
fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
(self.0)(state, args)
}
}
Wrapped(f)
}
pub(crate) fn invoke_batch(
this: &dyn ExternalFunction,
state: &mut ExecutionState,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
out_var: Variable,
) {
let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
let mut out = pool.get();
out.reserve(mask.len());
for_each_binding_with_mask!(mask, args, bindings, |iter| {
iter.fill_vec(&mut out, Value::stale, |_, args| {
this.invoke(state, args.as_slice())
});
});
bindings.insert(out_var, &out);
}
pub(crate) fn invoke_batch_assign(
this: &dyn ExternalFunction,
state: &mut ExecutionState,
mask: &mut Mask,
bindings: &mut Bindings,
args: &[QueryEntry],
out_var: Variable,
) {
let mut out = bindings.take(out_var).expect("out_var must be bound");
for_each_binding_with_mask!(mask, args, bindings, |iter| {
iter.assign_vec_and_retain(&mut out.vals, |_, args| this.invoke(state, &args))
});
bindings.replace(out);
}
dyn_clone::clone_trait_object!(ExternalFunction);
pub(crate) type ExternalFunctions =
DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunction>>;
#[derive(Default)]
pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
impl Clone for Counters {
fn clone(&self) -> Counters {
let mut map = DenseIdMap::new();
for (k, v) in self.0.iter() {
map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)))
}
Counters(map)
}
}
impl Counters {
pub(crate) fn read(&self, ctr: CounterId) -> usize {
self.0[ctr].load(Ordering::Acquire)
}
pub(crate) fn inc(&self, ctr: CounterId) -> usize {
self.0[ctr].fetch_add(1, Ordering::Release)
}
}
#[derive(Clone, Default)]
pub struct Database {
pub(crate) tables: DenseIdMap<TableId, TableInfo>,
pub(crate) counters: Counters,
pub(crate) external_functions: ExternalFunctions,
container_values: ContainerValues,
notification_list: NotificationList<TableId>,
deps: DependencyGraph,
base_values: BaseValues,
total_size_estimate: usize,
}
impl Database {
pub fn new() -> Database {
Database::default()
}
pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
RuleSetBuilder::new(self)
}
pub fn add_external_function(
&mut self,
f: Box<dyn ExternalFunction + 'static>,
) -> ExternalFunctionId {
self.external_functions.push(f)
}
pub fn free_external_function(&mut self, id: ExternalFunctionId) {
self.external_functions.take(id);
}
pub fn base_values(&self) -> &BaseValues {
&self.base_values
}
pub fn base_values_mut(&mut self) -> &mut BaseValues {
&mut self.base_values
}
pub fn container_values(&self) -> &ContainerValues {
&self.container_values
}
pub fn container_values_mut(&mut self) -> &mut ContainerValues {
&mut self.container_values
}
pub fn rebuild_containers(&mut self, table_id: TableId) -> bool {
let mut containers = mem::take(&mut self.container_values);
let table = &self.tables[table_id].table;
let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
self.container_values = containers;
res
}
pub fn apply_rebuild(
&mut self,
func_id: TableId,
to_rebuild: &[TableId],
next_ts: Value,
) -> bool {
let func = self.tables.take(func_id).unwrap();
if parallelize_db_level_op(self.total_size_estimate) {
let mut tables = Vec::with_capacity(to_rebuild.len());
for id in to_rebuild {
tables.push((*id, self.tables.take(*id).unwrap()));
}
tables.par_iter_mut().for_each(|(id, info)| {
if info.table.apply_rebuild(
func_id,
&func.table,
next_ts,
&mut ExecutionState::new(self.read_only_view(), Default::default()),
) {
self.notification_list.notify(*id);
}
});
for (id, info) in tables {
self.tables.insert(id, info);
}
} else {
for id in to_rebuild {
let mut info = self.tables.take(*id).unwrap();
if info.table.apply_rebuild(
func_id,
&func.table,
next_ts,
&mut ExecutionState::new(self.read_only_view(), Default::default()),
) {
self.notification_list.notify(*id);
}
self.tables.insert(*id, info);
}
}
self.tables.insert(func_id, func);
self.merge_all()
}
pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
let mut state = ExecutionState::new(self.read_only_view(), Default::default());
f(&mut state)
}
pub(crate) fn read_only_view(&self) -> DbView<'_> {
DbView {
table_info: &self.tables,
counters: &self.counters,
external_funcs: &self.external_functions,
bases: &self.base_values,
containers: &self.container_values,
notification_list: &self.notification_list,
}
}
pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
let table_info = self
.tables
.get(table)
.expect("table must be declared in the current database");
let table = &table_info.table;
if let Some(c) = c {
if let Some(sub) = table.fast_subset(&c) {
sub.size()
} else {
table.refine_one(table.refine_live(table.all()), &c).size()
}
} else {
table.len()
}
}
pub fn add_counter(&mut self) -> CounterId {
self.counters.0.push(AtomicUsize::new(0))
}
pub fn inc_counter(&self, counter: CounterId) -> usize {
self.counters.inc(counter)
}
pub fn read_counter(&self, counter: CounterId) -> usize {
self.counters.read(counter)
}
pub fn merge_all(&mut self) -> bool {
let mut ever_changed = false;
let do_parallel = parallelize_db_level_op(self.total_size_estimate);
let mut to_merge = IndexSet::default();
loop {
to_merge.clear();
let to_merge_vec = self.notification_list.reset();
if to_merge_vec.len() < 4 {
ever_changed |= self.merge_simple(to_merge_vec);
break;
}
for table in to_merge_vec {
to_merge.insert(table);
}
let mut changed = false;
let mut tables_merging = DenseIdMap::<
TableId,
(
// The info needed to merge this table.
Option<TableInfo>,
// Pre-allocated write buffers, according to the tables declared write
// dependencies.
DenseIdMap<TableId, Box<dyn MutationBuffer>>,
),
>::with_capacity(self.tables.n_ids());
for stratum in self.deps.strata() {
for table in stratum.intersection(&to_merge).copied() {
let mut bufs = DenseIdMap::default();
for dep in self.deps.write_deps(table) {
if let Some(info) = self.tables.get(dep) {
bufs.insert(dep, info.table.new_buffer());
}
}
tables_merging.insert(table, (None, bufs));
}
for table in stratum.intersection(&to_merge).copied() {
tables_merging[table].0 = Some(self.tables.unwrap_val(table));
}
let db = self.read_only_view();
changed |= if do_parallel {
tables_merging
.par_iter_mut()
.map(|(_, (info, buffers))| {
let mut es = ExecutionState::new(db, mem::take(buffers));
info.as_mut().unwrap().table.merge(&mut es).added || es.changed
})
.max()
.unwrap_or(false)
} else {
tables_merging
.iter_mut()
.map(|(_, (info, buffers))| {
let mut es = ExecutionState::new(db, mem::take(buffers));
info.as_mut().unwrap().table.merge(&mut es).added || es.changed
})
.max()
.unwrap_or(false)
};
for (id, (table, _)) in tables_merging.drain() {
self.tables.insert(id, table.unwrap());
}
}
ever_changed |= changed;
}
let mut size_estimate = 0;
for (_, info) in self.tables.iter_mut() {
info.column_indexes.update(|_, ti| {
Arc::get_mut(ti).unwrap().reset();
});
info.indexes.update(|_, ti| {
Arc::get_mut(ti).unwrap().reset();
});
size_estimate += info.table.len();
}
self.total_size_estimate = size_estimate;
ever_changed
}
fn merge_simple(&mut self, mut to_merge: SmallVec<[TableId; 4]>) -> bool {
let mut changed = false;
while !to_merge.is_empty() {
for table_id in to_merge.iter().copied() {
let mut info = self.tables.unwrap_val(table_id);
let mut es = ExecutionState::new(self.read_only_view(), Default::default());
changed |= info.table.merge(&mut es).added || es.changed;
self.tables.insert(table_id, info);
}
to_merge = self.notification_list.reset();
}
changed
}
pub fn merge_table(&mut self, table: TableId) -> bool {
let mut info = self.tables.unwrap_val(table);
self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
let table_changed = info.table.merge(&mut ExecutionState::new(
self.read_only_view(),
Default::default(),
));
self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
self.tables.insert(table, info);
table_changed.added
}
pub fn next_table_id(&self) -> TableId {
self.tables.next_id()
}
pub fn add_table<T: Table + Sized + 'static>(
&mut self,
table: T,
read_deps: impl IntoIterator<Item = TableId>,
write_deps: impl IntoIterator<Item = TableId>,
) -> TableId {
self.add_table_impl(table, None, read_deps, write_deps)
}
pub fn add_table_named<T: Table + Sized + 'static>(
&mut self,
table: T,
name: Arc<str>,
read_deps: impl IntoIterator<Item = TableId>,
write_deps: impl IntoIterator<Item = TableId>,
) -> TableId {
self.add_table_impl(table, Some(name), read_deps, write_deps)
}
fn add_table_impl<T: Table + Sized + 'static>(
&mut self,
table: T,
name: Option<Arc<str>>,
read_deps: impl IntoIterator<Item = TableId>,
write_deps: impl IntoIterator<Item = TableId>,
) -> TableId {
let spec = table.spec();
let table = WrappedTable::new(table);
let res = self.tables.push(TableInfo {
name,
spec,
table,
indexes: IndexCatalog::new(),
column_indexes: IndexCatalog::new(),
});
self.deps.add_table(res, read_deps, write_deps);
res
}
pub fn get_table(&self, table: TableId) -> &WrappedTable {
&self
.tables
.get(table)
.expect("must access a table that has been declared in this database")
.table
}
pub fn get_table_info(&self, table: TableId) -> &TableInfo {
self.tables
.get(table)
.expect("must access a table that has been declared in this database")
}
pub fn new_buffer(&self, id: TableId) -> Box<dyn MutationBuffer> {
self.notification_list.notify(id);
self.get_table(id).new_buffer()
}
pub(crate) fn process_constraints(
&self,
table: TableId,
cs: &[Constraint],
) -> ProcessedConstraints {
let table_info = &self.tables[table];
let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
slow.retain(|c| {
let (col, val) = match c {
Constraint::EqConst { col, val } => (*col, *val),
Constraint::Eq { .. }
| Constraint::LtConst { .. }
| Constraint::GtConst { .. }
| Constraint::LeConst { .. }
| Constraint::GeConst { .. } => return true,
};
if *table_info
.spec
.uncacheable_columns
.get(col)
.unwrap_or(&false)
{
return true;
}
fast.push(c.clone());
let index = get_column_index_from_tableinfo(table_info, col);
match index.get().unwrap().get_subset(&val) {
Some(s) => {
with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
}
None => {
subset = Subset::empty();
}
}
false
});
ProcessedConstraints { subset, fast, slow }
}
pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
&mut *self
.tables
.get_mut(id)
.expect("must access a table that has been declared in this database")
.table
}
pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
plan::plan_query(query)
}
}
impl Drop for Database {
fn drop(&mut self) {
with_pool_set(PoolSet::clear);
rayon::broadcast(|_| with_pool_set(PoolSet::clear));
}
}
fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
let index: Arc<_> = table_info.indexes.get_or_insert(cols.into(), || {
Arc::new(ResettableOnceLock::new(Index::new(
cols.to_vec(),
TupleIndex::new(cols.len()),
)))
});
index.get_or_update(|index| {
index.refresh(table_info.table.as_ref());
});
debug_assert!(
!index
.get()
.unwrap()
.needs_refresh(table_info.table.as_ref())
);
index
}
fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
let index: Arc<_> = table_info.column_indexes.get_or_insert(col, || {
Arc::new(ResettableOnceLock::new(Index::new(
vec![col],
ColumnIndex::new(),
)))
});
index.get_or_update(|index| {
index.refresh(table_info.table.as_ref());
});
debug_assert!(
!index
.get()
.unwrap()
.needs_refresh(table_info.table.as_ref())
);
index
}