pjson_rs/infrastructure/integration/
object_pool.rs

1// Object pooling system for high-performance memory management
2//
3// This module provides thread-safe object pools for frequently allocated
4// data structures like HashMap and Vec to minimize garbage collection overhead.
5
6use crossbeam::queue::ArrayQueue;
7use once_cell::sync::Lazy;
8use std::borrow::Cow;
9use std::collections::HashMap;
10use std::sync::{Arc, Mutex};
11
12/// Thread-safe object pool for reusable data structures
13pub struct ObjectPool<T> {
14    /// Queue of available objects
15    objects: ArrayQueue<T>,
16    /// Factory function to create new objects
17    factory: Arc<dyn Fn() -> T + Send + Sync>,
18    /// Maximum pool capacity
19    max_capacity: usize,
20    /// Pool statistics
21    stats: Arc<Mutex<PoolStats>>,
22}
23
24#[derive(Debug, Clone, Default)]
25pub struct PoolStats {
26    pub objects_created: usize,
27    pub objects_reused: usize,
28    pub objects_returned: usize,
29    pub peak_usage: usize,
30    pub current_pool_size: usize,
31}
32
33impl<T> ObjectPool<T> {
34    /// Create a new object pool with specified capacity
35    pub fn new<F>(capacity: usize, factory: F) -> Self
36    where
37        F: Fn() -> T + Send + Sync + 'static,
38    {
39        Self {
40            objects: ArrayQueue::new(capacity),
41            factory: Arc::new(factory),
42            max_capacity: capacity,
43            stats: Arc::new(Mutex::new(PoolStats::default())),
44        }
45    }
46
47    /// Get an object from the pool, creating a new one if needed
48    pub fn get(&self) -> PooledObject<'_, T> {
49        let obj = if let Some(obj) = self.objects.pop() {
50            // Update stats for reuse
51            if let Ok(mut stats) = self.stats.lock() {
52                stats.objects_reused += 1;
53                stats.current_pool_size = stats.current_pool_size.saturating_sub(1);
54            }
55            obj
56        } else {
57            // Create new object
58            let obj = (self.factory)();
59            if let Ok(mut stats) = self.stats.lock() {
60                stats.objects_created += 1;
61                stats.peak_usage = stats
62                    .peak_usage
63                    .max(stats.objects_created - stats.current_pool_size);
64            }
65            obj
66        };
67
68        PooledObject {
69            object: Some(obj),
70            pool: self,
71        }
72    }
73
74    /// Return an object to the pool
75    fn return_object(&self, obj: T) {
76        // Try to return to pool
77        if self.objects.push(obj).is_ok()
78            && let Ok(mut stats) = self.stats.lock()
79        {
80            stats.objects_returned += 1;
81            stats.current_pool_size += 1;
82        }
83        // If pool is full, object is dropped (let GC handle it)
84    }
85
86    /// Clean object before returning to pool (default implementation)
87    fn clean_object(_obj: &mut T) {
88        // Default implementation does nothing
89        // Specialized implementations below for specific types
90    }
91
92    /// Get current pool statistics
93    pub fn stats(&self) -> PoolStats {
94        self.stats
95            .lock()
96            .map(|guard| guard.clone())
97            .unwrap_or_default()
98    }
99}
100
101// Trait for cleanable objects
102trait Cleanable {
103    fn clean(&mut self);
104}
105
106impl Cleanable for HashMap<Cow<'static, str>, Cow<'static, str>> {
107    fn clean(&mut self) {
108        self.clear();
109    }
110}
111
112impl Cleanable for HashMap<String, String> {
113    fn clean(&mut self) {
114        self.clear();
115    }
116}
117
118impl Cleanable for Vec<u8> {
119    fn clean(&mut self) {
120        self.clear();
121    }
122}
123
124impl Cleanable for Vec<String> {
125    fn clean(&mut self) {
126        self.clear();
127    }
128}
129
130/// RAII wrapper that automatically returns objects to pool
131pub struct PooledObject<'a, T> {
132    object: Option<T>,
133    pool: &'a ObjectPool<T>,
134}
135
136impl<'a, T> PooledObject<'a, T> {
137    /// Get a reference to the pooled object
138    pub fn get(&self) -> &T {
139        self.object
140            .as_ref()
141            .expect("PooledObject accessed after take")
142    }
143
144    /// Get a mutable reference to the pooled object
145    pub fn get_mut(&mut self) -> &mut T {
146        self.object
147            .as_mut()
148            .expect("PooledObject accessed after take")
149    }
150
151    /// Take ownership of the object (prevents return to pool)
152    pub fn take(mut self) -> T {
153        self.object.take().expect("PooledObject already taken")
154    }
155}
156
157impl<'a, T> Drop for PooledObject<'a, T> {
158    fn drop(&mut self) {
159        if let Some(obj) = self.object.take() {
160            self.pool.return_object(obj);
161        }
162    }
163}
164
165impl<'a, T> std::ops::Deref for PooledObject<'a, T> {
166    type Target = T;
167
168    fn deref(&self) -> &Self::Target {
169        self.get()
170    }
171}
172
173impl<'a, T> std::ops::DerefMut for PooledObject<'a, T> {
174    fn deref_mut(&mut self) -> &mut Self::Target {
175        self.get_mut()
176    }
177}
178
179/// Global pools for common data structures
180pub struct GlobalPools {
181    pub cow_hashmap: ObjectPool<HashMap<Cow<'static, str>, Cow<'static, str>>>,
182    pub string_hashmap: ObjectPool<HashMap<String, String>>,
183    pub byte_vec: ObjectPool<Vec<u8>>,
184    pub string_vec: ObjectPool<Vec<String>>,
185}
186
187impl GlobalPools {
188    fn new() -> Self {
189        Self {
190            cow_hashmap: ObjectPool::new(50, || {
191                let mut map = HashMap::with_capacity(8);
192                map.shrink_to_fit(); // Ensure it's clean
193                map
194            }),
195            string_hashmap: ObjectPool::new(50, || {
196                let mut map = HashMap::with_capacity(8);
197                map.shrink_to_fit(); // Ensure it's clean
198                map
199            }),
200            byte_vec: ObjectPool::new(100, || {
201                let mut vec = Vec::with_capacity(1024);
202                vec.clear(); // Ensure it's clean
203                vec
204            }),
205            string_vec: ObjectPool::new(50, || {
206                let mut vec = Vec::with_capacity(16);
207                vec.clear(); // Ensure it's clean
208                vec
209            }),
210        }
211    }
212}
213
214/// Global singleton pools
215static GLOBAL_POOLS: Lazy<GlobalPools> = Lazy::new(GlobalPools::new);
216
217/// Wrapper that ensures cleaning happens
218pub struct CleaningPooledObject<T: 'static> {
219    inner: PooledObject<'static, T>,
220}
221
222impl<T: 'static> CleaningPooledObject<T> {
223    fn new(inner: PooledObject<'static, T>) -> Self {
224        Self { inner }
225    }
226
227    pub fn take(self) -> T {
228        self.inner.take()
229    }
230}
231
232impl<T: 'static> std::ops::Deref for CleaningPooledObject<T> {
233    type Target = T;
234    fn deref(&self) -> &Self::Target {
235        &self.inner
236    }
237}
238
239impl<T: 'static> std::ops::DerefMut for CleaningPooledObject<T> {
240    fn deref_mut(&mut self) -> &mut Self::Target {
241        &mut self.inner
242    }
243}
244
245/// Global cleaning pools - direct ObjectPool instances
246static CLEANING_COW_HASHMAP: Lazy<ObjectPool<HashMap<Cow<'static, str>, Cow<'static, str>>>> =
247    Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
248static CLEANING_STRING_HASHMAP: Lazy<ObjectPool<HashMap<String, String>>> =
249    Lazy::new(|| ObjectPool::new(50, || HashMap::with_capacity(8)));
250static CLEANING_BYTE_VEC: Lazy<ObjectPool<Vec<u8>>> =
251    Lazy::new(|| ObjectPool::new(100, || Vec::with_capacity(1024)));
252static CLEANING_STRING_VEC: Lazy<ObjectPool<Vec<String>>> =
253    Lazy::new(|| ObjectPool::new(50, || Vec::with_capacity(16)));
254
255/// Convenience functions for global cleaning pools
256pub fn get_cow_hashmap() -> CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>> {
257    let mut obj = CLEANING_COW_HASHMAP.get();
258    obj.clear(); // Clean before use
259    CleaningPooledObject::new(obj)
260}
261
262pub fn get_string_hashmap() -> CleaningPooledObject<HashMap<String, String>> {
263    let mut obj = CLEANING_STRING_HASHMAP.get();
264    obj.clear(); // Clean before use
265    CleaningPooledObject::new(obj)
266}
267
268pub fn get_byte_vec() -> CleaningPooledObject<Vec<u8>> {
269    let mut obj = CLEANING_BYTE_VEC.get();
270    obj.clear(); // Clean before use
271    CleaningPooledObject::new(obj)
272}
273
274pub fn get_string_vec() -> CleaningPooledObject<Vec<String>> {
275    let mut obj = CLEANING_STRING_VEC.get();
276    obj.clear(); // Clean before use
277    CleaningPooledObject::new(obj)
278}
279
280/// Pool statistics aggregator
281#[derive(Debug, Clone)]
282pub struct GlobalPoolStats {
283    pub cow_hashmap: PoolStats,
284    pub string_hashmap: PoolStats,
285    pub byte_vec: PoolStats,
286    pub string_vec: PoolStats,
287    pub total_objects_created: usize,
288    pub total_objects_reused: usize,
289    pub total_reuse_ratio: f64,
290}
291
292/// Get comprehensive statistics for all global pools
293pub fn get_global_pool_stats() -> GlobalPoolStats {
294    let cow_hashmap = CLEANING_COW_HASHMAP.stats();
295    let string_hashmap = CLEANING_STRING_HASHMAP.stats();
296    let byte_vec = CLEANING_BYTE_VEC.stats();
297    let string_vec = CLEANING_STRING_VEC.stats();
298
299    let total_created = cow_hashmap.objects_created
300        + string_hashmap.objects_created
301        + byte_vec.objects_created
302        + string_vec.objects_created;
303    let total_reused = cow_hashmap.objects_reused
304        + string_hashmap.objects_reused
305        + byte_vec.objects_reused
306        + string_vec.objects_reused;
307
308    let total_reuse_ratio = if total_created + total_reused > 0 {
309        total_reused as f64 / (total_created + total_reused) as f64
310    } else {
311        0.0
312    };
313
314    GlobalPoolStats {
315        cow_hashmap,
316        string_hashmap,
317        byte_vec,
318        string_vec,
319        total_objects_created: total_created,
320        total_objects_reused: total_reused,
321        total_reuse_ratio,
322    }
323}
324
325/// Optimized response building with pooled objects
326pub mod pooled_builders {
327    use super::*;
328    use crate::domain::value_objects::JsonData;
329    use crate::infrastructure::integration::{ResponseBody, UniversalResponse};
330
331    /// Response builder that uses pooled HashMap
332    pub struct PooledResponseBuilder {
333        status_code: u16,
334        headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
335        content_type: Cow<'static, str>,
336    }
337
338    impl PooledResponseBuilder {
339        /// Create new builder with pooled HashMap
340        pub fn new() -> Self {
341            Self {
342                status_code: 200,
343                headers: get_cow_hashmap(),
344                content_type: Cow::Borrowed("application/json"),
345            }
346        }
347
348        /// Set status code
349        pub fn status(mut self, status: u16) -> Self {
350            self.status_code = status;
351            self
352        }
353
354        /// Add header
355        pub fn header(
356            mut self,
357            name: impl Into<Cow<'static, str>>,
358            value: impl Into<Cow<'static, str>>,
359        ) -> Self {
360            self.headers.insert(name.into(), value.into());
361            self
362        }
363
364        /// Set content type
365        pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
366            self.content_type = content_type.into();
367            self
368        }
369
370        /// Build response with JSON data
371        pub fn json(self, data: JsonData) -> UniversalResponse {
372            // Take ownership of headers to avoid cloning
373            let headers = self.headers.take();
374
375            UniversalResponse {
376                status_code: self.status_code,
377                headers,
378                body: ResponseBody::Json(data),
379                content_type: self.content_type,
380            }
381        }
382
383        /// Build response with binary data using pooled Vec
384        pub fn binary(self, data: Vec<u8>) -> UniversalResponse {
385            let headers = self.headers.take();
386
387            UniversalResponse {
388                status_code: self.status_code,
389                headers,
390                body: ResponseBody::Binary(data),
391                content_type: Cow::Borrowed("application/octet-stream"),
392            }
393        }
394    }
395
396    impl Default for PooledResponseBuilder {
397        fn default() -> Self {
398            Self::new()
399        }
400    }
401
402    /// Server-Sent Events builder with pooled Vec
403    pub struct PooledSSEBuilder {
404        events: CleaningPooledObject<Vec<String>>,
405        headers: CleaningPooledObject<HashMap<Cow<'static, str>, Cow<'static, str>>>,
406    }
407
408    impl PooledSSEBuilder {
409        /// Create new SSE builder
410        pub fn new() -> Self {
411            let mut headers = get_cow_hashmap();
412            headers.insert(Cow::Borrowed("Cache-Control"), Cow::Borrowed("no-cache"));
413            headers.insert(Cow::Borrowed("Connection"), Cow::Borrowed("keep-alive"));
414
415            Self {
416                events: get_string_vec(),
417                headers,
418            }
419        }
420
421        /// Add event data
422        pub fn event(mut self, data: impl Into<String>) -> Self {
423            self.events.push(format!("data: {}\n\n", data.into()));
424            self
425        }
426
427        /// Add custom header
428        pub fn header(
429            mut self,
430            name: impl Into<Cow<'static, str>>,
431            value: impl Into<Cow<'static, str>>,
432        ) -> Self {
433            self.headers.insert(name.into(), value.into());
434            self
435        }
436
437        /// Build SSE response
438        pub fn build(self) -> UniversalResponse {
439            let events = self.events.take();
440            let headers = self.headers.take();
441
442            UniversalResponse {
443                status_code: 200,
444                headers,
445                body: ResponseBody::ServerSentEvents(events),
446                content_type: Cow::Borrowed("text/event-stream"),
447            }
448        }
449    }
450
451    impl Default for PooledSSEBuilder {
452        fn default() -> Self {
453            Self::new()
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::super::ResponseBody;
461    use super::*;
462    use crate::domain::value_objects::JsonData;
463
464    #[test]
465    fn test_object_pool_basic_operations() {
466        let pool = ObjectPool::new(5, || HashMap::<String, String>::with_capacity(4));
467
468        // Get object from pool
469        let mut obj1 = pool.get();
470        obj1.insert("test".to_string(), "value".to_string());
471
472        // Get another object
473        let obj2 = pool.get();
474
475        // Check stats
476        let stats = pool.stats();
477        assert_eq!(stats.objects_created, 2);
478        assert_eq!(stats.objects_reused, 0);
479
480        // Drop objects (return to pool)
481        drop(obj1);
482        drop(obj2);
483
484        // Get object again (should be reused)
485        let _obj3 = pool.get();
486        // Note: obj3 might not be empty because we're using a basic pool
487        // The cleaning happens in CleaningPooledObject, not in basic ObjectPool
488
489        let stats = pool.stats();
490        assert_eq!(stats.objects_reused, 1);
491    }
492
493    #[test]
494    fn test_pooled_object_deref() {
495        let pool = ObjectPool::new(5, || vec![1, 2, 3]);
496        let obj = pool.get();
497
498        // Test Deref
499        assert_eq!(obj.len(), 3);
500        assert_eq!(obj[0], 1);
501    }
502
503    #[test]
504    fn test_pooled_object_take() {
505        let pool = ObjectPool::new(5, || vec![1, 2, 3]);
506        let obj = pool.get();
507
508        let taken = obj.take();
509        assert_eq!(taken, vec![1, 2, 3]);
510
511        // Object should not be returned to pool
512        let stats = pool.stats();
513        assert_eq!(stats.objects_returned, 0);
514    }
515
516    #[test]
517    fn test_global_pools() {
518        let mut headers = get_cow_hashmap();
519        headers.insert(Cow::Borrowed("test"), Cow::Borrowed("value"));
520        drop(headers);
521
522        let mut bytes = get_byte_vec();
523        bytes.extend_from_slice(b"test data");
524        drop(bytes);
525
526        let stats = get_global_pool_stats();
527        // Note: Stats might be 0 initially because we're using different pools
528        // This test validates that the stats function works, not specific values
529        assert!(stats.total_reuse_ratio >= 0.0);
530    }
531
532    #[test]
533    fn test_pooled_response_builder() {
534        let response = pooled_builders::PooledResponseBuilder::new()
535            .status(201)
536            .header("X-Test", "test-value")
537            .content_type("application/json")
538            .json(JsonData::String("test".to_string()));
539
540        assert_eq!(response.status_code, 201);
541        assert_eq!(
542            response.headers.get("X-Test"),
543            Some(&Cow::Borrowed("test-value"))
544        );
545    }
546
547    #[test]
548    fn test_pooled_sse_builder() {
549        let response = pooled_builders::PooledSSEBuilder::new()
550            .event("first event")
551            .event("second event")
552            .header("X-Custom", "custom-value")
553            .build();
554
555        assert_eq!(response.status_code, 200);
556        assert_eq!(response.content_type, "text/event-stream");
557
558        if let ResponseBody::ServerSentEvents(events) = response.body {
559            assert_eq!(events.len(), 2);
560            assert!(events[0].contains("first event"));
561            assert!(events[1].contains("second event"));
562        } else {
563            panic!("Expected ServerSentEvents body");
564        }
565    }
566
567    #[test]
568    fn test_pool_capacity_limits() {
569        let pool = ObjectPool::new(2, Vec::<i32>::new);
570
571        let obj1 = pool.get();
572        let obj2 = pool.get();
573        let obj3 = pool.get(); // This should create new object
574
575        drop(obj1);
576        drop(obj2);
577        drop(obj3); // Pool is full, so this should be dropped
578
579        let stats = pool.stats();
580        assert_eq!(stats.objects_created, 3);
581        assert_eq!(stats.objects_returned, 2); // Only 2 can fit in pool
582    }
583
584    #[test]
585    fn test_concurrent_pool_access() {
586        use std::sync::Arc;
587        use std::thread;
588
589        let pool = Arc::new(ObjectPool::new(10, Vec::<i32>::new));
590        let mut handles = vec![];
591
592        for _ in 0..5 {
593            let pool_clone = Arc::clone(&pool);
594            let handle = thread::spawn(move || {
595                let mut obj = pool_clone.get();
596                obj.push(1);
597                obj.push(2);
598                // Object automatically returned when dropped
599            });
600            handles.push(handle);
601        }
602
603        for handle in handles {
604            handle.join().unwrap();
605        }
606
607        let stats = pool.stats();
608        assert!(stats.objects_created <= 10); // Should reuse objects
609        assert!(stats.objects_reused > 0 || stats.objects_created == 5);
610    }
611}