1use std::{
4 cmp, iter, mem,
5 sync::{Arc, OnceLock, atomic::AtomicUsize},
6};
7
8use crate::numeric_id::{DenseIdMap, IdVec, NumericId};
9use dashmap::mapref::one::RefMut;
10use smallvec::SmallVec;
11use web_time::Instant;
12
13use crate::{
14 Constraint, OffsetRange, Pool, SubsetRef,
15 action::{Bindings, ExecutionState, PredictedVals},
16 common::{DashMap, Value},
17 free_join::{
18 RuleReport, RuleSetReport,
19 frame_update::{FrameUpdates, UpdateInstr},
20 get_index_from_tableinfo,
21 },
22 hash_index::{ColumnIndex, IndexBase, TupleIndex},
23 offsets::{Offsets, SortedOffsetVector, Subset},
24 parallel_heuristics::parallelize_db_level_op,
25 pool::Pooled,
26 query::RuleSet,
27 row_buffer::TaggedRowBuffer,
28 table_spec::{ColumnId, Offset, WrappedTableRef},
29};
30
31use super::{
32 ActionId, AtomId, Database, HashColumnIndex, HashIndex, TableInfo, Variable,
33 get_column_index_from_tableinfo,
34 plan::{JoinHeader, JoinStage, Plan},
35 with_pool_set,
36};
37
38enum DynamicIndex {
39 Cached {
40 intersect_outer: bool,
41 table: HashIndex,
42 },
43 CachedColumn {
44 intersect_outer: bool,
45 table: HashColumnIndex,
46 },
47 Dynamic(TupleIndex),
48 DynamicColumn(Arc<ColumnIndex>),
49}
50
51struct Prober {
52 node: TrieNode,
53 pool: Pool<SortedOffsetVector>,
54 ix: DynamicIndex,
55}
56
57impl Prober {
58 fn get_subset(&self, key: &[Value]) -> Option<Subset> {
59 match &self.ix {
60 DynamicIndex::Cached {
61 intersect_outer,
62 table,
63 } => {
64 let mut sub = table.get().unwrap().get_subset(key)?.to_owned(&self.pool);
65 if *intersect_outer {
66 sub.intersect(self.node.subset.as_ref(), &self.pool);
67 if sub.is_empty() {
68 return None;
69 }
70 }
71 Some(sub)
72 }
73 DynamicIndex::CachedColumn {
74 intersect_outer,
75 table,
76 } => {
77 debug_assert_eq!(key.len(), 1);
78 let mut sub = table
79 .get()
80 .unwrap()
81 .get_subset(&key[0])?
82 .to_owned(&self.pool);
83 if *intersect_outer {
84 sub.intersect(self.node.subset.as_ref(), &self.pool);
85 if sub.is_empty() {
86 return None;
87 }
88 }
89 Some(sub)
90 }
91 DynamicIndex::Dynamic(tab) => tab.get_subset(key).map(|x| x.to_owned(&self.pool)),
92 DynamicIndex::DynamicColumn(tab) => {
93 tab.get_subset(&key[0]).map(|x| x.to_owned(&self.pool))
94 }
95 }
96 }
97 fn for_each(&self, mut f: impl FnMut(&[Value], SubsetRef)) {
98 match &self.ix {
99 DynamicIndex::Cached {
100 intersect_outer: true,
101 table,
102 } => table.get().unwrap().for_each(|k, v| {
103 let mut res = v.to_owned(&self.pool);
104 res.intersect(self.node.subset.as_ref(), &self.pool);
105 if !res.is_empty() {
106 f(k, res.as_ref())
107 }
108 }),
109 DynamicIndex::Cached {
110 intersect_outer: false,
111 table,
112 } => table.get().unwrap().for_each(|k, v| f(k, v)),
113 DynamicIndex::CachedColumn {
114 intersect_outer: true,
115 table,
116 } => {
117 table.get().unwrap().for_each(|k, v| {
118 let mut res = v.to_owned(&self.pool);
119 res.intersect(self.node.subset.as_ref(), &self.pool);
120 if !res.is_empty() {
121 f(&[*k], res.as_ref())
122 }
123 });
124 }
125 DynamicIndex::CachedColumn {
126 intersect_outer: false,
127 table,
128 } => {
129 table.get().unwrap().for_each(|k, v| f(&[*k], v));
130 }
131 DynamicIndex::Dynamic(tab) => {
132 tab.for_each(f);
133 }
134 DynamicIndex::DynamicColumn(tab) => tab.for_each(|k, v| {
135 f(&[*k], v);
136 }),
137 }
138 }
139
140 fn len(&self) -> usize {
141 match &self.ix {
142 DynamicIndex::Cached { table, .. } => table.get().unwrap().len(),
143 DynamicIndex::CachedColumn { table, .. } => table.get().unwrap().len(),
144 DynamicIndex::Dynamic(tab) => tab.len(),
145 DynamicIndex::DynamicColumn(tab) => tab.len(),
146 }
147 }
148}
149
150impl Database {
151 pub fn run_rule_set(&mut self, rule_set: &RuleSet) -> RuleSetReport {
152 if rule_set.plans.is_empty() {
153 return RuleSetReport::default();
154 }
155 let preds = with_pool_set(|ps| ps.get::<PredictedVals>());
156 let match_counter = MatchCounter::new(rule_set.actions.n_ids());
157
158 let search_and_apply_timer = Instant::now();
159 let rule_reports = DashMap::default();
160 if parallelize_db_level_op(self.total_size_estimate) {
161 rayon::in_place_scope(|scope| {
162 for (plan, desc, _action) in rule_set.plans.values() {
163 scope.spawn(|scope| {
164 let join_state = JoinState::new(self, &preds);
165 let mut action_buf =
166 ScopedActionBuffer::new(scope, rule_set, &match_counter);
167 let mut binding_info = BindingInfo::default();
168 for (id, info) in plan.atoms.iter() {
169 let table = join_state.db.get_table(info.table);
170 binding_info.insert_subset(id, table.all());
171 }
172
173 let search_and_apply_timer = Instant::now();
174 join_state.run_header(plan, &mut binding_info, &mut action_buf);
175 let search_and_apply_time = search_and_apply_timer.elapsed();
176
177 if action_buf.needs_flush {
178 action_buf.flush(&mut ExecutionState::new(
179 &preds,
180 self.read_only_view(),
181 Default::default(),
182 ));
183 }
184 let mut rule_report: RefMut<'_, String, RuleReport> =
185 rule_reports.entry(desc.clone()).or_default();
186 *rule_report = rule_report.union(&RuleReport {
187 search_and_apply_time,
188 num_matches: 0,
189 });
190 });
191 }
192 });
193 } else {
194 let join_state = JoinState::new(self, &preds);
195 let mut action_buf = InPlaceActionBuffer {
198 rule_set,
199 match_counter: &match_counter,
200 batches: Default::default(),
201 };
202 for (plan, desc, _action) in rule_set.plans.values() {
203 let mut binding_info = BindingInfo::default();
204 for (id, info) in plan.atoms.iter() {
205 let table = join_state.db.get_table(info.table);
206 binding_info.insert_subset(id, table.all());
207 }
208
209 let search_and_apply_timer = Instant::now();
210 join_state.run_header(plan, &mut binding_info, &mut action_buf);
211 let search_and_apply_time = search_and_apply_timer.elapsed();
212
213 let mut rule_report = rule_reports.entry(desc.clone()).or_default();
214 *rule_report = rule_report.union(&RuleReport {
215 search_and_apply_time,
216 num_matches: 0,
217 });
218 }
219 action_buf.flush(&mut ExecutionState::new(
220 &preds,
221 self.read_only_view(),
222 Default::default(),
223 ));
224 }
225 for (_plan, desc, action) in rule_set.plans.values() {
226 let mut reservation = rule_reports.get_mut(desc).unwrap();
227 let RuleReport { num_matches, .. } = reservation.value_mut();
228 *num_matches += match_counter.read_matches(*action);
229 }
230 let search_and_apply_time = search_and_apply_timer.elapsed();
231
232 let merge_timer = Instant::now();
233 let changed = self.merge_all();
234 let merge_time = merge_timer.elapsed();
235
236 RuleSetReport {
237 changed,
238 rule_reports,
239 search_and_apply_time,
240 merge_time,
241 }
242 }
243}
244
245struct ActionState {
246 n_runs: usize,
247 len: usize,
248 bindings: Bindings,
249}
250
251impl Default for ActionState {
252 fn default() -> Self {
253 Self {
254 n_runs: 0,
255 len: 0,
256 bindings: Bindings::new(VAR_BATCH_SIZE),
257 }
258 }
259}
260
261struct JoinState<'a> {
262 db: &'a Database,
263 preds: &'a PredictedVals,
264}
265
266type ColumnIndexes = IdVec<ColumnId, OnceLock<Arc<ColumnIndex>>>;
267
268struct TrieNode {
276 subset: Subset,
278 cached_subsets: OnceLock<Arc<Pooled<ColumnIndexes>>>,
280}
281
282impl TrieNode {
283 fn size(&self) -> usize {
284 self.subset.size()
285 }
286 fn get_cached_index(&self, col: ColumnId, info: &TableInfo) -> Arc<ColumnIndex> {
287 self.cached_subsets.get_or_init(|| {
288 let mut vec: Pooled<ColumnIndexes> = with_pool_set(|ps| ps.get());
290 vec.resize_with(info.spec.arity(), OnceLock::new);
291 Arc::new(vec)
292 })[col]
293 .get_or_init(|| {
294 let col_index = info.table.group_by_col(self.subset.as_ref(), col);
295 Arc::new(col_index)
296 })
297 .clone()
298 }
299}
300
301impl Clone for TrieNode {
302 fn clone(&self) -> Self {
303 let cached_subsets = OnceLock::new();
304 if let Some(cached) = self.cached_subsets.get() {
305 cached_subsets.set(cached.clone()).ok().unwrap();
306 }
307 Self {
308 subset: self.subset.clone(),
309 cached_subsets,
310 }
311 }
312}
313
314#[derive(Default, Clone)]
315struct BindingInfo {
316 bindings: DenseIdMap<Variable, Value>,
317 subsets: DenseIdMap<AtomId, TrieNode>,
318}
319
320impl BindingInfo {
321 fn insert_subset(&mut self, atom: AtomId, subset: Subset) {
323 let node = TrieNode {
324 subset,
325 cached_subsets: Default::default(),
326 };
327 self.subsets.insert(atom, node);
328 }
329
330 fn move_back(&mut self, atom: AtomId, prober: Prober) {
333 self.subsets.insert(atom, prober.node);
334 }
335
336 fn move_back_node(&mut self, atom: AtomId, node: TrieNode) {
337 self.subsets.insert(atom, node);
338 }
339
340 fn has_empty_subset(&self, atom: AtomId) -> bool {
341 self.subsets[atom].subset.is_empty()
342 }
343
344 fn unwrap_val(&mut self, atom: AtomId) -> TrieNode {
345 self.subsets.unwrap_val(atom)
346 }
347}
348
349impl<'a> JoinState<'a> {
350 fn new(db: &'a Database, preds: &'a PredictedVals) -> Self {
351 Self { db, preds }
352 }
353
354 fn get_index(
355 &self,
356 plan: &Plan,
357 atom: AtomId,
358 binding_info: &mut BindingInfo,
359 cols: impl Iterator<Item = ColumnId>,
360 ) -> Prober {
361 let cols = SmallVec::<[ColumnId; 4]>::from_iter(cols);
362 let trie_node = binding_info.subsets.unwrap_val(atom);
363 let subset = &trie_node.subset;
364
365 let table_id = plan.atoms[atom].table;
366 let info = &self.db.tables[table_id];
367 let all_cacheable = cols.iter().all(|col| {
368 !info
369 .spec
370 .uncacheable_columns
371 .get(*col)
372 .copied()
373 .unwrap_or(false)
374 });
375 let whole_table = info.table.all();
376 let dyn_index =
377 if all_cacheable && subset.is_dense() && whole_table.size() / 2 < subset.size() {
378 let intersect_outer =
381 !(whole_table.is_dense() && subset.bounds() == whole_table.bounds());
382 if cols.len() != 1 {
386 DynamicIndex::Cached {
387 intersect_outer,
388 table: get_index_from_tableinfo(info, &cols).clone(),
389 }
390 } else {
391 DynamicIndex::CachedColumn {
392 intersect_outer,
393 table: get_column_index_from_tableinfo(info, cols[0]).clone(),
394 }
395 }
396 } else if cols.len() != 1 {
397 DynamicIndex::Dynamic(info.table.group_by_key(subset.as_ref(), &cols))
399 } else {
400 DynamicIndex::DynamicColumn(trie_node.get_cached_index(cols[0], info))
401 };
402 Prober {
403 node: trie_node,
404 pool: with_pool_set(|ps| ps.get_pool().clone()),
405 ix: dyn_index,
406 }
407 }
408 fn get_column_index(
409 &self,
410 plan: &Plan,
411 binding_info: &mut BindingInfo,
412 atom: AtomId,
413 col: ColumnId,
414 ) -> Prober {
415 self.get_index(plan, atom, binding_info, iter::once(col))
416 }
417
418 fn run_header<'buf, BUF: ActionBuffer<'buf>>(
428 &self,
429 plan: &'a Plan,
430 binding_info: &mut BindingInfo,
431 action_buf: &mut BUF,
432 ) where
433 'a: 'buf,
434 {
435 for JoinHeader { atom, subset, .. } in &plan.stages.header {
436 if subset.is_empty() {
437 return;
438 }
439 let mut cur = binding_info.unwrap_val(*atom);
440 debug_assert!(cur.cached_subsets.get().is_none());
441 cur.subset
442 .intersect(subset.as_ref(), &with_pool_set(|ps| ps.get_pool()));
443 binding_info.move_back_node(*atom, cur);
444 }
445 for (_, node) in binding_info.subsets.iter() {
446 if node.subset.is_empty() {
447 return;
448 }
449 }
450 let mut order = InstrOrder::from_iter(0..plan.stages.instrs.len());
451 sort_plan_by_size(&mut order, 0, &plan.stages.instrs, binding_info);
452 self.run_plan(plan, &mut order, 0, binding_info, action_buf);
453 }
454
455 fn run_plan<'buf, BUF: ActionBuffer<'buf>>(
464 &self,
465 plan: &'a Plan,
466 instr_order: &mut InstrOrder,
467 cur: usize,
468 binding_info: &mut BindingInfo,
469 action_buf: &mut BUF,
470 ) where
471 'a: 'buf,
472 {
473 if cur >= instr_order.len() {
474 action_buf.push_bindings(plan.stages.actions, &binding_info.bindings, || {
475 ExecutionState::new(self.preds, self.db.read_only_view(), Default::default())
476 });
477 return;
478 }
479 let chunk_size = action_buf.morsel_size(cur, instr_order.len());
480 let mut cur_size = estimate_size(&plan.stages.instrs[instr_order.get(cur)], binding_info);
481 if cur_size > 32 && cur % 3 == 1 && cur < instr_order.len() - 1 {
482 sort_plan_by_size(instr_order, cur, &plan.stages.instrs, binding_info);
485 cur_size = estimate_size(&plan.stages.instrs[instr_order.get(cur)], binding_info);
486 }
487
488 macro_rules! drain_updates {
490 ($updates:expr) => {
491 if cur == 0 || cur == 1 {
492 drain_updates_parallel!($updates)
493 } else {
494 $updates.drain(|update| match update {
495 UpdateInstr::PushBinding(var, val) => {
496 binding_info.bindings.insert(var, val);
497 }
498 UpdateInstr::RefineAtom(atom, subset) => {
499 binding_info.insert_subset(atom, subset);
500 }
501 UpdateInstr::EndFrame => {
502 self.run_plan(plan, instr_order, cur + 1, binding_info, action_buf);
503 }
504 })
505 }
506 };
507 }
508 macro_rules! drain_updates_parallel {
509 ($updates:expr) => {{
510 let predicted = self.preds;
511 let db = self.db;
512 action_buf.recur(
513 BorrowedLocalState {
514 binding_info,
515 instr_order,
516 updates: &mut $updates,
517 },
518 move || ExecutionState::new(predicted, db.read_only_view(), Default::default()),
519 move |BorrowedLocalState {
520 binding_info,
521 instr_order,
522 updates,
523 },
524 buf| {
525 updates.drain(|update| match update {
526 UpdateInstr::PushBinding(var, val) => {
527 binding_info.bindings.insert(var, val);
528 }
529 UpdateInstr::RefineAtom(atom, subset) => {
530 binding_info.insert_subset(atom, subset);
531 }
532 UpdateInstr::EndFrame => {
533 JoinState {
534 db,
535 preds: predicted,
536 }
537 .run_plan(
538 plan,
539 instr_order,
540 cur + 1,
541 binding_info,
542 buf,
543 );
544 }
545 })
546 },
547 );
548 $updates.clear();
549 }};
550 }
551
552 fn refine_subset(
553 sub: Subset,
554 constraints: &[Constraint],
555 table: &WrappedTableRef,
556 ) -> Subset {
557 let sub = table.refine_live(sub);
558 table.refine(sub, constraints)
559 }
560
561 match &plan.stages.instrs[instr_order.get(cur)] {
562 JoinStage::Intersect { var, scans } => match scans.as_slice() {
563 [] => {}
564 [a] if a.cs.is_empty() => {
565 if binding_info.has_empty_subset(a.atom) {
566 return;
567 }
568 let prober = self.get_column_index(plan, binding_info, a.atom, a.column);
569 let table = self.db.tables[plan.atoms[a.atom].table].table.as_ref();
570 let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
571 with_pool_set(|ps| {
572 prober.for_each(|val, x| {
573 updates.push_binding(*var, val[0]);
574 let sub = refine_subset(x.to_owned(&ps.get_pool()), &[], &table);
575 if sub.is_empty() {
576 updates.rollback();
577 return;
578 }
579 updates.refine_atom(a.atom, sub);
580 updates.finish_frame();
581 if updates.frames() >= chunk_size {
582 drain_updates!(updates);
583 }
584 })
585 });
586 drain_updates!(updates);
587 binding_info.move_back(a.atom, prober);
588 }
589 [a] => {
590 if binding_info.has_empty_subset(a.atom) {
591 return;
592 }
593 let prober = self.get_column_index(plan, binding_info, a.atom, a.column);
594 let table = self.db.tables[plan.atoms[a.atom].table].table.as_ref();
595 let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
596 with_pool_set(|ps| {
597 prober.for_each(|val, x| {
598 updates.push_binding(*var, val[0]);
599 let sub = refine_subset(x.to_owned(&ps.get_pool()), &a.cs, &table);
600 if sub.is_empty() {
601 updates.rollback();
602 return;
603 }
604 updates.refine_atom(a.atom, sub);
605 updates.finish_frame();
606 if updates.frames() >= chunk_size {
607 drain_updates!(updates);
608 }
609 })
610 });
611 drain_updates!(updates);
612 binding_info.move_back(a.atom, prober);
613 }
614 [a, b] => {
615 let a_prober = self.get_column_index(plan, binding_info, a.atom, a.column);
616 let b_prober = self.get_column_index(plan, binding_info, b.atom, b.column);
617
618 let ((smaller, smaller_scan), (larger, larger_scan)) =
619 if a_prober.len() < b_prober.len() {
620 ((&a_prober, a), (&b_prober, b))
621 } else {
622 ((&b_prober, b), (&a_prober, a))
623 };
624
625 let smaller_atom = smaller_scan.atom;
626 let larger_atom = larger_scan.atom;
627 let large_table = self.db.tables[plan.atoms[larger_atom].table].table.as_ref();
628 let small_table = self.db.tables[plan.atoms[smaller_atom].table]
629 .table
630 .as_ref();
631 let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
632 with_pool_set(|ps| {
633 smaller.for_each(|val, small_sub| {
634 if let Some(mut large_sub) = larger.get_subset(val) {
635 large_sub = refine_subset(large_sub, &larger_scan.cs, &large_table);
636 if large_sub.is_empty() {
637 updates.rollback();
638 return;
639 }
640 let small_sub = refine_subset(
641 small_sub.to_owned(&ps.get_pool()),
642 &smaller_scan.cs,
643 &small_table,
644 );
645 if small_sub.is_empty() {
646 updates.rollback();
647 return;
648 }
649 updates.push_binding(*var, val[0]);
650 updates.refine_atom(smaller_atom, small_sub);
651 updates.refine_atom(larger_atom, large_sub);
652 updates.finish_frame();
653 if updates.frames() >= chunk_size {
654 drain_updates_parallel!(updates);
655 }
656 }
657 });
658 });
659 drain_updates!(updates);
660
661 binding_info.move_back(a.atom, a_prober);
662 binding_info.move_back(b.atom, b_prober);
663 }
664 rest => {
665 let mut smallest = 0;
666 let mut smallest_size = usize::MAX;
667 let mut probers = Vec::with_capacity(rest.len());
668 for (i, scan) in rest.iter().enumerate() {
669 let prober =
670 self.get_column_index(plan, binding_info, scan.atom, scan.column);
671 let size = prober.len();
672 if size < smallest_size {
673 smallest = i;
674 smallest_size = size;
675 }
676 probers.push(prober);
677 }
678
679 let main_spec = &rest[smallest];
680 let main_spec_table = self.db.tables[plan.atoms[main_spec.atom].table]
681 .table
682 .as_ref();
683
684 if smallest_size != 0 {
685 let mut updates =
687 FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
688 probers[smallest].for_each(|key, sub| {
689 with_pool_set(|ps| {
690 updates.push_binding(*var, key[0]);
691 for (i, scan) in rest.iter().enumerate() {
692 if i == smallest {
693 continue;
694 }
695 if let Some(mut sub) = probers[i].get_subset(key) {
696 let table = self.db.tables[plan.atoms[rest[i].atom].table]
697 .table
698 .as_ref();
699 sub = refine_subset(sub, &rest[i].cs, &table);
700 if sub.is_empty() {
701 updates.rollback();
702 return;
703 }
704 updates.refine_atom(scan.atom, sub)
705 } else {
706 updates.rollback();
707 return;
709 }
710 }
711 let sub = sub.to_owned(&ps.get_pool());
712 let sub = refine_subset(sub, &main_spec.cs, &main_spec_table);
713 if sub.is_empty() {
714 updates.rollback();
715 return;
716 }
717 updates.refine_atom(main_spec.atom, sub);
718 updates.finish_frame();
719 if updates.frames() >= chunk_size {
720 drain_updates_parallel!(updates);
721 }
722 })
723 });
724 drain_updates!(updates);
725 }
726 for (spec, prober) in rest.iter().zip(probers.into_iter()) {
727 binding_info.move_back(spec.atom, prober);
728 }
729 }
730 },
731 JoinStage::FusedIntersect {
732 cover,
733 bind,
734 to_intersect,
735 } if to_intersect.is_empty() => {
736 let cover_atom = cover.to_index.atom;
737 if binding_info.has_empty_subset(cover_atom) {
738 return;
739 }
740 let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
741 let cover_node = binding_info.unwrap_val(cover_atom);
742 let cover_subset = cover_node.subset.as_ref();
743 let mut cur = Offset::new(0);
744 let mut buffer = TaggedRowBuffer::new(bind.len());
745 let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
746 loop {
747 buffer.clear();
748 let table = &self.db.tables[plan.atoms[cover_atom].table].table;
749 let next = table.scan_project(
750 cover_subset,
751 &proj,
752 cur,
753 chunk_size,
754 &cover.constraints,
755 &mut buffer,
756 );
757 for (row, key) in buffer.non_stale() {
758 updates.refine_atom(
759 cover_atom,
760 Subset::Dense(OffsetRange::new(row, row.inc())),
761 );
762 for (i, (_, var)) in bind.iter().enumerate() {
764 updates.push_binding(*var, key[i]);
765 }
766 updates.finish_frame();
767 if updates.frames() >= chunk_size {
768 drain_updates_parallel!(updates);
769 }
770 }
771 if let Some(next) = next {
772 cur = next;
773 continue;
774 }
775 break;
776 }
777 drain_updates!(updates);
778 binding_info.move_back_node(cover_atom, cover_node);
780 }
781 JoinStage::FusedIntersect {
782 cover,
783 bind,
784 to_intersect,
785 } => {
786 let cover_atom = cover.to_index.atom;
787 if binding_info.has_empty_subset(cover_atom) {
788 return;
789 }
790 let index_probers = to_intersect
791 .iter()
792 .enumerate()
793 .map(|(i, (spec, _))| {
794 (
795 i,
796 spec.to_index.atom,
797 self.get_index(
798 plan,
799 spec.to_index.atom,
800 binding_info,
801 spec.to_index.vars.iter().copied(),
802 ),
803 )
804 })
805 .collect::<SmallVec<[(usize, AtomId, Prober); 4]>>();
806 let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
807 let cover_node = binding_info.unwrap_val(cover_atom);
808 let cover_subset = cover_node.subset.as_ref();
809 let mut cur = Offset::new(0);
810 let mut buffer = TaggedRowBuffer::new(bind.len());
811 let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
812 loop {
813 buffer.clear();
814 let table = &self.db.tables[plan.atoms[cover_atom].table].table;
815 let next = table.scan_project(
816 cover_subset,
817 &proj,
818 cur,
819 chunk_size,
820 &cover.constraints,
821 &mut buffer,
822 );
823 'mid: for (row, key) in buffer.non_stale() {
824 updates.refine_atom(
825 cover_atom,
826 Subset::Dense(OffsetRange::new(row, row.inc())),
827 );
828 for (i, (_, var)) in bind.iter().enumerate() {
830 updates.push_binding(*var, key[i]);
831 }
832 for (i, atom, prober) in &index_probers {
834 let index_cols = &to_intersect[*i].1;
836 let index_key = index_cols
837 .iter()
838 .map(|col| key[col.index()])
839 .collect::<SmallVec<[Value; 4]>>();
840 let Some(mut subset) = prober.get_subset(&index_key) else {
841 updates.rollback();
842 continue 'mid;
844 };
845 let table_info = &self.db.tables[plan.atoms[*atom].table];
847 let cs = &to_intersect[*i].0.constraints;
848 subset = refine_subset(subset, cs, &table_info.table.as_ref());
849 if subset.is_empty() {
850 updates.rollback();
851 continue 'mid;
853 }
854 updates.refine_atom(*atom, subset);
855 }
856 updates.finish_frame();
857 if updates.frames() >= chunk_size {
858 drain_updates_parallel!(updates);
859 }
860 }
861 if let Some(next) = next {
862 cur = next;
863 continue;
864 }
865 break;
866 }
867 drain_updates!(updates);
872 binding_info.move_back_node(cover_atom, cover_node);
874 for (_, atom, prober) in index_probers {
875 binding_info.move_back(atom, prober);
876 }
877 }
878 }
879 }
880}
881
882const VAR_BATCH_SIZE: usize = 128;
883
884trait ActionBuffer<'state>: Send {
891 type AsLocal<'a>: ActionBuffer<'state>
892 where
893 'state: 'a;
894 fn push_bindings(
902 &mut self,
903 action: ActionId,
904 bindings: &DenseIdMap<Variable, Value>,
905 to_exec_state: impl FnMut() -> ExecutionState<'state>,
906 );
907
908 fn flush(&mut self, exec_state: &mut ExecutionState);
910
911 fn recur<'local>(
920 &mut self,
921 local: BorrowedLocalState<'local>,
922 to_exec_state: impl FnMut() -> ExecutionState<'state> + Send + 'state,
923 work: impl for<'a> FnOnce(BorrowedLocalState<'a>, &mut Self::AsLocal<'a>) + Send + 'state,
924 );
925
926 fn morsel_size(&mut self, _level: usize, _total: usize) -> usize {
932 256
933 }
934}
935
936struct InPlaceActionBuffer<'a> {
939 rule_set: &'a RuleSet,
940 match_counter: &'a MatchCounter,
941 batches: DenseIdMap<ActionId, ActionState>,
942}
943
944impl<'a, 'outer: 'a> ActionBuffer<'a> for InPlaceActionBuffer<'outer> {
945 type AsLocal<'b>
946 = Self
947 where
948 'a: 'b;
949
950 fn push_bindings(
951 &mut self,
952 action: ActionId,
953 bindings: &DenseIdMap<Variable, Value>,
954 mut to_exec_state: impl FnMut() -> ExecutionState<'a>,
955 ) {
956 let action_state = self.batches.get_or_default(action);
957 action_state.n_runs += 1;
958 action_state.len += 1;
959 let action_info = &self.rule_set.actions[action];
960 unsafe {
963 action_state.bindings.push(bindings, &action_info.used_vars);
964 }
965 if action_state.len >= VAR_BATCH_SIZE {
966 let mut state = to_exec_state();
967 let succeeded = state.run_instrs(&action_info.instrs, &mut action_state.bindings);
968 action_state.bindings.clear();
969 self.match_counter.inc_matches(action, succeeded);
970 action_state.len = 0;
971 }
972 }
973
974 fn flush(&mut self, exec_state: &mut ExecutionState) {
975 flush_action_states(
976 exec_state,
977 &mut self.batches,
978 self.rule_set,
979 self.match_counter,
980 );
981 }
982
983 fn recur<'local>(
984 &mut self,
985 local: BorrowedLocalState<'local>,
986 _to_exec_state: impl FnMut() -> ExecutionState<'a> + Send + 'a,
987 work: impl for<'b> FnOnce(BorrowedLocalState<'b>, &mut Self) + Send + 'a,
988 ) {
989 work(local, self)
990 }
991}
992
993struct ScopedActionBuffer<'inner, 'scope> {
995 scope: &'inner rayon::Scope<'scope>,
996 rule_set: &'scope RuleSet,
997 match_counter: &'scope MatchCounter,
998 batches: DenseIdMap<ActionId, ActionState>,
999 needs_flush: bool,
1000}
1001
1002impl<'inner, 'scope> ScopedActionBuffer<'inner, 'scope> {
1003 fn new(
1004 scope: &'inner rayon::Scope<'scope>,
1005 rule_set: &'scope RuleSet,
1006 match_counter: &'scope MatchCounter,
1007 ) -> Self {
1008 Self {
1009 scope,
1010 rule_set,
1011 batches: Default::default(),
1012 match_counter,
1013 needs_flush: false,
1014 }
1015 }
1016}
1017
1018impl<'scope> ActionBuffer<'scope> for ScopedActionBuffer<'_, 'scope> {
1019 type AsLocal<'a>
1020 = ScopedActionBuffer<'a, 'scope>
1021 where
1022 'scope: 'a;
1023 fn push_bindings(
1024 &mut self,
1025 action: ActionId,
1026 bindings: &DenseIdMap<Variable, Value>,
1027 mut to_exec_state: impl FnMut() -> ExecutionState<'scope>,
1028 ) {
1029 self.needs_flush = true;
1030 let action_state = self.batches.get_or_default(action);
1031 action_state.n_runs += 1;
1032 action_state.len += 1;
1033 let action_info = &self.rule_set.actions[action];
1034 unsafe {
1037 action_state.bindings.push(bindings, &action_info.used_vars);
1038 }
1039 if action_state.len >= VAR_BATCH_SIZE {
1040 let mut state = to_exec_state();
1041 let mut bindings =
1042 mem::replace(&mut action_state.bindings, Bindings::new(VAR_BATCH_SIZE));
1043 action_state.len = 0;
1044 self.scope.spawn(move |_| {
1045 state.run_instrs(&action_info.instrs, &mut bindings);
1046 });
1047 }
1048 }
1049
1050 fn flush(&mut self, exec_state: &mut ExecutionState) {
1051 flush_action_states(
1052 exec_state,
1053 &mut self.batches,
1054 self.rule_set,
1055 self.match_counter,
1056 );
1057 self.needs_flush = false;
1058 }
1059 fn recur<'local>(
1060 &mut self,
1061 mut local: BorrowedLocalState<'local>,
1062 mut to_exec_state: impl FnMut() -> ExecutionState<'scope> + Send + 'scope,
1063 work: impl for<'a> FnOnce(BorrowedLocalState<'a>, &mut ScopedActionBuffer<'a, 'scope>)
1064 + Send
1065 + 'scope,
1066 ) {
1067 let rule_set = self.rule_set;
1068 let match_counter = self.match_counter;
1069 let mut inner = local.clone_state();
1070 self.scope.spawn(move |scope| {
1071 let mut buf: ScopedActionBuffer<'_, 'scope> = ScopedActionBuffer {
1072 scope,
1073 rule_set,
1074 match_counter,
1075 needs_flush: false,
1076 batches: Default::default(),
1077 };
1078 work(inner.borrow_mut(), &mut buf);
1079 if buf.needs_flush {
1080 flush_action_states(
1081 &mut to_exec_state(),
1082 &mut buf.batches,
1083 buf.rule_set,
1084 buf.match_counter,
1085 );
1086 }
1087 });
1088 }
1089
1090 fn morsel_size(&mut self, _level: usize, _total: usize) -> usize {
1091 match _level {
1093 0 if _total > 2 => 32,
1094 _ => 256,
1095 }
1096 }
1097}
1098
1099fn flush_action_states(
1100 exec_state: &mut ExecutionState,
1101 actions: &mut DenseIdMap<ActionId, ActionState>,
1102 rule_set: &RuleSet,
1103 match_counter: &MatchCounter,
1104) {
1105 for (action, ActionState { bindings, len, .. }) in actions.iter_mut() {
1106 if *len > 0 {
1107 let succeeded = exec_state.run_instrs(&rule_set.actions[action].instrs, bindings);
1108 bindings.clear();
1109 match_counter.inc_matches(action, succeeded);
1110 *len = 0;
1111 }
1112 }
1113}
1114struct MatchCounter {
1115 matches: IdVec<ActionId, AtomicUsize>,
1116}
1117
1118impl MatchCounter {
1119 fn new(n_ids: usize) -> Self {
1120 let mut matches = IdVec::with_capacity(n_ids);
1121 matches.resize_with(n_ids, || AtomicUsize::new(0));
1122 Self { matches }
1123 }
1124
1125 fn inc_matches(&self, action: ActionId, by: usize) {
1126 self.matches[action].fetch_add(by, std::sync::atomic::Ordering::Relaxed);
1127 }
1128 fn read_matches(&self, action: ActionId) -> usize {
1129 self.matches[action].load(std::sync::atomic::Ordering::Acquire)
1130 }
1131}
1132
1133fn estimate_size(join_stage: &JoinStage, binding_info: &BindingInfo) -> usize {
1134 match join_stage {
1135 JoinStage::Intersect { scans, .. } => scans
1136 .iter()
1137 .map(|scan| binding_info.subsets[scan.atom].size())
1138 .min()
1139 .unwrap_or(0),
1140 JoinStage::FusedIntersect { cover, .. } => binding_info.subsets[cover.to_index.atom].size(),
1141 }
1142}
1143
1144fn num_intersected_rels(join_stage: &JoinStage) -> i32 {
1145 match join_stage {
1146 JoinStage::Intersect { scans, .. } => scans.len() as i32,
1147 JoinStage::FusedIntersect { to_intersect, .. } => to_intersect.len() as i32 + 1,
1148 }
1149}
1150
1151fn sort_plan_by_size(
1152 order: &mut InstrOrder,
1153 start: usize,
1154 instrs: &[JoinStage],
1155 binding_info: &mut BindingInfo,
1156) {
1157 let mut times_refined = with_pool_set(|ps| ps.get::<DenseIdMap<AtomId, i64>>());
1159
1160 for ins in instrs[..start].iter() {
1162 match ins {
1163 JoinStage::Intersect { scans, .. } => scans.iter().for_each(|scan| {
1164 *times_refined.get_or_default(scan.atom) += 1;
1165 }),
1166 JoinStage::FusedIntersect { cover, .. } => {
1167 *times_refined.get_or_default(cover.to_index.atom) +=
1168 cover.to_index.vars.len() as i64;
1169 }
1170 }
1171 }
1172
1173 let key_fn = |join_stage: &JoinStage,
1179 binding_info: &BindingInfo,
1180 times_refined: &DenseIdMap<AtomId, i64>| {
1181 let refine = match join_stage {
1182 JoinStage::Intersect { scans, .. } => scans
1183 .iter()
1184 .map(|scan| times_refined.get(scan.atom).copied().unwrap_or_default())
1185 .sum::<i64>(),
1186 JoinStage::FusedIntersect { cover, .. } => times_refined
1187 .get(cover.to_index.atom)
1188 .copied()
1189 .unwrap_or_default(),
1190 };
1191 (
1192 -refine,
1193 -num_intersected_rels(join_stage),
1194 estimate_size(join_stage, binding_info),
1195 )
1196 };
1197
1198 for i in start..order.len() {
1199 for j in i + 1..order.len() {
1200 let key_i = key_fn(&instrs[order.get(i)], binding_info, ×_refined);
1201 let key_j = key_fn(&instrs[order.get(j)], binding_info, ×_refined);
1202 if key_j < key_i {
1203 order.data.swap(i, j);
1204 }
1205 }
1206 match &instrs[order.get(i)] {
1208 JoinStage::Intersect { scans, .. } => scans.iter().for_each(|scan| {
1209 *times_refined.get_or_default(scan.atom) += 1;
1210 }),
1211 JoinStage::FusedIntersect { cover, .. } => {
1212 *times_refined.get_or_default(cover.to_index.atom) +=
1213 cover.to_index.vars.len() as i64;
1214 }
1215 }
1216 }
1217}
1218
1219#[derive(Debug, Clone, PartialEq, Eq)]
1220struct InstrOrder {
1221 data: SmallVec<[u16; 8]>,
1222}
1223
1224impl InstrOrder {
1225 fn new() -> Self {
1226 InstrOrder {
1227 data: SmallVec::new(),
1228 }
1229 }
1230
1231 fn from_iter(range: impl Iterator<Item = usize>) -> InstrOrder {
1232 let mut res = InstrOrder::new();
1233 res.data
1234 .extend(range.map(|x| u16::try_from(x).expect("too many instructions")));
1235 res
1236 }
1237
1238 fn get(&self, idx: usize) -> usize {
1239 self.data[idx] as usize
1240 }
1241 fn len(&self) -> usize {
1242 self.data.len()
1243 }
1244}
1245
1246struct BorrowedLocalState<'a> {
1247 instr_order: &'a mut InstrOrder,
1248 binding_info: &'a mut BindingInfo,
1249 updates: &'a mut FrameUpdates,
1250}
1251
1252impl BorrowedLocalState<'_> {
1253 fn clone_state(&mut self) -> LocalState {
1254 LocalState {
1255 instr_order: self.instr_order.clone(),
1256 binding_info: self.binding_info.clone(),
1257 updates: std::mem::take(self.updates),
1258 }
1259 }
1260}
1261
1262struct LocalState {
1263 instr_order: InstrOrder,
1264 binding_info: BindingInfo,
1265 updates: FrameUpdates,
1266}
1267
1268impl LocalState {
1269 fn borrow_mut<'a>(&'a mut self) -> BorrowedLocalState<'a> {
1270 BorrowedLocalState {
1271 instr_order: &mut self.instr_order,
1272 binding_info: &mut self.binding_info,
1273 updates: &mut self.updates,
1274 }
1275 }
1276}