pjson_rs/infrastructure/integration/
object_pool.rs1use crossbeam::queue::ArrayQueue;
7use once_cell::sync::Lazy;
8use std::borrow::Cow;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11
12pub struct ObjectPool<T> {
14 objects: ArrayQueue<T>,
16 factory: Arc<dyn Fn() -> T + Send + Sync>,
18 max_capacity: usize,
20 stats: Arc<Mutex<PoolStats>>,
22}
23
24#[derive(Debug, Clone, Default)]
25pub struct PoolStats {
26 pub objects_created: usize,
27 pub objects_reused: usize,
28 pub objects_returned: usize,
29 pub peak_usage: usize,
30 pub current_pool_size: usize,
31}
32
33impl<T> ObjectPool<T> {
34 pub fn new<F>(capacity: usize, factory: F) -> Self
36 where
37 F: Fn() -> T + Send + Sync + 'static,
38 {
39 Self {
40 objects: ArrayQueue::new(capacity),
41 factory: Arc::new(factory),
42 max_capacity: capacity,
43 stats: Arc::new(Mutex::new(PoolStats::default())),
44 }
45 }
46
47 pub fn get(&self) -> PooledObject<'_, T> {
49 let obj = if let Some(obj) = self.objects.pop() {
50 if let Ok(mut stats) = self.stats.lock() {
52 stats.objects_reused += 1;
53 stats.current_pool_size = stats.current_pool_size.saturating_sub(1);
54 }
55 obj
56 } else {
57 let obj = (self.factory)();
59 if let Ok(mut stats) = self.stats.lock() {
60 stats.objects_created += 1;
61 stats.peak_usage = stats
62 .peak_usage
63 .max(stats.objects_created - stats.current_pool_size);
64 }
65 obj
66 };
67
68 PooledObject {
69 object: Some(obj),
70 pool: self,
71 }
72 }
73
74 fn return_object(&self, obj: T) {
76 if self.objects.push(obj).is_ok()
78 && let Ok(mut stats) = self.stats.lock()
79 {
80 stats.objects_returned += 1;
81 stats.current_pool_size += 1;
82 }
83 }
85
86 fn clean_object(_obj: &mut T) {
88 }
91
92 pub fn stats(&self) -> PoolStats {
94 self.stats
95 .lock()
96 .map(|guard| guard.clone())
97 .unwrap_or_default()
98 }
99}
100
101trait Cleanable {
103 fn clean(&mut self);
104}
105
106impl Cleanable for HashMap<Cow<'static, str>, Cow<'static, str>> {
107 fn clean(&mut self) {
108 self.clear();
109 }
110}
111
112impl Cleanable for HashMap<String, String> {
113 fn clean(&mut self) {
114 self.clear();
115 }
116}
117
118impl Cleanable for Vec<u8> {
119 fn clean(&mut self) {
120 self.clear();
121 }
122}
123
124impl Cleanable for Vec<String> {
125 fn clean(&mut self) {
126 self.clear();
127 }
128}
129
130pub struct PooledObject<'a, T> {
132 object: Option<T>,
133 pool: &'a ObjectPool<T>,
134}
135
136impl<'a, T> PooledObject<'a, T> {
137 pub fn get(&self) -> &T {
139 self.object
140 .as_ref()
141 .expect("PooledObject accessed after take")
142 }
143
144 pub fn get_mut(&mut self) -> &mut T {
146 self.object
147 .as_mut()
148 .expect("PooledObject accessed after take")
149 }
150
151 pub fn take(mut self) -> T {
153 self.object.take().expect("PooledObject already taken")
154 }
155}
156
157impl<'a, T> Drop for PooledObject<'a, T> {
158 fn drop(&mut self) {
159 if let Some(obj) = self.object.take() {
160 self.pool.return_object(obj);
161 }
162 }
163}
164
165impl<'a, T> std::ops::Deref for PooledObject<'a, T> {
166 type Target = T;
167
168 fn deref(&self) -> &Self::Target {
169 self.get()
170 }
171}
172
173impl<'a, T> std::ops::DerefMut for PooledObject<'a, T> {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 self.get_mut()
176 }
177}
178
179pub struct GlobalPools {
181 pub cow_hashmap: ObjectPool<HashMap<Cow<'static, str>, Cow<'static, str>>>,
182 pub string_hashmap: ObjectPool<HashMap<String, String>>,
183 pub byte_vec: ObjectPool<Vec<u8>>,
184 pub string_vec: ObjectPool<Vec<String>>,
185}
186
187impl GlobalPools {
188 fn new() -> Self {
189 Self {
190 cow_hashmap: ObjectPool::new(50, || {
191 let mut map = HashMap::with_capacity(8);
192 map.shrink_to_fit(); map
194 }),
195 string_hashmap: ObjectPool::new(50, || {
196 let mut map = HashMap::with_capacity(8);
197 map.shrink_to_fit(); map
199 }),
200 byte_vec: ObjectPool::new(100, || {
201 let mut vec = Vec::with_capacity(1024);
202 vec.clear(); vec
204 }),
205 string_vec: ObjectPool::new(50, || {
206 let mut vec = Vec::with_capacity(16);
207 vec.clear(); vec
209 }),
210 }
211 }
212}
213
214static GLOBAL_POOLS: Lazy<GlobalPools> = Lazy::new(GlobalPools::new);
216
217pub struct CleaningPooledObject<T: 'static> {
219 inner: PooledObject<'static, T>,
220}
221
222impl<T: 'static> CleaningPooledObject<T> {
223 fn new(inner: PooledObject<'static, T>) -> Self {
224 Self { inner }
225 }
226
227 pub fn take(self) -> T {
228 self.inner.take()
229 }
230}
231
232impl<T: 'static> std::ops::Deref for CleaningPooledObject<T> {
233 type Target = T;
234 fn deref(&self) -> &Self::Target {
235 &self.inner
236 }
237}
238
239impl<T: 'static> std::ops::DerefMut for CleaningPooledObject<T> {
240 fn deref_mut(&mut self) -> &mut Self::Target {
241 &mut self.inner
242 }
243}
244
245static CLEANING_COW_HASHMAP: Lazy<ObjectPool<HashMap<Cow<'static, str>, Cow<'static, str>>>> =
247 Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
248static CLEANING_STRING_HASHMAP: Lazy<ObjectPool<HashMap<String, String>>> =
249 Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
250static CLEANING_BYTE_VEC: Lazy<ObjectPool<Vec<u8>>> =
251 Lazy::new(|| ObjectPool::new(100, || Vec::with_capacity(1024)));
252static CLEANING_STRING_VEC: Lazy<ObjectPool<Vec<String>>> =
253 Lazy::new(|| ObjectPool::new(50, || Vec::with_capacity(16)));
254
255pub fn get_cow_hashmap() -> CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>> {
257 let mut obj = CLEANING_COW_HASHMAP.get();
258 obj.clear(); CleaningPooledObject::new(obj)
260}
261
262pub fn get_string_hashmap() -> CleaningPooledObject<HashMap<String, String>> {
263 let mut obj = CLEANING_STRING_HASHMAP.get();
264 obj.clear(); CleaningPooledObject::new(obj)
266}
267
268pub fn get_byte_vec() -> CleaningPooledObject<Vec<u8>> {
269 let mut obj = CLEANING_BYTE_VEC.get();
270 obj.clear(); CleaningPooledObject::new(obj)
272}
273
274pub fn get_string_vec() -> CleaningPooledObject<Vec<String>> {
275 let mut obj = CLEANING_STRING_VEC.get();
276 obj.clear(); CleaningPooledObject::new(obj)
278}
279
280#[derive(Debug, Clone)]
282pub struct GlobalPoolStats {
283 pub cow_hashmap: PoolStats,
284 pub string_hashmap: PoolStats,
285 pub byte_vec: PoolStats,
286 pub string_vec: PoolStats,
287 pub total_objects_created: usize,
288 pub total_objects_reused: usize,
289 pub total_reuse_ratio: f64,
290}
291
292pub fn get_global_pool_stats() -> GlobalPoolStats {
294 let cow_hashmap = CLEANING_COW_HASHMAP.stats();
295 let string_hashmap = CLEANING_STRING_HASHMAP.stats();
296 let byte_vec = CLEANING_BYTE_VEC.stats();
297 let string_vec = CLEANING_STRING_VEC.stats();
298
299 let total_created = cow_hashmap.objects_created
300 + string_hashmap.objects_created
301 + byte_vec.objects_created
302 + string_vec.objects_created;
303 let total_reused = cow_hashmap.objects_reused
304 + string_hashmap.objects_reused
305 + byte_vec.objects_reused
306 + string_vec.objects_reused;
307
308 let total_reuse_ratio = if total_created + total_reused > 0 {
309 total_reused as f64 / (total_created + total_reused) as f64
310 } else {
311 0.0
312 };
313
314 GlobalPoolStats {
315 cow_hashmap,
316 string_hashmap,
317 byte_vec,
318 string_vec,
319 total_objects_created: total_created,
320 total_objects_reused: total_reused,
321 total_reuse_ratio,
322 }
323}
324
325pub mod pooled_builders {
327 use super::*;
328 use crate::domain::value_objects::JsonData;
329 use crate::infrastructure::integration::{ResponseBody, UniversalResponse};
330
331 pub struct PooledResponseBuilder {
333 status_code: u16,
334 headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
335 content_type: Cow<'static, str>,
336 }
337
338 impl PooledResponseBuilder {
339 pub fn new() -> Self {
341 Self {
342 status_code: 200,
343 headers: get_cow_hashmap(),
344 content_type: Cow::Borrowed("application/json"),
345 }
346 }
347
348 pub fn status(mut self, status: u16) -> Self {
350 self.status_code = status;
351 self
352 }
353
354 pub fn header(
356 mut self,
357 name: impl Into<Cow<'static, str>>,
358 value: impl Into<Cow<'static, str>>,
359 ) -> Self {
360 self.headers.insert(name.into(), value.into());
361 self
362 }
363
364 pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
366 self.content_type = content_type.into();
367 self
368 }
369
370 pub fn json(self, data: JsonData) -> UniversalResponse {
372 let headers = self.headers.take();
374
375 UniversalResponse {
376 status_code: self.status_code,
377 headers,
378 body: ResponseBody::Json(data),
379 content_type: self.content_type,
380 }
381 }
382
383 pub fn binary(self, data: Vec<u8>) -> UniversalResponse {
385 let headers = self.headers.take();
386
387 UniversalResponse {
388 status_code: self.status_code,
389 headers,
390 body: ResponseBody::Binary(data),
391 content_type: Cow::Borrowed("application/octet-stream"),
392 }
393 }
394 }
395
396 impl Default for PooledResponseBuilder {
397 fn default() -> Self {
398 Self::new()
399 }
400 }
401
402 pub struct PooledSSEBuilder {
404 events: CleaningPooledObject<Vec<String>>,
405 headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
406 }
407
408 impl PooledSSEBuilder {
409 pub fn new() -> Self {
411 let mut headers = get_cow_hashmap();
412 headers.insert(Cow::Borrowed("Cache-Control"), Cow::Borrowed("no-cache"));
413 headers.insert(Cow::Borrowed("Connection"), Cow::Borrowed("keep-alive"));
414
415 Self {
416 events: get_string_vec(),
417 headers,
418 }
419 }
420
421 pub fn event(mut self, data: impl Into<String>) -> Self {
423 self.events.push(format!("data: {}\n\n", data.into()));
424 self
425 }
426
427 pub fn header(
429 mut self,
430 name: impl Into<Cow<'static, str>>,
431 value: impl Into<Cow<'static, str>>,
432 ) -> Self {
433 self.headers.insert(name.into(), value.into());
434 self
435 }
436
437 pub fn build(self) -> UniversalResponse {
439 let events = self.events.take();
440 let headers = self.headers.take();
441
442 UniversalResponse {
443 status_code: 200,
444 headers,
445 body: ResponseBody::ServerSentEvents(events),
446 content_type: Cow::Borrowed("text/event-stream"),
447 }
448 }
449 }
450
451 impl Default for PooledSSEBuilder {
452 fn default() -> Self {
453 Self::new()
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::super::ResponseBody;
461 use super::*;
462 use crate::domain::value_objects::JsonData;
463
464 #[test]
465 fn test_object_pool_basic_operations() {
466 let pool = ObjectPool::new(5, || HashMap::<String, String>::with_capacity(4));
467
468 let mut obj1 = pool.get();
470 obj1.insert("test".to_string(), "value".to_string());
471
472 let obj2 = pool.get();
474
475 let stats = pool.stats();
477 assert_eq!(stats.objects_created, 2);
478 assert_eq!(stats.objects_reused, 0);
479
480 drop(obj1);
482 drop(obj2);
483
484 let _obj3 = pool.get();
486 let stats = pool.stats();
490 assert_eq!(stats.objects_reused, 1);
491 }
492
493 #[test]
494 fn test_pooled_object_deref() {
495 let pool = ObjectPool::new(5, || vec![1, 2, 3]);
496 let obj = pool.get();
497
498 assert_eq!(obj.len(), 3);
500 assert_eq!(obj[0], 1);
501 }
502
503 #[test]
504 fn test_pooled_object_take() {
505 let pool = ObjectPool::new(5, || vec![1, 2, 3]);
506 let obj = pool.get();
507
508 let taken = obj.take();
509 assert_eq!(taken, vec![1, 2, 3]);
510
511 let stats = pool.stats();
513 assert_eq!(stats.objects_returned, 0);
514 }
515
516 #[test]
517 fn test_global_pools() {
518 let mut headers = get_cow_hashmap();
519 headers.insert(Cow::Borrowed("test"), Cow::Borrowed("value"));
520 drop(headers);
521
522 let mut bytes = get_byte_vec();
523 bytes.extend_from_slice(b"test data");
524 drop(bytes);
525
526 let stats = get_global_pool_stats();
527 assert!(stats.total_reuse_ratio >= 0.0);
530 }
531
532 #[test]
533 fn test_pooled_response_builder() {
534 let response = pooled_builders::PooledResponseBuilder::new()
535 .status(201)
536 .header("X-Test", "test-value")
537 .content_type("application/json")
538 .json(JsonData::String("test".to_string()));
539
540 assert_eq!(response.status_code, 201);
541 assert_eq!(
542 response.headers.get("X-Test"),
543 Some(&Cow::Borrowed("test-value"))
544 );
545 }
546
547 #[test]
548 fn test_pooled_sse_builder() {
549 let response = pooled_builders::PooledSSEBuilder::new()
550 .event("first event")
551 .event("second event")
552 .header("X-Custom", "custom-value")
553 .build();
554
555 assert_eq!(response.status_code, 200);
556 assert_eq!(response.content_type, "text/event-stream");
557
558 if let ResponseBody::ServerSentEvents(events) = response.body {
559 assert_eq!(events.len(), 2);
560 assert!(events[0].contains("first event"));
561 assert!(events[1].contains("second event"));
562 } else {
563 panic!("Expected ServerSentEvents body");
564 }
565 }
566
567 #[test]
568 fn test_pool_capacity_limits() {
569 let pool = ObjectPool::new(2, Vec::<i32>::new);
570
571 let obj1 = pool.get();
572 let obj2 = pool.get();
573 let obj3 = pool.get(); drop(obj1);
576 drop(obj2);
577 drop(obj3); let stats = pool.stats();
580 assert_eq!(stats.objects_created, 3);
581 assert_eq!(stats.objects_returned, 2); }
583
584 #[test]
585 fn test_concurrent_pool_access() {
586 use std::sync::Arc;
587 use std::thread;
588
589 let pool = Arc::new(ObjectPool::new(10, Vec::<i32>::new));
590 let mut handles = vec![];
591
592 for _ in 0..5 {
593 let pool_clone = Arc::clone(&pool);
594 let handle = thread::spawn(move || {
595 let mut obj = pool_clone.get();
596 obj.push(1);
597 obj.push(2);
598 });
600 handles.push(handle);
601 }
602
603 for handle in handles {
604 handle.join().unwrap();
605 }
606
607 let stats = pool.stats();
608 assert!(stats.objects_created <= 10); assert!(stats.objects_reused > 0 || stats.objects_created == 5);
610 }
611}