egglog_core_relations/action/
mod.rs

1//! Instructions that are executed on the results of a query.
2//!
3//! This allows us to execute the "right-hand-side" of a rule. The
4//! implementation here is optimized to execute on a batch of rows at a time.
5use std::ops::Deref;
6
7use crate::numeric_id::{DenseIdMap, NumericId};
8use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
9use smallvec::SmallVec;
10
11use crate::{
12    BaseValues, ContainerValues, ExternalFunctionId, WrappedTable,
13    common::{DashMap, Value},
14    free_join::{CounterId, Counters, ExternalFunctions, TableId, TableInfo, Variable},
15    pool::{Clear, Pooled, with_pool_set},
16    table_spec::{ColumnId, MutationBuffer},
17};
18
19use self::mask::{Mask, MaskIter, ValueSource};
20
21#[macro_use]
22pub(crate) mod mask;
23
24#[cfg(test)]
25mod tests;
26
27/// A representation of a value within a query or rule.
28///
29/// A QueryEntry is either a variable bound in a join, or an untyped constant.
30#[derive(Copy, Clone, Debug)]
31pub enum QueryEntry {
32    Var(Variable),
33    Const(Value),
34}
35
36impl From<Variable> for QueryEntry {
37    fn from(var: Variable) -> Self {
38        QueryEntry::Var(var)
39    }
40}
41
42impl From<Value> for QueryEntry {
43    fn from(val: Value) -> Self {
44        QueryEntry::Const(val)
45    }
46}
47
48/// A value that can be written to a table in an action.
49#[derive(Debug, Clone, Copy)]
50pub enum WriteVal {
51    /// A variable or a constant.
52    QueryEntry(QueryEntry),
53    /// A fresh value from the given counter.
54    IncCounter(CounterId),
55    /// The value of the current row index.
56    CurrentVal(usize),
57}
58
59impl<T> From<T> for WriteVal
60where
61    T: Into<QueryEntry>,
62{
63    fn from(val: T) -> Self {
64        WriteVal::QueryEntry(val.into())
65    }
66}
67
68impl From<CounterId> for WriteVal {
69    fn from(ctr: CounterId) -> Self {
70        WriteVal::IncCounter(ctr)
71    }
72}
73
74/// A value that can be written to the database during a merge action.
75#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
76pub enum MergeVal {
77    /// A fresh value from the given counter.
78    Counter(CounterId),
79    /// A standard constant value.
80    Constant(Value),
81}
82
83impl From<CounterId> for MergeVal {
84    fn from(ctr: CounterId) -> Self {
85        MergeVal::Counter(ctr)
86    }
87}
88
89impl From<Value> for MergeVal {
90    fn from(val: Value) -> Self {
91        MergeVal::Constant(val)
92    }
93}
94
95/// Bindings store a sequence of values for a given set of variables.
96///
97/// The intent of bindings is to store a sequence of mappings from [`Variable`] to [`Value`], in a
98/// struct-of-arrays style that is better laid out for processing bindings in batches.
99#[derive(Debug)]
100pub(crate) struct Bindings {
101    matches: usize,
102    /// The maximum number of calls to `push` that we can receive before we clear the
103    /// [`Bindings`].
104    // This is used to preallocate chunks of the flat `data` vector.
105    max_batch_size: usize,
106    data: Pooled<Vec<Value>>,
107    /// Points into `data`. data[vars[var].. vars[var]+matches]` contains the values for `data`.
108    var_offsets: DenseIdMap<Variable, usize>,
109}
110
111impl std::ops::Index<Variable> for Bindings {
112    type Output = [Value];
113    fn index(&self, var: Variable) -> &[Value] {
114        self.get(var).unwrap()
115    }
116}
117
118impl Bindings {
119    pub(crate) fn new(max_batch_size: usize) -> Self {
120        Bindings {
121            matches: 0,
122            max_batch_size,
123            data: Default::default(),
124            var_offsets: DenseIdMap::new(),
125        }
126    }
127    fn assert_invariant(&self) {
128        #[cfg(debug_assertions)]
129        {
130            assert!(self.matches <= self.max_batch_size);
131            for (var, start) in self.var_offsets.iter() {
132                assert!(
133                    start + self.matches <= self.data.len(),
134                    "Variable {:?} starts at {}, but data only has {} elements",
135                    var,
136                    start,
137                    self.data.len()
138                );
139            }
140        }
141    }
142
143    pub(crate) fn clear(&mut self) {
144        self.matches = 0;
145        self.var_offsets.clear();
146        self.data.clear();
147        self.assert_invariant();
148    }
149
150    fn get(&self, var: Variable) -> Option<&[Value]> {
151        let start = self.var_offsets.get(var)?;
152        Some(&self.data[*start..*start + self.matches])
153    }
154
155    fn add_mapping(&mut self, var: Variable, vals: &[Value]) {
156        let start = self.data.len();
157        self.data.extend_from_slice(vals);
158        // We have a flat representation of the data, meaning that writing more than
159        // `max_batch_size` values to `var` could overwrite values for a different variable, which
160        // would produce some mysterious results that are hard to debug.
161        debug_assert!(vals.len() <= self.max_batch_size);
162        if vals.len() < self.max_batch_size {
163            let target_len = self.data.len() + self.max_batch_size - vals.len();
164            self.data.resize(target_len, Value::stale());
165        }
166        self.var_offsets.insert(var, start);
167    }
168
169    pub(crate) fn insert(&mut self, var: Variable, vals: &[Value]) {
170        if self.var_offsets.n_ids() == 0 {
171            self.matches = vals.len();
172        } else {
173            assert_eq!(self.matches, vals.len());
174        }
175        self.add_mapping(var, vals);
176        self.assert_invariant();
177    }
178
179    /// Push a new set of bindings for the given variables.
180    ///
181    /// # Safety:
182    /// This method assumes that all calls to `push`:
183    /// * Have a mapping for every member of `used_vars`.
184    /// * Are passed the same `used_vars`.
185    ///
186    /// It is unsafe to avoid bounds-checking. This method is called extremely frequently and the
187    /// overhead of boundschecking is noticeable.
188    pub(crate) unsafe fn push(
189        &mut self,
190        map: &DenseIdMap<Variable, Value>,
191        used_vars: &[Variable],
192    ) {
193        if self.matches != 0 {
194            assert!(self.matches < self.max_batch_size);
195            #[cfg(debug_assertions)]
196            {
197                for var in used_vars {
198                    assert!(
199                        self.var_offsets.get(*var).is_some(),
200                        "Variable {:?} not found in bindings {:?}",
201                        var,
202                        self.var_offsets
203                    );
204                }
205            }
206            for var in used_vars {
207                let var = var.index();
208                // Safe version: this degrades some benchmarks by ~6%
209                // let start = self.var_offsets.raw()[var].unwrap();
210                // self.data[start + self.matches] = map.raw()[var].unwrap();
211                unsafe {
212                    let start = self.var_offsets.raw().get_unchecked(var).unwrap_unchecked();
213                    *self.data.get_unchecked_mut(start + self.matches) =
214                        map.raw().get_unchecked(var).unwrap_unchecked();
215                }
216            }
217        } else {
218            for (var, val) in map.iter() {
219                self.add_mapping(var, &[*val]);
220            }
221        }
222
223        self.matches += 1;
224        self.assert_invariant();
225    }
226
227    /// A method that removes the bindings for the given variable and allows for its values to be
228    /// used independently from the [`Bindings`] struct. This is helpful when an operation needs to
229    /// mutably borrow the values for one value while reading the values for another.
230    ///
231    /// To add the values back, use [`Bindings::replace`].
232    pub(crate) fn take(&mut self, var: Variable) -> Option<ExtractedBinding> {
233        let mut vals: Pooled<Vec<Value>> = with_pool_set(|ps| ps.get());
234        vals.extend_from_slice(self.get(var)?);
235        let start = self.var_offsets.take(var)?;
236        Some(ExtractedBinding {
237            var,
238            offset: start,
239            vals,
240        })
241    }
242
243    /// Replace a binding extracted with [`Bindings::take`].
244    ///
245    /// # Panics
246    /// This method will panic if the length of the values in `bdg` does not match the current
247    /// number of matches in `Bindings`. It may panic if `bdg` was extracted from a different
248    /// [`Bindings`] than the one it is being replaced in.
249    pub(crate) fn replace(&mut self, bdg: ExtractedBinding) {
250        // Replace the binding with the new values.
251        let ExtractedBinding {
252            var,
253            offset,
254            mut vals,
255        } = bdg;
256        assert_eq!(vals.len(), self.matches);
257        self.data
258            .splice(offset..offset + self.matches, vals.drain(..));
259        self.var_offsets.insert(var, offset);
260    }
261}
262
263/// A binding that has been extracted from a [`Bindings`] struct via the [`Bindings::take`] method.
264///
265/// This allows for a variable's contents to be read while the [`Bindings`] struct has been
266/// borrowed mutably. The contents will not be readable until [`Bindings::replace`] is called.
267pub(crate) struct ExtractedBinding {
268    var: Variable,
269    offset: usize,
270    pub(crate) vals: Pooled<Vec<Value>>,
271}
272
273#[derive(Default)]
274pub(crate) struct PredictedVals {
275    #[allow(clippy::type_complexity)]
276    data: DashMap<(TableId, SmallVec<[Value; 3]>), Pooled<Vec<Value>>>,
277}
278
279impl Clear for PredictedVals {
280    fn reuse(&self) -> bool {
281        self.data.capacity() > 0
282    }
283    fn clear(&mut self) {
284        if self.data.len() > 500 && rayon::current_num_threads() > 1 {
285            self.data
286                .shards_mut()
287                .par_iter_mut()
288                .for_each(|shard| shard.get_mut().clear());
289        }
290        self.data.clear()
291    }
292    fn bytes(&self) -> usize {
293        self.data.capacity()
294            * (std::mem::size_of::<(TableId, SmallVec<[Value; 3]>)>()
295                + std::mem::size_of::<Pooled<Vec<Value>>>())
296    }
297}
298
299impl PredictedVals {
300    pub(crate) fn get_val(
301        &self,
302        table: TableId,
303        key: &[Value],
304        default: impl FnOnce() -> Pooled<Vec<Value>>,
305    ) -> impl Deref<Target = Pooled<Vec<Value>>> + '_ {
306        self.data
307            .entry((table, SmallVec::from_slice(key)))
308            .or_insert_with(default)
309    }
310}
311
312#[derive(Copy, Clone)]
313pub(crate) struct DbView<'a> {
314    pub(crate) table_info: &'a DenseIdMap<TableId, TableInfo>,
315    pub(crate) counters: &'a Counters,
316    pub(crate) external_funcs: &'a ExternalFunctions,
317    pub(crate) bases: &'a BaseValues,
318    pub(crate) containers: &'a ContainerValues,
319}
320
321/// A handle on a database that may be in the process of running a rule.
322///
323/// An ExecutionState grants immutable access to the (much of) the database, and also provides a
324/// limited API to mutate database contents.
325///
326/// A few important notes:
327///
328/// ## Some tables may be missing
329/// Callers external to this crate cannot construct an `ExecutionState` directly. Depending on the
330/// context, some tables may not be available. In particular, when running [`crate::Table::merge`]
331/// operations, only a table's read-side dependencies are available for reading (sim. for writing).
332/// This allows tables that do not need access to one another to be merged in parallel.
333///
334/// When executing a rule, ExecutionState has access to all tables.
335///
336/// ## Limited Mutability
337/// Callers can only stage insertsions and deletions to tables. These changes are not applied until
338/// the next call to `merge` on the underlying table.
339///
340/// ## Predicted Values
341/// ExecutionStates provide a means of synchronizing the results of a pending write across
342/// different executions of a rule. This is particularly important in the case where the result of
343/// an operation (such as "lookup or insert new id" operatiosn) is a fresh id. A common
344/// ExecutionState ensures that future lookups will see the same id (even across calls to
345/// [`ExecutionState::clone`]).
346pub struct ExecutionState<'a> {
347    pub(crate) predicted: &'a PredictedVals,
348    pub(crate) db: DbView<'a>,
349    pub(crate) buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
350    /// Whether any mutations have been staged via this ExecutionState.
351    pub(crate) changed: bool,
352}
353
354impl Clone for ExecutionState<'_> {
355    fn clone(&self) -> Self {
356        let mut res = ExecutionState {
357            predicted: self.predicted,
358            db: self.db,
359            buffers: DenseIdMap::new(),
360            changed: false,
361        };
362        for (id, buf) in self.buffers.iter() {
363            res.buffers.insert(id, buf.fresh_handle());
364        }
365        res
366    }
367}
368
369impl<'a> ExecutionState<'a> {
370    pub(crate) fn new(
371        predicted: &'a PredictedVals,
372        db: DbView<'a>,
373        buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
374    ) -> Self {
375        ExecutionState {
376            predicted,
377            db,
378            buffers,
379            changed: false,
380        }
381    }
382
383    /// Stage an insertion of the given row into `table`.
384    ///
385    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
386    pub fn stage_insert(&mut self, table: TableId, row: &[Value]) {
387        self.buffers
388            .get_or_insert(table, || self.db.table_info[table].table.new_buffer())
389            .stage_insert(row);
390        self.changed = true;
391    }
392
393    /// Stage a removal of the given row from `table` if it is present.
394    ///
395    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
396    pub fn stage_remove(&mut self, table: TableId, key: &[Value]) {
397        self.buffers
398            .get_or_insert(table, || self.db.table_info[table].table.new_buffer())
399            .stage_remove(key);
400        self.changed = true;
401    }
402
403    /// Call an external function.
404    pub fn call_external_func(
405        &mut self,
406        func: ExternalFunctionId,
407        args: &[Value],
408    ) -> Option<Value> {
409        self.db.external_funcs[func].invoke(self, args)
410    }
411
412    pub fn inc_counter(&self, ctr: CounterId) -> usize {
413        self.db.counters.inc(ctr)
414    }
415
416    pub fn read_counter(&self, ctr: CounterId) -> usize {
417        self.db.counters.read(ctr)
418    }
419
420    /// Get an immutable reference to the table with id `table`.
421    /// Dangerous: Reading from a table during action execution may break the semi-naive evaluation
422    pub fn get_table(&self, table: TableId) -> &'a WrappedTable {
423        &self.db.table_info[table].table
424    }
425
426    pub fn base_values(&self) -> &BaseValues {
427        self.db.bases
428    }
429
430    pub fn container_values(&self) -> &'a ContainerValues {
431        self.db.containers
432    }
433
434    /// Get the _current_ value for a given key in `table`, or otherwise insert
435    /// the unique _next_ value.
436    ///
437    /// Insertions into tables are not performed immediately, but rules and
438    /// merge functions sometimes need to get the result of an insertion. For
439    /// such cases, executions keep a cache of "predicted" values for a given
440    /// mapping that manage the insertions, etc.
441    ///
442    /// If you are using `egglog`, consider using `egglog_bridge::TableAction`.
443    pub fn predict_val(
444        &mut self,
445        table: TableId,
446        key: &[Value],
447        vals: impl ExactSizeIterator<Item = MergeVal>,
448    ) -> Pooled<Vec<Value>> {
449        if let Some(row) = self.db.table_info[table].table.get_row(key) {
450            return row.vals;
451        }
452        Pooled::cloned(
453            self.predicted
454                .get_val(table, key, || {
455                    Self::construct_new_row(
456                        &self.db,
457                        &mut self.buffers,
458                        &mut self.changed,
459                        table,
460                        key,
461                        vals,
462                    )
463                })
464                .deref(),
465        )
466    }
467
468    fn construct_new_row(
469        db: &DbView,
470        buffers: &mut DenseIdMap<TableId, Box<dyn MutationBuffer>>,
471        changed: &mut bool,
472        table: TableId,
473        key: &[Value],
474        vals: impl ExactSizeIterator<Item = MergeVal>,
475    ) -> Pooled<Vec<Value>> {
476        with_pool_set(|ps| {
477            let mut new = ps.get::<Vec<Value>>();
478            new.reserve(key.len() + vals.len());
479            new.extend_from_slice(key);
480            for val in vals {
481                new.push(match val {
482                    MergeVal::Counter(ctr) => Value::from_usize(db.counters.inc(ctr)),
483                    MergeVal::Constant(c) => c,
484                })
485            }
486            buffers
487                .get_or_insert(table, || db.table_info[table].table.new_buffer())
488                .stage_insert(&new);
489            *changed = true;
490            new
491        })
492    }
493
494    /// A variant of [`ExecutionState::predict_val`] that avoids materializing the full row, and
495    /// instead only returns the value in the given column.
496    pub fn predict_col(
497        &mut self,
498        table: TableId,
499        key: &[Value],
500        vals: impl ExactSizeIterator<Item = MergeVal>,
501        col: ColumnId,
502    ) -> Value {
503        if let Some(val) = self.db.table_info[table].table.get_row_column(key, col) {
504            return val;
505        }
506        self.predicted.get_val(table, key, || {
507            Self::construct_new_row(
508                &self.db,
509                &mut self.buffers,
510                &mut self.changed,
511                table,
512                key,
513                vals,
514            )
515        })[col.index()]
516    }
517}
518
519impl ExecutionState<'_> {
520    /// Returns the number of matches that make it to the end of the instructions
521    pub(crate) fn run_instrs(&mut self, instrs: &[Instr], bindings: &mut Bindings) -> usize {
522        if bindings.var_offsets.next_id().rep() == 0 {
523            // If we have no variables, we want to run the rules once.
524            bindings.matches = 1;
525        }
526
527        // Vectorized execution for larger batch sizes
528        let mut mask = with_pool_set(|ps| Mask::new(0..bindings.matches, ps));
529        for instr in instrs {
530            if mask.is_empty() {
531                return 0;
532            }
533            self.run_instr(&mut mask, instr, bindings);
534        }
535        mask.count_ones()
536    }
537    fn run_instr(&mut self, mask: &mut Mask, inst: &Instr, bindings: &mut Bindings) {
538        fn assert_impl(
539            bindings: &mut Bindings,
540            mask: &mut Mask,
541            l: &QueryEntry,
542            r: &QueryEntry,
543            op: impl Fn(Value, Value) -> bool,
544        ) {
545            match (l, r) {
546                (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
547                    mask.iter(&bindings[*v1])
548                        .zip(&bindings[*v2])
549                        .retain(|(v1, v2)| op(*v1, *v2));
550                }
551                (QueryEntry::Var(v), QueryEntry::Const(c))
552                | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
553                    mask.iter(&bindings[*v]).retain(|v| op(*v, *c));
554                }
555                (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
556                    if !op(*c1, *c2) {
557                        mask.clear();
558                    }
559                }
560            }
561        }
562
563        match inst {
564            Instr::LookupOrInsertDefault {
565                table: table_id,
566                args,
567                default,
568                dst_col,
569                dst_var,
570            } => {
571                let pool = with_pool_set(|ps| ps.get_pool::<Vec<Value>>().clone());
572                self.buffers.get_or_insert(*table_id, || {
573                    self.db.table_info[*table_id].table.new_buffer()
574                });
575                let table = &self.db.table_info[*table_id].table;
576                // Do two passes over the current vector. First, do a round of lookups. Then, for
577                // any offsets where the lookup failed, insert the default value.
578                let mut mask_copy = mask.clone();
579                table.lookup_row_vectorized(&mut mask_copy, bindings, args, *dst_col, *dst_var);
580                mask_copy.symmetric_difference(mask);
581                if mask_copy.is_empty() {
582                    return;
583                }
584                let mut out = bindings.take(*dst_var).unwrap();
585                for_each_binding_with_mask!(mask_copy, args.as_slice(), bindings, |iter| {
586                    iter.assign_vec(&mut out.vals, |offset, key| {
587                        // First, check if the entry is already in the table:
588                        // if let Some(row) = table.get_row_column(&key, *dst_col) {
589                        //     return row;
590                        // }
591                        // If not, insert the default value.
592                        //
593                        // We avoid doing this more than once by using the
594                        // `predicted` map.
595                        let prediction_key = (
596                            *table_id,
597                            SmallVec::<[Value; 3]>::from_slice(key.as_slice()),
598                        );
599                        let buffers = &mut self.buffers;
600                        // Bind some mutable references because the closure passed
601                        // to or_insert_with is `move`.
602                        let ctrs = &self.db.counters;
603                        let bindings = &bindings;
604                        let pool = pool.clone();
605                        let row =
606                            self.predicted
607                                .data
608                                .entry(prediction_key)
609                                .or_insert_with(move || {
610                                    let mut row = pool.get();
611                                    row.extend_from_slice(key.as_slice());
612                                    // Extend the key with the default values.
613                                    row.reserve(default.len());
614                                    for val in default {
615                                        let val = match val {
616                                            WriteVal::QueryEntry(QueryEntry::Const(c)) => *c,
617                                            WriteVal::QueryEntry(QueryEntry::Var(v)) => {
618                                                bindings[*v][offset]
619                                            }
620                                            WriteVal::IncCounter(ctr) => {
621                                                Value::from_usize(ctrs.inc(*ctr))
622                                            }
623                                            WriteVal::CurrentVal(ix) => row[*ix],
624                                        };
625                                        row.push(val)
626                                    }
627                                    // Insert it into the table.
628                                    buffers.get_mut(*table_id).unwrap().stage_insert(&row);
629                                    row
630                                });
631                        row[dst_col.index()]
632                    });
633                });
634                bindings.replace(out);
635            }
636            Instr::LookupWithDefault {
637                table,
638                args,
639                dst_col,
640                dst_var,
641                default,
642            } => {
643                let table = &self.db.table_info[*table].table;
644                table.lookup_with_default_vectorized(
645                    mask, bindings, args, *dst_col, *default, *dst_var,
646                );
647            }
648            Instr::Lookup {
649                table,
650                args,
651                dst_col,
652                dst_var,
653            } => {
654                let table = &self.db.table_info[*table].table;
655                table.lookup_row_vectorized(mask, bindings, args, *dst_col, *dst_var);
656            }
657
658            Instr::LookupWithFallback {
659                table: table_id,
660                table_key,
661                func,
662                func_args,
663                dst_col,
664                dst_var,
665            } => {
666                let table = &self.db.table_info[*table_id].table;
667                let mut lookup_result = mask.clone();
668                table.lookup_row_vectorized(
669                    &mut lookup_result,
670                    bindings,
671                    table_key,
672                    *dst_col,
673                    *dst_var,
674                );
675                let mut to_call_func = lookup_result.clone();
676                to_call_func.symmetric_difference(mask);
677                if to_call_func.is_empty() {
678                    return;
679                }
680
681                // Call the given external function on all entries where the lookup failed.
682                self.db.external_funcs[*func].invoke_batch_assign(
683                    self,
684                    &mut to_call_func,
685                    bindings,
686                    func_args,
687                    *dst_var,
688                );
689                // The new mask should be the lanes where the lookup succeeded or where `func`
690                // succeeded.
691                lookup_result.union(&to_call_func);
692                *mask = lookup_result;
693            }
694            Instr::Insert { table, vals } => {
695                for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
696                    iter.for_each(|vals| {
697                        self.stage_insert(*table, vals.as_slice());
698                    })
699                });
700            }
701            Instr::InsertIfEq { table, l, r, vals } => match (l, r) {
702                (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
703                    for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
704                        iter.zip(&bindings[*v1])
705                            .zip(&bindings[*v2])
706                            .for_each(|((vals, v1), v2)| {
707                                if v1 == v2 {
708                                    self.stage_insert(*table, &vals);
709                                }
710                            })
711                    })
712                }
713                (QueryEntry::Var(v), QueryEntry::Const(c))
714                | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
715                    for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
716                        iter.zip(&bindings[*v]).for_each(|(vals, cond)| {
717                            if cond == c {
718                                self.stage_insert(*table, &vals);
719                            }
720                        })
721                    })
722                }
723                (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
724                    if c1 == c2 {
725                        for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| iter
726                            .for_each(|vals| {
727                                self.stage_insert(*table, &vals);
728                            }))
729                    }
730                }
731            },
732            Instr::Remove { table, args } => {
733                for_each_binding_with_mask!(mask, args.as_slice(), bindings, |iter| {
734                    iter.for_each(|args| {
735                        self.stage_remove(*table, args.as_slice());
736                    })
737                });
738            }
739            Instr::External { func, args, dst } => {
740                self.db.external_funcs[*func].invoke_batch(self, mask, bindings, args, *dst);
741            }
742            Instr::ExternalWithFallback {
743                f1,
744                args1,
745                f2,
746                args2,
747                dst,
748            } => {
749                let mut f1_result = mask.clone();
750                self.db.external_funcs[*f1].invoke_batch(
751                    self,
752                    &mut f1_result,
753                    bindings,
754                    args1,
755                    *dst,
756                );
757                let mut to_call_f2 = f1_result.clone();
758                to_call_f2.symmetric_difference(mask);
759                if to_call_f2.is_empty() {
760                    return;
761                }
762                // Call the given external function on all entries where the first call failed.
763                self.db.external_funcs[*f2].invoke_batch_assign(
764                    self,
765                    &mut to_call_f2,
766                    bindings,
767                    args2,
768                    *dst,
769                );
770                // The new mask should be the lanes where either `f1` or `f2` succeeded.
771                f1_result.union(&to_call_f2);
772                *mask = f1_result;
773            }
774            Instr::AssertAnyNe { ops, divider } => {
775                for_each_binding_with_mask!(mask, ops.as_slice(), bindings, |iter| {
776                    iter.retain(|vals| {
777                        vals[0..*divider]
778                            .iter()
779                            .zip(&vals[*divider..])
780                            .any(|(l, r)| l != r)
781                    })
782                })
783            }
784            Instr::AssertEq(l, r) => assert_impl(bindings, mask, l, r, |l, r| l == r),
785            Instr::AssertNe(l, r) => assert_impl(bindings, mask, l, r, |l, r| l != r),
786            Instr::ReadCounter { counter, dst } => {
787                let mut vals = with_pool_set(|ps| ps.get::<Vec<Value>>());
788                let ctr_val = Value::from_usize(self.read_counter(*counter));
789                vals.resize(bindings.matches, ctr_val);
790                bindings.insert(*dst, &vals);
791            }
792        }
793    }
794}
795
796#[derive(Debug, Clone)]
797pub(crate) enum Instr {
798    /// Look up the value of the given table, inserting a new entry with a
799    /// default value if it is not there.
800    LookupOrInsertDefault {
801        table: TableId,
802        args: Vec<QueryEntry>,
803        default: Vec<WriteVal>,
804        dst_col: ColumnId,
805        dst_var: Variable,
806    },
807
808    /// Look up the value of the given table; if the value is not there, use the
809    /// given default.
810    LookupWithDefault {
811        table: TableId,
812        args: Vec<QueryEntry>,
813        dst_col: ColumnId,
814        dst_var: Variable,
815        default: QueryEntry,
816    },
817
818    /// Look up a value of the given table, halting execution if it is not
819    /// there.
820    Lookup {
821        table: TableId,
822        args: Vec<QueryEntry>,
823        dst_col: ColumnId,
824        dst_var: Variable,
825    },
826
827    /// Look up the given key in the table: if the value is not present in the given table, then
828    /// call the given external function with the given arguments. If the external function returns
829    /// a value, that value is returned in the given `dst_var`. If the lookup fails and the
830    /// external function does not return a value, then execution is halted.
831    LookupWithFallback {
832        table: TableId,
833        table_key: Vec<QueryEntry>,
834        func: ExternalFunctionId,
835        func_args: Vec<QueryEntry>,
836        dst_col: ColumnId,
837        dst_var: Variable,
838    },
839
840    /// Insert the given return value value with the provided arguments into the
841    /// table.
842    Insert {
843        table: TableId,
844        vals: Vec<QueryEntry>,
845    },
846
847    /// Insert `vals` into `table` if `l` and `r` are equal.
848    InsertIfEq {
849        table: TableId,
850        l: QueryEntry,
851        r: QueryEntry,
852        vals: Vec<QueryEntry>,
853    },
854
855    /// Remove the entry corresponding to `args` in `func`.
856    Remove {
857        table: TableId,
858        args: Vec<QueryEntry>,
859    },
860
861    /// Bind the result of the external function to a variable.
862    External {
863        func: ExternalFunctionId,
864        args: Vec<QueryEntry>,
865        dst: Variable,
866    },
867
868    /// Bind the result of the external function to a variable. If the first external function
869    /// fails, then use the second external function. If both fail, execution is haulted, (as in a
870    /// single failure of `External`).
871    ExternalWithFallback {
872        f1: ExternalFunctionId,
873        args1: Vec<QueryEntry>,
874        f2: ExternalFunctionId,
875        args2: Vec<QueryEntry>,
876        dst: Variable,
877    },
878
879    /// Continue execution iff the two variables are equal.
880    AssertEq(QueryEntry, QueryEntry),
881
882    /// Continue execution iff the two variables are not equal.
883    AssertNe(QueryEntry, QueryEntry),
884
885    /// For the two slices: ops[0..divider] and ops[divider..], continue
886    /// execution iff there is one pair of values at the same offset that are
887    /// not equal.
888    AssertAnyNe {
889        ops: Vec<QueryEntry>,
890        divider: usize,
891    },
892
893    /// Read the value of a counter and write it to the given variable.
894    ReadCounter {
895        /// The counter to broadcast.
896        counter: CounterId,
897        /// The variable to write the value to.
898        dst: Variable,
899    },
900}