egglog_core_relations/
common.rs1use std::{
2 hash::{BuildHasherDefault, Hash, Hasher},
3 mem,
4 ops::Deref,
5 sync::{Arc, Mutex},
6};
7
8use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
9use egglog_concurrency::ConcurrentVec;
10use hashbrown::HashTable;
11use rustc_hash::FxHasher;
12
13use crate::{Subset, TableId, TableVersion, WrappedTable, pool::Clear};
14
15pub(crate) type HashMap<K, V> = hashbrown::HashMap<K, V, BuildHasherDefault<FxHasher>>;
16pub(crate) type HashSet<T> = hashbrown::HashSet<T, BuildHasherDefault<FxHasher>>;
17pub(crate) type IndexSet<T> = indexmap::IndexSet<T, BuildHasherDefault<FxHasher>>;
18pub(crate) type IndexMap<K, V> = indexmap::IndexMap<K, V, BuildHasherDefault<FxHasher>>;
19pub(crate) type DashMap<K, V> = dashmap::DashMap<K, V, BuildHasherDefault<FxHasher>>;
20
21#[derive(Clone)]
26pub struct InternTable<K, V> {
27 vals: Arc<ConcurrentVec<K>>,
28 data: Vec<Arc<Mutex<HashTable<V>>>>,
29 shards_log2: u32,
30}
31
32impl<K, V> Default for InternTable<K, V> {
33 fn default() -> Self {
34 Self::with_shards(4)
35 }
36}
37impl<K, V> InternTable<K, V> {
38 fn with_shards(shards_log2: u32) -> InternTable<K, V> {
43 let mut data = Vec::new();
44 data.resize_with(1 << shards_log2, Default::default);
45 InternTable {
46 vals: Arc::new(ConcurrentVec::with_capacity(512)),
47 data,
48 shards_log2,
49 }
50 }
51}
52
53impl<K: Eq + Hash + Clone, V: NumericId> InternTable<K, V> {
54 pub fn intern(&self, k: &K) -> V {
55 let hash = hash_value(k);
56 let shard = ((hash >> (64 - self.shards_log2)) & ((1 << self.shards_log2) - 1)) as usize;
59 let mut table = self.data[shard].lock().unwrap();
60 let read_guard = self.vals.read();
61 if let Some(v) = table.find(hash, |v| k == &read_guard[v.index()]) {
62 *v
63 } else {
64 mem::drop(read_guard);
65 let res = V::from_usize(self.vals.push(k.clone()));
66 let read_guard = self.vals.read();
67 *table
68 .insert_unique(hash, res, |v| hash_value(&read_guard[v.index()]))
69 .get()
70 }
71 }
72
73 pub fn get(&self, v: V) -> impl Deref<Target = K> + '_ {
74 MapDeref {
75 base: self.vals.read(),
76 index: v.index(),
77 }
78 }
79
80 pub fn get_cloned(&self, v: V) -> K {
81 self.vals.read()[v.index()].clone()
82 }
83}
84
85fn hash_value(v: &impl Hash) -> u64 {
86 let mut hasher = FxHasher::default();
87 v.hash(&mut hasher);
88 hasher.finish()
89}
90
91impl<K: NumericId, V> Clear for DenseIdMap<K, V> {
92 fn reuse(&self) -> bool {
93 self.capacity() > 0
94 }
95 fn clear(&mut self) {
96 self.clear();
97 }
98 fn bytes(&self) -> usize {
99 self.capacity() * mem::size_of::<Option<V>>()
100 }
101}
102
103define_id!(pub Value, u32, "A generic identifier representing an egglog value");
104
105impl Value {
106 pub(crate) fn stale() -> Self {
107 Value::new(u32::MAX)
108 }
109 pub(crate) fn set_stale(&mut self) {
112 self.rep = u32::MAX;
113 }
114
115 pub(crate) fn is_stale(&self) -> bool {
117 self.rep == u32::MAX
118 }
119}
120
121struct MapDeref<T> {
122 base: T,
123 index: usize,
124}
125
126impl<S, T: Deref<Target = [S]>> Deref for MapDeref<T> {
127 type Target = S;
128
129 fn deref(&self) -> &S {
130 &(&*self.base)[self.index]
131 }
132}
133
134define_id!(pub(crate) ShardId, u32, "an identifier pointing to a shard in a sharded hash table");
135
136#[derive(Copy, Clone)]
141pub(crate) struct ShardData {
142 log2_shard_count: u32,
143}
144
145impl ShardData {
146 pub(crate) fn new(n_shards: usize) -> Self {
147 Self {
148 log2_shard_count: n_shards.next_power_of_two().trailing_zeros(),
149 }
150 }
151 pub(crate) fn n_shards(&self) -> usize {
152 1 << self.log2_shard_count
153 }
154 pub(crate) fn shard_id(&self, hash: u64) -> ShardId {
155 let high_bits = (hash.wrapping_shr(64 - (self.log2_shard_count + 7)))
156 & ((1 << self.log2_shard_count) - 1);
157 ShardId::from_usize(high_bits as usize)
158 }
159 pub(crate) fn get_shard<'a, K: ?Sized, V>(&self, val: &K, table: &'a IdVec<ShardId, V>) -> &'a V
160 where
161 for<'b> &'b K: Hash,
162 {
163 let hc = {
164 let mut hasher = FxHasher::default();
165 val.hash(&mut hasher);
166 hasher.finish()
167 };
168 &table[self.shard_id(hc)]
169 }
170
171 pub(crate) fn get_shard_mut<'a, V>(
172 &self,
173 val: impl Hash,
174 table: &'a mut IdVec<ShardId, V>,
175 ) -> &'a mut V {
176 let hc = {
177 let mut hasher = FxHasher::default();
178 val.hash(&mut hasher);
179 hasher.finish()
180 };
181 &mut table[self.shard_id(hc)]
182 }
183}
184
185#[derive(Clone, Default)]
188pub(crate) struct SubsetTracker {
189 last_rebuilt_at: DenseIdMap<TableId, TableVersion>,
190}
191
192impl SubsetTracker {
193 pub(crate) fn recent_updates(&mut self, table_id: TableId, table: &WrappedTable) -> Subset {
200 let current_version = table.version();
201 let res = if let Some(last_version) = self.last_rebuilt_at.get(table_id) {
202 if current_version.major == last_version.major {
203 table.updates_since(last_version.minor)
204 } else {
205 table.all()
206 }
207 } else {
208 table.all()
209 };
210 self.last_rebuilt_at.insert(table_id, current_version);
211 res
212 }
213}
214
215pub(crate) fn iter_dashmap_bulk<K: Hash + Eq, V>(
218 map: &mut DashMap<K, V>,
219 mut f: impl FnMut(&K, &mut V),
220) {
221 let shards = map.shards_mut();
222 for shard in shards {
223 let mut_shard = shard.get_mut();
224 for entry in unsafe { mut_shard.iter() } {
227 let (k, v) = unsafe { entry.as_mut() };
229 f(k, v.get_mut());
230 }
231 }
232}