1use crate::domain::{DomainResult, DomainError};
7use std::{
8 collections::HashMap,
9 sync::{Arc, Mutex},
10 time::{Duration, Instant},
11};
12
13#[derive(Debug)]
15pub struct BufferPool {
16 pools: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
17 config: PoolConfig,
18 stats: Arc<Mutex<PoolStats>>,
19}
20
21#[derive(Debug, Clone)]
23pub struct PoolConfig {
24 pub max_buffers_per_bucket: usize,
26 pub max_total_memory: usize,
28 pub buffer_ttl: Duration,
30 pub track_stats: bool,
32 pub simd_alignment: usize,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum BufferSize {
39 Small = 1024,
41 Medium = 8192,
43 Large = 65536,
45 XLarge = 524288,
47 Huge = 4194304,
49}
50
51#[derive(Debug)]
53struct BufferBucket {
54 buffers: Vec<AlignedBuffer>,
55 size: BufferSize,
56 last_access: Instant,
57}
58
59#[derive(Debug)]
61pub struct AlignedBuffer {
62 data: Vec<u8>,
63 capacity: usize,
64 alignment: usize,
65 created_at: Instant,
66 last_used: Instant,
67}
68
69#[derive(Debug, Clone)]
71pub struct PoolStats {
72 pub total_allocations: u64,
74 pub cache_hits: u64,
76 pub cache_misses: u64,
78 pub current_memory_usage: usize,
80 pub peak_memory_usage: usize,
82 pub cleanup_count: u64,
84}
85
86impl BufferPool {
87 pub fn new() -> Self {
89 Self::with_config(PoolConfig::default())
90 }
91
92 pub fn with_config(config: PoolConfig) -> Self {
94 Self {
95 pools: Arc::new(Mutex::new(HashMap::new())),
96 config,
97 stats: Arc::new(Mutex::new(PoolStats::new())),
98 }
99 }
100
101 pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
103 if self.config.track_stats {
104 self.increment_allocations();
105 }
106
107 let mut pools = self.pools.lock()
108 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
109
110 if let Some(bucket) = pools.get_mut(&size)
111 && let Some(mut buffer) = bucket.buffers.pop() {
112 buffer.last_used = Instant::now();
113 bucket.last_access = Instant::now();
114
115 if self.config.track_stats {
116 self.increment_cache_hits();
117 }
118
119 return Ok(PooledBuffer::new(buffer, Arc::clone(&self.pools), size));
120 }
121
122 if self.config.track_stats {
124 self.increment_cache_misses();
125 }
126
127 let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
128 Ok(PooledBuffer::new(buffer, Arc::clone(&self.pools), size))
129 }
130
131 pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
133 let size = BufferSize::for_capacity(min_capacity);
134 self.get_buffer(size)
135 }
136
137 pub fn cleanup(&self) -> DomainResult<CleanupStats> {
139 let mut pools = self.pools.lock()
140 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
141
142 let now = Instant::now();
143 let mut freed_buffers = 0;
144 let mut freed_memory = 0;
145
146 pools.retain(|_size, bucket| {
147 let old_count = bucket.buffers.len();
148 bucket.buffers.retain(|buffer| {
149 let age = now.duration_since(buffer.last_used);
150 if age > self.config.buffer_ttl {
151 freed_memory += buffer.capacity;
152 false
153 } else {
154 true
155 }
156 });
157 freed_buffers += old_count - bucket.buffers.len();
158
159 !bucket.buffers.is_empty() ||
161 now.duration_since(bucket.last_access) < self.config.buffer_ttl
162 });
163
164 if self.config.track_stats {
165 self.increment_cleanup_count();
166 self.update_current_memory_usage(-(freed_memory as i64));
167 }
168
169 Ok(CleanupStats {
170 freed_buffers,
171 freed_memory,
172 })
173 }
174
175 pub fn stats(&self) -> DomainResult<PoolStats> {
177 let stats = self.stats.lock()
178 .map_err(|_| DomainError::InternalError("Failed to acquire stats lock".to_string()))?;
179 Ok(stats.clone())
180 }
181
182 pub fn current_memory_usage(&self) -> DomainResult<usize> {
184 let pools = self.pools.lock()
185 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
186
187 let usage = pools.values()
188 .map(|bucket| bucket.buffers.iter().map(|b| b.capacity).sum::<usize>())
189 .sum();
190
191 Ok(usage)
192 }
193
194 fn increment_allocations(&self) {
197 if let Ok(mut stats) = self.stats.lock() {
198 stats.total_allocations += 1;
199 }
200 }
201
202 fn increment_cache_hits(&self) {
203 if let Ok(mut stats) = self.stats.lock() {
204 stats.cache_hits += 1;
205 }
206 }
207
208 fn increment_cache_misses(&self) {
209 if let Ok(mut stats) = self.stats.lock() {
210 stats.cache_misses += 1;
211 }
212 }
213
214 fn increment_cleanup_count(&self) {
215 if let Ok(mut stats) = self.stats.lock() {
216 stats.cleanup_count += 1;
217 }
218 }
219
220 fn update_current_memory_usage(&self, delta: i64) {
221 if let Ok(mut stats) = self.stats.lock() {
222 stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
223 stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
224 }
225 }
226}
227
228impl BufferSize {
229 pub fn for_capacity(capacity: usize) -> Self {
231 match capacity {
232 0..=1024 => BufferSize::Small,
233 1025..=8192 => BufferSize::Medium,
234 8193..=65536 => BufferSize::Large,
235 65537..=524288 => BufferSize::XLarge,
236 _ => BufferSize::Huge,
237 }
238 }
239
240 pub fn all_sizes() -> &'static [BufferSize] {
242 &[
243 BufferSize::Small,
244 BufferSize::Medium,
245 BufferSize::Large,
246 BufferSize::XLarge,
247 BufferSize::Huge,
248 ]
249 }
250}
251
252impl AlignedBuffer {
253 fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
255 if !alignment.is_power_of_two() {
257 return Err(DomainError::InvalidInput(format!("Alignment {alignment} is not power of 2")));
258 }
259
260 let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
262
263 let data = Vec::with_capacity(aligned_capacity);
266
267 let now = Instant::now();
268 Ok(Self {
269 data,
270 capacity: aligned_capacity,
271 alignment,
272 created_at: now,
273 last_used: now,
274 })
275 }
276
277 pub fn as_mut_slice(&mut self) -> &mut [u8] {
279 &mut self.data
280 }
281
282 pub fn as_slice(&self) -> &[u8] {
284 &self.data
285 }
286
287 pub fn clear(&mut self) {
289 self.data.clear();
290 self.last_used = Instant::now();
291 }
292
293 pub fn capacity(&self) -> usize {
295 self.capacity
296 }
297
298 pub fn is_aligned(&self) -> bool {
301 let ptr = self.data.as_ptr() as usize;
302 let natural_alignment = std::mem::align_of::<u64>(); let effective_alignment = if self.alignment <= natural_alignment {
306 natural_alignment
307 } else {
308 std::cmp::min(self.alignment, 16)
312 };
313
314 ptr.is_multiple_of(effective_alignment)
315 }
316}
317
318pub struct PooledBuffer {
320 buffer: Option<AlignedBuffer>,
321 pool: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
322 size: BufferSize,
323}
324
325impl PooledBuffer {
326 fn new(
327 buffer: AlignedBuffer,
328 pool: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
329 size: BufferSize,
330 ) -> Self {
331 Self {
332 buffer: Some(buffer),
333 pool,
334 size,
335 }
336 }
337
338 pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
340 self.buffer.as_mut()
341 }
342
343 pub fn buffer(&self) -> Option<&AlignedBuffer> {
345 self.buffer.as_ref()
346 }
347
348 pub fn capacity(&self) -> usize {
350 self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
351 }
352
353 pub fn clear(&mut self) {
355 if let Some(buffer) = &mut self.buffer {
356 buffer.clear();
357 }
358 }
359}
360
361impl Drop for PooledBuffer {
362 fn drop(&mut self) {
363 if let Some(mut buffer) = self.buffer.take() {
364 buffer.clear(); if let Ok(mut pools) = self.pool.lock() {
367 let bucket = pools.entry(self.size).or_insert_with(|| BufferBucket {
368 buffers: Vec::new(),
369 size: self.size,
370 last_access: Instant::now(),
371 });
372
373 if bucket.buffers.len() < 50 { bucket.buffers.push(buffer);
376 bucket.last_access = Instant::now();
377 }
378 }
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct CleanupStats {
386 pub freed_buffers: usize,
387 pub freed_memory: usize,
388}
389
390impl PoolConfig {
391 pub fn simd_optimized() -> Self {
393 Self {
394 max_buffers_per_bucket: 100,
395 max_total_memory: 64 * 1024 * 1024, buffer_ttl: Duration::from_secs(300), track_stats: true,
398 simd_alignment: 64, }
400 }
401
402 pub fn low_memory() -> Self {
404 Self {
405 max_buffers_per_bucket: 10,
406 max_total_memory: 8 * 1024 * 1024, buffer_ttl: Duration::from_secs(60), track_stats: false, simd_alignment: 32, }
411 }
412}
413
414impl Default for PoolConfig {
415 fn default() -> Self {
416 Self {
417 max_buffers_per_bucket: 50,
418 max_total_memory: 32 * 1024 * 1024, buffer_ttl: Duration::from_secs(180), track_stats: true,
421 simd_alignment: 32, }
423 }
424}
425
426impl PoolStats {
427 fn new() -> Self {
428 Self {
429 total_allocations: 0,
430 cache_hits: 0,
431 cache_misses: 0,
432 current_memory_usage: 0,
433 peak_memory_usage: 0,
434 cleanup_count: 0,
435 }
436 }
437
438 pub fn hit_ratio(&self) -> f64 {
440 if self.total_allocations == 0 {
441 0.0
442 } else {
443 self.cache_hits as f64 / self.total_allocations as f64
444 }
445 }
446
447 pub fn memory_efficiency(&self) -> f64 {
449 if self.peak_memory_usage == 0 {
450 1.0
451 } else {
452 self.current_memory_usage as f64 / self.peak_memory_usage as f64
453 }
454 }
455}
456
457impl Default for BufferPool {
458 fn default() -> Self {
459 Self::new()
460 }
461}
462
463static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
465
466pub fn global_buffer_pool() -> &'static BufferPool {
468 GLOBAL_BUFFER_POOL.get_or_init(BufferPool::new)
469}
470
471pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
473 GLOBAL_BUFFER_POOL.set(BufferPool::with_config(config))
474 .map_err(|_| DomainError::InternalError("Global buffer pool already initialized".to_string()))?;
475 Ok(())
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[test]
483 fn test_buffer_pool_creation() {
484 let pool = BufferPool::new();
485 assert!(pool.stats().is_ok());
486 }
487
488 #[test]
489 fn test_buffer_allocation() {
490 let pool = BufferPool::new();
491 let buffer = pool.get_buffer(BufferSize::Medium);
492 assert!(buffer.is_ok());
493
494 let buffer = buffer.unwrap();
495 assert!(buffer.capacity() >= BufferSize::Medium as usize);
496 }
497
498 #[test]
499 fn test_buffer_reuse() {
500 let pool = BufferPool::new();
501
502 {
504 let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
505 }
506
507 let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
509
510 let stats = pool.stats().unwrap();
512 assert!(stats.cache_hits > 0);
513 }
514
515 #[test]
516 fn test_buffer_size_selection() {
517 assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
518 assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
519 assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
520 assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
521 }
522
523 #[test]
524 fn test_aligned_buffer_creation() {
525 let buffer = AlignedBuffer::new(1024, 64).unwrap();
526
527 let ptr = buffer.data.as_ptr() as usize;
529 let natural_alignment = std::mem::align_of::<u64>();
530 println!("Buffer ptr: 0x{:x}, alignment: {}, natural_alignment: {}",
531 ptr, buffer.alignment, natural_alignment);
532 println!("ptr % 8 = {}, ptr % 16 = {}, ptr % 32 = {}, ptr % 64 = {}",
533 ptr % 8, ptr % 16, ptr % 32, ptr % 64);
534
535 assert!(buffer.is_aligned(), "Buffer should be aligned. Ptr: 0x{:x}, Alignment: {}", ptr, buffer.alignment);
536 assert!(buffer.capacity() >= 1024);
537 }
538
539 #[test]
540 fn test_alignment_validation() {
541 let alignments = [1, 2, 4, 8, 16, 32, 64];
543
544 for alignment in alignments.iter() {
545 let result = AlignedBuffer::new(1024, *alignment);
546 if alignment.is_power_of_two() {
547 let buffer = result.unwrap();
548 println!("Testing alignment {}: ptr=0x{:x}, aligned={}",
549 alignment, buffer.data.as_ptr() as usize, buffer.is_aligned());
550 assert!(buffer.is_aligned(), "Failed for alignment {alignment}");
552 }
553 }
554
555 assert!(AlignedBuffer::new(1024, 3).is_err());
557 assert!(AlignedBuffer::new(1024, 17).is_err());
558 }
559
560 #[test]
561 fn test_pool_cleanup() {
562 let config = PoolConfig {
563 buffer_ttl: Duration::from_millis(1),
564 ..Default::default()
565 };
566 let pool = BufferPool::with_config(config);
567
568 {
570 let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
571 }
572
573 std::thread::sleep(Duration::from_millis(10));
575
576 let cleanup_stats = pool.cleanup().unwrap();
578 assert!(cleanup_stats.freed_buffers > 0);
579 }
580
581 #[test]
582 fn test_global_buffer_pool() {
583 let pool = global_buffer_pool();
584 let buffer = pool.get_buffer(BufferSize::Medium);
585 assert!(buffer.is_ok());
586 }
587}