egglog_core_relations/pool/
mod.rs1use std::{
4 cell::{Cell, RefCell},
5 fmt,
6 hash::{Hash, Hasher},
7 mem::{self, ManuallyDrop},
8 ops::{Deref, DerefMut},
9 ptr,
10 rc::Rc,
11};
12
13use crate::{
14 AtomId,
15 numeric_id::{DenseIdMap, IdVec},
16};
17use fixedbitset::FixedBitSet;
18use hashbrown::HashTable;
19
20use crate::{
21 ColumnId, RowId,
22 action::{Instr, PredictedVals},
23 common::{HashMap, HashSet, IndexMap, IndexSet, ShardId, Value},
24 hash_index::{BufferedSubset, ColumnIndex, TableEntry},
25 offsets::SortedOffsetVector,
26 table::TableEntry as SwTableEntry,
27 table_spec::Constraint,
28};
29
30#[cfg(test)]
31mod tests;
32
33pub trait Clear: Default {
35 fn clear(&mut self);
39 fn reuse(&self) -> bool {
41 true
42 }
43 fn bytes(&self) -> usize;
45}
46
47impl<T> Clear for Vec<T> {
48 fn clear(&mut self) {
49 self.clear()
50 }
51 fn reuse(&self) -> bool {
52 self.capacity() > 256
53 }
54 fn bytes(&self) -> usize {
55 self.capacity() * mem::size_of::<T>()
56 }
57}
58
59impl<T: Clear> Clear for Rc<T> {
60 fn clear(&mut self) {
61 Rc::get_mut(self).unwrap().clear()
62 }
63 fn reuse(&self) -> bool {
64 Rc::strong_count(self) == 1 && Rc::weak_count(self) == 0
65 }
66 fn bytes(&self) -> usize {
67 mem::size_of::<T>()
68 }
69}
70
71impl<T: Clear> Clone for Pooled<Rc<T>>
72where
73 Rc<T>: InPoolSet<PoolSet>,
74{
75 fn clone(&self) -> Self {
76 Pooled {
77 data: self.data.clone(),
78 }
79 }
80}
81
82impl<T> Clear for HashSet<T> {
83 fn clear(&mut self) {
84 self.clear()
85 }
86 fn reuse(&self) -> bool {
87 self.capacity() > 0
88 }
89 fn bytes(&self) -> usize {
90 self.capacity() * mem::size_of::<T>()
91 }
92}
93
94impl<T> Clear for HashTable<T> {
95 fn clear(&mut self) {
96 self.clear()
97 }
98 fn reuse(&self) -> bool {
99 self.capacity() > 0
100 }
101 fn bytes(&self) -> usize {
102 self.capacity() * mem::size_of::<T>()
103 }
104}
105
106impl<K, V> Clear for HashMap<K, V> {
107 fn clear(&mut self) {
108 self.clear()
109 }
110 fn reuse(&self) -> bool {
111 self.capacity() > 0
112 }
113 fn bytes(&self) -> usize {
114 self.capacity() * mem::size_of::<(K, V)>()
115 }
116}
117
118impl<K, V> Clear for IndexMap<K, V> {
119 fn clear(&mut self) {
120 self.clear()
121 }
122 fn reuse(&self) -> bool {
123 self.capacity() > 0
124 }
125 fn bytes(&self) -> usize {
126 self.capacity() * (mem::size_of::<u64>() + mem::size_of::<(K, V)>())
127 }
128}
129
130impl<T> Clear for IndexSet<T> {
131 fn clear(&mut self) {
132 self.clear()
133 }
134 fn reuse(&self) -> bool {
135 self.capacity() > 0
136 }
137 fn bytes(&self) -> usize {
138 self.capacity() * (mem::size_of::<u64>() + mem::size_of::<T>())
139 }
140}
141
142impl Clear for FixedBitSet {
143 fn clear(&mut self) {
144 self.clone_from(&Default::default());
145 }
146 fn reuse(&self) -> bool {
147 !self.is_empty()
148 }
149 fn bytes(&self) -> usize {
150 self.len() / 8
151 }
152}
153
154impl<K, V> Clear for IdVec<K, V> {
155 fn clear(&mut self) {
156 self.clear()
157 }
158 fn reuse(&self) -> bool {
159 self.capacity() > 0
160 }
161 fn bytes(&self) -> usize {
162 self.capacity() * mem::size_of::<V>()
163 }
164}
165
166struct PoolState<T> {
167 data: Vec<T>,
168 bytes: usize,
169 limit: usize,
170}
171
172impl<T: Clear> PoolState<T> {
173 fn new(limit: usize) -> Self {
174 PoolState {
175 data: Vec::new(),
176 bytes: 0,
177 limit,
178 }
179 }
180
181 fn push(&mut self, mut item: T) {
182 if !item.reuse() {
183 return;
184 }
185 if self.bytes + item.bytes() > self.limit {
186 return;
187 }
188 item.clear();
189 self.bytes += item.bytes();
190 self.data.push(item);
191 }
192
193 fn pop(&mut self) -> T {
194 if let Some(got) = self.data.pop() {
195 self.bytes -= got.bytes();
196 got
197 } else {
198 Default::default()
199 }
200 }
201
202 fn clear_and_shrink(&mut self) {
203 self.data.clear();
204 self.bytes = 0;
205 self.data.shrink_to_fit();
206 }
207}
208
209pub struct Pool<T> {
211 data: Rc<RefCell<PoolState<T>>>,
212}
213
214impl<T> Clone for Pool<T> {
215 fn clone(&self) -> Self {
216 Pool {
217 data: self.data.clone(),
218 }
219 }
220}
221
222impl<T: Clear> Default for Pool<T> {
223 fn default() -> Self {
224 Pool {
225 data: Rc::new(RefCell::new(PoolState::new(usize::MAX))),
226 }
227 }
228}
229
230impl<T: Clear + InPoolSet<PoolSet>> Pool<T> {
231 pub(crate) fn new(limit: usize) -> Pool<T> {
232 Pool {
233 data: Rc::new(RefCell::new(PoolState::new(limit))),
234 }
235 }
236 pub(crate) fn get(&self) -> Pooled<T> {
238 let empty = self.data.borrow_mut().pop();
239
240 Pooled {
241 data: ManuallyDrop::new(empty),
242 }
243 }
244
245 pub(crate) fn clear(&self) {
247 let mut data_mut = self.data.borrow_mut();
248 data_mut.clear_and_shrink();
249 }
250}
251
252pub struct Pooled<T: Clear + InPoolSet<PoolSet>> {
255 data: ManuallyDrop<T>,
256}
257
258impl<T: Clear + InPoolSet<PoolSet>> Default for Pooled<T> {
259 fn default() -> Self {
260 with_pool_set(|ps| ps.get::<T>())
261 }
262}
263
264impl<T: Clear + fmt::Debug + InPoolSet<PoolSet>> fmt::Debug for Pooled<T> {
265 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
266 let data: &T = &self.data;
267 data.fmt(f)
268 }
269}
270impl<T: Clear + PartialEq + InPoolSet<PoolSet>> PartialEq for Pooled<T> {
271 fn eq(&self, other: &Self) -> bool {
272 <T as PartialEq>::eq(&self.data, &other.data)
274 }
275}
276
277impl<T: Clear + InPoolSet<PoolSet> + Eq> Eq for Pooled<T> {}
278
279impl<T: Clear + Hash + InPoolSet<PoolSet>> Hash for Pooled<T> {
280 fn hash<H: Hasher>(&self, state: &mut H) {
281 self.data.hash(state)
282 }
283}
284
285impl<T: Clear + InPoolSet<PoolSet> + 'static> Pooled<T> {
286 pub(crate) fn refresh(this: &mut Pooled<T>) {
295 this.data.clear();
296 if this.data.reuse() {
297 return;
298 }
299 let pool = with_pool_set(|ps| ps.get_pool::<T>());
300 let mut other = pool.data.borrow_mut().pop();
301 if !other.reuse() {
302 return;
303 }
304 let slot: &mut T = &mut this.data;
305 mem::swap(slot, &mut other);
306 }
307
308 pub(crate) fn into_inner(this: Pooled<T>) -> T {
309 let inner = unsafe { ptr::read(&this.data) };
312 mem::forget(this);
313 ManuallyDrop::into_inner(inner)
314 }
315
316 pub(crate) fn new(data: T) -> Pooled<T> {
317 Pooled {
318 data: ManuallyDrop::new(data),
319 }
320 }
321}
322
323impl<T: Clear + Clone + InPoolSet<PoolSet>> Pooled<T> {
324 pub(crate) fn cloned(this: &Pooled<T>) -> Pooled<T> {
325 let mut res = with_pool_set(|ps| ps.get::<T>());
326 res.clone_from(this);
327 res
328 }
329}
330
331impl<T: Clear + InPoolSet<PoolSet>> Drop for Pooled<T> {
332 fn drop(&mut self) {
333 let reuse = self.data.reuse();
334 if !reuse {
335 unsafe { ManuallyDrop::drop(&mut self.data) };
338 return;
339 }
340 self.data.clear();
341 let t: &T = &self.data;
342 with_pool_set(|ps| {
344 T::with_pool(ps, |pool| {
345 pool.data.borrow_mut().push(unsafe { ptr::read(t) })
346 })
347 });
348 }
349}
350
351impl<T: Clear + InPoolSet<PoolSet>> Deref for Pooled<T> {
352 type Target = T;
353
354 fn deref(&self) -> &T {
355 &self.data
356 }
357}
358
359impl<T: Clear + InPoolSet<PoolSet>> DerefMut for Pooled<T> {
360 fn deref_mut(&mut self) -> &mut T {
361 &mut self.data
362 }
363}
364
365pub trait InPoolSet<PoolSet>
368where
369 Self: Sized + Clear,
370{
371 fn with_pool<R>(pool_set: &PoolSet, f: impl FnOnce(&Pool<Self>) -> R) -> R;
372}
373
374macro_rules! pool_set {
375 ($vis:vis $name:ident { $($ident:ident : $ty:ty [ $bytes:expr ],)* }) => {
376 $vis struct $name {
377 $(
378 $ident: Pool<$ty>,
379 )*
380 }
381
382 impl Default for $name {
383 fn default() -> Self {
384 $name {
385 $(
386 $ident: Pool::new($bytes),
387 )*
388 }
389 }
390 }
391
392 impl $name {
393 $vis fn get_pool<T: InPoolSet<Self>>(&self) -> Pool<T> {
394 T::with_pool(self, Pool::clone)
395 }
396
397 $vis fn get<T: InPoolSet<Self> + Default>(&self) -> Pooled<T> {
398 self.get_pool().get()
399 }
400 $vis fn clear(&self) {
401 $( self.$ident.clear(); )*
402 }
403 }
404
405 $(
406 impl InPoolSet<$name> for $ty {
407 fn with_pool<R>(pool_set: &$name, f: impl FnOnce(&Pool<Self>) -> R) -> R {
408 f(&pool_set.$ident)
409 }
410 }
411 )*
412 }
413}
414
415pool_set! {
424 pub PoolSet {
425 vec_vals: Vec<Value> [ 1 << 25 ],
426 vec_cell_vals: Vec<Cell<Value>> [ 1 << 25 ],
427 rows: Vec<RowId> [ 1 << 25 ],
430 offset_vec: SortedOffsetVector [ 1 << 20 ],
431 column_index: IndexMap<Value, BufferedSubset> [ 1 << 20 ],
432 constraints: Vec<Constraint> [ 1 << 20 ],
433 bitsets: FixedBitSet [ 1 << 20 ],
434 instrs: Vec<Instr> [ 1 << 20 ],
435 tuple_indexes: HashTable<TableEntry<BufferedSubset>> [ 1 << 20 ],
436 staged_outputs: HashTable<SwTableEntry> [ 1 << 25 ],
437 predicted_vals: PredictedVals [ 1 << 20 ],
438 shard_hist: DenseIdMap<ShardId, usize> [ 1 << 20 ],
439 instr_indexes: Vec<u32> [ 1 << 20 ],
440 cached_subsets: IdVec<ColumnId, std::sync::OnceLock<std::sync::Arc<ColumnIndex>>> [ 4 << 20 ],
441 intersected_on: DenseIdMap<AtomId, i64> [ 1 << 20 ],
442 }
443}
444
445pub(crate) fn with_pool_set<R>(f: impl FnOnce(&PoolSet) -> R) -> R {
447 POOL_SET.with(|pool_set| f(pool_set))
448}
449
450thread_local! {
451 static POOL_SET: ManuallyDrop<PoolSet> = Default::default();
460}