1use crate::domain::{DomainResult, DomainError};
7use std::{
8 collections::HashMap,
9 sync::{Arc, Mutex},
10 time::{Duration, Instant},
11};
12
13#[derive(Debug)]
15pub struct BufferPool {
16 pools: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
17 config: PoolConfig,
18 stats: Arc<Mutex<PoolStats>>,
19}
20
21#[derive(Debug, Clone)]
23pub struct PoolConfig {
24 pub max_buffers_per_bucket: usize,
26 pub max_total_memory: usize,
28 pub buffer_ttl: Duration,
30 pub track_stats: bool,
32 pub simd_alignment: usize,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
38pub enum BufferSize {
39 Small = 1024,
41 Medium = 8192,
43 Large = 65536,
45 XLarge = 524288,
47 Huge = 4194304,
49}
50
51#[derive(Debug)]
53struct BufferBucket {
54 buffers: Vec<AlignedBuffer>,
55 size: BufferSize,
56 last_access: Instant,
57}
58
59#[derive(Debug)]
61pub struct AlignedBuffer {
62 data: Vec<u8>,
63 capacity: usize,
64 alignment: usize,
65 created_at: Instant,
66 last_used: Instant,
67}
68
69#[derive(Debug, Clone)]
71pub struct PoolStats {
72 pub total_allocations: u64,
74 pub cache_hits: u64,
76 pub cache_misses: u64,
78 pub current_memory_usage: usize,
80 pub peak_memory_usage: usize,
82 pub cleanup_count: u64,
84}
85
86impl BufferPool {
87 pub fn new() -> Self {
89 Self::with_config(PoolConfig::default())
90 }
91
92 pub fn with_config(config: PoolConfig) -> Self {
94 Self {
95 pools: Arc::new(Mutex::new(HashMap::new())),
96 config,
97 stats: Arc::new(Mutex::new(PoolStats::new())),
98 }
99 }
100
101 pub fn get_buffer(&self, size: BufferSize) -> DomainResult<PooledBuffer> {
103 if self.config.track_stats {
104 self.increment_allocations();
105 }
106
107 let mut pools = self.pools.lock()
108 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
109
110 if let Some(bucket) = pools.get_mut(&size) {
111 if let Some(mut buffer) = bucket.buffers.pop() {
112 buffer.last_used = Instant::now();
113 bucket.last_access = Instant::now();
114
115 if self.config.track_stats {
116 self.increment_cache_hits();
117 }
118
119 return Ok(PooledBuffer::new(buffer, Arc::clone(&self.pools), size));
120 }
121 }
122
123 if self.config.track_stats {
125 self.increment_cache_misses();
126 }
127
128 let buffer = AlignedBuffer::new(size as usize, self.config.simd_alignment)?;
129 Ok(PooledBuffer::new(buffer, Arc::clone(&self.pools), size))
130 }
131
132 pub fn get_buffer_with_capacity(&self, min_capacity: usize) -> DomainResult<PooledBuffer> {
134 let size = BufferSize::for_capacity(min_capacity);
135 self.get_buffer(size)
136 }
137
138 pub fn cleanup(&self) -> DomainResult<CleanupStats> {
140 let mut pools = self.pools.lock()
141 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
142
143 let now = Instant::now();
144 let mut freed_buffers = 0;
145 let mut freed_memory = 0;
146
147 pools.retain(|_size, bucket| {
148 let old_count = bucket.buffers.len();
149 bucket.buffers.retain(|buffer| {
150 let age = now.duration_since(buffer.last_used);
151 if age > self.config.buffer_ttl {
152 freed_memory += buffer.capacity;
153 false
154 } else {
155 true
156 }
157 });
158 freed_buffers += old_count - bucket.buffers.len();
159
160 !bucket.buffers.is_empty() ||
162 now.duration_since(bucket.last_access) < self.config.buffer_ttl
163 });
164
165 if self.config.track_stats {
166 self.increment_cleanup_count();
167 self.update_current_memory_usage(-(freed_memory as i64));
168 }
169
170 Ok(CleanupStats {
171 freed_buffers,
172 freed_memory,
173 })
174 }
175
176 pub fn stats(&self) -> DomainResult<PoolStats> {
178 let stats = self.stats.lock()
179 .map_err(|_| DomainError::InternalError("Failed to acquire stats lock".to_string()))?;
180 Ok(stats.clone())
181 }
182
183 pub fn current_memory_usage(&self) -> DomainResult<usize> {
185 let pools = self.pools.lock()
186 .map_err(|_| DomainError::InternalError("Failed to acquire pool lock".to_string()))?;
187
188 let usage = pools.values()
189 .map(|bucket| bucket.buffers.iter().map(|b| b.capacity).sum::<usize>())
190 .sum();
191
192 Ok(usage)
193 }
194
195 fn increment_allocations(&self) {
198 if let Ok(mut stats) = self.stats.lock() {
199 stats.total_allocations += 1;
200 }
201 }
202
203 fn increment_cache_hits(&self) {
204 if let Ok(mut stats) = self.stats.lock() {
205 stats.cache_hits += 1;
206 }
207 }
208
209 fn increment_cache_misses(&self) {
210 if let Ok(mut stats) = self.stats.lock() {
211 stats.cache_misses += 1;
212 }
213 }
214
215 fn increment_cleanup_count(&self) {
216 if let Ok(mut stats) = self.stats.lock() {
217 stats.cleanup_count += 1;
218 }
219 }
220
221 fn update_current_memory_usage(&self, delta: i64) {
222 if let Ok(mut stats) = self.stats.lock() {
223 stats.current_memory_usage = (stats.current_memory_usage as i64 + delta).max(0) as usize;
224 stats.peak_memory_usage = stats.peak_memory_usage.max(stats.current_memory_usage);
225 }
226 }
227}
228
229impl BufferSize {
230 pub fn for_capacity(capacity: usize) -> Self {
232 match capacity {
233 0..=1024 => BufferSize::Small,
234 1025..=8192 => BufferSize::Medium,
235 8193..=65536 => BufferSize::Large,
236 65537..=524288 => BufferSize::XLarge,
237 _ => BufferSize::Huge,
238 }
239 }
240
241 pub fn all_sizes() -> &'static [BufferSize] {
243 &[
244 BufferSize::Small,
245 BufferSize::Medium,
246 BufferSize::Large,
247 BufferSize::XLarge,
248 BufferSize::Huge,
249 ]
250 }
251}
252
253impl AlignedBuffer {
254 fn new(capacity: usize, alignment: usize) -> DomainResult<Self> {
256 if !alignment.is_power_of_two() {
258 return Err(DomainError::InvalidInput(format!("Alignment {alignment} is not power of 2")));
259 }
260
261 let aligned_capacity = (capacity + alignment - 1) & !(alignment - 1);
263
264 let data = Vec::with_capacity(aligned_capacity);
267
268 let now = Instant::now();
269 Ok(Self {
270 data,
271 capacity: aligned_capacity,
272 alignment,
273 created_at: now,
274 last_used: now,
275 })
276 }
277
278 pub fn as_mut_slice(&mut self) -> &mut [u8] {
280 &mut self.data
281 }
282
283 pub fn as_slice(&self) -> &[u8] {
285 &self.data
286 }
287
288 pub fn clear(&mut self) {
290 self.data.clear();
291 self.last_used = Instant::now();
292 }
293
294 pub fn capacity(&self) -> usize {
296 self.capacity
297 }
298
299 pub fn is_aligned(&self) -> bool {
302 let ptr = self.data.as_ptr() as usize;
303 let natural_alignment = std::mem::align_of::<u64>(); let effective_alignment = if self.alignment <= natural_alignment {
307 natural_alignment
308 } else {
309 let min_acceptable = std::cmp::min(self.alignment, 16);
312 min_acceptable
313 };
314
315 ptr % effective_alignment == 0
316 }
317}
318
319pub struct PooledBuffer {
321 buffer: Option<AlignedBuffer>,
322 pool: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
323 size: BufferSize,
324}
325
326impl PooledBuffer {
327 fn new(
328 buffer: AlignedBuffer,
329 pool: Arc<Mutex<HashMap<BufferSize, BufferBucket>>>,
330 size: BufferSize,
331 ) -> Self {
332 Self {
333 buffer: Some(buffer),
334 pool,
335 size,
336 }
337 }
338
339 pub fn buffer_mut(&mut self) -> Option<&mut AlignedBuffer> {
341 self.buffer.as_mut()
342 }
343
344 pub fn buffer(&self) -> Option<&AlignedBuffer> {
346 self.buffer.as_ref()
347 }
348
349 pub fn capacity(&self) -> usize {
351 self.buffer.as_ref().map(|b| b.capacity()).unwrap_or(0)
352 }
353
354 pub fn clear(&mut self) {
356 if let Some(buffer) = &mut self.buffer {
357 buffer.clear();
358 }
359 }
360}
361
362impl Drop for PooledBuffer {
363 fn drop(&mut self) {
364 if let Some(mut buffer) = self.buffer.take() {
365 buffer.clear(); if let Ok(mut pools) = self.pool.lock() {
368 let bucket = pools.entry(self.size).or_insert_with(|| BufferBucket {
369 buffers: Vec::new(),
370 size: self.size,
371 last_access: Instant::now(),
372 });
373
374 if bucket.buffers.len() < 50 { bucket.buffers.push(buffer);
377 bucket.last_access = Instant::now();
378 }
379 }
380 }
381 }
382}
383
384#[derive(Debug, Clone)]
386pub struct CleanupStats {
387 pub freed_buffers: usize,
388 pub freed_memory: usize,
389}
390
391impl PoolConfig {
392 pub fn simd_optimized() -> Self {
394 Self {
395 max_buffers_per_bucket: 100,
396 max_total_memory: 64 * 1024 * 1024, buffer_ttl: Duration::from_secs(300), track_stats: true,
399 simd_alignment: 64, }
401 }
402
403 pub fn low_memory() -> Self {
405 Self {
406 max_buffers_per_bucket: 10,
407 max_total_memory: 8 * 1024 * 1024, buffer_ttl: Duration::from_secs(60), track_stats: false, simd_alignment: 32, }
412 }
413}
414
415impl Default for PoolConfig {
416 fn default() -> Self {
417 Self {
418 max_buffers_per_bucket: 50,
419 max_total_memory: 32 * 1024 * 1024, buffer_ttl: Duration::from_secs(180), track_stats: true,
422 simd_alignment: 32, }
424 }
425}
426
427impl PoolStats {
428 fn new() -> Self {
429 Self {
430 total_allocations: 0,
431 cache_hits: 0,
432 cache_misses: 0,
433 current_memory_usage: 0,
434 peak_memory_usage: 0,
435 cleanup_count: 0,
436 }
437 }
438
439 pub fn hit_ratio(&self) -> f64 {
441 if self.total_allocations == 0 {
442 0.0
443 } else {
444 self.cache_hits as f64 / self.total_allocations as f64
445 }
446 }
447
448 pub fn memory_efficiency(&self) -> f64 {
450 if self.peak_memory_usage == 0 {
451 1.0
452 } else {
453 self.current_memory_usage as f64 / self.peak_memory_usage as f64
454 }
455 }
456}
457
458impl Default for BufferPool {
459 fn default() -> Self {
460 Self::new()
461 }
462}
463
464static GLOBAL_BUFFER_POOL: std::sync::OnceLock<BufferPool> = std::sync::OnceLock::new();
466
467pub fn global_buffer_pool() -> &'static BufferPool {
469 GLOBAL_BUFFER_POOL.get_or_init(|| BufferPool::new())
470}
471
472pub fn initialize_global_buffer_pool(config: PoolConfig) -> DomainResult<()> {
474 GLOBAL_BUFFER_POOL.set(BufferPool::with_config(config))
475 .map_err(|_| DomainError::InternalError("Global buffer pool already initialized".to_string()))?;
476 Ok(())
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482
483 #[test]
484 fn test_buffer_pool_creation() {
485 let pool = BufferPool::new();
486 assert!(pool.stats().is_ok());
487 }
488
489 #[test]
490 fn test_buffer_allocation() {
491 let pool = BufferPool::new();
492 let buffer = pool.get_buffer(BufferSize::Medium);
493 assert!(buffer.is_ok());
494
495 let buffer = buffer.unwrap();
496 assert!(buffer.capacity() >= BufferSize::Medium as usize);
497 }
498
499 #[test]
500 fn test_buffer_reuse() {
501 let pool = BufferPool::new();
502
503 {
505 let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
506 }
507
508 let _buffer2 = pool.get_buffer(BufferSize::Small).unwrap();
510
511 let stats = pool.stats().unwrap();
513 assert!(stats.cache_hits > 0);
514 }
515
516 #[test]
517 fn test_buffer_size_selection() {
518 assert_eq!(BufferSize::for_capacity(500), BufferSize::Small);
519 assert_eq!(BufferSize::for_capacity(2000), BufferSize::Medium);
520 assert_eq!(BufferSize::for_capacity(50000), BufferSize::Large);
521 assert_eq!(BufferSize::for_capacity(100000), BufferSize::XLarge);
522 }
523
524 #[test]
525 fn test_aligned_buffer_creation() {
526 let buffer = AlignedBuffer::new(1024, 64).unwrap();
527
528 let ptr = buffer.data.as_ptr() as usize;
530 let natural_alignment = std::mem::align_of::<u64>();
531 println!("Buffer ptr: 0x{:x}, alignment: {}, natural_alignment: {}",
532 ptr, buffer.alignment, natural_alignment);
533 println!("ptr % 8 = {}, ptr % 16 = {}, ptr % 32 = {}, ptr % 64 = {}",
534 ptr % 8, ptr % 16, ptr % 32, ptr % 64);
535
536 assert!(buffer.is_aligned(), "Buffer should be aligned. Ptr: 0x{:x}, Alignment: {}", ptr, buffer.alignment);
537 assert!(buffer.capacity() >= 1024);
538 }
539
540 #[test]
541 fn test_alignment_validation() {
542 let alignments = [1, 2, 4, 8, 16, 32, 64];
544
545 for alignment in alignments.iter() {
546 let result = AlignedBuffer::new(1024, *alignment);
547 if alignment.is_power_of_two() {
548 let buffer = result.unwrap();
549 println!("Testing alignment {}: ptr=0x{:x}, aligned={}",
550 alignment, buffer.data.as_ptr() as usize, buffer.is_aligned());
551 assert!(buffer.is_aligned(), "Failed for alignment {alignment}");
553 }
554 }
555
556 assert!(AlignedBuffer::new(1024, 3).is_err());
558 assert!(AlignedBuffer::new(1024, 17).is_err());
559 }
560
561 #[test]
562 fn test_pool_cleanup() {
563 let config = PoolConfig {
564 buffer_ttl: Duration::from_millis(1),
565 ..Default::default()
566 };
567 let pool = BufferPool::with_config(config);
568
569 {
571 let _buffer = pool.get_buffer(BufferSize::Small).unwrap();
572 }
573
574 std::thread::sleep(Duration::from_millis(10));
576
577 let cleanup_stats = pool.cleanup().unwrap();
579 assert!(cleanup_stats.freed_buffers > 0);
580 }
581
582 #[test]
583 fn test_global_buffer_pool() {
584 let pool = global_buffer_pool();
585 let buffer = pool.get_buffer(BufferSize::Medium);
586 assert!(buffer.is_ok());
587 }
588}