1use std::ops::Deref;
6
7use crate::numeric_id::{DenseIdMap, NumericId};
8use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
9use smallvec::SmallVec;
10
11use crate::{
12 BaseValues, ContainerValues, ExternalFunctionId, WrappedTable,
13 common::{DashMap, Value},
14 free_join::{CounterId, Counters, ExternalFunctions, TableId, TableInfo, Variable},
15 pool::{Clear, Pooled, with_pool_set},
16 table_spec::{ColumnId, MutationBuffer},
17};
18
19use self::mask::{Mask, MaskIter, ValueSource};
20
21#[macro_use]
22pub(crate) mod mask;
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Copy, Clone, Debug)]
31pub enum QueryEntry {
32 Var(Variable),
33 Const(Value),
34}
35
36impl From<Variable> for QueryEntry {
37 fn from(var: Variable) -> Self {
38 QueryEntry::Var(var)
39 }
40}
41
42impl From<Value> for QueryEntry {
43 fn from(val: Value) -> Self {
44 QueryEntry::Const(val)
45 }
46}
47
48#[derive(Debug, Clone, Copy)]
50pub enum WriteVal {
51 QueryEntry(QueryEntry),
53 IncCounter(CounterId),
55 CurrentVal(usize),
57}
58
59impl<T> From<T> for WriteVal
60where
61 T: Into<QueryEntry>,
62{
63 fn from(val: T) -> Self {
64 WriteVal::QueryEntry(val.into())
65 }
66}
67
68impl From<CounterId> for WriteVal {
69 fn from(ctr: CounterId) -> Self {
70 WriteVal::IncCounter(ctr)
71 }
72}
73
74#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
76pub enum MergeVal {
77 Counter(CounterId),
79 Constant(Value),
81}
82
83impl From<CounterId> for MergeVal {
84 fn from(ctr: CounterId) -> Self {
85 MergeVal::Counter(ctr)
86 }
87}
88
89impl From<Value> for MergeVal {
90 fn from(val: Value) -> Self {
91 MergeVal::Constant(val)
92 }
93}
94
95#[derive(Debug)]
100pub(crate) struct Bindings {
101 matches: usize,
102 max_batch_size: usize,
106 data: Pooled<Vec<Value>>,
107 var_offsets: DenseIdMap<Variable, usize>,
109}
110
111impl std::ops::Index<Variable> for Bindings {
112 type Output = [Value];
113 fn index(&self, var: Variable) -> &[Value] {
114 self.get(var).unwrap()
115 }
116}
117
118impl Bindings {
119 pub(crate) fn new(max_batch_size: usize) -> Self {
120 Bindings {
121 matches: 0,
122 max_batch_size,
123 data: Default::default(),
124 var_offsets: DenseIdMap::new(),
125 }
126 }
127 fn assert_invariant(&self) {
128 #[cfg(debug_assertions)]
129 {
130 assert!(self.matches <= self.max_batch_size);
131 for (var, start) in self.var_offsets.iter() {
132 assert!(
133 start + self.matches <= self.data.len(),
134 "Variable {:?} starts at {}, but data only has {} elements",
135 var,
136 start,
137 self.data.len()
138 );
139 }
140 }
141 }
142
143 pub(crate) fn clear(&mut self) {
144 self.matches = 0;
145 self.var_offsets.clear();
146 self.data.clear();
147 self.assert_invariant();
148 }
149
150 fn get(&self, var: Variable) -> Option<&[Value]> {
151 let start = self.var_offsets.get(var)?;
152 Some(&self.data[*start..*start + self.matches])
153 }
154
155 fn add_mapping(&mut self, var: Variable, vals: &[Value]) {
156 let start = self.data.len();
157 self.data.extend_from_slice(vals);
158 debug_assert!(vals.len() <= self.max_batch_size);
162 if vals.len() < self.max_batch_size {
163 let target_len = self.data.len() + self.max_batch_size - vals.len();
164 self.data.resize(target_len, Value::stale());
165 }
166 self.var_offsets.insert(var, start);
167 }
168
169 pub(crate) fn insert(&mut self, var: Variable, vals: &[Value]) {
170 if self.var_offsets.n_ids() == 0 {
171 self.matches = vals.len();
172 } else {
173 assert_eq!(self.matches, vals.len());
174 }
175 self.add_mapping(var, vals);
176 self.assert_invariant();
177 }
178
179 pub(crate) unsafe fn push(
189 &mut self,
190 map: &DenseIdMap<Variable, Value>,
191 used_vars: &[Variable],
192 ) {
193 if self.matches != 0 {
194 assert!(self.matches < self.max_batch_size);
195 #[cfg(debug_assertions)]
196 {
197 for var in used_vars {
198 assert!(
199 self.var_offsets.get(*var).is_some(),
200 "Variable {:?} not found in bindings {:?}",
201 var,
202 self.var_offsets
203 );
204 }
205 }
206 for var in used_vars {
207 let var = var.index();
208 unsafe {
212 let start = self.var_offsets.raw().get_unchecked(var).unwrap_unchecked();
213 *self.data.get_unchecked_mut(start + self.matches) =
214 map.raw().get_unchecked(var).unwrap_unchecked();
215 }
216 }
217 } else {
218 for (var, val) in map.iter() {
219 self.add_mapping(var, &[*val]);
220 }
221 }
222
223 self.matches += 1;
224 self.assert_invariant();
225 }
226
227 pub(crate) fn take(&mut self, var: Variable) -> Option<ExtractedBinding> {
233 let mut vals: Pooled<Vec<Value>> = with_pool_set(|ps| ps.get());
234 vals.extend_from_slice(self.get(var)?);
235 let start = self.var_offsets.take(var)?;
236 Some(ExtractedBinding {
237 var,
238 offset: start,
239 vals,
240 })
241 }
242
243 pub(crate) fn replace(&mut self, bdg: ExtractedBinding) {
250 let ExtractedBinding {
252 var,
253 offset,
254 mut vals,
255 } = bdg;
256 assert_eq!(vals.len(), self.matches);
257 self.data
258 .splice(offset..offset + self.matches, vals.drain(..));
259 self.var_offsets.insert(var, offset);
260 }
261}
262
263pub(crate) struct ExtractedBinding {
268 var: Variable,
269 offset: usize,
270 pub(crate) vals: Pooled<Vec<Value>>,
271}
272
273#[derive(Default)]
274pub(crate) struct PredictedVals {
275 #[allow(clippy::type_complexity)]
276 data: DashMap<(TableId, SmallVec<[Value; 3]>), Pooled<Vec<Value>>>,
277}
278
279impl Clear for PredictedVals {
280 fn reuse(&self) -> bool {
281 self.data.capacity() > 0
282 }
283 fn clear(&mut self) {
284 if self.data.len() > 500 && rayon::current_num_threads() > 1 {
285 self.data
286 .shards_mut()
287 .par_iter_mut()
288 .for_each(|shard| shard.get_mut().clear());
289 }
290 self.data.clear()
291 }
292 fn bytes(&self) -> usize {
293 self.data.capacity()
294 * (std::mem::size_of::<(TableId, SmallVec<[Value; 3]>)>()
295 + std::mem::size_of::<Pooled<Vec<Value>>>())
296 }
297}
298
299impl PredictedVals {
300 pub(crate) fn get_val(
301 &self,
302 table: TableId,
303 key: &[Value],
304 default: impl FnOnce() -> Pooled<Vec<Value>>,
305 ) -> impl Deref<Target = Pooled<Vec<Value>>> + '_ {
306 self.data
307 .entry((table, SmallVec::from_slice(key)))
308 .or_insert_with(default)
309 }
310}
311
312#[derive(Copy, Clone)]
313pub(crate) struct DbView<'a> {
314 pub(crate) table_info: &'a DenseIdMap<TableId, TableInfo>,
315 pub(crate) counters: &'a Counters,
316 pub(crate) external_funcs: &'a ExternalFunctions,
317 pub(crate) bases: &'a BaseValues,
318 pub(crate) containers: &'a ContainerValues,
319}
320
321pub struct ExecutionState<'a> {
347 pub(crate) predicted: &'a PredictedVals,
348 pub(crate) db: DbView<'a>,
349 pub(crate) buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
350 pub(crate) changed: bool,
352}
353
354impl Clone for ExecutionState<'_> {
355 fn clone(&self) -> Self {
356 let mut res = ExecutionState {
357 predicted: self.predicted,
358 db: self.db,
359 buffers: DenseIdMap::new(),
360 changed: false,
361 };
362 for (id, buf) in self.buffers.iter() {
363 res.buffers.insert(id, buf.fresh_handle());
364 }
365 res
366 }
367}
368
369impl<'a> ExecutionState<'a> {
370 pub(crate) fn new(
371 predicted: &'a PredictedVals,
372 db: DbView<'a>,
373 buffers: DenseIdMap<TableId, Box<dyn MutationBuffer>>,
374 ) -> Self {
375 ExecutionState {
376 predicted,
377 db,
378 buffers,
379 changed: false,
380 }
381 }
382
383 pub fn stage_insert(&mut self, table: TableId, row: &[Value]) {
387 self.buffers
388 .get_or_insert(table, || self.db.table_info[table].table.new_buffer())
389 .stage_insert(row);
390 self.changed = true;
391 }
392
393 pub fn stage_remove(&mut self, table: TableId, key: &[Value]) {
397 self.buffers
398 .get_or_insert(table, || self.db.table_info[table].table.new_buffer())
399 .stage_remove(key);
400 self.changed = true;
401 }
402
403 pub fn call_external_func(
405 &mut self,
406 func: ExternalFunctionId,
407 args: &[Value],
408 ) -> Option<Value> {
409 self.db.external_funcs[func].invoke(self, args)
410 }
411
412 pub fn inc_counter(&self, ctr: CounterId) -> usize {
413 self.db.counters.inc(ctr)
414 }
415
416 pub fn read_counter(&self, ctr: CounterId) -> usize {
417 self.db.counters.read(ctr)
418 }
419
420 pub fn get_table(&self, table: TableId) -> &'a WrappedTable {
423 &self.db.table_info[table].table
424 }
425
426 pub fn base_values(&self) -> &BaseValues {
427 self.db.bases
428 }
429
430 pub fn container_values(&self) -> &'a ContainerValues {
431 self.db.containers
432 }
433
434 pub fn predict_val(
444 &mut self,
445 table: TableId,
446 key: &[Value],
447 vals: impl ExactSizeIterator<Item = MergeVal>,
448 ) -> Pooled<Vec<Value>> {
449 if let Some(row) = self.db.table_info[table].table.get_row(key) {
450 return row.vals;
451 }
452 Pooled::cloned(
453 self.predicted
454 .get_val(table, key, || {
455 Self::construct_new_row(
456 &self.db,
457 &mut self.buffers,
458 &mut self.changed,
459 table,
460 key,
461 vals,
462 )
463 })
464 .deref(),
465 )
466 }
467
468 fn construct_new_row(
469 db: &DbView,
470 buffers: &mut DenseIdMap<TableId, Box<dyn MutationBuffer>>,
471 changed: &mut bool,
472 table: TableId,
473 key: &[Value],
474 vals: impl ExactSizeIterator<Item = MergeVal>,
475 ) -> Pooled<Vec<Value>> {
476 with_pool_set(|ps| {
477 let mut new = ps.get::<Vec<Value>>();
478 new.reserve(key.len() + vals.len());
479 new.extend_from_slice(key);
480 for val in vals {
481 new.push(match val {
482 MergeVal::Counter(ctr) => Value::from_usize(db.counters.inc(ctr)),
483 MergeVal::Constant(c) => c,
484 })
485 }
486 buffers
487 .get_or_insert(table, || db.table_info[table].table.new_buffer())
488 .stage_insert(&new);
489 *changed = true;
490 new
491 })
492 }
493
494 pub fn predict_col(
497 &mut self,
498 table: TableId,
499 key: &[Value],
500 vals: impl ExactSizeIterator<Item = MergeVal>,
501 col: ColumnId,
502 ) -> Value {
503 if let Some(val) = self.db.table_info[table].table.get_row_column(key, col) {
504 return val;
505 }
506 self.predicted.get_val(table, key, || {
507 Self::construct_new_row(
508 &self.db,
509 &mut self.buffers,
510 &mut self.changed,
511 table,
512 key,
513 vals,
514 )
515 })[col.index()]
516 }
517}
518
519impl ExecutionState<'_> {
520 pub(crate) fn run_instrs(&mut self, instrs: &[Instr], bindings: &mut Bindings) -> usize {
522 if bindings.var_offsets.next_id().rep() == 0 {
523 bindings.matches = 1;
525 }
526
527 let mut mask = with_pool_set(|ps| Mask::new(0..bindings.matches, ps));
529 for instr in instrs {
530 if mask.is_empty() {
531 return 0;
532 }
533 self.run_instr(&mut mask, instr, bindings);
534 }
535 mask.count_ones()
536 }
537 fn run_instr(&mut self, mask: &mut Mask, inst: &Instr, bindings: &mut Bindings) {
538 fn assert_impl(
539 bindings: &mut Bindings,
540 mask: &mut Mask,
541 l: &QueryEntry,
542 r: &QueryEntry,
543 op: impl Fn(Value, Value) -> bool,
544 ) {
545 match (l, r) {
546 (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
547 mask.iter(&bindings[*v1])
548 .zip(&bindings[*v2])
549 .retain(|(v1, v2)| op(*v1, *v2));
550 }
551 (QueryEntry::Var(v), QueryEntry::Const(c))
552 | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
553 mask.iter(&bindings[*v]).retain(|v| op(*v, *c));
554 }
555 (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
556 if !op(*c1, *c2) {
557 mask.clear();
558 }
559 }
560 }
561 }
562
563 match inst {
564 Instr::LookupOrInsertDefault {
565 table: table_id,
566 args,
567 default,
568 dst_col,
569 dst_var,
570 } => {
571 let pool = with_pool_set(|ps| ps.get_pool::<Vec<Value>>().clone());
572 self.buffers.get_or_insert(*table_id, || {
573 self.db.table_info[*table_id].table.new_buffer()
574 });
575 let table = &self.db.table_info[*table_id].table;
576 let mut mask_copy = mask.clone();
579 table.lookup_row_vectorized(&mut mask_copy, bindings, args, *dst_col, *dst_var);
580 mask_copy.symmetric_difference(mask);
581 if mask_copy.is_empty() {
582 return;
583 }
584 let mut out = bindings.take(*dst_var).unwrap();
585 for_each_binding_with_mask!(mask_copy, args.as_slice(), bindings, |iter| {
586 iter.assign_vec(&mut out.vals, |offset, key| {
587 let prediction_key = (
596 *table_id,
597 SmallVec::<[Value; 3]>::from_slice(key.as_slice()),
598 );
599 let buffers = &mut self.buffers;
600 let ctrs = &self.db.counters;
603 let bindings = &bindings;
604 let pool = pool.clone();
605 let row =
606 self.predicted
607 .data
608 .entry(prediction_key)
609 .or_insert_with(move || {
610 let mut row = pool.get();
611 row.extend_from_slice(key.as_slice());
612 row.reserve(default.len());
614 for val in default {
615 let val = match val {
616 WriteVal::QueryEntry(QueryEntry::Const(c)) => *c,
617 WriteVal::QueryEntry(QueryEntry::Var(v)) => {
618 bindings[*v][offset]
619 }
620 WriteVal::IncCounter(ctr) => {
621 Value::from_usize(ctrs.inc(*ctr))
622 }
623 WriteVal::CurrentVal(ix) => row[*ix],
624 };
625 row.push(val)
626 }
627 buffers.get_mut(*table_id).unwrap().stage_insert(&row);
629 row
630 });
631 row[dst_col.index()]
632 });
633 });
634 bindings.replace(out);
635 }
636 Instr::LookupWithDefault {
637 table,
638 args,
639 dst_col,
640 dst_var,
641 default,
642 } => {
643 let table = &self.db.table_info[*table].table;
644 table.lookup_with_default_vectorized(
645 mask, bindings, args, *dst_col, *default, *dst_var,
646 );
647 }
648 Instr::Lookup {
649 table,
650 args,
651 dst_col,
652 dst_var,
653 } => {
654 let table = &self.db.table_info[*table].table;
655 table.lookup_row_vectorized(mask, bindings, args, *dst_col, *dst_var);
656 }
657
658 Instr::LookupWithFallback {
659 table: table_id,
660 table_key,
661 func,
662 func_args,
663 dst_col,
664 dst_var,
665 } => {
666 let table = &self.db.table_info[*table_id].table;
667 let mut lookup_result = mask.clone();
668 table.lookup_row_vectorized(
669 &mut lookup_result,
670 bindings,
671 table_key,
672 *dst_col,
673 *dst_var,
674 );
675 let mut to_call_func = lookup_result.clone();
676 to_call_func.symmetric_difference(mask);
677 if to_call_func.is_empty() {
678 return;
679 }
680
681 self.db.external_funcs[*func].invoke_batch_assign(
683 self,
684 &mut to_call_func,
685 bindings,
686 func_args,
687 *dst_var,
688 );
689 lookup_result.union(&to_call_func);
692 *mask = lookup_result;
693 }
694 Instr::Insert { table, vals } => {
695 for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
696 iter.for_each(|vals| {
697 self.stage_insert(*table, vals.as_slice());
698 })
699 });
700 }
701 Instr::InsertIfEq { table, l, r, vals } => match (l, r) {
702 (QueryEntry::Var(v1), QueryEntry::Var(v2)) => {
703 for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
704 iter.zip(&bindings[*v1])
705 .zip(&bindings[*v2])
706 .for_each(|((vals, v1), v2)| {
707 if v1 == v2 {
708 self.stage_insert(*table, &vals);
709 }
710 })
711 })
712 }
713 (QueryEntry::Var(v), QueryEntry::Const(c))
714 | (QueryEntry::Const(c), QueryEntry::Var(v)) => {
715 for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| {
716 iter.zip(&bindings[*v]).for_each(|(vals, cond)| {
717 if cond == c {
718 self.stage_insert(*table, &vals);
719 }
720 })
721 })
722 }
723 (QueryEntry::Const(c1), QueryEntry::Const(c2)) => {
724 if c1 == c2 {
725 for_each_binding_with_mask!(mask, vals.as_slice(), bindings, |iter| iter
726 .for_each(|vals| {
727 self.stage_insert(*table, &vals);
728 }))
729 }
730 }
731 },
732 Instr::Remove { table, args } => {
733 for_each_binding_with_mask!(mask, args.as_slice(), bindings, |iter| {
734 iter.for_each(|args| {
735 self.stage_remove(*table, args.as_slice());
736 })
737 });
738 }
739 Instr::External { func, args, dst } => {
740 self.db.external_funcs[*func].invoke_batch(self, mask, bindings, args, *dst);
741 }
742 Instr::ExternalWithFallback {
743 f1,
744 args1,
745 f2,
746 args2,
747 dst,
748 } => {
749 let mut f1_result = mask.clone();
750 self.db.external_funcs[*f1].invoke_batch(
751 self,
752 &mut f1_result,
753 bindings,
754 args1,
755 *dst,
756 );
757 let mut to_call_f2 = f1_result.clone();
758 to_call_f2.symmetric_difference(mask);
759 if to_call_f2.is_empty() {
760 return;
761 }
762 self.db.external_funcs[*f2].invoke_batch_assign(
764 self,
765 &mut to_call_f2,
766 bindings,
767 args2,
768 *dst,
769 );
770 f1_result.union(&to_call_f2);
772 *mask = f1_result;
773 }
774 Instr::AssertAnyNe { ops, divider } => {
775 for_each_binding_with_mask!(mask, ops.as_slice(), bindings, |iter| {
776 iter.retain(|vals| {
777 vals[0..*divider]
778 .iter()
779 .zip(&vals[*divider..])
780 .any(|(l, r)| l != r)
781 })
782 })
783 }
784 Instr::AssertEq(l, r) => assert_impl(bindings, mask, l, r, |l, r| l == r),
785 Instr::AssertNe(l, r) => assert_impl(bindings, mask, l, r, |l, r| l != r),
786 Instr::ReadCounter { counter, dst } => {
787 let mut vals = with_pool_set(|ps| ps.get::<Vec<Value>>());
788 let ctr_val = Value::from_usize(self.read_counter(*counter));
789 vals.resize(bindings.matches, ctr_val);
790 bindings.insert(*dst, &vals);
791 }
792 }
793 }
794}
795
796#[derive(Debug, Clone)]
797pub(crate) enum Instr {
798 LookupOrInsertDefault {
801 table: TableId,
802 args: Vec<QueryEntry>,
803 default: Vec<WriteVal>,
804 dst_col: ColumnId,
805 dst_var: Variable,
806 },
807
808 LookupWithDefault {
811 table: TableId,
812 args: Vec<QueryEntry>,
813 dst_col: ColumnId,
814 dst_var: Variable,
815 default: QueryEntry,
816 },
817
818 Lookup {
821 table: TableId,
822 args: Vec<QueryEntry>,
823 dst_col: ColumnId,
824 dst_var: Variable,
825 },
826
827 LookupWithFallback {
832 table: TableId,
833 table_key: Vec<QueryEntry>,
834 func: ExternalFunctionId,
835 func_args: Vec<QueryEntry>,
836 dst_col: ColumnId,
837 dst_var: Variable,
838 },
839
840 Insert {
843 table: TableId,
844 vals: Vec<QueryEntry>,
845 },
846
847 InsertIfEq {
849 table: TableId,
850 l: QueryEntry,
851 r: QueryEntry,
852 vals: Vec<QueryEntry>,
853 },
854
855 Remove {
857 table: TableId,
858 args: Vec<QueryEntry>,
859 },
860
861 External {
863 func: ExternalFunctionId,
864 args: Vec<QueryEntry>,
865 dst: Variable,
866 },
867
868 ExternalWithFallback {
872 f1: ExternalFunctionId,
873 args1: Vec<QueryEntry>,
874 f2: ExternalFunctionId,
875 args2: Vec<QueryEntry>,
876 dst: Variable,
877 },
878
879 AssertEq(QueryEntry, QueryEntry),
881
882 AssertNe(QueryEntry, QueryEntry),
884
885 AssertAnyNe {
889 ops: Vec<QueryEntry>,
890 divider: usize,
891 },
892
893 ReadCounter {
895 counter: CounterId,
897 dst: Variable,
899 },
900}