use std::collections::VecDeque;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Weak;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use mea::semaphore::OwnedSemaphorePermit;
use mea::semaphore::Semaphore;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RecycleCancelledStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct PoolConfig {
pub max_size: usize,
pub queue_strategy: QueueStrategy,
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
}
impl PoolConfig {
pub fn new(max_size: usize) -> Self {
Self {
max_size,
queue_strategy: QueueStrategy::default(),
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
}
}
pub fn with_queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
self.queue_strategy = queue_strategy;
self
}
pub fn with_recycle_cancelled_strategy(
mut self,
recycle_cancelled_strategy: RecycleCancelledStrategy,
) -> Self {
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
self
}
}
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct PoolStatus {
pub max_size: usize,
pub current_size: usize,
pub idle_count: usize,
pub wait_count: usize,
}
pub struct Pool<M: ManageObject> {
config: PoolConfig,
manager: M,
users: AtomicUsize,
permits: Arc<Semaphore>,
slots: Mutex<PoolDeque<ObjectState<M::Object>>>,
}
#[derive(Debug)]
struct PoolDeque<T> {
deque: VecDeque<T>,
current_size: usize,
max_size: usize,
}
impl<M> std::fmt::Debug for Pool<M>
where
M: ManageObject,
M::Object: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pool")
.field("slots", &self.slots)
.field("config", &self.config)
.field("users", &self.users)
.field("permits", &self.permits)
.finish()
}
}
impl<M: ManageObject> Pool<M> {
pub fn new(config: PoolConfig, manager: M) -> Arc<Self> {
let users = AtomicUsize::new(0);
let permits = Arc::new(Semaphore::new(config.max_size));
let slots = Mutex::new(PoolDeque {
deque: VecDeque::with_capacity(config.max_size),
current_size: 0,
max_size: config.max_size,
});
Arc::new(Self {
config,
manager,
users,
permits,
slots,
})
}
pub async fn replenish(&self, most: usize) -> usize {
let mut permit = {
let mut n = most;
loop {
match self.permits.try_acquire(n) {
Some(permit) => break permit,
None => {
n = n.min(self.permits.available_permits());
continue;
}
}
}
};
if permit.permits() == 0 {
return 0;
}
let gap = {
let idles = self.slots.lock().deque.len();
if idles >= permit.permits() {
return 0;
}
match permit.split(idles) {
None => unreachable!(
"idles ({}) should be less than permits ({})",
idles,
permit.permits()
),
Some(p) => {
drop(p);
}
}
permit.permits()
};
let mut replenished = 0;
for _ in 0..gap {
if let Ok(o) = self.manager.create().await {
let status = ObjectStatus::default();
let state = ObjectState { o, status };
let mut slots = self.slots.lock();
slots.current_size += 1;
slots.deque.push_back(state);
drop(slots);
replenished += 1;
}
match permit.split(1) {
None => unreachable!("permit must be greater than 0 at this point"),
Some(p) => {
drop(p);
}
}
}
replenished
}
pub async fn get(self: &Arc<Self>) -> Result<Object<M>, M::Error> {
self.users.fetch_add(1, Ordering::Relaxed);
let guard = scopeguard::guard((), |()| {
self.users.fetch_sub(1, Ordering::Relaxed);
});
let permit = self.permits.clone().acquire_owned(1).await;
let object = loop {
let existing = match self.config.queue_strategy {
QueueStrategy::Fifo => self.slots.lock().deque.pop_front(),
QueueStrategy::Lifo => self.slots.lock().deque.pop_back(),
};
match existing {
None => {
let object = self.manager.create().await?;
let state = ObjectState {
o: object,
status: ObjectStatus::default(),
};
self.slots.lock().current_size += 1;
break Object {
state: Some(state),
permit,
pool: Arc::downgrade(self),
};
}
Some(object) => {
let mut unready_object = UnreadyObject {
state: Some(object),
pool: Arc::downgrade(self),
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
};
let state = unready_object.state();
let status = state.status;
if self
.manager
.is_recyclable(&mut state.o, &status)
.await
.is_ok()
{
state.status.recycle_count += 1;
state.status.recycled = Some(std::time::Instant::now());
break unready_object.ready(permit);
} else {
unready_object.detach();
}
}
};
};
scopeguard::ScopeGuard::into_inner(guard);
Ok(object)
}
pub fn retain(
&self,
f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
) -> RetainResult<M::Object> {
let mut slots = self.slots.lock();
let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
slots.current_size -= result.removed.len();
result
}
pub fn status(&self) -> PoolStatus {
let slots = self.slots.lock();
let (current_size, max_size) = (slots.current_size, slots.max_size);
drop(slots);
let users = self.users.load(Ordering::Relaxed);
let (idle_count, wait_count) = if users < current_size {
(current_size - users, 0)
} else {
(0, users - current_size)
};
PoolStatus {
max_size,
current_size,
idle_count,
wait_count,
}
}
fn push_back(&self, o: ObjectState<M::Object>) {
self.return_to_pool(o);
self.users.fetch_sub(1, Ordering::Relaxed);
}
fn return_to_pool(&self, o: ObjectState<M::Object>) {
let mut slots = self.slots.lock();
assert!(
slots.current_size <= slots.max_size,
"invariant broken: current_size <= max_size (actual: {} <= {})",
slots.current_size,
slots.max_size,
);
slots.deque.push_back(o);
}
fn detach_object(&self, o: &mut M::Object, ready: bool) {
let mut slots = self.slots.lock();
assert!(
slots.current_size <= slots.max_size,
"invariant broken: current_size <= max_size (actual: {} <= {})",
slots.current_size,
slots.max_size,
);
slots.current_size -= 1;
drop(slots);
if ready {
self.users.fetch_sub(1, Ordering::Relaxed);
} else {
}
self.manager.on_detached(o);
}
}
pub struct Object<M: ManageObject> {
state: Option<ObjectState<M::Object>>,
permit: OwnedSemaphorePermit,
pool: Weak<Pool<M>>,
}
impl<M> std::fmt::Debug for Object<M>
where
M: ManageObject,
M::Object: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Object")
.field("state", &self.state)
.field("permit", &self.permit)
.finish()
}
}
impl<M: ManageObject> Drop for Object<M> {
fn drop(&mut self) {
if let Some(state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.push_back(state);
}
}
}
}
impl<M: ManageObject> Deref for Object<M> {
type Target = M::Object;
fn deref(&self) -> &M::Object {
&self.state.as_ref().unwrap().o
}
}
impl<M: ManageObject> DerefMut for Object<M> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.state.as_mut().unwrap().o
}
}
impl<M: ManageObject> AsRef<M::Object> for Object<M> {
fn as_ref(&self) -> &M::Object {
self
}
}
impl<M: ManageObject> AsMut<M::Object> for Object<M> {
fn as_mut(&mut self) -> &mut M::Object {
self
}
}
impl<M: ManageObject> Object<M> {
pub fn detach(mut self) -> M::Object {
let mut o = self.state.take().unwrap().o;
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut o, true);
}
o
}
pub fn status(&self) -> ObjectStatus {
self.state.as_ref().unwrap().status
}
}
struct UnreadyObject<M: ManageObject> {
state: Option<ObjectState<M::Object>>,
pool: Weak<Pool<M>>,
recycle_cancelled_strategy: RecycleCancelledStrategy,
}
impl<M: ManageObject> Drop for UnreadyObject<M> {
fn drop(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
match self.recycle_cancelled_strategy {
RecycleCancelledStrategy::Detach => {
pool.detach_object(&mut state.o, false);
}
RecycleCancelledStrategy::ReturnToPool => {
pool.return_to_pool(state);
}
}
}
}
}
}
impl<M: ManageObject> UnreadyObject<M> {
fn ready(mut self, permit: OwnedSemaphorePermit) -> Object<M> {
let state = Some(self.state.take().unwrap());
let pool = self.pool.clone();
Object {
state,
permit,
pool,
}
}
fn detach(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o, false);
}
}
}
fn state(&mut self) -> &mut ObjectState<M::Object> {
self.state.as_mut().unwrap()
}
}
#[derive(Debug)]
struct ObjectState<T> {
o: T,
status: ObjectStatus,
}
impl<T> retain_spec::SealedState for ObjectState<T> {
type Object = T;
fn status(&self) -> ObjectStatus {
self.status
}
fn mut_object(&mut self) -> &mut Self::Object {
&mut self.o
}
fn take_object(self) -> Self::Object {
self.o
}
}