egglog_core_relations/free_join/
execute.rs

1//! Core free join execution.
2
3use std::{
4    cmp, iter, mem,
5    sync::{Arc, OnceLock, atomic::AtomicUsize},
6};
7
8use crate::numeric_id::{DenseIdMap, IdVec, NumericId};
9use dashmap::mapref::one::RefMut;
10use smallvec::SmallVec;
11use web_time::Instant;
12
13use crate::{
14    Constraint, OffsetRange, Pool, SubsetRef,
15    action::{Bindings, ExecutionState, PredictedVals},
16    common::{DashMap, Value},
17    free_join::{
18        RuleReport, RuleSetReport,
19        frame_update::{FrameUpdates, UpdateInstr},
20        get_index_from_tableinfo,
21    },
22    hash_index::{ColumnIndex, IndexBase, TupleIndex},
23    offsets::{Offsets, SortedOffsetVector, Subset},
24    parallel_heuristics::parallelize_db_level_op,
25    pool::Pooled,
26    query::RuleSet,
27    row_buffer::TaggedRowBuffer,
28    table_spec::{ColumnId, Offset, WrappedTableRef},
29};
30
31use super::{
32    ActionId, AtomId, Database, HashColumnIndex, HashIndex, TableInfo, Variable,
33    get_column_index_from_tableinfo,
34    plan::{JoinHeader, JoinStage, Plan},
35    with_pool_set,
36};
37
38enum DynamicIndex {
39    Cached {
40        intersect_outer: bool,
41        table: HashIndex,
42    },
43    CachedColumn {
44        intersect_outer: bool,
45        table: HashColumnIndex,
46    },
47    Dynamic(TupleIndex),
48    DynamicColumn(Arc<ColumnIndex>),
49}
50
51struct Prober {
52    node: TrieNode,
53    pool: Pool<SortedOffsetVector>,
54    ix: DynamicIndex,
55}
56
57impl Prober {
58    fn get_subset(&self, key: &[Value]) -> Option<Subset> {
59        match &self.ix {
60            DynamicIndex::Cached {
61                intersect_outer,
62                table,
63            } => {
64                let mut sub = table.get().unwrap().get_subset(key)?.to_owned(&self.pool);
65                if *intersect_outer {
66                    sub.intersect(self.node.subset.as_ref(), &self.pool);
67                    if sub.is_empty() {
68                        return None;
69                    }
70                }
71                Some(sub)
72            }
73            DynamicIndex::CachedColumn {
74                intersect_outer,
75                table,
76            } => {
77                debug_assert_eq!(key.len(), 1);
78                let mut sub = table
79                    .get()
80                    .unwrap()
81                    .get_subset(&key[0])?
82                    .to_owned(&self.pool);
83                if *intersect_outer {
84                    sub.intersect(self.node.subset.as_ref(), &self.pool);
85                    if sub.is_empty() {
86                        return None;
87                    }
88                }
89                Some(sub)
90            }
91            DynamicIndex::Dynamic(tab) => tab.get_subset(key).map(|x| x.to_owned(&self.pool)),
92            DynamicIndex::DynamicColumn(tab) => {
93                tab.get_subset(&key[0]).map(|x| x.to_owned(&self.pool))
94            }
95        }
96    }
97    fn for_each(&self, mut f: impl FnMut(&[Value], SubsetRef)) {
98        match &self.ix {
99            DynamicIndex::Cached {
100                intersect_outer: true,
101                table,
102            } => table.get().unwrap().for_each(|k, v| {
103                let mut res = v.to_owned(&self.pool);
104                res.intersect(self.node.subset.as_ref(), &self.pool);
105                if !res.is_empty() {
106                    f(k, res.as_ref())
107                }
108            }),
109            DynamicIndex::Cached {
110                intersect_outer: false,
111                table,
112            } => table.get().unwrap().for_each(|k, v| f(k, v)),
113            DynamicIndex::CachedColumn {
114                intersect_outer: true,
115                table,
116            } => {
117                table.get().unwrap().for_each(|k, v| {
118                    let mut res = v.to_owned(&self.pool);
119                    res.intersect(self.node.subset.as_ref(), &self.pool);
120                    if !res.is_empty() {
121                        f(&[*k], res.as_ref())
122                    }
123                });
124            }
125            DynamicIndex::CachedColumn {
126                intersect_outer: false,
127                table,
128            } => {
129                table.get().unwrap().for_each(|k, v| f(&[*k], v));
130            }
131            DynamicIndex::Dynamic(tab) => {
132                tab.for_each(f);
133            }
134            DynamicIndex::DynamicColumn(tab) => tab.for_each(|k, v| {
135                f(&[*k], v);
136            }),
137        }
138    }
139
140    fn len(&self) -> usize {
141        match &self.ix {
142            DynamicIndex::Cached { table, .. } => table.get().unwrap().len(),
143            DynamicIndex::CachedColumn { table, .. } => table.get().unwrap().len(),
144            DynamicIndex::Dynamic(tab) => tab.len(),
145            DynamicIndex::DynamicColumn(tab) => tab.len(),
146        }
147    }
148}
149
150impl Database {
151    pub fn run_rule_set(&mut self, rule_set: &RuleSet) -> RuleSetReport {
152        if rule_set.plans.is_empty() {
153            return RuleSetReport::default();
154        }
155        let preds = with_pool_set(|ps| ps.get::<PredictedVals>());
156        let match_counter = MatchCounter::new(rule_set.actions.n_ids());
157
158        let search_and_apply_timer = Instant::now();
159        let rule_reports = DashMap::default();
160        if parallelize_db_level_op(self.total_size_estimate) {
161            rayon::in_place_scope(|scope| {
162                for (plan, desc, _action) in rule_set.plans.values() {
163                    scope.spawn(|scope| {
164                        let join_state = JoinState::new(self, &preds);
165                        let mut action_buf =
166                            ScopedActionBuffer::new(scope, rule_set, &match_counter);
167                        let mut binding_info = BindingInfo::default();
168                        for (id, info) in plan.atoms.iter() {
169                            let table = join_state.db.get_table(info.table);
170                            binding_info.insert_subset(id, table.all());
171                        }
172
173                        let search_and_apply_timer = Instant::now();
174                        join_state.run_header(plan, &mut binding_info, &mut action_buf);
175                        let search_and_apply_time = search_and_apply_timer.elapsed();
176
177                        if action_buf.needs_flush {
178                            action_buf.flush(&mut ExecutionState::new(
179                                &preds,
180                                self.read_only_view(),
181                                Default::default(),
182                            ));
183                        }
184                        let mut rule_report: RefMut<'_, String, RuleReport> =
185                            rule_reports.entry(desc.clone()).or_default();
186                        *rule_report = rule_report.union(&RuleReport {
187                            search_and_apply_time,
188                            num_matches: 0,
189                        });
190                    });
191                }
192            });
193        } else {
194            let join_state = JoinState::new(self, &preds);
195            // Just run all of the plans in order with a single in-place action
196            // buffer.
197            let mut action_buf = InPlaceActionBuffer {
198                rule_set,
199                match_counter: &match_counter,
200                batches: Default::default(),
201            };
202            for (plan, desc, _action) in rule_set.plans.values() {
203                let mut binding_info = BindingInfo::default();
204                for (id, info) in plan.atoms.iter() {
205                    let table = join_state.db.get_table(info.table);
206                    binding_info.insert_subset(id, table.all());
207                }
208
209                let search_and_apply_timer = Instant::now();
210                join_state.run_header(plan, &mut binding_info, &mut action_buf);
211                let search_and_apply_time = search_and_apply_timer.elapsed();
212
213                let mut rule_report = rule_reports.entry(desc.clone()).or_default();
214                *rule_report = rule_report.union(&RuleReport {
215                    search_and_apply_time,
216                    num_matches: 0,
217                });
218            }
219            action_buf.flush(&mut ExecutionState::new(
220                &preds,
221                self.read_only_view(),
222                Default::default(),
223            ));
224        }
225        for (_plan, desc, action) in rule_set.plans.values() {
226            let mut reservation = rule_reports.get_mut(desc).unwrap();
227            let RuleReport { num_matches, .. } = reservation.value_mut();
228            *num_matches += match_counter.read_matches(*action);
229        }
230        let search_and_apply_time = search_and_apply_timer.elapsed();
231
232        let merge_timer = Instant::now();
233        let changed = self.merge_all();
234        let merge_time = merge_timer.elapsed();
235
236        RuleSetReport {
237            changed,
238            rule_reports,
239            search_and_apply_time,
240            merge_time,
241        }
242    }
243}
244
245struct ActionState {
246    n_runs: usize,
247    len: usize,
248    bindings: Bindings,
249}
250
251impl Default for ActionState {
252    fn default() -> Self {
253        Self {
254            n_runs: 0,
255            len: 0,
256            bindings: Bindings::new(VAR_BATCH_SIZE),
257        }
258    }
259}
260
261struct JoinState<'a> {
262    db: &'a Database,
263    preds: &'a PredictedVals,
264}
265
266type ColumnIndexes = IdVec<ColumnId, OnceLock<Arc<ColumnIndex>>>;
267
268/// Information about the current subset of an atom's relation that is being considered, along with
269/// lazily-initialized, cached indexes on that subset.
270///
271/// This is the standard trie-node used in lazy implementations of GJ as in the original egglog
272/// implementation and the FJ paper. It currently does not handle non-column indexes, but that
273/// should be a fairly straightforward extension if we start generating plans that need those.
274/// (Right now, most plans iterating over more than one column just do a scan anyway).
275struct TrieNode {
276    /// The actual subset of the corresponding atom.
277    subset: Subset,
278    /// Any cached indexes on this subset.
279    cached_subsets: OnceLock<Arc<Pooled<ColumnIndexes>>>,
280}
281
282impl TrieNode {
283    fn size(&self) -> usize {
284        self.subset.size()
285    }
286    fn get_cached_index(&self, col: ColumnId, info: &TableInfo) -> Arc<ColumnIndex> {
287        self.cached_subsets.get_or_init(|| {
288            // Pre-size the vector so we do not need to borrow it mutably to initialize the index.
289            let mut vec: Pooled<ColumnIndexes> = with_pool_set(|ps| ps.get());
290            vec.resize_with(info.spec.arity(), OnceLock::new);
291            Arc::new(vec)
292        })[col]
293            .get_or_init(|| {
294                let col_index = info.table.group_by_col(self.subset.as_ref(), col);
295                Arc::new(col_index)
296            })
297            .clone()
298    }
299}
300
301impl Clone for TrieNode {
302    fn clone(&self) -> Self {
303        let cached_subsets = OnceLock::new();
304        if let Some(cached) = self.cached_subsets.get() {
305            cached_subsets.set(cached.clone()).ok().unwrap();
306        }
307        Self {
308            subset: self.subset.clone(),
309            cached_subsets,
310        }
311    }
312}
313
314#[derive(Default, Clone)]
315struct BindingInfo {
316    bindings: DenseIdMap<Variable, Value>,
317    subsets: DenseIdMap<AtomId, TrieNode>,
318}
319
320impl BindingInfo {
321    /// Initializes the atom-related metadata in the [`BindingInfo`].
322    fn insert_subset(&mut self, atom: AtomId, subset: Subset) {
323        let node = TrieNode {
324            subset,
325            cached_subsets: Default::default(),
326        };
327        self.subsets.insert(atom, node);
328    }
329
330    /// Probers returned from [`JoinState::get_index`] will move atom-related state out of the
331    /// [`BindingInfo`]. Once the caller is done using a prober, this method moves it back.
332    fn move_back(&mut self, atom: AtomId, prober: Prober) {
333        self.subsets.insert(atom, prober.node);
334    }
335
336    fn move_back_node(&mut self, atom: AtomId, node: TrieNode) {
337        self.subsets.insert(atom, node);
338    }
339
340    fn has_empty_subset(&self, atom: AtomId) -> bool {
341        self.subsets[atom].subset.is_empty()
342    }
343
344    fn unwrap_val(&mut self, atom: AtomId) -> TrieNode {
345        self.subsets.unwrap_val(atom)
346    }
347}
348
349impl<'a> JoinState<'a> {
350    fn new(db: &'a Database, preds: &'a PredictedVals) -> Self {
351        Self { db, preds }
352    }
353
354    fn get_index(
355        &self,
356        plan: &Plan,
357        atom: AtomId,
358        binding_info: &mut BindingInfo,
359        cols: impl Iterator<Item = ColumnId>,
360    ) -> Prober {
361        let cols = SmallVec::<[ColumnId; 4]>::from_iter(cols);
362        let trie_node = binding_info.subsets.unwrap_val(atom);
363        let subset = &trie_node.subset;
364
365        let table_id = plan.atoms[atom].table;
366        let info = &self.db.tables[table_id];
367        let all_cacheable = cols.iter().all(|col| {
368            !info
369                .spec
370                .uncacheable_columns
371                .get(*col)
372                .copied()
373                .unwrap_or(false)
374        });
375        let whole_table = info.table.all();
376        let dyn_index =
377            if all_cacheable && subset.is_dense() && whole_table.size() / 2 < subset.size() {
378                // Skip intersecting with the subset if we are just looking at the
379                // whole table.
380                let intersect_outer =
381                    !(whole_table.is_dense() && subset.bounds() == whole_table.bounds());
382                // heuristic: if the subset we are scanning is somewhat
383                // large _or_ it is most of the table, or we already have a cached
384                // index for it, then return it.
385                if cols.len() != 1 {
386                    DynamicIndex::Cached {
387                        intersect_outer,
388                        table: get_index_from_tableinfo(info, &cols).clone(),
389                    }
390                } else {
391                    DynamicIndex::CachedColumn {
392                        intersect_outer,
393                        table: get_column_index_from_tableinfo(info, cols[0]).clone(),
394                    }
395                }
396            } else if cols.len() != 1 {
397                // NB: we should have a caching strategy for non-column indexes.
398                DynamicIndex::Dynamic(info.table.group_by_key(subset.as_ref(), &cols))
399            } else {
400                DynamicIndex::DynamicColumn(trie_node.get_cached_index(cols[0], info))
401            };
402        Prober {
403            node: trie_node,
404            pool: with_pool_set(|ps| ps.get_pool().clone()),
405            ix: dyn_index,
406        }
407    }
408    fn get_column_index(
409        &self,
410        plan: &Plan,
411        binding_info: &mut BindingInfo,
412        atom: AtomId,
413        col: ColumnId,
414    ) -> Prober {
415        self.get_index(plan, atom, binding_info, iter::once(col))
416    }
417
418    /// Runs the free join plan, starting with the header.
419    ///
420    /// A bit about the `instr_order` parameter: This defines the order in which the [`JoinStage`]
421    /// instructions will run. We want to support cached [`Plan`]s that may be based on stale
422    /// ordering information. `instr_order` allows us to specify a new ordering of the instructions
423    /// without mutating the plan itself: `run_plan` simply executes
424    /// `plan.stages.instrs[instr_order[i]]` at stage `i`.
425    ///
426    /// This is also a stepping stone towards supporting fully dynamic variable ordering.
427    fn run_header<'buf, BUF: ActionBuffer<'buf>>(
428        &self,
429        plan: &'a Plan,
430        binding_info: &mut BindingInfo,
431        action_buf: &mut BUF,
432    ) where
433        'a: 'buf,
434    {
435        for JoinHeader { atom, subset, .. } in &plan.stages.header {
436            if subset.is_empty() {
437                return;
438            }
439            let mut cur = binding_info.unwrap_val(*atom);
440            debug_assert!(cur.cached_subsets.get().is_none());
441            cur.subset
442                .intersect(subset.as_ref(), &with_pool_set(|ps| ps.get_pool()));
443            binding_info.move_back_node(*atom, cur);
444        }
445        for (_, node) in binding_info.subsets.iter() {
446            if node.subset.is_empty() {
447                return;
448            }
449        }
450        let mut order = InstrOrder::from_iter(0..plan.stages.instrs.len());
451        sort_plan_by_size(&mut order, 0, &plan.stages.instrs, binding_info);
452        self.run_plan(plan, &mut order, 0, binding_info, action_buf);
453    }
454
455    /// The core method for executing a free join plan.
456    ///
457    /// This method takes the plan, mutable data-structures for variable binding and staging
458    /// actions, and two indexes: `cur` which is the current stage of the plan to run, and `level`
459    /// which is the current "fan-out" node we are in. The latter parameter is an experimental
460    /// index used to detect if we are at the "top" of a plan rather than the "bottom", and is
461    /// currently used as a heuristic to determine if we should increase parallelism more than the
462    /// default.
463    fn run_plan<'buf, BUF: ActionBuffer<'buf>>(
464        &self,
465        plan: &'a Plan,
466        instr_order: &mut InstrOrder,
467        cur: usize,
468        binding_info: &mut BindingInfo,
469        action_buf: &mut BUF,
470    ) where
471        'a: 'buf,
472    {
473        if cur >= instr_order.len() {
474            action_buf.push_bindings(plan.stages.actions, &binding_info.bindings, || {
475                ExecutionState::new(self.preds, self.db.read_only_view(), Default::default())
476            });
477            return;
478        }
479        let chunk_size = action_buf.morsel_size(cur, instr_order.len());
480        let mut cur_size = estimate_size(&plan.stages.instrs[instr_order.get(cur)], binding_info);
481        if cur_size > 32 && cur % 3 == 1 && cur < instr_order.len() - 1 {
482            // If we have a reasonable number of tuples to process, adjust the variable order every
483            // 3 rounds, but always make sure to readjust on the second roung.
484            sort_plan_by_size(instr_order, cur, &plan.stages.instrs, binding_info);
485            cur_size = estimate_size(&plan.stages.instrs[instr_order.get(cur)], binding_info);
486        }
487
488        // Helper macro (not its own method to appease the borrow checker).
489        macro_rules! drain_updates {
490            ($updates:expr) => {
491                if cur == 0 || cur == 1 {
492                    drain_updates_parallel!($updates)
493                } else {
494                    $updates.drain(|update| match update {
495                        UpdateInstr::PushBinding(var, val) => {
496                            binding_info.bindings.insert(var, val);
497                        }
498                        UpdateInstr::RefineAtom(atom, subset) => {
499                            binding_info.insert_subset(atom, subset);
500                        }
501                        UpdateInstr::EndFrame => {
502                            self.run_plan(plan, instr_order, cur + 1, binding_info, action_buf);
503                        }
504                    })
505                }
506            };
507        }
508        macro_rules! drain_updates_parallel {
509            ($updates:expr) => {{
510                let predicted = self.preds;
511                let db = self.db;
512                action_buf.recur(
513                    BorrowedLocalState {
514                        binding_info,
515                        instr_order,
516                        updates: &mut $updates,
517                    },
518                    move || ExecutionState::new(predicted, db.read_only_view(), Default::default()),
519                    move |BorrowedLocalState {
520                              binding_info,
521                              instr_order,
522                              updates,
523                          },
524                          buf| {
525                        updates.drain(|update| match update {
526                            UpdateInstr::PushBinding(var, val) => {
527                                binding_info.bindings.insert(var, val);
528                            }
529                            UpdateInstr::RefineAtom(atom, subset) => {
530                                binding_info.insert_subset(atom, subset);
531                            }
532                            UpdateInstr::EndFrame => {
533                                JoinState {
534                                    db,
535                                    preds: predicted,
536                                }
537                                .run_plan(
538                                    plan,
539                                    instr_order,
540                                    cur + 1,
541                                    binding_info,
542                                    buf,
543                                );
544                            }
545                        })
546                    },
547                );
548                $updates.clear();
549            }};
550        }
551
552        fn refine_subset(
553            sub: Subset,
554            constraints: &[Constraint],
555            table: &WrappedTableRef,
556        ) -> Subset {
557            let sub = table.refine_live(sub);
558            table.refine(sub, constraints)
559        }
560
561        match &plan.stages.instrs[instr_order.get(cur)] {
562            JoinStage::Intersect { var, scans } => match scans.as_slice() {
563                [] => {}
564                [a] if a.cs.is_empty() => {
565                    if binding_info.has_empty_subset(a.atom) {
566                        return;
567                    }
568                    let prober = self.get_column_index(plan, binding_info, a.atom, a.column);
569                    let table = self.db.tables[plan.atoms[a.atom].table].table.as_ref();
570                    let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
571                    with_pool_set(|ps| {
572                        prober.for_each(|val, x| {
573                            updates.push_binding(*var, val[0]);
574                            let sub = refine_subset(x.to_owned(&ps.get_pool()), &[], &table);
575                            if sub.is_empty() {
576                                updates.rollback();
577                                return;
578                            }
579                            updates.refine_atom(a.atom, sub);
580                            updates.finish_frame();
581                            if updates.frames() >= chunk_size {
582                                drain_updates!(updates);
583                            }
584                        })
585                    });
586                    drain_updates!(updates);
587                    binding_info.move_back(a.atom, prober);
588                }
589                [a] => {
590                    if binding_info.has_empty_subset(a.atom) {
591                        return;
592                    }
593                    let prober = self.get_column_index(plan, binding_info, a.atom, a.column);
594                    let table = self.db.tables[plan.atoms[a.atom].table].table.as_ref();
595                    let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
596                    with_pool_set(|ps| {
597                        prober.for_each(|val, x| {
598                            updates.push_binding(*var, val[0]);
599                            let sub = refine_subset(x.to_owned(&ps.get_pool()), &a.cs, &table);
600                            if sub.is_empty() {
601                                updates.rollback();
602                                return;
603                            }
604                            updates.refine_atom(a.atom, sub);
605                            updates.finish_frame();
606                            if updates.frames() >= chunk_size {
607                                drain_updates!(updates);
608                            }
609                        })
610                    });
611                    drain_updates!(updates);
612                    binding_info.move_back(a.atom, prober);
613                }
614                [a, b] => {
615                    let a_prober = self.get_column_index(plan, binding_info, a.atom, a.column);
616                    let b_prober = self.get_column_index(plan, binding_info, b.atom, b.column);
617
618                    let ((smaller, smaller_scan), (larger, larger_scan)) =
619                        if a_prober.len() < b_prober.len() {
620                            ((&a_prober, a), (&b_prober, b))
621                        } else {
622                            ((&b_prober, b), (&a_prober, a))
623                        };
624
625                    let smaller_atom = smaller_scan.atom;
626                    let larger_atom = larger_scan.atom;
627                    let large_table = self.db.tables[plan.atoms[larger_atom].table].table.as_ref();
628                    let small_table = self.db.tables[plan.atoms[smaller_atom].table]
629                        .table
630                        .as_ref();
631                    let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
632                    with_pool_set(|ps| {
633                        smaller.for_each(|val, small_sub| {
634                            if let Some(mut large_sub) = larger.get_subset(val) {
635                                large_sub = refine_subset(large_sub, &larger_scan.cs, &large_table);
636                                if large_sub.is_empty() {
637                                    updates.rollback();
638                                    return;
639                                }
640                                let small_sub = refine_subset(
641                                    small_sub.to_owned(&ps.get_pool()),
642                                    &smaller_scan.cs,
643                                    &small_table,
644                                );
645                                if small_sub.is_empty() {
646                                    updates.rollback();
647                                    return;
648                                }
649                                updates.push_binding(*var, val[0]);
650                                updates.refine_atom(smaller_atom, small_sub);
651                                updates.refine_atom(larger_atom, large_sub);
652                                updates.finish_frame();
653                                if updates.frames() >= chunk_size {
654                                    drain_updates_parallel!(updates);
655                                }
656                            }
657                        });
658                    });
659                    drain_updates!(updates);
660
661                    binding_info.move_back(a.atom, a_prober);
662                    binding_info.move_back(b.atom, b_prober);
663                }
664                rest => {
665                    let mut smallest = 0;
666                    let mut smallest_size = usize::MAX;
667                    let mut probers = Vec::with_capacity(rest.len());
668                    for (i, scan) in rest.iter().enumerate() {
669                        let prober =
670                            self.get_column_index(plan, binding_info, scan.atom, scan.column);
671                        let size = prober.len();
672                        if size < smallest_size {
673                            smallest = i;
674                            smallest_size = size;
675                        }
676                        probers.push(prober);
677                    }
678
679                    let main_spec = &rest[smallest];
680                    let main_spec_table = self.db.tables[plan.atoms[main_spec.atom].table]
681                        .table
682                        .as_ref();
683
684                    if smallest_size != 0 {
685                        // Smallest leads the scan
686                        let mut updates =
687                            FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
688                        probers[smallest].for_each(|key, sub| {
689                            with_pool_set(|ps| {
690                                updates.push_binding(*var, key[0]);
691                                for (i, scan) in rest.iter().enumerate() {
692                                    if i == smallest {
693                                        continue;
694                                    }
695                                    if let Some(mut sub) = probers[i].get_subset(key) {
696                                        let table = self.db.tables[plan.atoms[rest[i].atom].table]
697                                            .table
698                                            .as_ref();
699                                        sub = refine_subset(sub, &rest[i].cs, &table);
700                                        if sub.is_empty() {
701                                            updates.rollback();
702                                            return;
703                                        }
704                                        updates.refine_atom(scan.atom, sub)
705                                    } else {
706                                        updates.rollback();
707                                        // Empty intersection.
708                                        return;
709                                    }
710                                }
711                                let sub = sub.to_owned(&ps.get_pool());
712                                let sub = refine_subset(sub, &main_spec.cs, &main_spec_table);
713                                if sub.is_empty() {
714                                    updates.rollback();
715                                    return;
716                                }
717                                updates.refine_atom(main_spec.atom, sub);
718                                updates.finish_frame();
719                                if updates.frames() >= chunk_size {
720                                    drain_updates_parallel!(updates);
721                                }
722                            })
723                        });
724                        drain_updates!(updates);
725                    }
726                    for (spec, prober) in rest.iter().zip(probers.into_iter()) {
727                        binding_info.move_back(spec.atom, prober);
728                    }
729                }
730            },
731            JoinStage::FusedIntersect {
732                cover,
733                bind,
734                to_intersect,
735            } if to_intersect.is_empty() => {
736                let cover_atom = cover.to_index.atom;
737                if binding_info.has_empty_subset(cover_atom) {
738                    return;
739                }
740                let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
741                let cover_node = binding_info.unwrap_val(cover_atom);
742                let cover_subset = cover_node.subset.as_ref();
743                let mut cur = Offset::new(0);
744                let mut buffer = TaggedRowBuffer::new(bind.len());
745                let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
746                loop {
747                    buffer.clear();
748                    let table = &self.db.tables[plan.atoms[cover_atom].table].table;
749                    let next = table.scan_project(
750                        cover_subset,
751                        &proj,
752                        cur,
753                        chunk_size,
754                        &cover.constraints,
755                        &mut buffer,
756                    );
757                    for (row, key) in buffer.non_stale() {
758                        updates.refine_atom(
759                            cover_atom,
760                            Subset::Dense(OffsetRange::new(row, row.inc())),
761                        );
762                        // bind the values
763                        for (i, (_, var)) in bind.iter().enumerate() {
764                            updates.push_binding(*var, key[i]);
765                        }
766                        updates.finish_frame();
767                        if updates.frames() >= chunk_size {
768                            drain_updates_parallel!(updates);
769                        }
770                    }
771                    if let Some(next) = next {
772                        cur = next;
773                        continue;
774                    }
775                    break;
776                }
777                drain_updates!(updates);
778                // Restore the subsets we swapped out.
779                binding_info.move_back_node(cover_atom, cover_node);
780            }
781            JoinStage::FusedIntersect {
782                cover,
783                bind,
784                to_intersect,
785            } => {
786                let cover_atom = cover.to_index.atom;
787                if binding_info.has_empty_subset(cover_atom) {
788                    return;
789                }
790                let index_probers = to_intersect
791                    .iter()
792                    .enumerate()
793                    .map(|(i, (spec, _))| {
794                        (
795                            i,
796                            spec.to_index.atom,
797                            self.get_index(
798                                plan,
799                                spec.to_index.atom,
800                                binding_info,
801                                spec.to_index.vars.iter().copied(),
802                            ),
803                        )
804                    })
805                    .collect::<SmallVec<[(usize, AtomId, Prober); 4]>>();
806                let proj = SmallVec::<[ColumnId; 4]>::from_iter(bind.iter().map(|(col, _)| *col));
807                let cover_node = binding_info.unwrap_val(cover_atom);
808                let cover_subset = cover_node.subset.as_ref();
809                let mut cur = Offset::new(0);
810                let mut buffer = TaggedRowBuffer::new(bind.len());
811                let mut updates = FrameUpdates::with_capacity(cmp::min(chunk_size, cur_size));
812                loop {
813                    buffer.clear();
814                    let table = &self.db.tables[plan.atoms[cover_atom].table].table;
815                    let next = table.scan_project(
816                        cover_subset,
817                        &proj,
818                        cur,
819                        chunk_size,
820                        &cover.constraints,
821                        &mut buffer,
822                    );
823                    'mid: for (row, key) in buffer.non_stale() {
824                        updates.refine_atom(
825                            cover_atom,
826                            Subset::Dense(OffsetRange::new(row, row.inc())),
827                        );
828                        // bind the values
829                        for (i, (_, var)) in bind.iter().enumerate() {
830                            updates.push_binding(*var, key[i]);
831                        }
832                        // now probe each remaining indexes
833                        for (i, atom, prober) in &index_probers {
834                            // create a key: to_intersect indexes into the key from the cover
835                            let index_cols = &to_intersect[*i].1;
836                            let index_key = index_cols
837                                .iter()
838                                .map(|col| key[col.index()])
839                                .collect::<SmallVec<[Value; 4]>>();
840                            let Some(mut subset) = prober.get_subset(&index_key) else {
841                                updates.rollback();
842                                // There are no possible values for this subset
843                                continue 'mid;
844                            };
845                            // apply any constraints needed in this scan.
846                            let table_info = &self.db.tables[plan.atoms[*atom].table];
847                            let cs = &to_intersect[*i].0.constraints;
848                            subset = refine_subset(subset, cs, &table_info.table.as_ref());
849                            if subset.is_empty() {
850                                updates.rollback();
851                                // There are no possible values for this subset
852                                continue 'mid;
853                            }
854                            updates.refine_atom(*atom, subset);
855                        }
856                        updates.finish_frame();
857                        if updates.frames() >= chunk_size {
858                            drain_updates_parallel!(updates);
859                        }
860                    }
861                    if let Some(next) = next {
862                        cur = next;
863                        continue;
864                    }
865                    break;
866                }
867                // TODO: special-case the scenario when the cover doesn't need
868                // deduping (and hence we can do a straight scan: e.g. when the
869                // cover is binding a superset of the primary key for the
870                // table).
871                drain_updates!(updates);
872                // Restore the subsets we swapped out.
873                binding_info.move_back_node(cover_atom, cover_node);
874                for (_, atom, prober) in index_probers {
875                    binding_info.move_back(atom, prober);
876                }
877            }
878        }
879    }
880}
881
882const VAR_BATCH_SIZE: usize = 128;
883
884/// A trait used to abstract over different ways of buffering actions together
885/// before running them.
886///
887/// This trait exists as a fairly ad-hoc wrapper over its two implementations.
888/// It allows us to avoid duplicating the (somewhat monstrous) `run_plan` method
889/// for serial and parallel modes.
890trait ActionBuffer<'state>: Send {
891    type AsLocal<'a>: ActionBuffer<'state>
892    where
893        'state: 'a;
894    /// Push the given bindings to be executed for the specified action. If this
895    /// buffer has built up a sufficient batch size, it may execute
896    /// `to_exec_state` and then execute the action.
897    ///
898    /// NB: `push_bindings` makes module-specific assumptions on what values are passed to
899    /// `bindings` for a common `action`. This is not a general-purpose trait for that reason and
900    /// it should not, in general, be used outside of this module.
901    fn push_bindings(
902        &mut self,
903        action: ActionId,
904        bindings: &DenseIdMap<Variable, Value>,
905        to_exec_state: impl FnMut() -> ExecutionState<'state>,
906    );
907
908    /// Execute any remaining actions associated with this buffer.
909    fn flush(&mut self, exec_state: &mut ExecutionState);
910
911    /// Execute `work`, potentially asynchronously, with a mutable reference to
912    /// an action buffer, potentially handed off to a different thread.
913    ///
914    /// Callers [`BorrowedLocalState`] values that may be modified by work, or
915    /// cloned first and then have a separate copy modified by `work`. Callers
916    /// should assume that `local` _is_ modified synchronously.
917    // NB: Earlier versions of this method had BorrowedLocalState be a generic instead, but this
918    // ran into difficulties when we needed to pass multiple mutable references.
919    fn recur<'local>(
920        &mut self,
921        local: BorrowedLocalState<'local>,
922        to_exec_state: impl FnMut() -> ExecutionState<'state> + Send + 'state,
923        work: impl for<'a> FnOnce(BorrowedLocalState<'a>, &mut Self::AsLocal<'a>) + Send + 'state,
924    );
925
926    /// The unit at which you should batch updates passed to calls to `recur`,
927    /// potentially depending on the current level of recursion.
928    ///
929    /// As of right now this is just a hard-coded value. We may change it in the
930    /// future to fan out more at higher levels though.
931    fn morsel_size(&mut self, _level: usize, _total: usize) -> usize {
932        256
933    }
934}
935
936/// The action buffer we use if we are executing in a single-threaded
937/// environment. It builds up local batches and then flushes them inline.
938struct InPlaceActionBuffer<'a> {
939    rule_set: &'a RuleSet,
940    match_counter: &'a MatchCounter,
941    batches: DenseIdMap<ActionId, ActionState>,
942}
943
944impl<'a, 'outer: 'a> ActionBuffer<'a> for InPlaceActionBuffer<'outer> {
945    type AsLocal<'b>
946        = Self
947    where
948        'a: 'b;
949
950    fn push_bindings(
951        &mut self,
952        action: ActionId,
953        bindings: &DenseIdMap<Variable, Value>,
954        mut to_exec_state: impl FnMut() -> ExecutionState<'a>,
955    ) {
956        let action_state = self.batches.get_or_default(action);
957        action_state.n_runs += 1;
958        action_state.len += 1;
959        let action_info = &self.rule_set.actions[action];
960        // SAFETY: `used_vars` is a constant per-rule. This module only ever calls it with
961        // `bindings` produced by the same join.
962        unsafe {
963            action_state.bindings.push(bindings, &action_info.used_vars);
964        }
965        if action_state.len >= VAR_BATCH_SIZE {
966            let mut state = to_exec_state();
967            let succeeded = state.run_instrs(&action_info.instrs, &mut action_state.bindings);
968            action_state.bindings.clear();
969            self.match_counter.inc_matches(action, succeeded);
970            action_state.len = 0;
971        }
972    }
973
974    fn flush(&mut self, exec_state: &mut ExecutionState) {
975        flush_action_states(
976            exec_state,
977            &mut self.batches,
978            self.rule_set,
979            self.match_counter,
980        );
981    }
982
983    fn recur<'local>(
984        &mut self,
985        local: BorrowedLocalState<'local>,
986        _to_exec_state: impl FnMut() -> ExecutionState<'a> + Send + 'a,
987        work: impl for<'b> FnOnce(BorrowedLocalState<'b>, &mut Self) + Send + 'a,
988    ) {
989        work(local, self)
990    }
991}
992
993/// An Action buffer that hands off batches to of actions to rayon to execute.
994struct ScopedActionBuffer<'inner, 'scope> {
995    scope: &'inner rayon::Scope<'scope>,
996    rule_set: &'scope RuleSet,
997    match_counter: &'scope MatchCounter,
998    batches: DenseIdMap<ActionId, ActionState>,
999    needs_flush: bool,
1000}
1001
1002impl<'inner, 'scope> ScopedActionBuffer<'inner, 'scope> {
1003    fn new(
1004        scope: &'inner rayon::Scope<'scope>,
1005        rule_set: &'scope RuleSet,
1006        match_counter: &'scope MatchCounter,
1007    ) -> Self {
1008        Self {
1009            scope,
1010            rule_set,
1011            batches: Default::default(),
1012            match_counter,
1013            needs_flush: false,
1014        }
1015    }
1016}
1017
1018impl<'scope> ActionBuffer<'scope> for ScopedActionBuffer<'_, 'scope> {
1019    type AsLocal<'a>
1020        = ScopedActionBuffer<'a, 'scope>
1021    where
1022        'scope: 'a;
1023    fn push_bindings(
1024        &mut self,
1025        action: ActionId,
1026        bindings: &DenseIdMap<Variable, Value>,
1027        mut to_exec_state: impl FnMut() -> ExecutionState<'scope>,
1028    ) {
1029        self.needs_flush = true;
1030        let action_state = self.batches.get_or_default(action);
1031        action_state.n_runs += 1;
1032        action_state.len += 1;
1033        let action_info = &self.rule_set.actions[action];
1034        // SAFETY: `used_vars` is a constant per-rule. This module only ever calls it with
1035        // `bindings` produced by the same join.
1036        unsafe {
1037            action_state.bindings.push(bindings, &action_info.used_vars);
1038        }
1039        if action_state.len >= VAR_BATCH_SIZE {
1040            let mut state = to_exec_state();
1041            let mut bindings =
1042                mem::replace(&mut action_state.bindings, Bindings::new(VAR_BATCH_SIZE));
1043            action_state.len = 0;
1044            self.scope.spawn(move |_| {
1045                state.run_instrs(&action_info.instrs, &mut bindings);
1046            });
1047        }
1048    }
1049
1050    fn flush(&mut self, exec_state: &mut ExecutionState) {
1051        flush_action_states(
1052            exec_state,
1053            &mut self.batches,
1054            self.rule_set,
1055            self.match_counter,
1056        );
1057        self.needs_flush = false;
1058    }
1059    fn recur<'local>(
1060        &mut self,
1061        mut local: BorrowedLocalState<'local>,
1062        mut to_exec_state: impl FnMut() -> ExecutionState<'scope> + Send + 'scope,
1063        work: impl for<'a> FnOnce(BorrowedLocalState<'a>, &mut ScopedActionBuffer<'a, 'scope>)
1064        + Send
1065        + 'scope,
1066    ) {
1067        let rule_set = self.rule_set;
1068        let match_counter = self.match_counter;
1069        let mut inner = local.clone_state();
1070        self.scope.spawn(move |scope| {
1071            let mut buf: ScopedActionBuffer<'_, 'scope> = ScopedActionBuffer {
1072                scope,
1073                rule_set,
1074                match_counter,
1075                needs_flush: false,
1076                batches: Default::default(),
1077            };
1078            work(inner.borrow_mut(), &mut buf);
1079            if buf.needs_flush {
1080                flush_action_states(
1081                    &mut to_exec_state(),
1082                    &mut buf.batches,
1083                    buf.rule_set,
1084                    buf.match_counter,
1085                );
1086            }
1087        });
1088    }
1089
1090    fn morsel_size(&mut self, _level: usize, _total: usize) -> usize {
1091        // Lower morsel size to increase parallelism.
1092        match _level {
1093            0 if _total > 2 => 32,
1094            _ => 256,
1095        }
1096    }
1097}
1098
1099fn flush_action_states(
1100    exec_state: &mut ExecutionState,
1101    actions: &mut DenseIdMap<ActionId, ActionState>,
1102    rule_set: &RuleSet,
1103    match_counter: &MatchCounter,
1104) {
1105    for (action, ActionState { bindings, len, .. }) in actions.iter_mut() {
1106        if *len > 0 {
1107            let succeeded = exec_state.run_instrs(&rule_set.actions[action].instrs, bindings);
1108            bindings.clear();
1109            match_counter.inc_matches(action, succeeded);
1110            *len = 0;
1111        }
1112    }
1113}
1114struct MatchCounter {
1115    matches: IdVec<ActionId, AtomicUsize>,
1116}
1117
1118impl MatchCounter {
1119    fn new(n_ids: usize) -> Self {
1120        let mut matches = IdVec::with_capacity(n_ids);
1121        matches.resize_with(n_ids, || AtomicUsize::new(0));
1122        Self { matches }
1123    }
1124
1125    fn inc_matches(&self, action: ActionId, by: usize) {
1126        self.matches[action].fetch_add(by, std::sync::atomic::Ordering::Relaxed);
1127    }
1128    fn read_matches(&self, action: ActionId) -> usize {
1129        self.matches[action].load(std::sync::atomic::Ordering::Acquire)
1130    }
1131}
1132
1133fn estimate_size(join_stage: &JoinStage, binding_info: &BindingInfo) -> usize {
1134    match join_stage {
1135        JoinStage::Intersect { scans, .. } => scans
1136            .iter()
1137            .map(|scan| binding_info.subsets[scan.atom].size())
1138            .min()
1139            .unwrap_or(0),
1140        JoinStage::FusedIntersect { cover, .. } => binding_info.subsets[cover.to_index.atom].size(),
1141    }
1142}
1143
1144fn num_intersected_rels(join_stage: &JoinStage) -> i32 {
1145    match join_stage {
1146        JoinStage::Intersect { scans, .. } => scans.len() as i32,
1147        JoinStage::FusedIntersect { to_intersect, .. } => to_intersect.len() as i32 + 1,
1148    }
1149}
1150
1151fn sort_plan_by_size(
1152    order: &mut InstrOrder,
1153    start: usize,
1154    instrs: &[JoinStage],
1155    binding_info: &mut BindingInfo,
1156) {
1157    // How many times an atom has been intersected/joined
1158    let mut times_refined = with_pool_set(|ps| ps.get::<DenseIdMap<AtomId, i64>>());
1159
1160    // Count how many times each atom has been refined so far.
1161    for ins in instrs[..start].iter() {
1162        match ins {
1163            JoinStage::Intersect { scans, .. } => scans.iter().for_each(|scan| {
1164                *times_refined.get_or_default(scan.atom) += 1;
1165            }),
1166            JoinStage::FusedIntersect { cover, .. } => {
1167                *times_refined.get_or_default(cover.to_index.atom) +=
1168                    cover.to_index.vars.len() as i64;
1169            }
1170        }
1171    }
1172
1173    // We prioritize variables by
1174    //
1175    //   (1) how many times an atom with this variable has been refined,
1176    //   (2) then by how many relations joins on this variable
1177    //   (3) then by the cardinality of the variable to be enumerated
1178    let key_fn = |join_stage: &JoinStage,
1179                  binding_info: &BindingInfo,
1180                  times_refined: &DenseIdMap<AtomId, i64>| {
1181        let refine = match join_stage {
1182            JoinStage::Intersect { scans, .. } => scans
1183                .iter()
1184                .map(|scan| times_refined.get(scan.atom).copied().unwrap_or_default())
1185                .sum::<i64>(),
1186            JoinStage::FusedIntersect { cover, .. } => times_refined
1187                .get(cover.to_index.atom)
1188                .copied()
1189                .unwrap_or_default(),
1190        };
1191        (
1192            -refine,
1193            -num_intersected_rels(join_stage),
1194            estimate_size(join_stage, binding_info),
1195        )
1196    };
1197
1198    for i in start..order.len() {
1199        for j in i + 1..order.len() {
1200            let key_i = key_fn(&instrs[order.get(i)], binding_info, &times_refined);
1201            let key_j = key_fn(&instrs[order.get(j)], binding_info, &times_refined);
1202            if key_j < key_i {
1203                order.data.swap(i, j);
1204            }
1205        }
1206        // Update the counts after a new instruction is selected.
1207        match &instrs[order.get(i)] {
1208            JoinStage::Intersect { scans, .. } => scans.iter().for_each(|scan| {
1209                *times_refined.get_or_default(scan.atom) += 1;
1210            }),
1211            JoinStage::FusedIntersect { cover, .. } => {
1212                *times_refined.get_or_default(cover.to_index.atom) +=
1213                    cover.to_index.vars.len() as i64;
1214            }
1215        }
1216    }
1217}
1218
1219#[derive(Debug, Clone, PartialEq, Eq)]
1220struct InstrOrder {
1221    data: SmallVec<[u16; 8]>,
1222}
1223
1224impl InstrOrder {
1225    fn new() -> Self {
1226        InstrOrder {
1227            data: SmallVec::new(),
1228        }
1229    }
1230
1231    fn from_iter(range: impl Iterator<Item = usize>) -> InstrOrder {
1232        let mut res = InstrOrder::new();
1233        res.data
1234            .extend(range.map(|x| u16::try_from(x).expect("too many instructions")));
1235        res
1236    }
1237
1238    fn get(&self, idx: usize) -> usize {
1239        self.data[idx] as usize
1240    }
1241    fn len(&self) -> usize {
1242        self.data.len()
1243    }
1244}
1245
1246struct BorrowedLocalState<'a> {
1247    instr_order: &'a mut InstrOrder,
1248    binding_info: &'a mut BindingInfo,
1249    updates: &'a mut FrameUpdates,
1250}
1251
1252impl BorrowedLocalState<'_> {
1253    fn clone_state(&mut self) -> LocalState {
1254        LocalState {
1255            instr_order: self.instr_order.clone(),
1256            binding_info: self.binding_info.clone(),
1257            updates: std::mem::take(self.updates),
1258        }
1259    }
1260}
1261
1262struct LocalState {
1263    instr_order: InstrOrder,
1264    binding_info: BindingInfo,
1265    updates: FrameUpdates,
1266}
1267
1268impl LocalState {
1269    fn borrow_mut<'a>(&'a mut self) -> BorrowedLocalState<'a> {
1270        BorrowedLocalState {
1271            instr_order: &mut self.instr_order,
1272            binding_info: &mut self.binding_info,
1273            updates: &mut self.updates,
1274        }
1275    }
1276}