egglog_core_relations/free_join/
mod.rs

1//! Execute queries against a database using a variant of Free Join.
2use std::{
3    mem,
4    sync::{
5        Arc,
6        atomic::{AtomicUsize, Ordering},
7    },
8};
9
10use crate::numeric_id::{DenseIdMap, DenseIdMapWithReuse, NumericId, define_id};
11use egglog_concurrency::ResettableOnceLock;
12use rayon::prelude::*;
13use smallvec::SmallVec;
14use web_time::Duration;
15
16use crate::{
17    BaseValues, ContainerValues, PoolSet, QueryEntry, TupleIndex, Value,
18    action::{
19        Bindings, DbView,
20        mask::{Mask, MaskIter, ValueSource},
21    },
22    common::{DashMap, iter_dashmap_bulk},
23    dependency_graph::DependencyGraph,
24    hash_index::{ColumnIndex, Index, IndexBase},
25    offsets::Subset,
26    parallel_heuristics::parallelize_db_level_op,
27    pool::{Pool, Pooled, with_pool_set},
28    query::{Query, RuleSetBuilder},
29    table_spec::{
30        ColumnId, Constraint, MutationBuffer, Table, TableSpec, WrappedTable, WrappedTableRef,
31    },
32};
33
34use self::plan::Plan;
35use crate::action::{ExecutionState, PredictedVals};
36
37pub(crate) mod execute;
38pub(crate) mod frame_update;
39pub(crate) mod plan;
40
41define_id!(
42    pub AtomId,
43    u32,
44    "A component of a query consisting of a function and a list of variables or constants"
45);
46define_id!(pub Variable, u32, "a variable in a query");
47
48impl Variable {
49    pub fn placeholder() -> Variable {
50        Variable::new(!0)
51    }
52}
53
54define_id!(pub TableId, u32, "a table in the database");
55define_id!(pub(crate) ActionId, u32, "an identifier picking out the RHS of a rule");
56
57#[derive(Debug)]
58pub(crate) struct ProcessedConstraints {
59    /// The subset of the table matching the fast constraints. If there are no
60    /// fast constraints then this is the full table.
61    pub(crate) subset: Subset,
62    /// The constraints that can be evaluated quickly (O(log(n)) or O(1)).
63    pub(crate) fast: Pooled<Vec<Constraint>>,
64    /// The constraints that require an O(n) scan to evaluate.
65    pub(crate) slow: Pooled<Vec<Constraint>>,
66}
67
68impl Clone for ProcessedConstraints {
69    fn clone(&self) -> Self {
70        ProcessedConstraints {
71            subset: self.subset.clone(),
72            fast: Pooled::cloned(&self.fast),
73            slow: Pooled::cloned(&self.slow),
74        }
75    }
76}
77
78impl ProcessedConstraints {
79    /// The size of the subset of the table matching the fast constraints.
80    fn approx_size(&self) -> usize {
81        self.subset.size()
82    }
83}
84
85#[derive(Clone, Debug, PartialEq, Eq)]
86pub(crate) struct SubAtom {
87    pub(crate) atom: AtomId,
88    pub(crate) vars: SmallVec<[ColumnId; 2]>,
89}
90
91impl SubAtom {
92    pub(crate) fn new(atom: AtomId) -> SubAtom {
93        SubAtom {
94            atom,
95            vars: Default::default(),
96        }
97    }
98}
99
100#[derive(Debug)]
101pub(crate) struct VarInfo {
102    pub(crate) occurrences: Vec<SubAtom>,
103    /// Whether or not this variable shows up in the "actions" portion of a
104    /// rule.
105    pub(crate) used_in_rhs: bool,
106    pub(crate) defined_in_rhs: bool,
107}
108
109pub(crate) type HashIndex = Arc<ResettableOnceLock<Index<TupleIndex>>>;
110pub(crate) type HashColumnIndex = Arc<ResettableOnceLock<Index<ColumnIndex>>>;
111
112pub struct TableInfo {
113    pub(crate) spec: TableSpec,
114    pub(crate) table: WrappedTable,
115    pub(crate) indexes: DashMap<SmallVec<[ColumnId; 4]>, HashIndex>,
116    pub(crate) column_indexes: DashMap<ColumnId, HashColumnIndex>,
117}
118
119impl Clone for TableInfo {
120    fn clone(&self) -> Self {
121        fn deep_clone_map<K: Clone + std::hash::Hash + Eq, TI: IndexBase + Clone>(
122            map: &DashMap<K, Arc<ResettableOnceLock<Index<TI>>>>,
123            table: WrappedTableRef,
124        ) -> DashMap<K, Arc<ResettableOnceLock<Index<TI>>>> {
125            map.iter()
126                .map(|table_ref| {
127                    let (k, v) = table_ref.pair();
128                    let v: Index<TI> = v
129                        .get_or_update(|index| {
130                            index.refresh(table);
131                        })
132                        .clone();
133                    (k.clone(), Arc::new(ResettableOnceLock::new(v)))
134                })
135                .collect()
136        }
137        TableInfo {
138            spec: self.spec.clone(),
139            table: self.table.dyn_clone(),
140            indexes: deep_clone_map(&self.indexes, self.table.as_ref()),
141            column_indexes: deep_clone_map(&self.column_indexes, self.table.as_ref()),
142        }
143    }
144}
145
146define_id!(pub CounterId, u32, "A counter accessible to actions, useful for generating unique Ids.");
147define_id!(pub ExternalFunctionId, u32, "A user-defined operation that can be invoked from a query");
148
149/// External functions allow external callers to manipulate database state in
150/// near-arbitrary ways.
151///
152/// This is a useful, if low-level, interface for extending this database with
153/// functionality and state not built into the core model.
154pub trait ExternalFunction: dyn_clone::DynClone + Send + Sync {
155    /// Invoke the function with mutable access to the database. If a value is
156    /// not returned, halt the execution of the current rule.
157    fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value>;
158}
159
160/// Automatically generate an `ExternalFunction` implementation from a function.
161pub fn make_external_func<
162    F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
163>(
164    f: F,
165) -> impl ExternalFunction {
166    #[derive(Clone)]
167    struct Wrapped<F>(F);
168    impl<F> ExternalFunction for Wrapped<F>
169    where
170        F: Fn(&mut ExecutionState, &[Value]) -> Option<Value> + Clone + Send + Sync,
171    {
172        fn invoke(&self, state: &mut ExecutionState, args: &[Value]) -> Option<Value> {
173            (self.0)(state, args)
174        }
175    }
176    Wrapped(f)
177}
178
179pub(crate) trait ExternalFunctionExt: ExternalFunction {
180    /// A vectorized variant of `invoke` to avoid repeated dynamic dispatch.
181    ///
182    /// Implementors should not override this manually (in fact, they shouldn't
183    /// even be able to; some types are private); the default implementation
184    /// delegates core logic to `invoke`.
185    #[doc(hidden)]
186    fn invoke_batch(
187        &self,
188        state: &mut ExecutionState,
189        mask: &mut Mask,
190        bindings: &mut Bindings,
191        args: &[QueryEntry],
192        out_var: Variable,
193    ) {
194        let pool: Pool<Vec<Value>> = with_pool_set(|ps| ps.get_pool().clone());
195        let mut out = pool.get();
196        out.reserve(mask.len());
197        for_each_binding_with_mask!(mask, args, bindings, |iter| {
198            iter.fill_vec(&mut out, Value::stale, |_, args| {
199                self.invoke(state, args.as_slice())
200            });
201        });
202        bindings.insert(out_var, &out);
203    }
204
205    /// A variant of [`ExternalFunctionExt::invoke_batch`] that overwrites the output variable,
206    /// rather than assigning all new values.
207    ///
208    /// *Panics* This method will panic if `out_var` doesn't already have an appropriately-sized
209    /// vector bound in `bindings`.
210    #[doc(hidden)]
211    fn invoke_batch_assign(
212        &self,
213        state: &mut ExecutionState,
214        mask: &mut Mask,
215        bindings: &mut Bindings,
216        args: &[QueryEntry],
217        out_var: Variable,
218    ) {
219        let mut out = bindings.take(out_var).expect("out_var must be bound");
220        for_each_binding_with_mask!(mask, args, bindings, |iter| {
221            iter.assign_vec_and_retain(&mut out.vals, |_, args| self.invoke(state, &args))
222        });
223        bindings.replace(out);
224    }
225}
226
227impl<T: ExternalFunction> ExternalFunctionExt for T {}
228
229// Implements `Clone` for `Box<dyn ExternalFunctionExt>`.
230dyn_clone::clone_trait_object!(ExternalFunctionExt);
231
232pub(crate) type ExternalFunctions =
233    DenseIdMapWithReuse<ExternalFunctionId, Box<dyn ExternalFunctionExt>>;
234
235#[derive(Default)]
236pub(crate) struct Counters(DenseIdMap<CounterId, AtomicUsize>);
237
238impl Clone for Counters {
239    fn clone(&self) -> Counters {
240        let mut map = DenseIdMap::new();
241        for (k, v) in self.0.iter() {
242            // NB: we may want to experiment with Ordering::Relaxed here.
243            map.insert(k, AtomicUsize::new(v.load(Ordering::SeqCst)))
244        }
245        Counters(map)
246    }
247}
248
249impl Counters {
250    pub(crate) fn read(&self, ctr: CounterId) -> usize {
251        self.0[ctr].load(Ordering::Acquire)
252    }
253    pub(crate) fn inc(&self, ctr: CounterId) -> usize {
254        // We synchronize with `read_counter` but not with other increments.
255        // NB: we may want to experiment with Ordering::Relaxed here.
256        self.0[ctr].fetch_add(1, Ordering::Release)
257    }
258}
259
260#[derive(Debug, Default)]
261pub struct RuleSetReport {
262    pub changed: bool,
263    pub rule_reports: DashMap<String, RuleReport>,
264    pub search_and_apply_time: Duration,
265    pub merge_time: Duration,
266}
267
268#[derive(Debug, Default)]
269pub struct RuleReport {
270    pub search_and_apply_time: Duration,
271    pub num_matches: usize,
272}
273
274impl RuleReport {
275    pub fn union(&self, other: &RuleReport) -> RuleReport {
276        RuleReport {
277            search_and_apply_time: self.search_and_apply_time + other.search_and_apply_time,
278            num_matches: self.num_matches + other.num_matches,
279        }
280    }
281}
282
283/// A collection of tables and indexes over them.
284///
285/// A database also owns the memory pools used by its tables.
286#[derive(Clone, Default)]
287pub struct Database {
288    // NB: some fields are pub(crate) to allow some internal modules to avoid
289    // borrowing the whole table.
290    pub(crate) tables: DenseIdMap<TableId, TableInfo>,
291    // TODO: having a single AtomicUsize per counter can lead to contention. We
292    // should look into prefetching counters when creating a new ExecutionState
293    // and incrementing locally. Note that the batch size shouldn't be too big
294    // because we keep an array per id in the UF.
295    pub(crate) counters: Counters,
296    pub(crate) external_functions: ExternalFunctions,
297    container_values: ContainerValues,
298    // Tracks the relative dependencies between tables during merge operations.
299    deps: DependencyGraph,
300    base_values: BaseValues,
301    /// A rough estimate of the total size of the database.
302    ///
303    /// This is primarily used to determine whether or not to attempt to do some operations in
304    /// parallel.
305    total_size_estimate: usize,
306}
307
308impl Database {
309    /// Create an empty Database.
310    ///
311    /// Queries are executed using the current rayon thread pool, which defaults to the global
312    /// thread pool.
313    pub fn new() -> Database {
314        Database::default()
315    }
316
317    /// Initialize a new rulse set to run against this database.
318    pub fn new_rule_set(&mut self) -> RuleSetBuilder<'_> {
319        RuleSetBuilder::new(self)
320    }
321
322    /// Add a new external function to the database.
323    pub fn add_external_function(
324        &mut self,
325        f: impl ExternalFunction + 'static,
326    ) -> ExternalFunctionId {
327        self.external_functions.push(Box::new(f))
328    }
329
330    /// Free an existing external function. Make sure not to use `id` afterwards.
331    pub fn free_external_function(&mut self, id: ExternalFunctionId) {
332        self.external_functions.take(id);
333    }
334
335    pub fn base_values(&self) -> &BaseValues {
336        &self.base_values
337    }
338
339    pub fn base_values_mut(&mut self) -> &mut BaseValues {
340        &mut self.base_values
341    }
342
343    pub fn container_values(&self) -> &ContainerValues {
344        &self.container_values
345    }
346
347    pub fn container_values_mut(&mut self) -> &mut ContainerValues {
348        &mut self.container_values
349    }
350
351    pub fn rebuild_containers(&mut self, table_id: TableId) -> bool {
352        let mut containers = mem::take(&mut self.container_values);
353        let table = &self.tables[table_id].table;
354        let res = self.with_execution_state(|state| containers.rebuild_all(table_id, table, state));
355        self.container_values = containers;
356        res
357    }
358
359    /// Apply the value-level rebuild encoded by `func_id` to all the tables in `to_rebuild`.
360    ///
361    /// The native [`Table::apply_rebuild`] method takes a `next_ts` argument for filling in new
362    /// values in a table like [`crate::SortedWritesTable`] where values in a certain column need
363    /// to be inserted in sorted order; the `next_ts` argument to this method is passed to
364    /// `apply_rebuild` for this purpose.
365    pub fn apply_rebuild(
366        &mut self,
367        func_id: TableId,
368        to_rebuild: &[TableId],
369        next_ts: Value,
370    ) -> bool {
371        let func = self.tables.take(func_id).unwrap();
372        let predicted = PredictedVals::default();
373        if parallelize_db_level_op(self.total_size_estimate) {
374            let mut tables = Vec::with_capacity(to_rebuild.len());
375            for id in to_rebuild {
376                tables.push((*id, self.tables.take(*id).unwrap()));
377            }
378            tables.par_iter_mut().for_each(|(_, info)| {
379                info.table.apply_rebuild(
380                    func_id,
381                    &func.table,
382                    next_ts,
383                    &mut ExecutionState::new(&predicted, self.read_only_view(), Default::default()),
384                );
385            });
386            for (id, info) in tables {
387                self.tables.insert(id, info);
388            }
389        } else {
390            for id in to_rebuild {
391                let mut info = self.tables.take(*id).unwrap();
392                info.table.apply_rebuild(
393                    func_id,
394                    &func.table,
395                    next_ts,
396                    &mut ExecutionState::new(&predicted, self.read_only_view(), Default::default()),
397                );
398                self.tables.insert(*id, info);
399            }
400        }
401        self.tables.insert(func_id, func);
402        self.merge_all()
403    }
404
405    /// Run `f` with access to an `ExecutionState` mapped to this database.
406    pub fn with_execution_state<R>(&self, f: impl FnOnce(&mut ExecutionState) -> R) -> R {
407        let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
408        let mut state = ExecutionState::new(&predicted, self.read_only_view(), Default::default());
409        f(&mut state)
410    }
411
412    pub(crate) fn read_only_view(&self) -> DbView<'_> {
413        DbView {
414            table_info: &self.tables,
415            counters: &self.counters,
416            external_funcs: &self.external_functions,
417            bases: &self.base_values,
418            containers: &self.container_values,
419        }
420    }
421
422    /// Estimate the size of the table. If a constraint is provided, return an
423    /// estimate of the size of the subset of the table matching the constraint.
424    pub fn estimate_size(&self, table: TableId, c: Option<Constraint>) -> usize {
425        let table_info = self
426            .tables
427            .get(table)
428            .expect("table must be declared in the current database");
429        let table = &table_info.table;
430        if let Some(c) = c {
431            if let Some(sub) = table.fast_subset(&c) {
432                // In the case where a the constraint can be computed quickly,
433                // we do not filter for staleness, which may over-approximate.
434                sub.size()
435            } else {
436                table.refine_one(table.refine_live(table.all()), &c).size()
437            }
438        } else {
439            table.len()
440        }
441    }
442
443    /// Create a new counter for this database.
444    ///
445    /// These counters can be used to generate unique ids as part of an action.
446    pub fn add_counter(&mut self) -> CounterId {
447        self.counters.0.push(AtomicUsize::new(0))
448    }
449
450    /// Increment the given counter and return its previous value.
451    pub fn inc_counter(&self, counter: CounterId) -> usize {
452        self.counters.inc(counter)
453    }
454
455    /// Get the current value of the given counter.
456    pub fn read_counter(&self, counter: CounterId) -> usize {
457        self.counters.read(counter)
458    }
459
460    /// A helper for merging all pending updates. Used to write to the database after updates have
461    /// been staged. Returns true if any tuples were added.
462    ///
463    /// Exposed for testing purposes.
464    ///
465    /// Useful for out-of-band insertions into the database.
466    pub fn merge_all(&mut self) -> bool {
467        let mut ever_changed = false;
468        let do_parallel = parallelize_db_level_op(self.total_size_estimate);
469        loop {
470            let mut changed = false;
471            let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
472            let mut tables_merging = DenseIdMap::<
473                TableId,
474                (
475                    // The info needed to merge this table.
476                    Option<TableInfo>,
477                    // Pre-allocated write buffers, according to the tables declared write
478                    // dependencies.
479                    DenseIdMap<TableId, Box<dyn MutationBuffer>>,
480                ),
481            >::with_capacity(self.tables.n_ids());
482            for stratum in self.deps.strata() {
483                // Initialize the write dependencies first.
484                for table in stratum.iter().copied() {
485                    let mut bufs = DenseIdMap::default();
486                    for dep in self.deps.write_deps(table) {
487                        if let Some(info) = self.tables.get(dep) {
488                            bufs.insert(dep, info.table.new_buffer());
489                        }
490                    }
491                    tables_merging.insert(table, (None, bufs));
492                }
493                // Then initialize read dependencies (this two-phase structure is why we have an
494                // Option in the tables_merging map).
495                for table in stratum.iter().copied() {
496                    tables_merging[table].0 = Some(self.tables.unwrap_val(table));
497                }
498                let db = self.read_only_view();
499                changed |= if do_parallel {
500                    tables_merging
501                        .par_iter_mut()
502                        .map(|(_, (info, buffers))| {
503                            let mut es = ExecutionState::new(&predicted, db, mem::take(buffers));
504                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
505                        })
506                        .max()
507                        .unwrap_or(false)
508                } else {
509                    tables_merging
510                        .iter_mut()
511                        .map(|(_, (info, buffers))| {
512                            let mut es = ExecutionState::new(&predicted, db, mem::take(buffers));
513                            info.as_mut().unwrap().table.merge(&mut es).added || es.changed
514                        })
515                        .max()
516                        .unwrap_or(false)
517                };
518                for (id, (table, _)) in tables_merging.drain() {
519                    self.tables.insert(id, table.unwrap());
520                }
521            }
522            ever_changed |= changed;
523            if !changed {
524                break;
525            }
526        }
527        // Reset all indexes to force an update on the next access.
528        let mut size_estimate = 0;
529        for (_, info) in self.tables.iter_mut() {
530            iter_dashmap_bulk(&mut info.column_indexes, |_, ci| {
531                Arc::get_mut(ci).unwrap().reset();
532            });
533            iter_dashmap_bulk(&mut info.indexes, |_, ti| {
534                Arc::get_mut(ti).unwrap().reset();
535            });
536            size_estimate += info.table.len();
537        }
538        self.total_size_estimate = size_estimate;
539        ever_changed
540    }
541
542    /// A low-level helper for merging pending updates to a particular function.
543    ///
544    /// Callers should prefer `merge_all`, as the process of merging the data
545    /// for a particular table may cause other updates to be buffered
546    /// elesewhere. The `merge_all` method runs merges to a fixed point to avoid
547    /// surprises here.
548    pub fn merge_table(&mut self, table: TableId) {
549        let mut info = self.tables.unwrap_val(table);
550        let predicted = with_pool_set(|ps| ps.get::<PredictedVals>());
551        self.total_size_estimate = self.total_size_estimate.wrapping_sub(info.table.len());
552        let _table_changed = info.table.merge(&mut ExecutionState::new(
553            &predicted,
554            self.read_only_view(),
555            Default::default(),
556        ));
557        self.total_size_estimate = self.total_size_estimate.wrapping_add(info.table.len());
558        self.tables.insert(table, info);
559    }
560
561    /// Get id of the next table to be added to the database.
562    ///
563    /// This can be useful for "knot tying", when tables need to reference their
564    /// own id.
565    pub fn next_table_id(&self) -> TableId {
566        self.tables.next_id()
567    }
568
569    /// Add a table with the given schema to the database.
570    ///
571    /// The table must have a compatible spec with `types` (e.g. same number of
572    /// columns).
573    pub fn add_table<T: Table + Sized + 'static>(
574        &mut self,
575        table: T,
576        read_deps: impl IntoIterator<Item = TableId>,
577        write_deps: impl IntoIterator<Item = TableId>,
578    ) -> TableId {
579        let spec = table.spec();
580        let table = WrappedTable::new(table);
581        let res = self.tables.push(TableInfo {
582            spec,
583            table,
584            indexes: Default::default(),
585            column_indexes: Default::default(),
586        });
587        self.deps.add_table(res, read_deps, write_deps);
588        res
589    }
590
591    /// Get direct mutable access to the table.
592    ///
593    /// This method is useful for out-of-band access to databse state.
594    pub fn get_table(&self, id: TableId) -> &WrappedTable {
595        &self
596            .tables
597            .get(id)
598            .expect("must access a table that has been declared in this database")
599            .table
600    }
601
602    pub(crate) fn process_constraints(
603        &self,
604        table: TableId,
605        cs: &[Constraint],
606    ) -> ProcessedConstraints {
607        let table_info = &self.tables[table];
608        let (mut subset, mut fast, mut slow) = table_info.table.split_fast_slow(cs);
609        slow.retain(|c| {
610            let (col, val) = match c {
611                Constraint::EqConst { col, val } => (*col, *val),
612                Constraint::Eq { .. }
613                | Constraint::LtConst { .. }
614                | Constraint::GtConst { .. }
615                | Constraint::LeConst { .. }
616                | Constraint::GeConst { .. } => return true,
617            };
618            // We are looking up by a constant: this is something we can build
619            // an index for as long as the column is cacheable.
620            if *table_info
621                .spec
622                .uncacheable_columns
623                .get(col)
624                .unwrap_or(&false)
625            {
626                return true;
627            }
628            // We have or will build an index: upgrade this constraint to
629            // 'fast'.
630            fast.push(c.clone());
631            let index = get_column_index_from_tableinfo(table_info, col);
632            match index.get().unwrap().get_subset(&val) {
633                Some(s) => {
634                    with_pool_set(|ps| subset.intersect(s, &ps.get_pool()));
635                }
636                None => {
637                    // There are no rows matching this key! We can constrain this to nothing.
638                    subset = Subset::empty();
639                }
640            }
641            // Remove this constraint from the slow list.
642            false
643        });
644        ProcessedConstraints { subset, fast, slow }
645    }
646
647    /// Get direct mutable access to the table.
648    ///
649    /// This method is useful for out-of-band access to databse state.
650    pub fn get_table_mut(&mut self, id: TableId) -> &mut dyn Table {
651        &mut *self
652            .tables
653            .get_mut(id)
654            .expect("must access a table that has been declared in this database")
655            .table
656    }
657
658    pub(crate) fn plan_query(&mut self, query: Query) -> Plan {
659        plan::plan_query(query)
660    }
661}
662
663impl Drop for Database {
664    fn drop(&mut self) {
665        // Clean up the ambient thread pool.
666        //
667        // Calling mem::forget on the egraph can result in much faster execution times.
668        with_pool_set(PoolSet::clear);
669        rayon::broadcast(|_| with_pool_set(PoolSet::clear));
670    }
671}
672
673/// The core logic behind getting and updating a hash index.
674///
675/// This is in a separate function to allow us to reuse it while already
676/// borrowing a `TableInfo`.
677fn get_index_from_tableinfo(table_info: &TableInfo, cols: &[ColumnId]) -> HashIndex {
678    let index: Arc<_> = table_info
679        .indexes
680        .entry(cols.into())
681        .or_insert_with(|| {
682            Arc::new(ResettableOnceLock::new(Index::new(
683                cols.to_vec(),
684                TupleIndex::new(cols.len()),
685            )))
686        })
687        .clone();
688    index.get_or_update(|index| {
689        index.refresh(table_info.table.as_ref());
690    });
691    debug_assert!(
692        !index
693            .get()
694            .unwrap()
695            .needs_refresh(table_info.table.as_ref())
696    );
697    index
698}
699
700/// The core logic behind getting and updating a column index.
701///
702/// This is the single-column analog to [`get_index_from_tableinfo`].
703fn get_column_index_from_tableinfo(table_info: &TableInfo, col: ColumnId) -> HashColumnIndex {
704    let index: Arc<_> = table_info
705        .column_indexes
706        .entry(col)
707        .or_insert_with(|| {
708            Arc::new(ResettableOnceLock::new(Index::new(
709                vec![col],
710                ColumnIndex::new(),
711            )))
712        })
713        .clone();
714    index.get_or_update(|index| {
715        index.refresh(table_info.table.as_ref());
716    });
717    debug_assert!(
718        !index
719            .get()
720            .unwrap()
721            .needs_refresh(table_info.table.as_ref())
722    );
723    index
724}