1use std::{
3 hash::{Hash, Hasher},
4 mem,
5 sync::{Arc, Mutex},
6};
7
8use crate::numeric_id::{IdVec, NumericId, define_id};
9use egglog_concurrency::Notification;
10use hashbrown::HashTable;
11use once_cell::sync::Lazy;
12use rayon::iter::ParallelIterator;
13use rustc_hash::FxHasher;
14
15use crate::{
16 OffsetRange, Subset,
17 common::{HashMap, IndexMap, ShardData, ShardId, Value},
18 offsets::{RowId, SortedOffsetSlice, SubsetRef},
19 parallel_heuristics::parallelize_index_construction,
20 pool::{Pooled, with_pool_set},
21 row_buffer::{RowBuffer, TaggedRowBuffer},
22 table_spec::{ColumnId, Generation, Offset, TableVersion, WrappedTableRef},
23};
24
25#[cfg(test)]
26mod tests;
27
28#[derive(Clone)]
29pub(crate) struct TableEntry<T> {
30 hash: u64,
31 key: RowId,
33 vals: T,
34}
35
36#[derive(Clone)]
37pub(crate) struct Index<TI> {
38 key: Vec<ColumnId>,
39 updated_to: TableVersion,
40 table: TI,
41}
42
43impl<TI: IndexBase> Index<TI> {
44 pub(crate) fn new(key: Vec<ColumnId>, table: TI) -> Self {
45 Index {
46 key,
47 updated_to: TableVersion {
48 major: Generation::new(0),
49 minor: Offset::new(0),
50 },
51 table,
52 }
53 }
54
55 pub(crate) fn get_subset<'a>(&'a self, key: &'a TI::Key) -> Option<SubsetRef<'a>> {
58 self.table.get_subset(key)
59 }
60
61 pub(crate) fn needs_refresh(&self, table: WrappedTableRef) -> bool {
62 table.version() != self.updated_to
63 }
64
65 pub(crate) fn refresh(&mut self, table: WrappedTableRef) {
66 let cur_version = table.version();
67 if cur_version == self.updated_to {
68 return;
69 }
70 let subset = if cur_version.major != self.updated_to.major {
71 self.table.clear();
72 table.all()
73 } else {
74 table.updates_since(self.updated_to.minor)
75 };
76 if parallelize_index_construction(subset.size()) {
77 self.table.merge_parallel(&self.key, table, subset.as_ref());
78 } else {
79 self.refresh_serial(table, subset);
80 }
81
82 self.updated_to = cur_version;
83 }
84
85 pub(crate) fn refresh_serial(&mut self, table: WrappedTableRef, subset: Subset) {
90 let mut buf = TaggedRowBuffer::new(self.key.len());
91 let mut cur = Offset::new(0);
92 loop {
93 buf.clear();
94 if let Some(next) =
95 table.scan_project(subset.as_ref(), &self.key, cur, 1024, &[], &mut buf)
96 {
97 cur = next;
98 self.table.merge_rows(&buf);
99 } else {
100 self.table.merge_rows(&buf);
101 break;
102 }
103 }
104 }
105
106 pub(crate) fn for_each(&self, f: impl FnMut(&TI::Key, SubsetRef)) {
107 self.table.for_each(f);
108 }
109
110 pub(crate) fn len(&self) -> usize {
111 self.table.len()
112 }
113}
114
115pub(crate) struct SubsetTable {
116 keys: RowBuffer,
117 hash: Pooled<HashTable<TableEntry<BufferedSubset>>>,
118}
119
120impl Clone for SubsetTable {
121 fn clone(&self) -> Self {
122 SubsetTable {
123 keys: self.keys.clone(),
124 hash: Pooled::cloned(&self.hash),
125 }
126 }
127}
128
129impl SubsetTable {
130 fn new(key_arity: usize) -> SubsetTable {
131 SubsetTable {
132 keys: RowBuffer::new(key_arity),
133 hash: with_pool_set(|ps| ps.get()),
134 }
135 }
136}
137
138pub(crate) trait IndexBase {
139 type Key: ?Sized;
143
144 type WriteKey: ?Sized;
148
149 fn clear(&mut self);
151 fn get_subset(&self, key: &Self::Key) -> Option<SubsetRef<'_>>;
153 fn add_row(&mut self, key: &Self::WriteKey, row: RowId);
155 fn merge_rows(&mut self, buf: &TaggedRowBuffer);
157 fn for_each(&self, f: impl FnMut(&Self::Key, SubsetRef));
159 fn len(&self) -> usize;
161
162 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef);
163}
164
165struct ColumnIndexShard {
166 table: Pooled<IndexMap<Value, BufferedSubset>>,
167 subsets: SubsetBuffer,
168}
169
170impl Clone for ColumnIndexShard {
171 fn clone(&self) -> Self {
172 ColumnIndexShard {
173 table: Pooled::cloned(&self.table),
174 subsets: self.subsets.clone(),
175 }
176 }
177}
178
179#[derive(Clone)]
180pub struct ColumnIndex {
181 shard_data: ShardData,
183 shards: IdVec<ShardId, ColumnIndexShard>,
184}
185
186impl IndexBase for ColumnIndex {
187 type Key = Value;
188 type WriteKey = [Value];
189 fn clear(&mut self) {
190 for (_, shard) in self.shards.iter_mut() {
191 for (_, subset) in shard.table.drain(..) {
192 match subset {
193 BufferedSubset::Dense(_) => {}
194 BufferedSubset::Sparse(buffered_vec) => {
195 shard.subsets.return_vec(buffered_vec);
196 }
197 }
198 }
199 }
200 }
201
202 fn get_subset<'a>(&'a self, key: &Value) -> Option<SubsetRef<'a>> {
203 let shard = self.shard_data.get_shard(key, &self.shards);
204 shard.table.get(key).map(|x| x.as_ref(&shard.subsets))
205 }
206 fn add_row(&mut self, vals: &[Value], row: RowId) {
207 for key in vals {
209 let shard = self.shard_data.get_shard_mut(key, &mut self.shards);
210 unsafe {
211 shard
212 .table
213 .entry(*key)
214 .or_insert_with(BufferedSubset::empty)
215 .add_row_sorted(row, &mut shard.subsets);
216 }
217 }
218 }
219 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
220 for (src_id, key) in buf.iter() {
221 self.add_row(key, src_id);
222 }
223 }
224 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
225 for (subsets, (k, v)) in self
226 .shards
227 .iter()
228 .flat_map(|(_, shard)| shard.table.iter().map(|x| (&shard.subsets, x)))
229 {
230 f(k, v.as_ref(subsets));
231 }
232 }
233 fn len(&self) -> usize {
234 self.shards.iter().map(|(_, shard)| shard.table.len()).sum()
235 }
236
237 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
238 const BATCH_SIZE: usize = 1024;
239 let shard_data = self.shard_data;
240 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
241 shard_data.n_shards(),
242 );
243 queues.resize_with(shard_data.n_shards(), || {
244 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
245 });
246 let split_buf = |buf: TaggedRowBuffer| {
247 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
248 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(1));
249 for (row_id, keys) in buf.non_stale() {
250 for key in keys {
251 shard_data
252 .get_shard_mut(*key, &mut split)
253 .add_row(row_id, &[*key]);
254 }
255 }
256 for (shard_id, buf) in split.drain() {
257 if buf.is_empty() {
258 continue;
259 }
260 let first = buf.get_row(RowId::new(0)).0;
261 queues[shard_id].lock().unwrap().push((first, buf));
262 }
263 };
264
265 run_in_thread_pool_and_block(&THREAD_POOL, || {
266 rayon::in_place_scope(|inner| {
267 let mut cur = Offset::new(0);
268 loop {
269 let mut buf = TaggedRowBuffer::new(cols.len());
270 if let Some(next) =
271 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
272 {
273 cur = next;
274 inner.spawn(move |_| split_buf(buf));
275 } else {
276 inner.spawn(move |_| split_buf(buf));
277 break;
278 }
279 }
280 });
281
282 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
283 use indexmap::map::Entry;
284 let mut vec = queues[shard_id].lock().unwrap();
286 vec.sort_by_key(|(start, _)| *start);
287 for (_, buf) in vec.drain(..) {
288 for (row_id, key) in buf.non_stale() {
289 debug_assert_eq!(key.len(), 1);
290 match shard.table.entry(key[0]) {
291 Entry::Occupied(mut occ) => {
292 unsafe {
294 occ.get_mut().add_row_sorted(row_id, &mut shard.subsets);
295 }
296 }
297 Entry::Vacant(v) => {
298 v.insert(BufferedSubset::singleton(row_id));
299 }
300 }
301 }
302 }
303 });
304 });
305 }
306}
307
308fn run_in_thread_pool_and_block<'a>(pool: &rayon::ThreadPool, f: impl FnMut() + Send + 'a) {
320 trait LifetimeWork<'a>: FnMut() + Send + 'a {}
327
328 impl<'a, F: FnMut() + Send + 'a> LifetimeWork<'a> for F {}
329 let as_lifetime: Box<dyn LifetimeWork<'a>> = Box::new(f);
330 let mut casted_away = unsafe {
331 mem::transmute::<Box<dyn LifetimeWork<'a>>, Box<dyn LifetimeWork<'static>>>(as_lifetime)
334 };
335 let n = Arc::new(Notification::new());
336 let inner = n.clone();
337 pool.spawn(move || {
338 casted_away();
339 mem::drop(casted_away);
340 inner.notify();
341 });
342 n.wait()
343}
344
345impl ColumnIndex {
346 pub(crate) fn new() -> ColumnIndex {
347 with_pool_set(|ps| {
348 let shard_data = ShardData::new(num_shards());
349 let mut shards = IdVec::with_capacity(shard_data.n_shards());
350 shards.resize_with(shard_data.n_shards(), || ColumnIndexShard {
351 table: ps.get(),
352 subsets: SubsetBuffer::default(),
353 });
354 ColumnIndex { shard_data, shards }
355 })
356 }
357}
358
359#[derive(Clone)]
360struct TupleIndexShard {
361 table: SubsetTable,
362 subsets: SubsetBuffer,
363}
364
365#[derive(Clone)]
367pub struct TupleIndex {
368 shard_data: ShardData,
371 shards: IdVec<ShardId, TupleIndexShard>,
372}
373
374impl TupleIndex {
375 pub(crate) fn new(key_arity: usize) -> TupleIndex {
376 let shard_data = ShardData::new(num_shards());
377 let mut shards = IdVec::with_capacity(shard_data.n_shards());
378 shards.resize_with(shard_data.n_shards(), || TupleIndexShard {
379 table: SubsetTable::new(key_arity),
380 subsets: SubsetBuffer::default(),
381 });
382 TupleIndex { shard_data, shards }
383 }
384}
385
386impl IndexBase for TupleIndex {
387 type Key = [Value];
388 type WriteKey = Self::Key;
389
390 fn clear(&mut self) {
391 for (_, shard) in self.shards.iter_mut() {
392 shard.table.keys.clear();
393 for entry in shard.table.hash.drain() {
394 match entry.vals {
395 BufferedSubset::Dense(_) => {}
396 BufferedSubset::Sparse(v) => {
397 shard.subsets.return_vec(v);
398 }
399 }
400 }
401 }
402 }
403
404 fn get_subset<'a>(&'a self, key: &[Value]) -> Option<SubsetRef<'a>> {
405 let hash = hash_key(key);
406 let shard = &self.shards[self.shard_data.shard_id(hash)];
407 let entry = shard.table.hash.find(hash, |entry| {
408 entry.hash == hash && shard.table.keys.get_row(entry.key) == key
409 })?;
410 Some(entry.vals.as_ref(&shard.subsets))
411 }
412
413 fn add_row(&mut self, key: &[Value], row: RowId) {
414 use hashbrown::hash_table::Entry;
415 let hash = hash_key(key);
416 let shard = &mut self.shards[self.shard_data.shard_id(hash)];
417 let table_entry = shard.table.hash.entry(
418 hash,
419 |entry| entry.hash == hash && shard.table.keys.get_row(entry.key) == key,
420 |ent| ent.hash,
421 );
422 match table_entry {
423 Entry::Occupied(mut occ) => {
424 unsafe {
426 occ.get_mut().vals.add_row_sorted(row, &mut shard.subsets);
427 }
428 }
429 Entry::Vacant(v) => {
430 let key_id = shard.table.keys.add_row(key);
431 let subset = BufferedSubset::singleton(row);
432 v.insert(TableEntry {
433 hash,
434 key: key_id,
435 vals: subset,
436 });
437 }
438 }
439 }
440
441 fn merge_rows(&mut self, buf: &TaggedRowBuffer) {
442 for (src_id, key) in buf.iter() {
443 self.add_row(key, src_id);
444 }
445 }
446 fn for_each(&self, mut f: impl FnMut(&Self::Key, SubsetRef)) {
447 for (_, shard) in self.shards.iter() {
448 for entry in shard.table.hash.iter() {
449 let key = shard.table.keys.get_row(entry.key);
450 f(key, entry.vals.as_ref(&shard.subsets));
451 }
452 }
453 }
454
455 fn len(&self) -> usize {
456 self.shards
457 .iter()
458 .map(|(_, shard)| shard.table.hash.len())
459 .sum()
460 }
461
462 fn merge_parallel(&mut self, cols: &[ColumnId], table: WrappedTableRef, subset: SubsetRef) {
463 const BATCH_SIZE: usize = 1024;
467 let shard_data = self.shard_data;
468 let mut queues = IdVec::<ShardId, Mutex<Vec<(RowId, TaggedRowBuffer)>>>::with_capacity(
469 shard_data.n_shards(),
470 );
471 queues.resize_with(shard_data.n_shards(), || {
472 Mutex::new(Vec::with_capacity((subset.size() / BATCH_SIZE) + 1))
473 });
474 let split_buf = |buf: TaggedRowBuffer| {
475 let mut split = IdVec::<ShardId, TaggedRowBuffer>::default();
476 split.resize_with(shard_data.n_shards(), || TaggedRowBuffer::new(cols.len()));
477 for (row_id, key) in buf.non_stale() {
478 shard_data
479 .get_shard_mut(key, &mut split)
480 .add_row(row_id, key);
481 }
482 for (shard_id, buf) in split.drain() {
483 if buf.is_empty() {
484 continue;
485 }
486 let first = buf.get_row(RowId::new(0)).0;
487 queues[shard_id].lock().unwrap().push((first, buf));
488 }
489 };
490 run_in_thread_pool_and_block(&THREAD_POOL, || {
491 rayon::scope(|scope| {
492 let mut cur = Offset::new(0);
493 loop {
494 let mut buf = TaggedRowBuffer::new(cols.len());
495 if let Some(next) =
496 table.scan_project(subset, cols, cur, BATCH_SIZE, &[], &mut buf)
497 {
498 cur = next;
499 scope.spawn(move |_| split_buf(buf));
500 } else {
501 scope.spawn(move |_| split_buf(buf));
502 break;
503 }
504 }
505 });
506 self.shards.par_iter_mut().for_each(|(shard_id, shard)| {
507 use hashbrown::hash_table::Entry;
508 let mut vec = queues[shard_id].lock().unwrap();
510 vec.sort_by_key(|(start, _)| *start);
511 for (_, buf) in vec.drain(..) {
512 for (row_id, key) in buf.non_stale() {
513 let hash = hash_key(key);
514 let table_entry = shard.table.hash.entry(
515 hash,
516 |entry| {
517 entry.hash == hash && shard.table.keys.get_row(entry.key) == key
518 },
519 |ent| ent.hash,
520 );
521 match table_entry {
522 Entry::Occupied(mut occ) => {
523 unsafe {
525 occ.get_mut()
526 .vals
527 .add_row_sorted(row_id, &mut shard.subsets);
528 }
529 }
530 Entry::Vacant(v) => {
531 let key_id = shard.table.keys.add_row(key);
532 let subset = BufferedSubset::singleton(row_id);
533 v.insert(TableEntry {
534 hash,
535 key: key_id,
536 vals: subset,
537 });
538 }
539 }
540 }
541 }
542 });
543 });
544 }
545}
546
547fn hash_key(key: &[Value]) -> u64 {
548 let mut hasher = FxHasher::default();
549 key.hash(&mut hasher);
550 hasher.finish()
551}
552
553define_id!(BufferIndex, u32, "an index into a subset buffer");
554
555struct SubsetBuffer {
565 buf: Pooled<Vec<RowId>>,
566 free_list: FreeList,
567}
568
569impl Clone for SubsetBuffer {
570 fn clone(&self) -> Self {
571 SubsetBuffer {
572 buf: Pooled::cloned(&self.buf),
573 free_list: self.free_list.clone(),
574 }
575 }
576}
577
578impl Default for SubsetBuffer {
579 fn default() -> SubsetBuffer {
580 with_pool_set(|ps| SubsetBuffer {
581 buf: ps.get(),
582 free_list: Default::default(),
583 })
584 }
585}
586
587impl SubsetBuffer {
588 fn new_vec(&mut self, rows: impl ExactSizeIterator<Item = RowId>) -> BufferedVec {
589 let len = rows.len();
590 if let Some(v) = self.free_list.get_size_class(len).pop() {
591 return self.fill_at(v, rows);
592 }
593 let start = BufferIndex::from_usize(self.buf.len());
594 self.buf.resize(
595 start.index() + len.next_power_of_two(),
596 RowId::new(u32::MAX),
597 );
598 self.fill_at(start, rows)
599 }
600
601 fn fill_at(
602 &mut self,
603 start: BufferIndex,
604 rows: impl ExactSizeIterator<Item = RowId>,
605 ) -> BufferedVec {
606 let mut cur = start;
607 for i in rows {
608 self.buf[cur.index()] = i;
609 cur = cur.inc();
610 }
611 BufferedVec(start, cur)
612 }
613
614 fn return_vec(&mut self, vec: BufferedVec) {
615 self.free_list.get_size_class(vec.len()).push(vec.0);
616 }
617
618 fn push_vec(&mut self, vec: BufferedVec, row: RowId) -> BufferedVec {
619 assert!(
620 vec.is_empty() || self.buf[vec.1.index() - 1] <= row,
621 "vec={vec:?}, row={row:?}, last_elt={:?}",
622 self.buf[vec.1.index() - 1]
623 );
624 if !vec.len().is_power_of_two() {
625 self.buf[vec.1.index()] = row;
626 return BufferedVec(vec.0, vec.1.inc());
627 }
628
629 let res = if let Some(v) = self.free_list.get_size_class(vec.len() + 1).pop() {
630 self.buf
631 .copy_within(vec.0.index()..vec.1.index(), v.index());
632 self.buf[v.index() + vec.len()] = row;
633 BufferedVec(v, BufferIndex::from_usize(v.index() + vec.len() + 1))
634 } else {
635 let start = self.buf.len();
636 self.buf.resize(
637 start + (vec.len() + 1).next_power_of_two(),
638 RowId::new(u32::MAX),
639 );
640 self.buf.copy_within(vec.0.index()..vec.1.index(), start);
641 self.buf[start + vec.len()] = row;
642 let end = start + vec.len() + 1;
643 BufferedVec(BufferIndex::from_usize(start), BufferIndex::from_usize(end))
644 };
645 self.return_vec(vec);
646 res
647 }
648
649 fn make_ref<'a>(&'a self, vec: &BufferedVec) -> SubsetRef<'a> {
650 let res = SubsetRef::Sparse(unsafe {
655 SortedOffsetSlice::new_unchecked(&self.buf[vec.0.index()..vec.1.index()])
656 });
657 #[cfg(debug_assertions)]
658 {
659 use crate::offsets::Offsets;
660 res.offsets(|x| assert_ne!(x.rep(), u32::MAX))
661 }
662 res
663 }
664}
665
666#[derive(Debug, Clone)]
676pub(crate) struct BufferedVec(BufferIndex, BufferIndex);
677
678impl Default for BufferedVec {
679 fn default() -> Self {
680 BufferedVec(BufferIndex::new(0), BufferIndex::new(0))
681 }
682}
683
684impl BufferedVec {
685 fn is_empty(&self) -> bool {
686 self.0 == self.1
687 }
688 fn len(&self) -> usize {
689 self.1.index() - self.0.index()
690 }
691}
692
693#[derive(Clone)]
694pub(crate) enum BufferedSubset {
695 Dense(OffsetRange),
696 Sparse(BufferedVec),
697}
698
699impl BufferedSubset {
700 unsafe fn add_row_sorted(&mut self, row: RowId, buf: &mut SubsetBuffer) {
702 match self {
703 BufferedSubset::Dense(range) => {
704 if range.end == range.start {
705 range.start = row;
706 range.end = row.inc();
707 return;
708 }
709 if range.end == row {
710 range.end = row.inc();
711 return;
712 }
713 let mut v = buf.new_vec((range.start.rep()..range.end.rep()).map(RowId::new));
714 v = buf.push_vec(v, row);
715 *self = BufferedSubset::Sparse(v);
716 }
717 BufferedSubset::Sparse(vec) => *vec = buf.push_vec(mem::take(vec), row),
718 }
719 }
720
721 fn empty() -> Self {
722 BufferedSubset::Dense(OffsetRange::new(RowId::new(0), RowId::new(0)))
723 }
724
725 fn singleton(row: RowId) -> Self {
726 BufferedSubset::Dense(OffsetRange::new(row, row.inc()))
727 }
728
729 fn as_ref<'a>(&self, buf: &'a SubsetBuffer) -> SubsetRef<'a> {
730 match self {
731 BufferedSubset::Dense(range) => SubsetRef::Dense(*range),
732 BufferedSubset::Sparse(vec) => buf.make_ref(vec),
733 }
734 }
735}
736
737fn num_shards() -> usize {
738 let n_threads = rayon::current_num_threads();
739 if n_threads == 1 { 1 } else { n_threads * 2 }
740}
741
742static THREAD_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
750 rayon::ThreadPoolBuilder::new()
751 .num_threads(rayon::current_num_threads())
752 .build()
753 .unwrap()
754});
755
756#[derive(Default, Clone)]
761pub(super) struct FreeList {
762 data: HashMap<usize, Vec<BufferIndex>>,
763}
764impl FreeList {
765 fn get_size_class(&mut self, size: usize) -> &mut Vec<BufferIndex> {
766 let size_class = size.next_power_of_two();
767 self.data.entry(size_class).or_default()
768 }
769}