egglog_core_relations/
common.rs

1use std::{
2    hash::{BuildHasherDefault, Hash, Hasher},
3    mem,
4    ops::Deref,
5    sync::{Arc, Mutex},
6};
7
8use crate::numeric_id::{DenseIdMap, IdVec, NumericId, define_id};
9use egglog_concurrency::ConcurrentVec;
10use hashbrown::HashTable;
11use rustc_hash::FxHasher;
12
13use crate::{Subset, TableId, TableVersion, WrappedTable, pool::Clear};
14
15pub(crate) type HashMap<K, V> = hashbrown::HashMap<K, V, BuildHasherDefault<FxHasher>>;
16pub(crate) type HashSet<T> = hashbrown::HashSet<T, BuildHasherDefault<FxHasher>>;
17pub(crate) type IndexSet<T> = indexmap::IndexSet<T, BuildHasherDefault<FxHasher>>;
18pub(crate) type IndexMap<K, V> = indexmap::IndexMap<K, V, BuildHasherDefault<FxHasher>>;
19pub(crate) type DashMap<K, V> = dashmap::DashMap<K, V, BuildHasherDefault<FxHasher>>;
20
21/// An intern table mapping a key to some numeric id type.
22///
23/// This is primarily used to manage the [`Value`]s associated with a a
24/// base value.
25#[derive(Clone)]
26pub struct InternTable<K, V> {
27    vals: Arc<ConcurrentVec<K>>,
28    data: Vec<Arc<Mutex<HashTable<V>>>>,
29    shards_log2: u32,
30}
31
32impl<K, V> Default for InternTable<K, V> {
33    fn default() -> Self {
34        Self::with_shards(4)
35    }
36}
37impl<K, V> InternTable<K, V> {
38    /// Create a new intern table with the given number of shards.
39    ///
40    /// The number of shards is passed as its base-2 log: we rely on the number
41    /// of shards being a power of two.
42    fn with_shards(shards_log2: u32) -> InternTable<K, V> {
43        let mut data = Vec::new();
44        data.resize_with(1 << shards_log2, Default::default);
45        InternTable {
46            vals: Arc::new(ConcurrentVec::with_capacity(512)),
47            data,
48            shards_log2,
49        }
50    }
51}
52
53impl<K: Eq + Hash + Clone, V: NumericId> InternTable<K, V> {
54    pub fn intern(&self, k: &K) -> V {
55        let hash = hash_value(k);
56        // Use the top bits of the hash to pick the shard. Hashbrown uses the
57        // bottom bits.
58        let shard = ((hash >> (64 - self.shards_log2)) & ((1 << self.shards_log2) - 1)) as usize;
59        let mut table = self.data[shard].lock().unwrap();
60        let read_guard = self.vals.read();
61        if let Some(v) = table.find(hash, |v| k == &read_guard[v.index()]) {
62            *v
63        } else {
64            mem::drop(read_guard);
65            let res = V::from_usize(self.vals.push(k.clone()));
66            let read_guard = self.vals.read();
67            *table
68                .insert_unique(hash, res, |v| hash_value(&read_guard[v.index()]))
69                .get()
70        }
71    }
72
73    pub fn get(&self, v: V) -> impl Deref<Target = K> + '_ {
74        MapDeref {
75            base: self.vals.read(),
76            index: v.index(),
77        }
78    }
79
80    pub fn get_cloned(&self, v: V) -> K {
81        self.vals.read()[v.index()].clone()
82    }
83}
84
85fn hash_value(v: &impl Hash) -> u64 {
86    let mut hasher = FxHasher::default();
87    v.hash(&mut hasher);
88    hasher.finish()
89}
90
91impl<K: NumericId, V> Clear for DenseIdMap<K, V> {
92    fn reuse(&self) -> bool {
93        self.capacity() > 0
94    }
95    fn clear(&mut self) {
96        self.clear();
97    }
98    fn bytes(&self) -> usize {
99        self.capacity() * mem::size_of::<Option<V>>()
100    }
101}
102
103define_id!(pub Value, u32, "A generic identifier representing an egglog value");
104
105impl Value {
106    pub(crate) fn stale() -> Self {
107        Value::new(u32::MAX)
108    }
109    /// Values have a special "Stale" value that is used to indicate that the
110    /// value isn't intended to be read.
111    pub(crate) fn set_stale(&mut self) {
112        self.rep = u32::MAX;
113    }
114
115    /// Whether or not the given value is stale. See [`Value::set_stale`].
116    pub(crate) fn is_stale(&self) -> bool {
117        self.rep == u32::MAX
118    }
119}
120
121struct MapDeref<T> {
122    base: T,
123    index: usize,
124}
125
126impl<S, T: Deref<Target = [S]>> Deref for MapDeref<T> {
127    type Target = S;
128
129    fn deref(&self) -> &S {
130        &(&*self.base)[self.index]
131    }
132}
133
134define_id!(pub(crate) ShardId, u32, "an identifier pointing to a shard in a sharded hash table");
135
136/// Sharding metadata used for sharding hash tables.
137///
138/// This is a separate type in order to allow other data-structures to pre-shard
139/// data bound for a particular table.
140#[derive(Copy, Clone)]
141pub(crate) struct ShardData {
142    log2_shard_count: u32,
143}
144
145impl ShardData {
146    pub(crate) fn new(n_shards: usize) -> Self {
147        Self {
148            log2_shard_count: n_shards.next_power_of_two().trailing_zeros(),
149        }
150    }
151    pub(crate) fn n_shards(&self) -> usize {
152        1 << self.log2_shard_count
153    }
154    pub(crate) fn shard_id(&self, hash: u64) -> ShardId {
155        let high_bits = (hash.wrapping_shr(64 - (self.log2_shard_count + 7)))
156            & ((1 << self.log2_shard_count) - 1);
157        ShardId::from_usize(high_bits as usize)
158    }
159    pub(crate) fn get_shard<'a, K: ?Sized, V>(&self, val: &K, table: &'a IdVec<ShardId, V>) -> &'a V
160    where
161        for<'b> &'b K: Hash,
162    {
163        let hc = {
164            let mut hasher = FxHasher::default();
165            val.hash(&mut hasher);
166            hasher.finish()
167        };
168        &table[self.shard_id(hc)]
169    }
170
171    pub(crate) fn get_shard_mut<'a, V>(
172        &self,
173        val: impl Hash,
174        table: &'a mut IdVec<ShardId, V>,
175    ) -> &'a mut V {
176        let hc = {
177            let mut hasher = FxHasher::default();
178            val.hash(&mut hasher);
179            hasher.finish()
180        };
181        &mut table[self.shard_id(hc)]
182    }
183}
184
185/// A simple helper struct used when handling incremental rebuilds that tracks the subsets of set
186/// of tables that have been passed to the tracker.
187#[derive(Clone, Default)]
188pub(crate) struct SubsetTracker {
189    last_rebuilt_at: DenseIdMap<TableId, TableVersion>,
190}
191
192impl SubsetTracker {
193    /// Hand back the subset of the table needed to be scanned in order to see all updates since
194    /// the last call to this method.
195    ///
196    /// If the given table's major version has been incremented, this method will return the whole
197    /// table. In other words, this method does not guarantee that the returned subset is disjoint
198    /// from ones that have been returned in the past.
199    pub(crate) fn recent_updates(&mut self, table_id: TableId, table: &WrappedTable) -> Subset {
200        let current_version = table.version();
201        let res = if let Some(last_version) = self.last_rebuilt_at.get(table_id) {
202            if current_version.major == last_version.major {
203                table.updates_since(last_version.minor)
204            } else {
205                table.all()
206            }
207        } else {
208            table.all()
209        };
210        self.last_rebuilt_at.insert(table_id, current_version);
211        res
212    }
213}
214
215/// Iterate over the contents of a `DashMap`, using a lower-overhead method than the built-in
216/// iterators available from `DashMap`.
217pub(crate) fn iter_dashmap_bulk<K: Hash + Eq, V>(
218    map: &mut DashMap<K, V>,
219    mut f: impl FnMut(&K, &mut V),
220) {
221    let shards = map.shards_mut();
222    for shard in shards {
223        let mut_shard = shard.get_mut();
224        // SAFETY: this iterator does not outlive `shard`: it is not returned from this function
225        // and `f` cannot store it anywhere as it takes an arbitrary lifetime.
226        for entry in unsafe { mut_shard.iter() } {
227            // SAFETY: we have exclusive access to the whole table.
228            let (k, v) = unsafe { entry.as_mut() };
229            f(k, v.get_mut());
230        }
231    }
232}