use std::cmp::max;
use std::sync::Arc;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use crossbeam_queue::ArrayQueue;
use crate::entry::Prc;
use crate::{Entry, OwnedEntry};
#[derive(Debug)]
pub struct Pool<T: Default> {
config: Config<T>,
queue: ArrayQueue<Prc<T>>,
allocated: AtomicUsize,
surpluspulls: AtomicUsize,
additional_allocated: AtomicBool,
}
impl<T: Default> Drop for Pool<T> {
fn drop(&mut self) {
while let Some(item) = self.queue.pop() {
unsafe { item.drop_slow() };
}
}
}
impl<T: Default> Pool<T> {
pub fn new(prealloc: usize, capacity: usize) -> Self {
Self::with_config(Config {
capacity,
prealloc,
..Default::default()
})
}
pub fn with_capacity(capacity: usize) -> Self {
Self::new(capacity, capacity)
}
pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
Self::new(capacity / 2, capacity)
}
pub fn with_config(mut config: Config<T>) -> Self {
config.post_process();
let prealloc = config.prealloc;
assert!(
prealloc <= config.capacity,
"prealloc must be less than or equal to capacity"
);
let queue_len = max(1, config.capacity);
let pool = Self {
queue: ArrayQueue::new(queue_len),
allocated: AtomicUsize::new(prealloc),
surpluspulls: AtomicUsize::new(0),
additional_allocated: AtomicBool::new(false),
config,
};
let mut items = Vec::with_capacity(prealloc);
for _ in 0..prealloc {
items.push(T::default());
}
while let Some(item) = items.pop() {
let _ = pool.queue.push(Prc::new_zero(item));
}
pool
}
pub fn enable_auto_reclaim(&mut self) {
self.config.auto_reclaim = true;
self.config.post_process();
}
pub fn in_use(&self) -> usize {
self.allocated.load(Relaxed) - self.queue.len()
}
pub fn allocated(&self) -> usize {
self.allocated.load(Acquire)
}
pub fn available(&self) -> usize {
self.config.capacity - self.in_use()
}
pub fn available_noalloc(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.available() == 0
}
pub fn capacity(&self) -> usize {
self.config.capacity
}
pub fn pull(&self) -> Option<Entry<'_, T>> {
self.pull_inner().map(|item| Entry {
item: Some(item),
pool: self,
})
}
pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
where
F: FnOnce(&mut T),
{
self.pull().map(|mut entry| {
func(unsafe { entry.get_mut_unchecked() });
entry
})
}
pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
self.pull_inner().map(|item| crate::OwnedEntry {
item: Some(item),
pool: self.clone(),
})
}
pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
where
F: FnOnce(&mut T),
{
self.pull_owned().map(|mut entry| {
func(unsafe { entry.get_mut_unchecked() });
entry
})
}
fn pull_inner(&self) -> Option<Prc<T>> {
match self.queue.pop() {
None => {
if !self.additional_allocated.load(Relaxed) {
self.additional_allocated.store(true, Relaxed);
}
if self.config.need_process_reclamation {
self.surpluspulls.store(0, SeqCst);
}
match self.allocated.fetch_update(AcqRel, Acquire, |current| {
match current < self.config.capacity {
true => Some(current + 1),
false => None,
}
}) {
Ok(_) => Some(Prc::new(T::default())),
Err(_) => None,
}
}
Some(item) => {
if self.config.need_process_reclamation {
let left = self.queue.len();
if left >= self.config.idle_threshold_for_surpluspull {
let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
&& self.additional_allocated.load(Relaxed)
{
self.reclaim();
}
} else {
self.surpluspulls.store(0, Relaxed);
}
}
item.inc_ref();
Some(item)
}
}
}
fn reclaim(&self) {
if let Some(item) = self.queue.pop() {
unsafe { item.drop_slow() };
let current = self.allocated.fetch_sub(1, Release) - 1;
if self.config.need_process_reclamation && current <= self.config.prealloc {
if self.additional_allocated.load(Relaxed) {
self.additional_allocated.store(false, Relaxed);
}
}
}
}
pub(crate) fn recycle(&self, mut item: Prc<T>) {
if let Some(func) = &self.config.clear_func {
func(unsafe { Prc::get_mut_unchecked(&mut item) })
}
if self.queue.push(item).is_err() {
panic!("It is imposible that the pool is full when recycling an item");
}
}
}
#[derive(Debug)]
pub struct Config<T: Default> {
pub capacity: usize,
pub prealloc: usize,
pub auto_reclaim: bool,
pub surpluspull_threshold_for_reclaim: usize,
pub idle_threshold_for_surpluspull: usize,
pub clear_func: Option<fn(&mut T)>,
need_process_reclamation: bool,
}
impl<T: Default> Default for Config<T> {
fn default() -> Self {
Self {
capacity: 1024,
prealloc: 0,
auto_reclaim: false,
clear_func: None,
surpluspull_threshold_for_reclaim: 0,
idle_threshold_for_surpluspull: 0,
need_process_reclamation: false,
}
}
}
impl<T: Default> Config<T> {
pub(crate) fn post_process(&mut self) {
if self.idle_threshold_for_surpluspull == 0 {
self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
}
if self.surpluspull_threshold_for_reclaim == 0 {
self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
}
if self.auto_reclaim && self.prealloc != self.capacity {
self.need_process_reclamation = true;
} else {
self.need_process_reclamation = false;
}
}
}