use crossbeam::queue::ArrayQueue;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct ObjectPool<T> {
objects: ArrayQueue<T>,
factory: Arc<dyn Fn() -> T + Send + Sync>,
stat_created: AtomicUsize,
stat_reused: AtomicUsize,
stat_returned: AtomicUsize,
stat_peak: AtomicUsize,
stat_pool_size: AtomicUsize,
}
#[derive(Debug, Clone, Default)]
pub struct PoolStats {
pub objects_created: usize,
pub objects_reused: usize,
pub objects_returned: usize,
pub peak_usage: usize,
pub current_pool_size: usize,
}
impl<T> ObjectPool<T> {
pub fn new<F>(capacity: usize, factory: F) -> Self
where
F: Fn() -> T + Send + Sync + 'static,
{
Self {
objects: ArrayQueue::new(capacity),
factory: Arc::new(factory),
stat_created: AtomicUsize::new(0),
stat_reused: AtomicUsize::new(0),
stat_returned: AtomicUsize::new(0),
stat_peak: AtomicUsize::new(0),
stat_pool_size: AtomicUsize::new(0),
}
}
pub fn get(&self) -> PooledObject<'_, T> {
let obj = if let Some(obj) = self.objects.pop() {
self.stat_reused.fetch_add(1, Ordering::Relaxed);
self.stat_pool_size.fetch_sub(1, Ordering::Relaxed);
obj
} else {
let obj = (self.factory)();
let created = self.stat_created.fetch_add(1, Ordering::Relaxed) + 1;
let pool_size = self.stat_pool_size.load(Ordering::Relaxed);
let in_use = created.saturating_sub(pool_size);
let _ = self
.stat_peak
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |prev| {
if in_use > prev { Some(in_use) } else { None }
});
obj
};
PooledObject {
object: Some(obj),
pool: self,
}
}
fn return_object(&self, obj: T) {
if self.objects.push(obj).is_ok() {
self.stat_returned.fetch_add(1, Ordering::Relaxed);
self.stat_pool_size.fetch_add(1, Ordering::Relaxed);
}
}
pub fn stats(&self) -> PoolStats {
PoolStats {
objects_created: self.stat_created.load(Ordering::Relaxed),
objects_reused: self.stat_reused.load(Ordering::Relaxed),
objects_returned: self.stat_returned.load(Ordering::Relaxed),
peak_usage: self.stat_peak.load(Ordering::Relaxed),
current_pool_size: self.stat_pool_size.load(Ordering::Relaxed),
}
}
}
pub struct PooledObject<'a, T> {
object: Option<T>,
pool: &'a ObjectPool<T>,
}
impl<'a, T> PooledObject<'a, T> {
pub fn get(&self) -> &T {
self.object
.as_ref()
.expect("PooledObject accessed after take")
}
pub fn get_mut(&mut self) -> &mut T {
self.object
.as_mut()
.expect("PooledObject accessed after take")
}
pub fn take(mut self) -> T {
self.object.take().expect("PooledObject already taken")
}
}
impl<'a, T> Drop for PooledObject<'a, T> {
fn drop(&mut self) {
if let Some(obj) = self.object.take() {
self.pool.return_object(obj);
}
}
}
impl<'a, T> std::ops::Deref for PooledObject<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.get()
}
}
impl<'a, T> std::ops::DerefMut for PooledObject<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.get_mut()
}
}
pub struct CleaningPooledObject<T: 'static> {
inner: PooledObject<'static, T>,
}
impl<T: 'static> CleaningPooledObject<T> {
fn new(inner: PooledObject<'static, T>) -> Self {
Self { inner }
}
pub fn take(self) -> T {
self.inner.take()
}
}
impl<T: 'static> std::ops::Deref for CleaningPooledObject<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T: 'static> std::ops::DerefMut for CleaningPooledObject<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
static CLEANING_COW_HASHMAP: Lazy<ObjectPool<HashMap<Cow<'static, str>, Cow<'static, str>>>> =
Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
static CLEANING_STRING_HASHMAP: Lazy<ObjectPool<HashMap<String, String>>> =
Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
static CLEANING_BYTE_VEC: Lazy<ObjectPool<Vec<u8>>> =
Lazy::new(|| ObjectPool::new(100, || Vec::with_capacity(1024)));
static CLEANING_STRING_VEC: Lazy<ObjectPool<Vec<String>>> =
Lazy::new(|| ObjectPool::new(50, || Vec::with_capacity(16)));
pub fn get_cow_hashmap() -> CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>> {
let mut obj = CLEANING_COW_HASHMAP.get();
obj.clear(); CleaningPooledObject::new(obj)
}
pub fn get_string_hashmap() -> CleaningPooledObject<HashMap<String, String>> {
let mut obj = CLEANING_STRING_HASHMAP.get();
obj.clear(); CleaningPooledObject::new(obj)
}
pub fn get_byte_vec() -> CleaningPooledObject<Vec<u8>> {
let mut obj = CLEANING_BYTE_VEC.get();
obj.clear(); CleaningPooledObject::new(obj)
}
pub fn get_string_vec() -> CleaningPooledObject<Vec<String>> {
let mut obj = CLEANING_STRING_VEC.get();
obj.clear(); CleaningPooledObject::new(obj)
}
#[derive(Debug, Clone)]
pub struct GlobalPoolStats {
pub cow_hashmap: PoolStats,
pub string_hashmap: PoolStats,
pub byte_vec: PoolStats,
pub string_vec: PoolStats,
pub total_objects_created: usize,
pub total_objects_reused: usize,
pub total_reuse_ratio: f64,
}
pub fn get_global_pool_stats() -> GlobalPoolStats {
let cow_hashmap = CLEANING_COW_HASHMAP.stats();
let string_hashmap = CLEANING_STRING_HASHMAP.stats();
let byte_vec = CLEANING_BYTE_VEC.stats();
let string_vec = CLEANING_STRING_VEC.stats();
let total_created = cow_hashmap.objects_created
+ string_hashmap.objects_created
+ byte_vec.objects_created
+ string_vec.objects_created;
let total_reused = cow_hashmap.objects_reused
+ string_hashmap.objects_reused
+ byte_vec.objects_reused
+ string_vec.objects_reused;
let total_reuse_ratio = if total_created + total_reused > 0 {
total_reused as f64 / (total_created + total_reused) as f64
} else {
0.0
};
GlobalPoolStats {
cow_hashmap,
string_hashmap,
byte_vec,
string_vec,
total_objects_created: total_created,
total_objects_reused: total_reused,
total_reuse_ratio,
}
}
pub mod pooled_builders {
use super::*;
use crate::domain::value_objects::JsonData;
use crate::infrastructure::integration::{ResponseBody, UniversalResponse};
pub struct PooledResponseBuilder {
status_code: u16,
headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
content_type: Cow<'static, str>,
}
impl PooledResponseBuilder {
pub fn new() -> Self {
Self {
status_code: 200,
headers: get_cow_hashmap(),
content_type: Cow::Borrowed("application/json"),
}
}
pub fn status(mut self, status: u16) -> Self {
self.status_code = status;
self
}
pub fn header(
mut self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
self.headers.insert(name.into(), value.into());
self
}
pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
self.content_type = content_type.into();
self
}
pub fn json(self, data: JsonData) -> UniversalResponse {
let headers = self.headers.take();
UniversalResponse {
status_code: self.status_code,
headers,
body: ResponseBody::Json(data),
content_type: self.content_type,
}
}
pub fn binary(self, data: Vec<u8>) -> UniversalResponse {
let headers = self.headers.take();
UniversalResponse {
status_code: self.status_code,
headers,
body: ResponseBody::Binary(data),
content_type: Cow::Borrowed("application/octet-stream"),
}
}
}
impl Default for PooledResponseBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct PooledSSEBuilder {
events: CleaningPooledObject<Vec<String>>,
headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
}
impl PooledSSEBuilder {
pub fn new() -> Self {
let mut headers = get_cow_hashmap();
headers.insert(Cow::Borrowed("Cache-Control"), Cow::Borrowed("no-cache"));
headers.insert(Cow::Borrowed("Connection"), Cow::Borrowed("keep-alive"));
Self {
events: get_string_vec(),
headers,
}
}
pub fn event(mut self, data: impl Into<String>) -> Self {
self.events.push(format!("data: {}\n\n", data.into()));
self
}
pub fn header(
mut self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
self.headers.insert(name.into(), value.into());
self
}
pub fn build(self) -> UniversalResponse {
let events = self.events.take();
let headers = self.headers.take();
UniversalResponse {
status_code: 200,
headers,
body: ResponseBody::ServerSentEvents(events),
content_type: Cow::Borrowed("text/event-stream"),
}
}
}
impl Default for PooledSSEBuilder {
fn default() -> Self {
Self::new()
}
}
}
#[cfg(test)]
mod tests {
use super::super::ResponseBody;
use super::*;
use crate::domain::value_objects::JsonData;
#[test]
fn test_object_pool_basic_operations() {
let pool = ObjectPool::new(5, || HashMap::<String, String>::with_capacity(4));
let mut obj1 = pool.get();
obj1.insert("test".to_string(), "value".to_string());
let obj2 = pool.get();
let stats = pool.stats();
assert_eq!(stats.objects_created, 2);
assert_eq!(stats.objects_reused, 0);
drop(obj1);
drop(obj2);
let _obj3 = pool.get();
let stats = pool.stats();
assert_eq!(stats.objects_reused, 1);
}
#[test]
fn test_pooled_object_deref() {
let pool = ObjectPool::new(5, || vec![1, 2, 3]);
let obj = pool.get();
assert_eq!(obj.len(), 3);
assert_eq!(obj[0], 1);
}
#[test]
fn test_pooled_object_take() {
let pool = ObjectPool::new(5, || vec![1, 2, 3]);
let obj = pool.get();
let taken = obj.take();
assert_eq!(taken, vec![1, 2, 3]);
let stats = pool.stats();
assert_eq!(stats.objects_returned, 0);
}
#[test]
fn test_global_pools() {
let mut headers = get_cow_hashmap();
headers.insert(Cow::Borrowed("test"), Cow::Borrowed("value"));
drop(headers);
let mut bytes = get_byte_vec();
bytes.extend_from_slice(b"test data");
drop(bytes);
let stats = get_global_pool_stats();
assert!(stats.total_reuse_ratio >= 0.0);
}
#[test]
fn test_pooled_response_builder() {
let response = pooled_builders::PooledResponseBuilder::new()
.status(201)
.header("X-Test", "test-value")
.content_type("application/json")
.json(JsonData::String("test".to_string()));
assert_eq!(response.status_code, 201);
assert_eq!(
response.headers.get("X-Test"),
Some(&Cow::Borrowed("test-value"))
);
}
#[test]
fn test_pooled_sse_builder() {
let response = pooled_builders::PooledSSEBuilder::new()
.event("first event")
.event("second event")
.header("X-Custom", "custom-value")
.build();
assert_eq!(response.status_code, 200);
assert_eq!(response.content_type, "text/event-stream");
if let ResponseBody::ServerSentEvents(events) = response.body {
assert_eq!(events.len(), 2);
assert!(events[0].contains("first event"));
assert!(events[1].contains("second event"));
} else {
panic!("Expected ServerSentEvents body");
}
}
#[test]
fn test_pool_capacity_limits() {
let pool = ObjectPool::new(2, Vec::<i32>::new);
let obj1 = pool.get();
let obj2 = pool.get();
let obj3 = pool.get();
drop(obj1);
drop(obj2);
drop(obj3);
let stats = pool.stats();
assert_eq!(stats.objects_created, 3);
assert_eq!(stats.objects_returned, 2); }
#[test]
fn test_concurrent_pool_access() {
use std::sync::Arc;
use std::thread;
let pool = Arc::new(ObjectPool::new(10, Vec::<i32>::new));
let mut handles = vec![];
for _ in 0..5 {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
let mut obj = pool_clone.get();
obj.push(1);
obj.push(2);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let stats = pool.stats();
assert!(stats.objects_created <= 10); assert!(stats.objects_reused > 0 || stats.objects_created == 5);
}
}