use std::collections::VecDeque;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Weak;
use crate::ManageObject;
use crate::ObjectStatus;
use crate::QueueStrategy;
use crate::RecycleCancelledStrategy;
use crate::RetainResult;
use crate::mutex::Mutex;
use crate::retain_spec;
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct PoolConfig {
pub queue_strategy: QueueStrategy,
pub recycle_cancelled_strategy: RecycleCancelledStrategy,
}
impl Default for PoolConfig {
fn default() -> Self {
Self::new()
}
}
impl PoolConfig {
pub fn new() -> Self {
Self {
queue_strategy: QueueStrategy::default(),
recycle_cancelled_strategy: RecycleCancelledStrategy::default(),
}
}
pub fn with_queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
self.queue_strategy = queue_strategy;
self
}
pub fn with_recycle_cancelled_strategy(
mut self,
recycle_cancelled_strategy: RecycleCancelledStrategy,
) -> Self {
self.recycle_cancelled_strategy = recycle_cancelled_strategy;
self
}
}
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct PoolStatus {
pub current_size: usize,
pub idle_count: usize,
}
#[derive(Debug, Copy, Clone)]
pub struct NeverManageObject<T: Send + Sync> {
_marker: std::marker::PhantomData<T>,
}
impl<T: Send + Sync> Default for NeverManageObject<T> {
fn default() -> Self {
Self {
_marker: std::marker::PhantomData,
}
}
}
pub struct PoolIsEmpty(());
impl std::fmt::Debug for PoolIsEmpty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unbounded pool is empty")
}
}
impl std::fmt::Display for PoolIsEmpty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(self, f)
}
}
impl std::error::Error for PoolIsEmpty {}
impl<T: Send + Sync> ManageObject for NeverManageObject<T> {
type Object = T;
type Error = PoolIsEmpty;
fn create(&self) -> impl Future<Output = Result<Self::Object, Self::Error>> + Send {
std::future::ready(Err(PoolIsEmpty(())))
}
fn is_recyclable(
&self,
_: &mut Self::Object,
_: &ObjectStatus,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
std::future::ready(Ok(()))
}
}
pub struct Pool<T, M: ManageObject<Object = T> = NeverManageObject<T>> {
config: PoolConfig,
manager: M,
slots: Mutex<PoolDeque<ObjectState<T>>>,
}
#[derive(Debug)]
struct PoolDeque<T> {
deque: VecDeque<T>,
current_size: usize,
}
impl<T, M> std::fmt::Debug for Pool<T, M>
where
T: std::fmt::Debug,
M: ManageObject<Object = T>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pool")
.field("slots", &self.slots)
.field("config", &self.config)
.finish()
}
}
impl<T: Send + Sync> Pool<T> {
pub fn never_manage(config: PoolConfig) -> Arc<Self> {
Self::new(config, NeverManageObject::<T>::default())
}
pub async fn get_or_create<E, F>(self: &Arc<Self>, f: F) -> Result<Object<T>, E>
where
F: AsyncFnOnce() -> Result<T, E> + Send,
{
let existing = match self.config.queue_strategy {
QueueStrategy::Fifo => self.slots.lock().deque.pop_front(),
QueueStrategy::Lifo => self.slots.lock().deque.pop_back(),
};
match existing {
None => {
let object = f().await?;
let state = ObjectState {
o: object,
status: ObjectStatus::default(),
};
self.slots.lock().current_size += 1;
Ok(Object {
state: Some(state),
pool: Arc::downgrade(self),
})
}
Some(mut state) => {
state.status.recycle_count += 1;
state.status.recycled = Some(std::time::Instant::now());
Ok(Object {
state: Some(state),
pool: Arc::downgrade(self),
})
}
}
}
}
impl<T, M: ManageObject<Object = T>> Pool<T, M> {
pub fn new(config: PoolConfig, manager: M) -> Arc<Self> {
let slots = Mutex::new(PoolDeque {
deque: VecDeque::new(),
current_size: 0,
});
Arc::new(Self {
config,
manager,
slots,
})
}
pub async fn get(self: &Arc<Self>) -> Result<Object<T, M>, M::Error> {
let object = loop {
let existing = match self.config.queue_strategy {
QueueStrategy::Fifo => self.slots.lock().deque.pop_front(),
QueueStrategy::Lifo => self.slots.lock().deque.pop_back(),
};
match existing {
None => {
let object = self.manager.create().await?;
let state = ObjectState {
o: object,
status: ObjectStatus::default(),
};
self.slots.lock().current_size += 1;
break Object {
state: Some(state),
pool: Arc::downgrade(self),
};
}
Some(object) => {
let mut unready_object = UnreadyObject {
state: Some(object),
pool: Arc::downgrade(self),
recycle_cancelled_strategy: self.config.recycle_cancelled_strategy,
};
let state = unready_object.state();
let status = state.status;
if self
.manager
.is_recyclable(&mut state.o, &status)
.await
.is_ok()
{
state.status.recycle_count += 1;
state.status.recycled = Some(std::time::Instant::now());
break unready_object.ready();
} else {
unready_object.detach();
}
}
};
};
Ok(object)
}
pub fn extend_one(&self, o: T) {
self.extend(Some(o));
}
pub fn extend(&self, iter: impl IntoIterator<Item = T>) {
let mut slots = self.slots.lock();
for o in iter {
slots.current_size += 1;
slots.deque.push_back(ObjectState {
o,
status: ObjectStatus::default(),
});
}
}
pub fn retain(
&self,
f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
) -> RetainResult<M::Object> {
let mut slots = self.slots.lock();
let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
slots.current_size -= result.removed.len();
result
}
pub fn status(&self) -> PoolStatus {
let slots = self.slots.lock();
let (current_size, idle_count) = (slots.current_size, slots.deque.len());
drop(slots);
PoolStatus {
current_size,
idle_count,
}
}
fn push_back(&self, o: ObjectState<T>) {
let mut slots = self.slots.lock();
slots.deque.push_back(o);
drop(slots);
}
fn detach_object(&self, o: &mut T) {
let mut slots = self.slots.lock();
slots.current_size -= 1;
drop(slots);
self.manager.on_detached(o);
}
}
pub struct Object<T, M: ManageObject<Object = T> = NeverManageObject<T>> {
state: Option<ObjectState<T>>,
pool: Weak<Pool<T, M>>,
}
impl<T, M> std::fmt::Debug for Object<T, M>
where
T: std::fmt::Debug,
M: ManageObject<Object = T>,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Object")
.field("state", &self.state)
.finish()
}
}
impl<T, M: ManageObject<Object = T>> Drop for Object<T, M> {
fn drop(&mut self) {
if let Some(state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.push_back(state);
}
}
}
}
impl<T, M: ManageObject<Object = T>> Deref for Object<T, M> {
type Target = T;
fn deref(&self) -> &T {
&self.state.as_ref().unwrap().o
}
}
impl<T, M: ManageObject<Object = T>> DerefMut for Object<T, M> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.state.as_mut().unwrap().o
}
}
impl<T, M: ManageObject<Object = T>> AsRef<M::Object> for Object<T, M> {
fn as_ref(&self) -> &M::Object {
self
}
}
impl<T, M: ManageObject<Object = T>> AsMut<M::Object> for Object<T, M> {
fn as_mut(&mut self) -> &mut M::Object {
self
}
}
impl<T, M: ManageObject<Object = T>> Object<T, M> {
pub fn detach(mut self) -> M::Object {
let mut o = self.state.take().unwrap().o;
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut o);
}
o
}
pub fn status(&self) -> ObjectStatus {
self.state.as_ref().unwrap().status
}
}
struct UnreadyObject<T, M: ManageObject<Object = T> = NeverManageObject<T>> {
state: Option<ObjectState<T>>,
pool: Weak<Pool<T, M>>,
recycle_cancelled_strategy: RecycleCancelledStrategy,
}
impl<T, M: ManageObject<Object = T>> Drop for UnreadyObject<T, M> {
fn drop(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
match self.recycle_cancelled_strategy {
RecycleCancelledStrategy::Detach => {
pool.detach_object(&mut state.o);
}
RecycleCancelledStrategy::ReturnToPool => {
pool.push_back(state);
}
}
}
}
}
}
impl<T, M: ManageObject<Object = T>> UnreadyObject<T, M> {
fn ready(mut self) -> Object<T, M> {
let state = Some(self.state.take().unwrap());
let pool = self.pool.clone();
Object { state, pool }
}
fn detach(&mut self) {
if let Some(mut state) = self.state.take() {
if let Some(pool) = self.pool.upgrade() {
pool.detach_object(&mut state.o);
}
}
}
fn state(&mut self) -> &mut ObjectState<T> {
self.state.as_mut().unwrap()
}
}
#[derive(Debug)]
struct ObjectState<T> {
o: T,
status: ObjectStatus,
}
impl<T> retain_spec::SealedState for ObjectState<T> {
type Object = T;
fn status(&self) -> ObjectStatus {
self.status
}
fn mut_object(&mut self) -> &mut Self::Object {
&mut self.o
}
fn take_object(self) -> Self::Object {
self.o
}
}