egglog_core_relations/pool/
mod.rs

1//! Utilities for pooling object allocations.
2
3use std::{
4    cell::{Cell, RefCell},
5    fmt,
6    hash::{Hash, Hasher},
7    mem::{self, ManuallyDrop},
8    ops::{Deref, DerefMut},
9    ptr,
10    rc::Rc,
11};
12
13use crate::{
14    AtomId,
15    numeric_id::{DenseIdMap, IdVec},
16};
17use fixedbitset::FixedBitSet;
18use hashbrown::HashTable;
19
20use crate::{
21    ColumnId, RowId,
22    action::{Instr, PredictedVals},
23    common::{HashMap, HashSet, IndexMap, IndexSet, ShardId, Value},
24    hash_index::{BufferedSubset, ColumnIndex, TableEntry},
25    offsets::SortedOffsetVector,
26    table::TableEntry as SwTableEntry,
27    table_spec::Constraint,
28};
29
30#[cfg(test)]
31mod tests;
32
33/// A trait for types whose allocations can be reused.
34pub trait Clear: Default {
35    /// Clear the object.
36    ///
37    /// The end result must be equivalent to `Self::default()`.
38    fn clear(&mut self);
39    /// Indicate whether or not this object should be reused.
40    fn reuse(&self) -> bool {
41        true
42    }
43    /// A rough approximation for the in-memory overhead of this object.
44    fn bytes(&self) -> usize;
45}
46
47impl<T> Clear for Vec<T> {
48    fn clear(&mut self) {
49        self.clear()
50    }
51    fn reuse(&self) -> bool {
52        self.capacity() > 256
53    }
54    fn bytes(&self) -> usize {
55        self.capacity() * mem::size_of::<T>()
56    }
57}
58
59impl<T: Clear> Clear for Rc<T> {
60    fn clear(&mut self) {
61        Rc::get_mut(self).unwrap().clear()
62    }
63    fn reuse(&self) -> bool {
64        Rc::strong_count(self) == 1 && Rc::weak_count(self) == 0
65    }
66    fn bytes(&self) -> usize {
67        mem::size_of::<T>()
68    }
69}
70
71impl<T: Clear> Clone for Pooled<Rc<T>>
72where
73    Rc<T>: InPoolSet<PoolSet>,
74{
75    fn clone(&self) -> Self {
76        Pooled {
77            data: self.data.clone(),
78        }
79    }
80}
81
82impl<T> Clear for HashSet<T> {
83    fn clear(&mut self) {
84        self.clear()
85    }
86    fn reuse(&self) -> bool {
87        self.capacity() > 0
88    }
89    fn bytes(&self) -> usize {
90        self.capacity() * mem::size_of::<T>()
91    }
92}
93
94impl<T> Clear for HashTable<T> {
95    fn clear(&mut self) {
96        self.clear()
97    }
98    fn reuse(&self) -> bool {
99        self.capacity() > 0
100    }
101    fn bytes(&self) -> usize {
102        self.capacity() * mem::size_of::<T>()
103    }
104}
105
106impl<K, V> Clear for HashMap<K, V> {
107    fn clear(&mut self) {
108        self.clear()
109    }
110    fn reuse(&self) -> bool {
111        self.capacity() > 0
112    }
113    fn bytes(&self) -> usize {
114        self.capacity() * mem::size_of::<(K, V)>()
115    }
116}
117
118impl<K, V> Clear for IndexMap<K, V> {
119    fn clear(&mut self) {
120        self.clear()
121    }
122    fn reuse(&self) -> bool {
123        self.capacity() > 0
124    }
125    fn bytes(&self) -> usize {
126        self.capacity() * (mem::size_of::<u64>() + mem::size_of::<(K, V)>())
127    }
128}
129
130impl<T> Clear for IndexSet<T> {
131    fn clear(&mut self) {
132        self.clear()
133    }
134    fn reuse(&self) -> bool {
135        self.capacity() > 0
136    }
137    fn bytes(&self) -> usize {
138        self.capacity() * (mem::size_of::<u64>() + mem::size_of::<T>())
139    }
140}
141
142impl Clear for FixedBitSet {
143    fn clear(&mut self) {
144        self.clone_from(&Default::default());
145    }
146    fn reuse(&self) -> bool {
147        !self.is_empty()
148    }
149    fn bytes(&self) -> usize {
150        self.len() / 8
151    }
152}
153
154impl<K, V> Clear for IdVec<K, V> {
155    fn clear(&mut self) {
156        self.clear()
157    }
158    fn reuse(&self) -> bool {
159        self.capacity() > 0
160    }
161    fn bytes(&self) -> usize {
162        self.capacity() * mem::size_of::<V>()
163    }
164}
165
166struct PoolState<T> {
167    data: Vec<T>,
168    bytes: usize,
169    limit: usize,
170}
171
172impl<T: Clear> PoolState<T> {
173    fn new(limit: usize) -> Self {
174        PoolState {
175            data: Vec::new(),
176            bytes: 0,
177            limit,
178        }
179    }
180
181    fn push(&mut self, mut item: T) {
182        if !item.reuse() {
183            return;
184        }
185        if self.bytes + item.bytes() > self.limit {
186            return;
187        }
188        item.clear();
189        self.bytes += item.bytes();
190        self.data.push(item);
191    }
192
193    fn pop(&mut self) -> T {
194        if let Some(got) = self.data.pop() {
195            self.bytes -= got.bytes();
196            got
197        } else {
198            Default::default()
199        }
200    }
201
202    fn clear_and_shrink(&mut self) {
203        self.data.clear();
204        self.bytes = 0;
205        self.data.shrink_to_fit();
206    }
207}
208
209/// A shared pool of objects.
210pub struct Pool<T> {
211    data: Rc<RefCell<PoolState<T>>>,
212}
213
214impl<T> Clone for Pool<T> {
215    fn clone(&self) -> Self {
216        Pool {
217            data: self.data.clone(),
218        }
219    }
220}
221
222impl<T: Clear> Default for Pool<T> {
223    fn default() -> Self {
224        Pool {
225            data: Rc::new(RefCell::new(PoolState::new(usize::MAX))),
226        }
227    }
228}
229
230impl<T: Clear + InPoolSet<PoolSet>> Pool<T> {
231    pub(crate) fn new(limit: usize) -> Pool<T> {
232        Pool {
233            data: Rc::new(RefCell::new(PoolState::new(limit))),
234        }
235    }
236    /// Get an empty value of type `T`, potentially reused from the pool.
237    pub(crate) fn get(&self) -> Pooled<T> {
238        let empty = self.data.borrow_mut().pop();
239
240        Pooled {
241            data: ManuallyDrop::new(empty),
242        }
243    }
244
245    /// Clear the contents of the pool and release any memory associated with it.
246    pub(crate) fn clear(&self) {
247        let mut data_mut = self.data.borrow_mut();
248        data_mut.clear_and_shrink();
249    }
250}
251
252/// An owned value of type `T` that can be returned to a memory pool when it is
253/// no longer used.
254pub struct Pooled<T: Clear + InPoolSet<PoolSet>> {
255    data: ManuallyDrop<T>,
256}
257
258impl<T: Clear + InPoolSet<PoolSet>> Default for Pooled<T> {
259    fn default() -> Self {
260        with_pool_set(|ps| ps.get::<T>())
261    }
262}
263
264impl<T: Clear + fmt::Debug + InPoolSet<PoolSet>> fmt::Debug for Pooled<T> {
265    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266        let data: &T = &self.data;
267        data.fmt(f)
268    }
269}
270impl<T: Clear + PartialEq + InPoolSet<PoolSet>> PartialEq for Pooled<T> {
271    fn eq(&self, other: &Self) -> bool {
272        // This form rid of a spuriou clippy warning about unconditional recursion.
273        <T as PartialEq>::eq(&self.data, &other.data)
274    }
275}
276
277impl<T: Clear + InPoolSet<PoolSet> + Eq> Eq for Pooled<T> {}
278
279impl<T: Clear + Hash + InPoolSet<PoolSet>> Hash for Pooled<T> {
280    fn hash<H: Hasher>(&self, state: &mut H) {
281        self.data.hash(state)
282    }
283}
284
285impl<T: Clear + InPoolSet<PoolSet> + 'static> Pooled<T> {
286    /// Clear the contents the wrapped object. If the object cannot be reused,
287    /// attempt to fetch another value from the pool.
288    ///
289    /// This method can be used in concert with `relinquish` to provide a
290    /// `clear` operation that hands data back to the pool, and then grabs it
291    /// back again if it needs to be reused.
292    ///
293    /// This pattern is likely only suitable for "temporary" buffers.
294    pub(crate) fn refresh(this: &mut Pooled<T>) {
295        this.data.clear();
296        if this.data.reuse() {
297            return;
298        }
299        let pool = with_pool_set(|ps| ps.get_pool::<T>());
300        let mut other = pool.data.borrow_mut().pop();
301        if !other.reuse() {
302            return;
303        }
304        let slot: &mut T = &mut this.data;
305        mem::swap(slot, &mut other);
306    }
307
308    pub(crate) fn into_inner(this: Pooled<T>) -> T {
309        // SAFETY: ownership of `this.data` is transferred to the caller. We
310        // will not drop `this` or use it again.
311        let inner = unsafe { ptr::read(&this.data) };
312        mem::forget(this);
313        ManuallyDrop::into_inner(inner)
314    }
315
316    pub(crate) fn new(data: T) -> Pooled<T> {
317        Pooled {
318            data: ManuallyDrop::new(data),
319        }
320    }
321}
322
323impl<T: Clear + Clone + InPoolSet<PoolSet>> Pooled<T> {
324    pub(crate) fn cloned(this: &Pooled<T>) -> Pooled<T> {
325        let mut res = with_pool_set(|ps| ps.get::<T>());
326        res.clone_from(this);
327        res
328    }
329}
330
331impl<T: Clear + InPoolSet<PoolSet>> Drop for Pooled<T> {
332    fn drop(&mut self) {
333        let reuse = self.data.reuse();
334        if !reuse {
335            // SAFETY: we own `self.data` and being in the drop method means no
336            // one else will access it.
337            unsafe { ManuallyDrop::drop(&mut self.data) };
338            return;
339        }
340        self.data.clear();
341        let t: &T = &self.data;
342        // SAFETY: ownership of `self.data` is transferred to the pool
343        with_pool_set(|ps| {
344            T::with_pool(ps, |pool| {
345                pool.data.borrow_mut().push(unsafe { ptr::read(t) })
346            })
347        });
348    }
349}
350
351impl<T: Clear + InPoolSet<PoolSet>> Deref for Pooled<T> {
352    type Target = T;
353
354    fn deref(&self) -> &T {
355        &self.data
356    }
357}
358
359impl<T: Clear + InPoolSet<PoolSet>> DerefMut for Pooled<T> {
360    fn deref_mut(&mut self) -> &mut T {
361        &mut self.data
362    }
363}
364
365/// Helper trait for allowing the trait resolution system to infer the correct
366/// pool type during allocation.
367pub trait InPoolSet<PoolSet>
368where
369    Self: Sized + Clear,
370{
371    fn with_pool<R>(pool_set: &PoolSet, f: impl FnOnce(&Pool<Self>) -> R) -> R;
372}
373
374macro_rules! pool_set {
375    ($vis:vis $name:ident { $($ident:ident : $ty:ty [ $bytes:expr ],)* }) => {
376        $vis struct $name {
377            $(
378                $ident: Pool<$ty>,
379            )*
380        }
381
382        impl Default for $name {
383            fn default() -> Self {
384                $name {
385                $(
386                    $ident: Pool::new($bytes),
387                )*
388                }
389            }
390        }
391
392        impl $name {
393            $vis fn get_pool<T: InPoolSet<Self>>(&self) -> Pool<T> {
394                T::with_pool(self, Pool::clone)
395            }
396
397            $vis fn get<T: InPoolSet<Self> + Default>(&self) -> Pooled<T> {
398                self.get_pool().get()
399            }
400            $vis fn clear(&self) {
401                $( self.$ident.clear(); )*
402            }
403        }
404
405        $(
406            impl InPoolSet<$name> for $ty {
407                fn with_pool<R>(pool_set: &$name, f: impl FnOnce(&Pool<Self>) -> R) -> R {
408                    f(&pool_set.$ident)
409                }
410            }
411        )*
412    }
413}
414
415// The main thread-local memory pool used for reusing allocations. The syntax is:
416//
417// <name> : <type> [ <bytes> ],
418//
419// Where `name` is not used for anything, `type` feeds into the `InPoolSet` machinery and allows
420// anything of that type to be allocated using `with_pool_set`, and `bytes` is a per-type limit on
421// the total bytes that can be buffered in a single (per-thread) memory pool.
422
423pool_set! {
424    pub PoolSet {
425        vec_vals: Vec<Value> [ 1 << 25 ],
426        vec_cell_vals: Vec<Cell<Value>> [ 1 << 25 ],
427        // TODO: work on scaffolding/DI/etc. so that we can share allocations
428        // between vec_vals and shared_vals.
429        rows: Vec<RowId> [ 1 << 25 ],
430        offset_vec: SortedOffsetVector [ 1 << 20 ],
431        column_index: IndexMap<Value, BufferedSubset> [ 1 << 20 ],
432        constraints: Vec<Constraint> [ 1 << 20 ],
433        bitsets: FixedBitSet [ 1 << 20 ],
434        instrs: Vec<Instr> [ 1 << 20 ],
435        tuple_indexes: HashTable<TableEntry<BufferedSubset>> [ 1 << 20 ],
436        staged_outputs: HashTable<SwTableEntry> [ 1 << 25 ],
437        predicted_vals: PredictedVals [ 1 << 20 ],
438        shard_hist: DenseIdMap<ShardId, usize> [ 1 << 20 ],
439        instr_indexes: Vec<u32> [ 1 << 20 ],
440        cached_subsets: IdVec<ColumnId, std::sync::OnceLock<std::sync::Arc<ColumnIndex>>> [ 4 << 20 ],
441        intersected_on: DenseIdMap<AtomId, i64> [ 1 << 20 ],
442    }
443}
444
445/// Run `f` on the thread-local [`PoolSet`].
446pub(crate) fn with_pool_set<R>(f: impl FnOnce(&PoolSet) -> R) -> R {
447    POOL_SET.with(|pool_set| f(pool_set))
448}
449
450thread_local! {
451    /// A thread-local pool set. All pooled allocations land back in the local thread.
452    ///
453    /// We don't drop this PoolSet because it does not contain any resources
454    /// that need to be released, other than memory (which will be reclaimed
455    /// when the process exits, right after drop runs).
456    ///
457    /// For large egraphs, this be a big runtime win. The main egglog binary
458    /// avoids dropping the egraph for the same reason.
459    static POOL_SET: ManuallyDrop<PoolSet> = Default::default();
460}