use std::{
cell::{Cell, RefCell},
fmt,
hash::{Hash, Hasher},
mem::{self, ManuallyDrop},
ops::{Deref, DerefMut},
ptr,
rc::Rc,
};
use crate::{
AtomId,
numeric_id::{DenseIdMap, IdVec},
};
use fixedbitset::FixedBitSet;
use hashbrown::HashTable;
use crate::{
ColumnId, RowId,
action::{Instr, PredictedVals},
common::{HashMap, HashSet, IndexMap, IndexSet, ShardId, Value},
hash_index::{BufferedSubset, ColumnIndex, TableEntry},
offsets::SortedOffsetVector,
table::TableEntry as SwTableEntry,
table_spec::Constraint,
};
#[cfg(test)]
mod tests;
pub trait Clear: Default {
fn clear(&mut self);
fn reuse(&self) -> bool {
true
}
fn bytes(&self) -> usize;
}
impl<T> Clear for Vec<T> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 256
}
fn bytes(&self) -> usize {
self.capacity() * mem::size_of::<T>()
}
}
impl<T: Clear> Clear for Rc<T> {
fn clear(&mut self) {
Rc::get_mut(self).unwrap().clear()
}
fn reuse(&self) -> bool {
Rc::strong_count(self) == 1 && Rc::weak_count(self) == 0
}
fn bytes(&self) -> usize {
mem::size_of::<T>()
}
}
impl<T: Clear> Clone for Pooled<Rc<T>>
where
Rc<T>: InPoolSet<PoolSet>,
{
fn clone(&self) -> Self {
Pooled {
data: self.data.clone(),
}
}
}
impl<T> Clear for HashSet<T> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * mem::size_of::<T>()
}
}
impl<T> Clear for HashTable<T> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * mem::size_of::<T>()
}
}
impl<K, V> Clear for HashMap<K, V> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * mem::size_of::<(K, V)>()
}
}
impl<K, V> Clear for IndexMap<K, V> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * (mem::size_of::<u64>() + mem::size_of::<(K, V)>())
}
}
impl<T> Clear for IndexSet<T> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * (mem::size_of::<u64>() + mem::size_of::<T>())
}
}
impl Clear for FixedBitSet {
fn clear(&mut self) {
self.clone_from(&Default::default());
}
fn reuse(&self) -> bool {
!self.is_empty()
}
fn bytes(&self) -> usize {
self.len() / 8
}
}
impl<K, V> Clear for IdVec<K, V> {
fn clear(&mut self) {
self.clear()
}
fn reuse(&self) -> bool {
self.capacity() > 0
}
fn bytes(&self) -> usize {
self.capacity() * mem::size_of::<V>()
}
}
struct PoolState<T> {
data: Vec<T>,
bytes: usize,
limit: usize,
}
impl<T: Clear> PoolState<T> {
fn new(limit: usize) -> Self {
PoolState {
data: Vec::new(),
bytes: 0,
limit,
}
}
fn push(&mut self, mut item: T) {
if !item.reuse() {
return;
}
if self.bytes + item.bytes() > self.limit {
return;
}
item.clear();
self.bytes += item.bytes();
self.data.push(item);
}
fn pop(&mut self) -> T {
if let Some(got) = self.data.pop() {
self.bytes -= got.bytes();
got
} else {
Default::default()
}
}
fn clear_and_shrink(&mut self) {
self.data.clear();
self.bytes = 0;
self.data.shrink_to_fit();
}
}
pub struct Pool<T> {
data: Rc<RefCell<PoolState<T>>>,
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Pool {
data: self.data.clone(),
}
}
}
impl<T: Clear> Default for Pool<T> {
fn default() -> Self {
Pool {
data: Rc::new(RefCell::new(PoolState::new(usize::MAX))),
}
}
}
impl<T: Clear + InPoolSet<PoolSet>> Pool<T> {
pub(crate) fn new(limit: usize) -> Pool<T> {
Pool {
data: Rc::new(RefCell::new(PoolState::new(limit))),
}
}
pub(crate) fn get(&self) -> Pooled<T> {
let empty = self.data.borrow_mut().pop();
Pooled {
data: ManuallyDrop::new(empty),
}
}
pub(crate) fn clear(&self) {
let mut data_mut = self.data.borrow_mut();
data_mut.clear_and_shrink();
}
}
pub struct Pooled<T: Clear + InPoolSet<PoolSet>> {
data: ManuallyDrop<T>,
}
impl<T: Clear + InPoolSet<PoolSet>> Default for Pooled<T> {
fn default() -> Self {
with_pool_set(|ps| ps.get::<T>())
}
}
impl<T: Clear + fmt::Debug + InPoolSet<PoolSet>> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let data: &T = &self.data;
data.fmt(f)
}
}
impl<T: Clear + PartialEq + InPoolSet<PoolSet>> PartialEq for Pooled<T> {
fn eq(&self, other: &Self) -> bool {
<T as PartialEq>::eq(&self.data, &other.data)
}
}
impl<T: Clear + InPoolSet<PoolSet> + Eq> Eq for Pooled<T> {}
impl<T: Clear + Hash + InPoolSet<PoolSet>> Hash for Pooled<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.data.hash(state)
}
}
impl<T: Clear + InPoolSet<PoolSet> + 'static> Pooled<T> {
pub(crate) fn refresh(this: &mut Pooled<T>) {
this.data.clear();
if this.data.reuse() {
return;
}
let pool = with_pool_set(|ps| ps.get_pool::<T>());
let mut other = pool.data.borrow_mut().pop();
if !other.reuse() {
return;
}
let slot: &mut T = &mut this.data;
mem::swap(slot, &mut other);
}
pub(crate) fn into_inner(this: Pooled<T>) -> T {
let inner = unsafe { ptr::read(&this.data) };
mem::forget(this);
ManuallyDrop::into_inner(inner)
}
pub(crate) fn new(data: T) -> Pooled<T> {
Pooled {
data: ManuallyDrop::new(data),
}
}
}
impl<T: Clear + Clone + InPoolSet<PoolSet>> Pooled<T> {
pub(crate) fn cloned(this: &Pooled<T>) -> Pooled<T> {
let mut res = with_pool_set(|ps| ps.get::<T>());
res.clone_from(this);
res
}
}
impl<T: Clear + InPoolSet<PoolSet>> Drop for Pooled<T> {
fn drop(&mut self) {
let reuse = self.data.reuse();
if !reuse {
unsafe { ManuallyDrop::drop(&mut self.data) };
return;
}
self.data.clear();
let t: &T = &self.data;
with_pool_set(|ps| {
T::with_pool(ps, |pool| {
pool.data.borrow_mut().push(unsafe { ptr::read(t) })
})
});
}
}
impl<T: Clear + InPoolSet<PoolSet>> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {
&self.data
}
}
impl<T: Clear + InPoolSet<PoolSet>> DerefMut for Pooled<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.data
}
}
pub trait InPoolSet<PoolSet>
where
Self: Sized + Clear,
{
fn with_pool<R>(pool_set: &PoolSet, f: impl FnOnce(&Pool<Self>) -> R) -> R;
}
macro_rules! pool_set {
($vis:vis $name:ident { $($ident:ident : $ty:ty [ $bytes:expr ],)* }) => {
$vis struct $name {
$(
$ident: Pool<$ty>,
)*
}
impl Default for $name {
fn default() -> Self {
$name {
$(
$ident: Pool::new($bytes),
)*
}
}
}
impl $name {
$vis fn get_pool<T: InPoolSet<Self>>(&self) -> Pool<T> {
T::with_pool(self, Pool::clone)
}
$vis fn get<T: InPoolSet<Self> + Default>(&self) -> Pooled<T> {
T::with_pool(self, |pool| pool.get())
}
$vis fn clear(&self) {
$( self.$ident.clear(); )*
}
}
$(
impl InPoolSet<$name> for $ty {
fn with_pool<R>(pool_set: &$name, f: impl FnOnce(&Pool<Self>) -> R) -> R {
f(&pool_set.$ident)
}
}
)*
}
}
pool_set! {
pub PoolSet {
vec_vals: Vec<Value> [ 1 << 25 ],
vec_cell_vals: Vec<Cell<Value>> [ 1 << 25 ],
rows: Vec<RowId> [ 1 << 25 ],
offset_vec: SortedOffsetVector [ 1 << 20 ],
column_index: IndexMap<Value, BufferedSubset> [ 1 << 20 ],
constraints: Vec<Constraint> [ 1 << 20 ],
bitsets: FixedBitSet [ 1 << 20 ],
instrs: Vec<Instr> [ 1 << 20 ],
tuple_indexes: HashTable<TableEntry<BufferedSubset>> [ 1 << 20 ],
staged_outputs: HashTable<SwTableEntry> [ 1 << 25 ],
predicted_vals: PredictedVals [ 1 << 20 ],
shard_hist: DenseIdMap<ShardId, usize> [ 1 << 20 ],
instr_indexes: Vec<u32> [ 1 << 20 ],
cached_subsets: IdVec<ColumnId, std::sync::OnceLock<std::sync::Arc<ColumnIndex>>> [ 4 << 20 ],
intersected_on: DenseIdMap<AtomId, i64> [ 1 << 20 ],
}
}
pub(crate) fn with_pool_set<R>(f: impl FnOnce(&PoolSet) -> R) -> R {
POOL_SET.with(|pool_set| f(pool_set))
}
thread_local! {
static POOL_SET: ManuallyDrop<PoolSet> = Default::default();
}