1use std::{
3 mem,
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8};
9
10use crate::numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id};
11use egglog_concurrency::ResettableOnceLock;
12use rayon::prelude::*;
13use smallvec::SmallVec;
14use web_time::Duration;
15
16use crate::{
17 BaseValues, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
18 action::{
19 Bindings, DbView,
20 mask::{Mask, MaskIter, ValueSource},
21 },
22 common::{DashMap, iter_dashmap_bulk},
23 dependency_graph::DependencyGraph,
24 hash_index::{ColumnIndex, Index, IndexBase},
25 offsets::Subset,
26 parallel_heuristics::parallelize_db_level_op,
27 pool::{Pool, Pooled, with_pool_set},
28 query::{Query, RuleSetBuilder},
29 table_spec::{
30 ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
31 },
32};
33
34use self::plan::Plan;
35use crate::action::{ExecutionState, PredictedVals};
36
37pub(crate) mod execute;
38pub(crate) mod frame_update;
39pub(crate) mod plan;
40
41define_id!(
42 pub AtomId,
43 u32,
44 "A component of a query consisting of a function and a list of variables or constants"
45);
46define_id!(pub Variable, u32, "a variable in a query");
47
48impl Variable {
49 pub fn placeholder() -> Variable {
50 Variable::new(!0)
51 }
52}
53
54define_id!(pub TableId, u32, "a table in the database");
55define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
56
57#[derive(Debug)]
58pub(crate) struct ProcessedConstraints {
59 pub(crate) subset: Subset,
62 pub(crate) fast: Pooled<Vec<Constraint>>,
64 pub(crate) slow: Pooled<Vec<Constraint>>,
66}
67
68impl Clone for ProcessedConstraints {
69 fn clone(&self) -> Self {
70 ProcessedConstraints {
71 subset: self.subset.clone(),
72 fast: Pooled::cloned(&self.fast),
73 slow: Pooled::cloned(&self.slow),
74 }
75 }
76}
77
78impl ProcessedConstraints {
79 fn approx_size(&self) -> usize {
81 self.subset.size()
82 }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub(crate) struct SubAtom {
87 pub(crate) atom: AtomId,
88 pub(crate) vars: SmallVec<[ColumnId; 2]>,
89}
90
91impl SubAtom {
92 pub(crate) fn new(atom: AtomId) -> SubAtom {
93 SubAtom {
94 atom,
95 vars: Default::default(),
96 }
97 }
98}
99
100#[derive(Debug)]
101pub(crate) struct VarInfo {
102 pub(crate) occurrences: Vec<SubAtom>,
103 pub(crate) used_in_rhs: bool,
106 pub(crate) defined_in_rhs: bool,
107}
108
109pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
110pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
111
112pub struct TableInfo {
113 pub(crate) spec: TableSpec,
114 pub(crate) table: WrappedTable,
115 pub(crate) indexes: DashMap<SmallVec<[ColumnId; 4]>, HashIndex>,
116 pub(crate) column_indexes: DashMap<ColumnId, HashColumnIndex>,
117}
118
119impl Clone for TableInfo {
120 fn clone(&self) -> Self {
121 fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
122 map: &DashMap<K, Arc<ResettableOnceLock<Index<TI>>>>,
123 table: WrappedTableRef,
124 ) -> DashMap<K, Arc<ResettableOnceLock<Index<TI>>>> {
125 map.iter()
126 .map(|table_ref| {
127 let (k, v) = table_ref.pair();
128 let v: Index<TI> = v
129 .get_or_update(|index| {
130 index.refresh(table);
131 })
132 .clone();
133 (k.clone(), Arc::new(ResettableOnceLock::new(v)))
134 })
135 .collect()
136 }
137 TableInfo {
138 spec: self.spec.clone(),
139 table: self.table.dyn_clone(),
140 indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
141 column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
142 }
143 }
144}
145
146define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
147define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
148
149pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
155 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
158}
159
160pub fn make_external_func<
162 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
163>(
164 f: F,
165) -> impl ExternalFunction {
166 #[derive(Clone)]
167 struct Wrapped<F>(F);
168 impl<F> ExternalFunction for Wrapped<F>
169 where
170 F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
171 {
172 fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
173 (self.0)(state, args)
174 }
175 }
176 Wrapped(f)
177}
178
179pub(crate) trait ExternalFunctionExt: ExternalFunction {
180 #[doc(hidden)]
186 fn invoke_batch(
187 &self,
188 state: &mut ExecutionState,
189 mask: &mut Mask,
190 bindings: &mut Bindings,
191 args: &[QueryEntry],
192 out_var: Variable,
193 ) {
194 let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
195 let mut out = pool.get();
196 out.reserve(mask.len());
197 for_each_binding_with_mask!(mask, args, bindings, |iter| {
198 iter.fill_vec(&mut out, Value::stale, |_, args| {
199 self.invoke(state, args.as_slice())
200 });
201 });
202 bindings.insert(out_var, &out);
203 }
204
205 #[doc(hidden)]
211 fn invoke_batch_assign(
212 &self,
213 state: &mut ExecutionState,
214 mask: &mut Mask,
215 bindings: &mut Bindings,
216 args: &[QueryEntry],
217 out_var: Variable,
218 ) {
219 let mut out = bindings.take(out_var).expect("out_var must be bound");
220 for_each_binding_with_mask!(mask, args, bindings, |iter| {
221 iter.assign_vec_and_retain(&mut out.vals, |_, args| self.invoke(state, &args))
222 });
223 bindings.replace(out);
224 }
225}
226
227impl<T: ExternalFunction> ExternalFunctionExt for T {}
228
229dyn_clone::clone_trait_object!(ExternalFunctionExt);
231
232pub(crate) type ExternalFunctions =
233 DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunctionExt>>;
234
235#[derive(Default)]
236pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
237
238impl Clone for Counters {
239 fn clone(&self) -> Counters {
240 let mut map = DenseIdMap::new();
241 for (k, v) in self.0.iter() {
242 map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)))
244 }
245 Counters(map)
246 }
247}
248
249impl Counters {
250 pub(crate) fn read(&self, ctr: CounterId) -> usize {
251 self.0[ctr].load(Ordering::Acquire)
252 }
253 pub(crate) fn inc(&self, ctr: CounterId) -> usize {
254 self.0[ctr].fetch_add(1, Ordering::Release)
257 }
258}
259
260#[derive(Debug, Default)]
261pub struct RuleSetReport {
262 pub changed: bool,
263 pub rule_reports: DashMap<String, RuleReport>,
264 pub search_and_apply_time: Duration,
265 pub merge_time: Duration,
266}
267
268#[derive(Debug, Default)]
269pub struct RuleReport {
270 pub search_and_apply_time: Duration,
271 pub num_matches: usize,
272}
273
274impl RuleReport {
275 pub fn union(&self, other: &RuleReport) -> RuleReport {
276 RuleReport {
277 search_and_apply_time: self.search_and_apply_time + other.search_and_apply_time,
278 num_matches: self.num_matches + other.num_matches,
279 }
280 }
281}
282
283#[derive(Clone, Default)]
287pub struct Database {
288 pub(crate) tables: DenseIdMap<TableId, TableInfo>,
291 pub(crate) counters: Counters,
296 pub(crate) external_functions: ExternalFunctions,
297 container_values: ContainerValues,
298 deps: DependencyGraph,
300 base_values: BaseValues,
301 total_size_estimate: usize,
306}
307
308impl Database {
309 pub fn new() -> Database {
314 Database::default()
315 }
316
317 pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
319 RuleSetBuilder::new(self)
320 }
321
322 pub fn add_external_function(
324 &mut self,
325 f: impl ExternalFunction + 'static,
326 ) -> ExternalFunctionId {
327 self.external_functions.push(Box::new(f))
328 }
329
330 pub fn free_external_function(&mut self, id: ExternalFunctionId) {
332 self.external_functions.take(id);
333 }
334
335 pub fn base_values(&self) -> &BaseValues {
336 &self.base_values
337 }
338
339 pub fn base_values_mut(&mut self) -> &mut BaseValues {
340 &mut self.base_values
341 }
342
343 pub fn container_values(&self) -> &ContainerValues {
344 &self.container_values
345 }
346
347 pub fn container_values_mut(&mut self) -> &mut ContainerValues {
348 &mut self.container_values
349 }
350
351 pub fn rebuild_containers(&mut self, table_id: TableId) -> bool {
352 let mut containers = mem::take(&mut self.container_values);
353 let table = &self.tables[table_id].table;
354 let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
355 self.container_values = containers;
356 res
357 }
358
359 pub fn apply_rebuild(
366 &mut self,
367 func_id: TableId,
368 to_rebuild: &[TableId],
369 next_ts: Value,
370 ) -> bool {
371 let func = self.tables.take(func_id).unwrap();
372 let predicted = PredictedVals::default();
373 if parallelize_db_level_op(self.total_size_estimate) {
374 let mut tables = Vec::with_capacity(to_rebuild.len());
375 for id in to_rebuild {
376 tables.push((*id, self.tables.take(*id).unwrap()));
377 }
378 tables.par_iter_mut().for_each(|(_, info)| {
379 info.table.apply_rebuild(
380 func_id,
381 &func.table,
382 next_ts,
383 &mut ExecutionState::new(&predicted, self.read_only_view(), Default::default()),
384 );
385 });
386 for (id, info) in tables {
387 self.tables.insert(id, info);
388 }
389 } else {
390 for id in to_rebuild {
391 let mut info = self.tables.take(*id).unwrap();
392 info.table.apply_rebuild(
393 func_id,
394 &func.table,
395 next_ts,
396 &mut ExecutionState::new(&predicted, self.read_only_view(), Default::default()),
397 );
398 self.tables.insert(*id, info);
399 }
400 }
401 self.tables.insert(func_id, func);
402 self.merge_all()
403 }
404
405 pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
407 let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
408 let mut state = ExecutionState::new(&predicted, self.read_only_view(), Default::default());
409 f(&mut state)
410 }
411
412 pub(crate) fn read_only_view(&self) -> DbView<'_> {
413 DbView {
414 table_info: &self.tables,
415 counters: &self.counters,
416 external_funcs: &self.external_functions,
417 bases: &self.base_values,
418 containers: &self.container_values,
419 }
420 }
421
422 pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
425 let table_info = self
426 .tables
427 .get(table)
428 .expect("table must be declared in the current database");
429 let table = &table_info.table;
430 if let Some(c) = c {
431 if let Some(sub) = table.fast_subset(&c) {
432 sub.size()
435 } else {
436 table.refine_one(table.refine_live(table.all()), &c).size()
437 }
438 } else {
439 table.len()
440 }
441 }
442
443 pub fn add_counter(&mut self) -> CounterId {
447 self.counters.0.push(AtomicUsize::new(0))
448 }
449
450 pub fn inc_counter(&self, counter: CounterId) -> usize {
452 self.counters.inc(counter)
453 }
454
455 pub fn read_counter(&self, counter: CounterId) -> usize {
457 self.counters.read(counter)
458 }
459
460 pub fn merge_all(&mut self) -> bool {
467 let mut ever_changed = false;
468 let do_parallel = parallelize_db_level_op(self.total_size_estimate);
469 loop {
470 let mut changed = false;
471 let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
472 let mut tables_merging = DenseIdMap::<
473 TableId,
474 (
475 Option<TableInfo>,
477 DenseIdMap<TableId, Box<dyn MutationBuffer>>,
480 ),
481 >::with_capacity(self.tables.n_ids());
482 for stratum in self.deps.strata() {
483 for table in stratum.iter().copied() {
485 let mut bufs = DenseIdMap::default();
486 for dep in self.deps.write_deps(table) {
487 if let Some(info) = self.tables.get(dep) {
488 bufs.insert(dep, info.table.new_buffer());
489 }
490 }
491 tables_merging.insert(table, (None, bufs));
492 }
493 for table in stratum.iter().copied() {
496 tables_merging[table].0 = Some(self.tables.unwrap_val(table));
497 }
498 let db = self.read_only_view();
499 changed |= if do_parallel {
500 tables_merging
501 .par_iter_mut()
502 .map(|(_, (info, buffers))| {
503 let mut es = ExecutionState::new(&predicted, db, mem::take(buffers));
504 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
505 })
506 .max()
507 .unwrap_or(false)
508 } else {
509 tables_merging
510 .iter_mut()
511 .map(|(_, (info, buffers))| {
512 let mut es = ExecutionState::new(&predicted, db, mem::take(buffers));
513 info.as_mut().unwrap().table.merge(&mut es).added || es.changed
514 })
515 .max()
516 .unwrap_or(false)
517 };
518 for (id, (table, _)) in tables_merging.drain() {
519 self.tables.insert(id, table.unwrap());
520 }
521 }
522 ever_changed |= changed;
523 if !changed {
524 break;
525 }
526 }
527 let mut size_estimate = 0;
529 for (_, info) in self.tables.iter_mut() {
530 iter_dashmap_bulk(&mut info.column_indexes, |_, ci| {
531 Arc::get_mut(ci).unwrap().reset();
532 });
533 iter_dashmap_bulk(&mut info.indexes, |_, ti| {
534 Arc::get_mut(ti).unwrap().reset();
535 });
536 size_estimate += info.table.len();
537 }
538 self.total_size_estimate = size_estimate;
539 ever_changed
540 }
541
542 pub fn merge_table(&mut self, table: TableId) {
549 let mut info = self.tables.unwrap_val(table);
550 let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
551 self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
552 let _table_changed = info.table.merge(&mut ExecutionState::new(
553 &predicted,
554 self.read_only_view(),
555 Default::default(),
556 ));
557 self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
558 self.tables.insert(table, info);
559 }
560
561 pub fn next_table_id(&self) -> TableId {
566 self.tables.next_id()
567 }
568
569 pub fn add_table<T: Table + Sized + 'static>(
574 &mut self,
575 table: T,
576 read_deps: impl IntoIterator<Item = TableId>,
577 write_deps: impl IntoIterator<Item = TableId>,
578 ) -> TableId {
579 let spec = table.spec();
580 let table = WrappedTable::new(table);
581 let res = self.tables.push(TableInfo {
582 spec,
583 table,
584 indexes: Default::default(),
585 column_indexes: Default::default(),
586 });
587 self.deps.add_table(res, read_deps, write_deps);
588 res
589 }
590
591 pub fn get_table(&self, id: TableId) -> &WrappedTable {
595 &self
596 .tables
597 .get(id)
598 .expect("must access a table that has been declared in this database")
599 .table
600 }
601
602 pub(crate) fn process_constraints(
603 &self,
604 table: TableId,
605 cs: &[Constraint],
606 ) -> ProcessedConstraints {
607 let table_info = &self.tables[table];
608 let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
609 slow.retain(|c| {
610 let (col, val) = match c {
611 Constraint::EqConst { col, val } => (*col, *val),
612 Constraint::Eq { .. }
613 | Constraint::LtConst { .. }
614 | Constraint::GtConst { .. }
615 | Constraint::LeConst { .. }
616 | Constraint::GeConst { .. } => return true,
617 };
618 if *table_info
621 .spec
622 .uncacheable_columns
623 .get(col)
624 .unwrap_or(&false)
625 {
626 return true;
627 }
628 fast.push(c.clone());
631 let index = get_column_index_from_tableinfo(table_info, col);
632 match index.get().unwrap().get_subset(&val) {
633 Some(s) => {
634 with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
635 }
636 None => {
637 subset = Subset::empty();
639 }
640 }
641 false
643 });
644 ProcessedConstraints { subset, fast, slow }
645 }
646
647 pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
651 &mut *self
652 .tables
653 .get_mut(id)
654 .expect("must access a table that has been declared in this database")
655 .table
656 }
657
658 pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
659 plan::plan_query(query)
660 }
661}
662
663impl Drop for Database {
664 fn drop(&mut self) {
665 with_pool_set(PoolSet::clear);
669 rayon::broadcast(|_| with_pool_set(PoolSet::clear));
670 }
671}
672
673fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
678 let index: Arc<_> = table_info
679 .indexes
680 .entry(cols.into())
681 .or_insert_with(|| {
682 Arc::new(ResettableOnceLock::new(Index::new(
683 cols.to_vec(),
684 TupleIndex::new(cols.len()),
685 )))
686 })
687 .clone();
688 index.get_or_update(|index| {
689 index.refresh(table_info.table.as_ref());
690 });
691 debug_assert!(
692 !index
693 .get()
694 .unwrap()
695 .needs_refresh(table_info.table.as_ref())
696 );
697 index
698}
699
700fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
704 let index: Arc<_> = table_info
705 .column_indexes
706 .entry(col)
707 .or_insert_with(|| {
708 Arc::new(ResettableOnceLock::new(Index::new(
709 vec![col],
710 ColumnIndex::new(),
711 )))
712 })
713 .clone();
714 index.get_or_update(|index| {
715 index.refresh(table_info.table.as_ref());
716 });
717 debug_assert!(
718 !index
719 .get()
720 .unwrap()
721 .needs_refresh(table_info.table.as_ref())
722 );
723 index
724}