egglog_core_relations/hash_index/
mod.rs

1//! Hash-based secondary indexes.
2use std::{
3    hash::{Hash, Hasher},
4    mem,
5    sync::{Arc, Mutex},
6};
7
8use crate::numeric_id::{IdVec, NumericId, define_id};
9use egglog_concurrency::Notification;
10use hashbrown::HashTable;
11use once_cell::sync::Lazy;
12use rayon::iter::ParallelIterator;
13use rustc_hash::FxHasher;
14
15use crate::{
16    OffsetRange, Subset,
17    common::{HashMap, IndexMap, ShardData, ShardId, Value},
18    offsets::{RowId, SortedOffsetSlice, SubsetRef},
19    parallel_heuristics::parallelize_index_construction,
20    pool::{Pooled, with_pool_set},
21    row_buffer::{RowBuffer, TaggedRowBuffer},
22    table_spec::{ColumnId, Generation, Offset, TableVersion, WrappedTableRef},
23};
24
25#[cfg(test)]
26mod tests;
27
28#[derive(Clone)]
29pub(crate) struct TableEntry<T> {
30    hash: u64,
31    /// Points into `keys`
32    key: RowId,
33    vals: T,
34}
35
36#[derive(Clone)]
37pub(crate) struct Index<TI> {
38    key: Vec<ColumnId>,
39    updated_to: TableVersion,
40    table: TI,
41}
42
43impl<TI: IndexBase> Index<TI> {
44    pub(crate) fn new(key: Vec<ColumnId>, table: TI) -> Self {
45        Index {
46            key,
47            updated_to: TableVersion {
48                major: Generation::new(0),
49                minor: Offset::new(0),
50            },
51            table,
52        }
53    }
54
55    /// Get the nonempty subset of rows associated with this key, if there is
56    /// one.
57    pub(crate) fn get_subset<'a>(&'a self, key: &'a TI::Key) -> Option<SubsetRef<'a>> {
58        self.table.get_subset(key)
59    }
60
61    pub(crate) fn needs_refresh(&self, table: WrappedTableRef) -> bool {
62        table.version() != self.updated_to
63    }
64
65    pub(crate) fn refresh(&mut self, table: WrappedTableRef) {
66        let cur_version = table.version();
67        if cur_version == self.updated_to {
68            return;
69        }
70        let subset = if cur_version.major != self.updated_to.major {
71            self.table.clear();
72            table.all()
73        } else {
74            table.updates_since(self.updated_to.minor)
75        };
76        if parallelize_index_construction(subset.size()) {
77            self.table.merge_parallel(&self.key, table, subset.as_ref());
78        } else {
79            self.refresh_serial(table, subset);
80        }
81
82        self.updated_to = cur_version;
83    }
84
85    /// Update the contents of the index to the current version of the table.
86    ///
87    /// The index is guaranteed to be up to date until `merge` is called on the
88    /// table again.
89    pub(crate) fn refresh_serial(&mut self, table: WrappedTableRef, subset: Subset) {
90        let mut buf = TaggedRowBuffer::new(self.key.len());
91        let mut cur = Offset::new(0);
92        loop {
93            buf.clear();
94            if let Some(next) =
95                table.scan_project(subset.as_ref(), &self.key, cur, 1024, &[], &mut buf)
96            {
97                cur = next;
98                self.table.merge_rows(&buf);
99            } else {
100                self.table.merge_rows(&buf);
101                break;
102            }
103        }
104    }
105
106    pub(crate) fn for_each(&self, f: impl FnMut(&TI::Key, SubsetRef)) {
107        self.table.for_each(f);
108    }
109
110    pub(crate) fn len(&self) -> usize {
111        self.table.len()
112    }
113}
114
115pub(crate) struct SubsetTable {
116    keys: RowBuffer,
117    hash: Pooled<HashTable<TableEntry<BufferedSubset>>>,
118}
119
120impl Clone for SubsetTable {
121    fn clone(&self) -> Self {
122        SubsetTable {
123            keys: self.keys.clone(),
124            hash: Pooled::cloned(&self.hash),
125        }
126    }
127}
128
129impl SubsetTable {
130    fn new(key_arity: usize) -> SubsetTable {
131        SubsetTable {
132            keys: RowBuffer::new(key_arity),
133            hash: with_pool_set(|ps| ps.get()),
134        }
135    }
136}
137
138pub(crate) trait IndexBase {
139    /// The type of keys for this index.  Keys can have validity constraints
140    /// (e.g. the arity of a slice for `Key = [Value]`). If keys are invalid,
141    /// these methods can panic.
142    type Key: ?Sized;
143
144    /// The write-side keys for an index. This is generally the same as `Key`, but Column-level
145    /// indexes allow for multiple values (e.g. a subset of a row) to be provided, allowing the
146    /// index to effectively cover multiple columns. This is useful for rebuilding.
147    type WriteKey: ?Sized;
148
149    /// Remove any existing entries in the index.
150    fn clear(&mut self);
151    /// Get the subset corresponding to this key, if there is one.
152    fn get_subset(&self, key: &Self::Key) -> Option<SubsetRef<'_>>;
153    /// Add the given key and row id to the table.
154    fn add_row(&mut self, key: &Self::WriteKey, row: RowId);
155    /// Merge the contents of the [`TaggedRowBuffer`] into the table.
156    fn merge_rows(&mut self, buf: &TaggedRowBuffer);
157    /// Call `f` over the elements of the index.
158    fn for_each(&self, f: impl FnMut(&Self::Key, SubsetRef));
159    /// The number of keys in the index.
160    fn len(&self) -> usize;
161
162    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef);
163}
164
165struct ColumnIndexShard {
166    table: Pooled<IndexMap<Value, BufferedSubset>>,
167    subsets: SubsetBuffer,
168}
169
170impl Clone for ColumnIndexShard {
171    fn clone(&self) -> Self {
172        ColumnIndexShard {
173            table: Pooled::cloned(&self.table),
174            subsets: self.subsets.clone(),
175        }
176    }
177}
178
179#[derive(Clone)]
180pub struct ColumnIndex {
181    // A specialized index used when we are indexing on a single column.
182    shard_data: ShardData,
183    shards: IdVec<ShardId, ColumnIndexShard>,
184}
185
186impl IndexBase for ColumnIndex {
187    type Key = Value;
188    type WriteKey = [Value];
189    fn clear(&mut self) {
190        for (_, shard) in self.shards.iter_mut() {
191            for (_, subset) in shard.table.drain(..) {
192                match subset {
193                    BufferedSubset::Dense(_) => {}
194                    BufferedSubset::Sparse(buffered_vec) => {
195                        shard.subsets.return_vec(buffered_vec);
196                    }
197                }
198            }
199        }
200    }
201
202    fn get_subset<'a>(&'a self, key: &Value) -> Option<SubsetRef<'a>> {
203        let shard = self.shard_data.get_shard(key, &self.shards);
204        shard.table.get(key).map(|x| x.as_ref(&shard.subsets))
205    }
206    fn add_row(&mut self, vals: &[Value], row: RowId) {
207        // SAFETY: everything in `table` comes from `subsets`.
208        for key in vals {
209            let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
210            unsafe {
211                shard
212                    .table
213                    .entry(*key)
214                    .or_insert_with(BufferedSubset::empty)
215                    .add_row_sorted(row, &mut shard.subsets);
216            }
217        }
218    }
219    fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
220        for (src_id, key) in buf.iter() {
221            self.add_row(key, src_id);
222        }
223    }
224    fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
225        for (subsets, (k, v)) in self
226            .shards
227            .iter()
228            .flat_map(|(_, shard)| shard.table.iter().map(|x| (&shard.subsets, x)))
229        {
230            f(k, v.as_ref(subsets));
231        }
232    }
233    fn len(&self) -> usize {
234        self.shards.iter().map(|(_, shard)| shard.table.len()).sum()
235    }
236
237    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
238        const BATCH_SIZE: usize = 1024;
239        let shard_data = self.shard_data;
240        let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
241            shard_data.n_shards(),
242        );
243        queues.resize_with(shard_data.n_shards(), || {
244            Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
245        });
246        let split_buf = |buf: TaggedRowBuffer| {
247            let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
248            split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(1));
249            for (row_id, keys) in buf.non_stale() {
250                for key in keys {
251                    shard_data
252                        .get_shard_mut(*key, &mut split)
253                        .add_row(row_id, &[*key]);
254                }
255            }
256            for (shard_id, buf) in split.drain() {
257                if buf.is_empty() {
258                    continue;
259                }
260                let first = buf.get_row(RowId::new(0)).0;
261                queues[shard_id].lock().unwrap().push((first, buf));
262            }
263        };
264
265        run_in_thread_pool_and_block(&THREAD_POOL, || {
266            rayon::in_place_scope(|inner| {
267                let mut cur = Offset::new(0);
268                loop {
269                    let mut buf = TaggedRowBuffer::new(cols.len());
270                    if let Some(next) =
271                        table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
272                    {
273                        cur = next;
274                        inner.spawn(move |_| split_buf(buf));
275                    } else {
276                        inner.spawn(move |_| split_buf(buf));
277                        break;
278                    }
279                }
280            });
281
282            self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
283                use indexmap::map::Entry;
284                // Sort the vector by start row id to ensure we populate subsets in sorted order.
285                let mut vec = queues[shard_id].lock().unwrap();
286                vec.sort_by_key(|(start, _)| *start);
287                for (_, buf) in vec.drain(..) {
288                    for (row_id, key) in buf.non_stale() {
289                        debug_assert_eq!(key.len(), 1);
290                        match shard.table.entry(key[0]) {
291                            Entry::Occupied(mut occ) => {
292                                // SAFETY: all of the buffered vectors in this map come from `subsets`.
293                                unsafe {
294                                    occ.get_mut().add_row_sorted(row_id, &mut shard.subsets);
295                                }
296                            }
297                            Entry::Vacant(v) => {
298                                v.insert(BufferedSubset::singleton(row_id));
299                            }
300                        }
301                    }
302                }
303            });
304        });
305    }
306}
307
308/// This function is an alternative for [`rayon::ThreadPool::install`] that doesn't steal work from
309/// the callee's current thread pool while waiting for `f` to finish.
310///
311/// We do this to avoid deadlocks. The whole purpose of using a separate threadpool in this module
312/// is to allow for sufficient parallelism while holding a lock on the main threadpool. That means
313/// we are not worried about an outer lock tying up a thread in the main pool.
314///
315/// On the other hand, it _is_ a bad idea to steal work on a rayon thread pool with some locks
316/// held. In particular, if another task on the thread pool _itself_ attempts to aquire the same
317/// lock, this can cause a deadlock. We saw this in the tests for this crate. The relevant lock
318/// are those around individual indexes stored in the database-level index cache.
319fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
320    // NB: We don't need the heap allocations here. But we are only calling this function if
321    // we are about to do a bunch of work, so clarify is probably going to be better than (even
322    // more) unsafe code.
323
324    // Alright, here we go: pretend `f` has `'static` lifetime because we are passing it to
325    // `spawn`.
326    trait LifetimeWork<'a>: FnMut() + Send + 'a {}
327
328    impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
329    let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
330    let mut casted_away = unsafe {
331        // SAFETY: `casted_away` will be dropped at the end of this method. The notification used
332        // below will ensure it does not escape.
333        mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
334    };
335    let n = Arc::new(Notification::new());
336    let inner = n.clone();
337    pool.spawn(move || {
338        casted_away();
339        mem::drop(casted_away);
340        inner.notify();
341    });
342    n.wait()
343}
344
345impl ColumnIndex {
346    pub(crate) fn new() -> ColumnIndex {
347        with_pool_set(|ps| {
348            let shard_data = ShardData::new(num_shards());
349            let mut shards = IdVec::with_capacity(shard_data.n_shards());
350            shards.resize_with(shard_data.n_shards(), || ColumnIndexShard {
351                table: ps.get(),
352                subsets: SubsetBuffer::default(),
353            });
354            ColumnIndex { shard_data, shards }
355        })
356    }
357}
358
359#[derive(Clone)]
360struct TupleIndexShard {
361    table: SubsetTable,
362    subsets: SubsetBuffer,
363}
364
365/// A mapping from keys to subsets of rows.
366#[derive(Clone)]
367pub struct TupleIndex {
368    // NB: we could store RowBuffers inline and then have indexes reference
369    // (u32, RowId) instead of RowId. Trades copying off for indirections.
370    shard_data: ShardData,
371    shards: IdVec<ShardId, TupleIndexShard>,
372}
373
374impl TupleIndex {
375    pub(crate) fn new(key_arity: usize) -> TupleIndex {
376        let shard_data = ShardData::new(num_shards());
377        let mut shards = IdVec::with_capacity(shard_data.n_shards());
378        shards.resize_with(shard_data.n_shards(), || TupleIndexShard {
379            table: SubsetTable::new(key_arity),
380            subsets: SubsetBuffer::default(),
381        });
382        TupleIndex { shard_data, shards }
383    }
384}
385
386impl IndexBase for TupleIndex {
387    type Key = [Value];
388    type WriteKey = Self::Key;
389
390    fn clear(&mut self) {
391        for (_, shard) in self.shards.iter_mut() {
392            shard.table.keys.clear();
393            for entry in shard.table.hash.drain() {
394                match entry.vals {
395                    BufferedSubset::Dense(_) => {}
396                    BufferedSubset::Sparse(v) => {
397                        shard.subsets.return_vec(v);
398                    }
399                }
400            }
401        }
402    }
403
404    fn get_subset<'a>(&'a self, key: &[Value]) -> Option<SubsetRef<'a>> {
405        let hash = hash_key(key);
406        let shard = &self.shards[self.shard_data.shard_id(hash)];
407        let entry = shard.table.hash.find(hash, |entry| {
408            entry.hash == hash && shard.table.keys.get_row(entry.key) == key
409        })?;
410        Some(entry.vals.as_ref(&shard.subsets))
411    }
412
413    fn add_row(&mut self, key: &[Value], row: RowId) {
414        use hashbrown::hash_table::Entry;
415        let hash = hash_key(key);
416        let shard = &mut self.shards[self.shard_data.shard_id(hash)];
417        let table_entry = shard.table.hash.entry(
418            hash,
419            |entry| entry.hash == hash && shard.table.keys.get_row(entry.key) == key,
420            |ent| ent.hash,
421        );
422        match table_entry {
423            Entry::Occupied(mut occ) => {
424                // SAFETY: everything in `table_entry` comes from `vals`.
425                unsafe {
426                    occ.get_mut().vals.add_row_sorted(row, &mut shard.subsets);
427                }
428            }
429            Entry::Vacant(v) => {
430                let key_id = shard.table.keys.add_row(key);
431                let subset = BufferedSubset::singleton(row);
432                v.insert(TableEntry {
433                    hash,
434                    key: key_id,
435                    vals: subset,
436                });
437            }
438        }
439    }
440
441    fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
442        for (src_id, key) in buf.iter() {
443            self.add_row(key, src_id);
444        }
445    }
446    fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
447        for (_, shard) in self.shards.iter() {
448            for entry in shard.table.hash.iter() {
449                let key = shard.table.keys.get_row(entry.key);
450                f(key, entry.vals.as_ref(&shard.subsets));
451            }
452        }
453    }
454
455    fn len(&self) -> usize {
456        self.shards
457            .iter()
458            .map(|(_, shard)| shard.table.hash.len())
459            .sum()
460    }
461
462    fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
463        // The structure here is similar to the implementation for ColumnIndex, with
464        // slightly more bookkeeping needed to handle arbitrary-arity keys.
465
466        const BATCH_SIZE: usize = 1024;
467        let shard_data = self.shard_data;
468        let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
469            shard_data.n_shards(),
470        );
471        queues.resize_with(shard_data.n_shards(), || {
472            Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
473        });
474        let split_buf = |buf: TaggedRowBuffer| {
475            let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
476            split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(cols.len()));
477            for (row_id, key) in buf.non_stale() {
478                shard_data
479                    .get_shard_mut(key, &mut split)
480                    .add_row(row_id, key);
481            }
482            for (shard_id, buf) in split.drain() {
483                if buf.is_empty() {
484                    continue;
485                }
486                let first = buf.get_row(RowId::new(0)).0;
487                queues[shard_id].lock().unwrap().push((first, buf));
488            }
489        };
490        run_in_thread_pool_and_block(&THREAD_POOL, || {
491            rayon::scope(|scope| {
492                let mut cur = Offset::new(0);
493                loop {
494                    let mut buf = TaggedRowBuffer::new(cols.len());
495                    if let Some(next) =
496                        table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
497                    {
498                        cur = next;
499                        scope.spawn(move |_| split_buf(buf));
500                    } else {
501                        scope.spawn(move |_| split_buf(buf));
502                        break;
503                    }
504                }
505            });
506            self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
507                use hashbrown::hash_table::Entry;
508                // Sort the vector by start row id to ensure we populate subsets in sorted order.
509                let mut vec = queues[shard_id].lock().unwrap();
510                vec.sort_by_key(|(start, _)| *start);
511                for (_, buf) in vec.drain(..) {
512                    for (row_id, key) in buf.non_stale() {
513                        let hash = hash_key(key);
514                        let table_entry = shard.table.hash.entry(
515                            hash,
516                            |entry| {
517                                entry.hash == hash && shard.table.keys.get_row(entry.key) == key
518                            },
519                            |ent| ent.hash,
520                        );
521                        match table_entry {
522                            Entry::Occupied(mut occ) => {
523                                // SAFETY: everything in `table_entry` comes from `vals`.
524                                unsafe {
525                                    occ.get_mut()
526                                        .vals
527                                        .add_row_sorted(row_id, &mut shard.subsets);
528                                }
529                            }
530                            Entry::Vacant(v) => {
531                                let key_id = shard.table.keys.add_row(key);
532                                let subset = BufferedSubset::singleton(row_id);
533                                v.insert(TableEntry {
534                                    hash,
535                                    key: key_id,
536                                    vals: subset,
537                                });
538                            }
539                        }
540                    }
541                }
542            });
543        });
544    }
545}
546
547fn hash_key(key: &[Value]) -> u64 {
548    let mut hasher = FxHasher::default();
549    key.hash(&mut hasher);
550    hasher.finish()
551}
552
553define_id!(BufferIndex, u32, "an index into a subset buffer");
554
555/// A shared pool of row ids used to store sorted offset vectors with a common
556/// lifetime.
557///
558/// This is used as the backing store for subsets stored in indexes. While
559/// definitely saves some allocations, the primary use for SubsetBuffer is to
560/// make deallocation faster: with a standard [`crate::offsets::Subset`]
561/// structure stored in the index, dropping requires an O(n) traversal of the
562/// index. SubsetBuffer allows deallocation to happen in constant time (given
563/// our use of memory pools).
564struct SubsetBuffer {
565    buf: Pooled<Vec<RowId>>,
566    free_list: FreeList,
567}
568
569impl Clone for SubsetBuffer {
570    fn clone(&self) -> Self {
571        SubsetBuffer {
572            buf: Pooled::cloned(&self.buf),
573            free_list: self.free_list.clone(),
574        }
575    }
576}
577
578impl Default for SubsetBuffer {
579    fn default() -> SubsetBuffer {
580        with_pool_set(|ps| SubsetBuffer {
581            buf: ps.get(),
582            free_list: Default::default(),
583        })
584    }
585}
586
587impl SubsetBuffer {
588    fn new_vec(&mut self, rows: impl ExactSizeIterator<Item = RowId>) -> BufferedVec {
589        let len = rows.len();
590        if let Some(v) = self.free_list.get_size_class(len).pop() {
591            return self.fill_at(v, rows);
592        }
593        let start = BufferIndex::from_usize(self.buf.len());
594        self.buf.resize(
595            start.index() + len.next_power_of_two(),
596            RowId::new(u32::MAX),
597        );
598        self.fill_at(start, rows)
599    }
600
601    fn fill_at(
602        &mut self,
603        start: BufferIndex,
604        rows: impl ExactSizeIterator<Item = RowId>,
605    ) -> BufferedVec {
606        let mut cur = start;
607        for i in rows {
608            self.buf[cur.index()] = i;
609            cur = cur.inc();
610        }
611        BufferedVec(start, cur)
612    }
613
614    fn return_vec(&mut self, vec: BufferedVec) {
615        self.free_list.get_size_class(vec.len()).push(vec.0);
616    }
617
618    fn push_vec(&mut self, vec: BufferedVec, row: RowId) -> BufferedVec {
619        assert!(
620            vec.is_empty() || self.buf[vec.1.index() - 1] <= row,
621            "vec={vec:?}, row={row:?}, last_elt={:?}",
622            self.buf[vec.1.index() - 1]
623        );
624        if !vec.len().is_power_of_two() {
625            self.buf[vec.1.index()] = row;
626            return BufferedVec(vec.0, vec.1.inc());
627        }
628
629        let res = if let Some(v) = self.free_list.get_size_class(vec.len() + 1).pop() {
630            self.buf
631                .copy_within(vec.0.index()..vec.1.index(), v.index());
632            self.buf[v.index() + vec.len()] = row;
633            BufferedVec(v, BufferIndex::from_usize(v.index() + vec.len() + 1))
634        } else {
635            let start = self.buf.len();
636            self.buf.resize(
637                start + (vec.len() + 1).next_power_of_two(),
638                RowId::new(u32::MAX),
639            );
640            self.buf.copy_within(vec.0.index()..vec.1.index(), start);
641            self.buf[start + vec.len()] = row;
642            let end = start + vec.len() + 1;
643            BufferedVec(BufferIndex::from_usize(start), BufferIndex::from_usize(end))
644        };
645        self.return_vec(vec);
646        res
647    }
648
649    fn make_ref<'a>(&'a self, vec: &BufferedVec) -> SubsetRef<'a> {
650        // SAFETY: if `vec` is a valid index into self.buf, it will be sorted.
651        //
652        // NB: we do not guarantee this in the type signature of BufferedVec,
653        // etc. But this is indeed safe given the usage within this module.
654        let res = SubsetRef::Sparse(unsafe {
655            SortedOffsetSlice::new_unchecked(&self.buf[vec.0.index()..vec.1.index()])
656        });
657        #[cfg(debug_assertions)]
658        {
659            use crate::offsets::Offsets;
660            res.offsets(|x| assert_ne!(x.rep(), u32::MAX))
661        }
662        res
663    }
664}
665
666/// A sorted vector of offsets stored in a [`SubsetBuffer`].
667///
668/// Note: this implements `Clone` to facilitate cloning entire indexes, but this is a _shallow_
669/// clone, making the clone operation work akin to slices in Golang. In particular: code that
670/// pushes to a clone of a `BufferedVec` can affect the original, and vice versa.
671///
672/// Business logic in this module probably shouldn't call clone explicitly. The implicit uses of
673/// clone (by other generated `Clone` implementations) are fine because they clone the
674/// `SubsetBuffer` that the `BufferedVec` points to at the same time that the vector is cloned.
675#[derive(Debug, Clone)]
676pub(crate) struct BufferedVec(BufferIndex, BufferIndex);
677
678impl Default for BufferedVec {
679    fn default() -> Self {
680        BufferedVec(BufferIndex::new(0), BufferIndex::new(0))
681    }
682}
683
684impl BufferedVec {
685    fn is_empty(&self) -> bool {
686        self.0 == self.1
687    }
688    fn len(&self) -> usize {
689        self.1.index() - self.0.index()
690    }
691}
692
693#[derive(Clone)]
694pub(crate) enum BufferedSubset {
695    Dense(OffsetRange),
696    Sparse(BufferedVec),
697}
698
699impl BufferedSubset {
700    /// *Safety:*  callers must ensure that `self` is either dense, or comes from `buf`.
701    unsafe fn add_row_sorted(&mut self, row: RowId, buf: &mut SubsetBuffer) {
702        match self {
703            BufferedSubset::Dense(range) => {
704                if range.end == range.start {
705                    range.start = row;
706                    range.end = row.inc();
707                    return;
708                }
709                if range.end == row {
710                    range.end = row.inc();
711                    return;
712                }
713                let mut v = buf.new_vec((range.start.rep()..range.end.rep()).map(RowId::new));
714                v = buf.push_vec(v, row);
715                *self = BufferedSubset::Sparse(v);
716            }
717            BufferedSubset::Sparse(vec) => *vec = buf.push_vec(mem::take(vec), row),
718        }
719    }
720
721    fn empty() -> Self {
722        BufferedSubset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
723    }
724
725    fn singleton(row: RowId) -> Self {
726        BufferedSubset::Dense(OffsetRange::new(row, row.inc()))
727    }
728
729    fn as_ref<'a>(&self, buf: &'a SubsetBuffer) -> SubsetRef<'a> {
730        match self {
731            BufferedSubset::Dense(range) => SubsetRef::Dense(*range),
732            BufferedSubset::Sparse(vec) => buf.make_ref(vec),
733        }
734    }
735}
736
737fn num_shards() -> usize {
738    let n_threads = rayon::current_num_threads();
739    if n_threads == 1 { 1 } else { n_threads * 2 }
740}
741
742/// A thread pool specifically for parallel hash index construction.
743///
744/// We use a separate thread pool here because callers can construct an index under a lock,
745/// and we do not want to take a long-running lock in the global thread pool without another
746/// way to get parallelism.
747///
748/// Earlier solutions using rayon::yield_now() were unreliable.
749static THREAD_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
750    rayon::ThreadPoolBuilder::new()
751        .num_threads(rayon::current_num_threads())
752        .build()
753        .unwrap()
754});
755
756/// A simple free list used to reuse slots in a [`SubsetBuffer`].
757///
758/// This free list works as a map from power-of-two size classes to a vector of offsets that point
759/// to the beginning of an unused vector.
760#[derive(Default, Clone)]
761pub(super) struct FreeList {
762    data: HashMap<usize, Vec<BufferIndex>>,
763}
764impl FreeList {
765    fn get_size_class(&mut self, size: usize) -> &mut Vec<BufferIndex> {
766        let size_class = size.next_power_of_two();
767        self.data.entry(size_class).or_default()
768    }
769}