1use parking_lot::Mutex;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use crate::error::{MemoryPoolError, MemoryPoolResult};
10
11pub struct PooledBuffer {
13 data: Option<Vec<u8>>,
14 size: usize,
15 pool: Option<Arc<MemoryPool>>,
16}
17
18impl PooledBuffer {
19 pub fn data(&self) -> &[u8] {
21 self.data.as_ref().map(|v| v.as_slice()).unwrap_or(&[])
22 }
23
24 pub fn data_mut(&mut self) -> &mut [u8] {
26 self.data.as_mut().map(|v| v.as_mut_slice()).unwrap_or(&mut [])
27 }
28
29 pub fn size(&self) -> usize {
31 self.size
32 }
33
34 pub fn into_vec(mut self) -> Vec<u8> {
36 if let Some(ref pool) = self.pool {
39 pool.stats.current_memory_usage.fetch_sub(
40 self.size,
41 Ordering::Relaxed,
42 );
43 }
44
45 self.pool = None; self.data.take().unwrap_or_default()
47 }
48}
49
50impl Drop for PooledBuffer {
51 fn drop(&mut self) {
52 if let (Some(pool), Some(data)) = (self.pool.take(), self.data.take()) {
53 pool.release_internal(data, self.size);
54 }
55 }
56}
57
58#[derive(Debug)]
60struct BufferEntry {
61 buffer: Vec<u8>,
62 size: usize,
63 last_used: Instant,
64 use_count: u32,
65}
66
67impl BufferEntry {
68 fn new(buffer: Vec<u8>, size: usize) -> Self {
69 Self {
70 buffer,
71 size,
72 last_used: Instant::now(),
73 use_count: 0,
74 }
75 }
76
77 fn matches_size(&self, requested_size: usize) -> bool {
78 self.size >= requested_size && self.size <= requested_size * 2
79 }
80
81 fn is_expired(&self, timeout: Duration) -> bool {
82 self.last_used.elapsed() > timeout
83 }
84}
85
86#[derive(Debug, Clone, Default)]
88pub struct PoolStats {
89 pub available_buffers: usize,
90 pub total_buffers_created: u64,
91 pub total_memory_allocated: usize,
92 pub peak_memory_usage: usize,
93 pub memory_reuse_count: u64,
94 pub current_memory_usage: usize, pub pooled_memory: usize, pub buffer_hits: u64,
97 pub buffer_misses: u64,
98}
99
100pub struct MemoryPool {
102 inner: Arc<Mutex<MemoryPoolInner>>,
103 stats: Arc<PoolStatsAtomic>,
104 config: PoolConfig,
105}
106
107struct MemoryPoolInner {
108 buffers: VecDeque<BufferEntry>,
109}
110
111struct PoolStatsAtomic {
112 total_buffers_created: AtomicU64,
113 memory_reuse_count: AtomicU64,
114 peak_memory_usage: AtomicUsize,
115 current_memory_usage: AtomicUsize,
116 buffer_hits: AtomicU64,
117 buffer_misses: AtomicU64,
118}
119
120#[derive(Debug, Clone)]
122pub struct PoolConfig {
123 pub max_buffers: usize,
125 pub max_memory: usize,
127 pub buffer_timeout: Duration,
129 pub preallocate: bool,
131 pub default_buffer_size: usize,
133}
134
135impl Default for PoolConfig {
136 fn default() -> Self {
137 Self {
138 max_buffers: 10,
139 max_memory: 500 * 1024 * 1024, buffer_timeout: Duration::from_secs(60),
141 preallocate: false,
142 default_buffer_size: 1920 * 1080 * 4, }
144 }
145}
146
147impl MemoryPool {
148 pub fn new() -> Arc<Self> {
150 Self::with_config(PoolConfig::default())
151 }
152
153 pub fn with_config(config: PoolConfig) -> Arc<Self> {
155 let pool = Arc::new(Self {
156 inner: Arc::new(Mutex::new(MemoryPoolInner {
157 buffers: VecDeque::new(),
158 })),
159 stats: Arc::new(PoolStatsAtomic {
160 total_buffers_created: AtomicU64::new(0),
161 memory_reuse_count: AtomicU64::new(0),
162 peak_memory_usage: AtomicUsize::new(0),
163 current_memory_usage: AtomicUsize::new(0),
164 buffer_hits: AtomicU64::new(0),
165 buffer_misses: AtomicU64::new(0),
166 }),
167 config,
168 });
169
170 if pool.config.preallocate {
171 pool.preallocate_buffers();
172 }
173
174 pool
175 }
176
177 fn preallocate_buffers(&self) {
179 let mut inner = self.inner.lock();
180 let num_buffers = self.config.max_buffers.min(4);
181
182 for _ in 0..num_buffers {
183 let buffer = vec![0u8; self.config.default_buffer_size];
184 let entry = BufferEntry::new(buffer, self.config.default_buffer_size);
185 inner.buffers.push_back(entry);
186
187 self.stats
188 .total_buffers_created
189 .fetch_add(1, Ordering::Relaxed);
190 }
193 }
194
195 pub fn acquire(self: &Arc<Self>, size: usize) -> MemoryPoolResult<PooledBuffer> {
197 if size == 0 {
198 return Err(MemoryPoolError::InvalidBufferSize { size });
199 }
200
201 let mut inner = self.inner.lock();
202
203 self.cleanup_expired_buffers(&mut inner);
205
206 if size > 10 * 1024 * 1024 {
208 let current = self.stats.current_memory_usage.load(Ordering::Relaxed);
209 eprintln!(
210 "[MemoryPool] Large allocation requested: {:.2} MB, current pool usage: {:.2} MB / {:.2} MB",
211 size as f64 / (1024.0 * 1024.0),
212 current as f64 / (1024.0 * 1024.0),
213 self.config.max_memory as f64 / (1024.0 * 1024.0)
214 );
215 }
216
217 if let Some(index) = self.find_suitable_buffer(&inner, size) {
219 let mut entry = inner.buffers.remove(index).unwrap();
220 entry.last_used = Instant::now();
221 entry.use_count += 1;
222
223 self.stats.current_memory_usage.fetch_add(
226 entry.size,
227 Ordering::Relaxed,
228 );
229
230 if entry.buffer.len() < size {
232 let size_diff = size - entry.size;
233 entry.buffer.resize(size, 0);
234 self.stats.current_memory_usage.fetch_add(
235 size_diff,
236 Ordering::Relaxed,
237 );
238 entry.size = size;
239 }
240
241 self.stats.buffer_hits.fetch_add(1, Ordering::Relaxed);
242 self.stats.memory_reuse_count.fetch_add(1, Ordering::Relaxed);
243
244 return Ok(PooledBuffer {
245 data: Some(entry.buffer),
246 size,
247 pool: Some(Arc::clone(self)),
248 });
249 }
250
251 drop(inner); self.stats.buffer_misses.fetch_add(1, Ordering::Relaxed);
255 self.allocate_new_buffer(size)
256 }
257
258 fn find_suitable_buffer(&self, inner: &MemoryPoolInner, size: usize) -> Option<usize> {
260 inner
261 .buffers
262 .iter()
263 .position(|entry| entry.matches_size(size))
264 }
265
266 fn allocate_new_buffer(self: &Arc<Self>, size: usize) -> MemoryPoolResult<PooledBuffer> {
268 let current_memory = self.stats.current_memory_usage.load(Ordering::Relaxed);
270 if current_memory + size > self.config.max_memory {
271 eprintln!(
274 "[MemoryPool] Pool limit reached ({} MB), allocating {:.2} MB directly without pooling",
275 self.config.max_memory / (1024 * 1024),
276 size as f64 / (1024.0 * 1024.0)
277 );
278
279 let buffer = vec![0u8; size];
280 return Ok(PooledBuffer {
281 data: Some(buffer),
282 size,
283 pool: None, });
285 }
286
287 let buffer = vec![0u8; size];
288 self.stats
289 .total_buffers_created
290 .fetch_add(1, Ordering::Relaxed);
291 self.stats
292 .current_memory_usage
293 .fetch_add(size, Ordering::Relaxed);
294 self.update_peak_memory();
295
296 Ok(PooledBuffer {
297 data: Some(buffer),
298 size,
299 pool: Some(Arc::clone(self)),
300 })
301 }
302
303 fn release_internal(&self, buffer: Vec<u8>, size: usize) {
305 let mut inner = self.inner.lock();
306
307 self.stats
312 .current_memory_usage
313 .fetch_sub(size, Ordering::Relaxed);
314
315 if inner.buffers.len() >= self.config.max_buffers {
317 return;
319 }
320
321 let entry = BufferEntry::new(buffer, size);
323 inner.buffers.push_back(entry);
324 }
325
326 fn cleanup_expired_buffers(&self, inner: &mut MemoryPoolInner) {
328 let timeout = self.config.buffer_timeout;
329
330 inner.buffers.retain(|entry| !entry.is_expired(timeout));
333 }
334
335 fn update_peak_memory(&self) {
337 let current = self.stats.current_memory_usage.load(Ordering::Relaxed);
338 let mut peak = self.stats.peak_memory_usage.load(Ordering::Relaxed);
339
340 while current > peak {
341 match self.stats.peak_memory_usage.compare_exchange(
342 peak,
343 current,
344 Ordering::Relaxed,
345 Ordering::Relaxed,
346 ) {
347 Ok(_) => break,
348 Err(x) => peak = x,
349 }
350 }
351 }
352
353 pub fn clear(&self) {
355 let mut inner = self.inner.lock();
356 inner.buffers.clear();
359 }
360
361 pub fn stats(&self) -> PoolStats {
363 let inner = self.inner.lock();
364
365 let pooled_memory: usize = inner.buffers.iter().map(|e| e.size).sum();
366
367 PoolStats {
368 available_buffers: inner.buffers.len(),
369 total_buffers_created: self.stats.total_buffers_created.load(Ordering::Relaxed),
370 total_memory_allocated: pooled_memory + self.stats.current_memory_usage.load(Ordering::Relaxed),
371 peak_memory_usage: self.stats.peak_memory_usage.load(Ordering::Relaxed),
372 memory_reuse_count: self.stats.memory_reuse_count.load(Ordering::Relaxed),
373 current_memory_usage: self.stats.current_memory_usage.load(Ordering::Relaxed),
374 pooled_memory,
375 buffer_hits: self.stats.buffer_hits.load(Ordering::Relaxed),
376 buffer_misses: self.stats.buffer_misses.load(Ordering::Relaxed),
377 }
378 }
379
380 pub fn hit_rate(&self) -> f64 {
382 let hits = self.stats.buffer_hits.load(Ordering::Relaxed);
383 let misses = self.stats.buffer_misses.load(Ordering::Relaxed);
384 let total = hits + misses;
385
386 if total == 0 {
387 0.0
388 } else {
389 (hits as f64 / total as f64) * 100.0
390 }
391 }
392}
393
394impl Default for MemoryPool {
395 fn default() -> Self {
396 Arc::try_unwrap(Self::new()).unwrap_or_else(|arc| (*arc).clone())
397 }
398}
399
400impl Clone for MemoryPool {
401 fn clone(&self) -> Self {
402 Self {
403 inner: Arc::clone(&self.inner),
404 stats: Arc::clone(&self.stats),
405 config: self.config.clone(),
406 }
407 }
408}
409
410static GLOBAL_POOL: once_cell::sync::Lazy<Arc<MemoryPool>> =
412 once_cell::sync::Lazy::new(|| MemoryPool::new());
413
414pub fn global_pool() -> Arc<MemoryPool> {
416 Arc::clone(&GLOBAL_POOL)
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_pool_acquire_release() {
425 let pool = MemoryPool::new();
426 let buffer = pool.acquire(1024).unwrap();
427 assert_eq!(buffer.size(), 1024);
428 drop(buffer);
429
430 let stats = pool.stats();
431 assert_eq!(stats.available_buffers, 1);
432 }
433
434 #[test]
435 fn test_pool_reuse() {
436 let pool = MemoryPool::new();
437
438 let buffer1 = pool.acquire(1024).unwrap();
439 drop(buffer1);
440
441 let buffer2 = pool.acquire(1024).unwrap();
442 drop(buffer2);
443
444 let stats = pool.stats();
445 assert_eq!(stats.memory_reuse_count, 1);
446 assert_eq!(stats.total_buffers_created, 1);
447 }
448
449 #[test]
450 fn test_pool_size_matching() {
451 let pool = MemoryPool::new();
452
453 let buffer1 = pool.acquire(1024).unwrap();
454 drop(buffer1);
455
456 let buffer2 = pool.acquire(512).unwrap();
458 drop(buffer2);
459
460 let stats = pool.stats();
461 assert_eq!(stats.memory_reuse_count, 1);
462 }
463
464 #[test]
465 fn test_hit_rate() {
466 let pool = MemoryPool::new();
467
468 let _buffer1 = pool.acquire(1024).unwrap();
469 drop(_buffer1);
470 let _buffer2 = pool.acquire(1024).unwrap();
471
472 assert!(pool.hit_rate() > 0.0);
473 }
474}