1use parking_lot::Mutex;
7use std::ops::{Deref, DerefMut};
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10pub struct ObjectPool<T> {
12 available: Mutex<Vec<T>>,
14 factory: Box<dyn Fn() -> T + Send + Sync>,
16 max_size: usize,
18 borrowed: AtomicUsize,
20 created: AtomicUsize,
22}
23
24impl<T> ObjectPool<T> {
25 pub fn new<F>(factory: F, max_size: usize) -> Self
27 where
28 F: Fn() -> T + Send + Sync + 'static,
29 {
30 Self {
31 available: Mutex::new(Vec::with_capacity(max_size)),
32 factory: Box::new(factory),
33 max_size,
34 borrowed: AtomicUsize::new(0),
35 created: AtomicUsize::new(0),
36 }
37 }
38
39 pub fn acquire(&self) -> PooledObject<'_, T> {
41 let obj = self.available.lock().pop().unwrap_or_else(|| {
42 self.created.fetch_add(1, Ordering::Relaxed);
43 (self.factory)()
44 });
45
46 self.borrowed.fetch_add(1, Ordering::Relaxed);
47
48 PooledObject {
49 obj: Some(obj),
50 pool: self,
51 }
52 }
53
54 fn release(&self, obj: T) {
56 self.borrowed.fetch_sub(1, Ordering::Relaxed);
57
58 let mut available = self.available.lock();
59 if available.len() < self.max_size {
60 available.push(obj);
61 }
62 }
64
65 pub fn available(&self) -> usize {
67 self.available.lock().len()
68 }
69
70 pub fn borrowed(&self) -> usize {
72 self.borrowed.load(Ordering::Relaxed)
73 }
74
75 pub fn created(&self) -> usize {
77 self.created.load(Ordering::Relaxed)
78 }
79}
80
81pub struct PooledObject<'a, T> {
83 obj: Option<T>,
84 pool: &'a ObjectPool<T>,
85}
86
87impl<T> Deref for PooledObject<'_, T> {
88 type Target = T;
89
90 fn deref(&self) -> &Self::Target {
91 self.obj.as_ref().unwrap()
92 }
93}
94
95impl<T> DerefMut for PooledObject<'_, T> {
96 fn deref_mut(&mut self) -> &mut Self::Target {
97 self.obj.as_mut().unwrap()
98 }
99}
100
101impl<T> Drop for PooledObject<'_, T> {
102 fn drop(&mut self) {
103 if let Some(obj) = self.obj.take() {
104 self.pool.release(obj);
105 }
106 }
107}
108
109pub type PooledBuffer<'a> = PooledObject<'a, Vec<u8>>;
111
112pub static PACK_BUFFER_POOL: std::sync::LazyLock<ObjectPool<Vec<u8>>> =
114 std::sync::LazyLock::new(|| {
115 ObjectPool::new(
116 || Vec::with_capacity(1024 * 1024), 100,
118 )
119 });
120
121pub static IO_BUFFER_POOL: std::sync::LazyLock<ObjectPool<Vec<u8>>> =
123 std::sync::LazyLock::new(|| {
124 ObjectPool::new(
125 || Vec::with_capacity(64 * 1024), 200,
127 )
128 });
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 #[test]
135 fn test_pool_acquire_release() {
136 let pool: ObjectPool<Vec<u8>> = ObjectPool::new(|| Vec::with_capacity(1024), 10);
137
138 {
139 let mut buf = pool.acquire();
140 buf.extend_from_slice(b"hello");
141 assert_eq!(pool.borrowed(), 1);
142 }
143
144 assert_eq!(pool.borrowed(), 0);
145 assert_eq!(pool.available(), 1);
146 }
147
148 #[test]
149 fn test_pool_reuse() {
150 let pool: ObjectPool<Vec<u8>> = ObjectPool::new(|| Vec::with_capacity(1024), 10);
151
152 {
153 let mut buf = pool.acquire();
154 buf.extend_from_slice(b"first");
155 }
156
157 {
158 let buf = pool.acquire();
159 assert!(buf.capacity() >= 1024);
161 }
162
163 assert_eq!(pool.created(), 1); }
165
166 #[test]
167 fn test_pool_max_size() {
168 let pool: ObjectPool<u32> = ObjectPool::new(|| 0, 2);
169
170 let _a = pool.acquire();
172 let _b = pool.acquire();
173 let _c = pool.acquire();
174
175 assert_eq!(pool.borrowed(), 3);
176 assert_eq!(pool.created(), 3);
177
178 drop(_a);
179 drop(_b);
180 drop(_c);
181
182 assert_eq!(pool.available(), 2);
184 }
185
186 #[test]
187 fn test_pooled_object_deref() {
188 let pool: ObjectPool<Vec<u8>> = ObjectPool::new(Vec::new, 10);
189
190 let mut buf = pool.acquire();
191 buf.push(1);
192 buf.push(2);
193
194 assert_eq!(buf.len(), 2);
195 assert_eq!(&*buf, &[1, 2]);
196 }
197
198 #[test]
199 fn test_global_pools() {
200 let pack_buf = PACK_BUFFER_POOL.acquire();
201 assert!(pack_buf.capacity() >= 1024 * 1024);
202
203 let io_buf = IO_BUFFER_POOL.acquire();
204 assert!(io_buf.capacity() >= 64 * 1024);
205 }
206}